In [1]:
import os
import re
import csv

import pandas as pd
from Bio import SeqIO, AlignIO
from Bio.Align import MultipleSeqAlignment

SNP_DIR = "SNP"
REFSEQ_DIR = "RefSeq"
METADATA = "Metadata"

REFERENCE = "EPI_ISL_402124"

# Spike protein AA

## 20aa

In [2]:
seqs = SeqIO.index(os.path.join(SNP_DIR, "spike_AA_selected_20.fasta"), "fasta")

reference = seqs[REFERENCE]

In [3]:
annotation = pd.read_csv(os.path.join(REFSEQ_DIR, "SARS_CoV_2.csv"), index_col=0)
siteMapping = {str(peptidePos): str(info['aaPos'].values[0]) for peptidePos, info in annotation.groupby("peptidePos")}

In [4]:
mutMapping = {}

with open(os.path.join(SNP_DIR, "variant_site_AA.csv")) as f:
    for row in csv.DictReader(f):
        site = row.pop('')
        row = {aa: int(freq) for aa, freq in row.items()}
        if site != "7711":
            preMut = max(row, key=row.get)
        else:
            preMut = "D"
        row.pop(preMut)
        postMut = max(row, key=row.get)
        mutMapping[site] = preMut + siteMapping[site] + postMut

In [5]:
variant_AA = pd.read_csv(os.path.join(SNP_DIR, "variant_AA_S_20.csv"), header=None, index_col=0)

rowNames = tuple(mutMapping[str(siteName)] for siteName in variant_AA.index)

In [7]:
metadata = pd.read_csv(os.path.join(METADATA, "SARS_CoV_2.csv"), index_col=0)
metadata.loc[pd.isna(metadata['Nextstrain_clade']), "Nextstrain_clade"] = "unknown"

In [8]:
ac2clade = {}

for clade, info in metadata.groupby("Nextstrain_clade"):
    if pd.isna(clade):
        print(clade)
    for ac in info.index:
        ac2clade[ac] = clade

In [32]:
D614G_index = -1

for n, i in enumerate(rowNames):
    if i == "D614G":
        D614G_index = n

aligned = AlignIO.read(os.path.join(SNP_DIR, "spike_AA_selected_20.fasta"), "fasta")

df = []

for record in aligned:
    snp = { rowNames[n]: 0 if base == ref or base in ("-", "X") else 1
           for n, (ref, base) in enumerate(zip(reference, record)) }
    snpFreq = list(snp.values())
    if sum(snpFreq):
        if snpFreq[D614G_index] == 1 and sum(snpFreq) == 1:
            pass
        else:
            df.append({"gisaid_id": ac2clade[record.id] + "_" + record.id, **snp})
            
df = pd.DataFrame(df)
df = df.sort_values('gisaid_id')
df = df.set_index("gisaid_id")
df.to_csv(os.path.join(SNP_DIR, "snp_AA.csv"))

# Trimmed by cluster

## 50aa

# Cluster comparison

In [9]:
trimmedRow = []

with open(os.path.join(SNP_DIR, "trimmed_AA_cls.csv"), "w") as csvfile:
    spamwriter = csv.DictWriter(csvfile, delimiter='\t', fieldnames = ["rep", "trimmed"])
    spamwriter.writeheader()
    with open(os.path.join(SNP_DIR, "Trim_S_AA.fasta.clstr")) as f:
        repID = None
        accessions = []
        for line in f:
            if line.startswith('>'):
                if len(accessions) > 1:
                    accessions = set(accessions)
                    if len(accessions) > 1:
                        spamwriter.writerow({"rep": ac2clade[repID], "trimmed": ', '.join(accessions) })
                if REFERENCE in accessions:
                    print(repID)
                accessions = []
            else:
                m = re.findall(r">(\w+)", line)[0]
                if '*' in line:
                    repID = m
                accessions.append(ac2clade[m])
        if len(accessions) > 1:
            accessions = set(accessions)
            if len(accessions) > 1:
                spamwriter.writerow({"rep": ac2clade[repID], "trimmed": ', '.join(accessions) })
        if REFERENCE in accessions:
            print(repID)

FileNotFoundError: [Errno 2] No such file or directory: 'SNP\\Trim_S_AA.fasta.clstr'

In [None]:
seqs = SeqIO.index(os.path.join(SNP_DIR, "spike_AA_selected.fasta"), "fasta")

reference = seqs[REFERENCE]

In [None]:
trimmedRow = []
repSeqs = []

with open(os.path.join(SNP_DIR, "trimmed_AA.csv"), "w") as csvfile:
    spamwriter = csv.DictWriter(csvfile, delimiter='\t', fieldnames = ["rep", "trimmed"])
    spamwriter.writeheader()
    with open(os.path.join(SNP_DIR, "Trim_S_AA.fasta.clstr")) as f:
        repID = None
        accessions = []
        for line in f:
            if line.startswith('>'):
                if repID:
                    repSeqs.append(seqs[repID])
                if len(accessions) > 1:
                    spamwriter.writerow({"rep": repID, "trimmed": ', '.join(accessions) })
                if REFERENCE in accessions:
                    print(repID)
                accessions = []
            else:
                m = re.findall(r">(\w+)", line)[0]
                if '*' in line:
                    repID = m
                accessions.append(m)
        if len(accessions) > 1:
            repSeqs.append(seqs[repID])
            spamwriter.writerow({"rep": repID, "trimmed": ', '.join(accessions) })
        if REFERENCE in accessions:
            print(repID)

In [None]:
variant_site_AA = pd.read_csv(os.path.join(SNP_DIR, "variant_site_AA.csv"), index_col=0)

In [None]:
annotation = pd.read_csv(os.path.join(REFSEQ_DIR, "SARS_CoV_2.csv"), index_col=0)
siteMapping = {str(peptidePos): str(info['aaPos'].values[0]) for peptidePos, info in annotation.groupby("peptidePos")}

In [None]:
mutMapping = {}

with open(os.path.join(SNP_DIR, "variant_site_AA.csv")) as f:
    for row in csv.DictReader(f):
        site = row.pop('')
        row = {aa: int(freq) for aa, freq in row.items()}
        if site != "7711":
            preMut = max(row, key=row.get)
        else:
            preMut = "D"
        row.pop(preMut)
        postMut = max(row, key=row.get)
        if site == "7598":
            print(row)
        mutMapping[site] = preMut + siteMapping[site] + postMut

In [None]:
variant_AA = pd.read_csv(os.path.join(SNP_DIR, "variant_AA_S_50.csv"), header=None, index_col=0)

rowNames = tuple(mutMapping[str(siteName)] for siteName in variant_AA.index)

In [None]:
aligned = MultipleSeqAlignment(repSeqs)

with open(os.path.join(SNP_DIR, "snp_AA_trimmed.csv"), 'w', newline='') as f:
    spamwriter = csv.writer(f, delimiter=',')
    spamwriter.writerow(("gisaid_id", *rowNames))
    for record in aligned:
        snp = tuple(0 if base == ref or base in ("-", "X") else 1 for ref, base in zip(reference, record))
        if sum(snp):
            if snp[32] == 1 and sum(snp) == 1:
                pass
            else:
                spamwriter.writerow((record.id, *snp))

In [None]:
aligned = AlignIO.read(os.path.join(SNP_DIR, "spike_AA_selected.fasta"), "fasta")

with open(os.path.join(SNP_DIR, "snp_AA.csv"), 'w', newline='') as f:
    spamwriter = csv.writer(f, delimiter=',')
    spamwriter.writerow(("gisaid_id", *rowNames))
    for record in aligned:
        snp = tuple(0 if base == ref or base in ("-", "X") else 1 for ref, base in zip(reference, record))
        if sum(snp):
            if snp[32] == 1 and sum(snp) == 1:
                pass
            else:
                spamwriter.writerow((record.id, *snp))

# Spike gene sequence

In [None]:
seqs = SeqIO.index(os.path.join(SNP_DIR,"spike_selected.fasta"), "fasta")

reference = seqs[REFERENCE]

In [None]:
trimmedRow = []
repSeqs = []

with open(os.path.join(SNP_DIR, "trimmed.csv"), "w") as csvfile:
    spamwriter = csv.DictWriter(csvfile, delimiter='\t', fieldnames = ["rep", "trimmed"])
    spamwriter.writeheader()
    with open(os.path.join(SNP_DIR, "Trim_S.fasta.clstr")) as f:
        repID = None
        accessions = []
        for line in f:
            if line.startswith('>'):
                if repID:
                    repSeqs.append(seqs[repID])
                if len(accessions) > 1:
                    spamwriter.writerow({"rep": repID, "trimmed": ', '.join(accessions) })
                if REFERENCE in accessions:
                    print(repID)
                accessions = []
            else:
                m = re.findall(r">(\w+)", line)[0]
                if '*' in line:
                    repID = m
                accessions.append(m)
        if len(accessions) > 1:
            repSeqs.append(seqs[repID])
            spamwriter.writerow({"rep": repID, "trimmed": ', '.join(accessions) })
        if REFERENCE in accessions:
            print(repID)

In [None]:
aligned = MultipleSeqAlignment(repSeqs)

with open(os.path.join(SNP_DIR, "snp_trimmed.csv"), 'w', newline='') as f:
    spamwriter = csv.writer(f, delimiter=',')
    for record in aligned:
        snp = (0 if base == ref or base in ("-", "N") else 1 for ref, base in zip(reference, record))
        spamwriter.writerow((record.id, *snp))