In [None]:
from Bio import AlignIO, SeqIO
from Bio.Seq import Seq
import os
import glob
import pandas as pd
from Bio.SeqRecord import SeqRecord


In [79]:
def cross_check(fasta_path, region):
    SPIKE_COORDS = (21563, 25321)
    RBD_COORDS = (22553, 23155)
    (start, end) = SPIKE_COORDS if region == "spike" else RBD_COORDS

    rec = SeqIO.read(fasta_path, format="fasta")

    wt = []
    aa_pos_ls = []
    coords = []
    aa_pos = 1 if region == "spike" else 331 # 1-based idx

    for pos in range(start-1, end, 3): # 0-based idx
        try:
            ref_seq = str(rec.seq)
            aa = str(Seq(ref_seq[pos: pos+3]).translate())

            if aa  == "*" or aa == "-":
                aa_pos += 1
                continue
            wt.append(aa)
            aa_pos_ls.append(aa_pos)
            coords.append((int(pos), int(pos+3))) # 0-based idx
            aa_pos += 1
        except:
            aa_pos += 1
            continue
        
    df = pd.DataFrame({'wildtype': wt, 'site': aa_pos_ls, "coords": coords}) if region == "spike" else pd.DataFrame({'wt': wt, 'pos': aa_pos_ls, "coords": coords})

    return df


In [None]:
fasta_path = "/home/yutianc/sars-cov-2-dms-ref/output/NC_045512.2_escape_ba.2_spike.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/ba.2/spike/BA.2_spike_summary.csv"

df = cross_check(fasta_path, region="spike")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["site"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["site", "wildtype"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wildtype,site,coords,mutant,spike mediated entry,ACE2 binding,sequential_site,region,_merge,codon
418,N,422,"(22825, 22828)",,,,,,left_only,aac
433,N,437,"(22870, 22873)",,,,,,left_only,aac
520,V,524,"(23131, 23134)",,,,,,left_only,gtg
559,Q,563,"(23248, 23251)",,,,,,left_only,caa
716,I,720,"(23719, 23722)",,,,,,left_only,atc
812,S,816,"(24007, 24010)",,,,,,left_only,tcc
814,I,818,"(24013, 24016)",,,,,,left_only,att
970,S,974,"(24481, 24484)",,,,,,left_only,tcc


In [93]:
fasta_path = "/home/yutianc/sars-cov-2-dms-ref/output/NC_045512.2_escape_xbb.1.5_spike.fa"
csv_path = "/home/yutianc/bjorn_rep/data/sc2/muninn_inputs/dms/XBB.1.5_spike_DMS.csv"

df = cross_check(fasta_path, region="spike")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["site"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["site", "wildtype"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]] if pd.notna(x) else None)
m

Unnamed: 0,wildtype,site,coords,mutant,human sera escape,spike mediated entry,ACE2 binding,sequential_site,region,_merge,codon
139,V,143,,-,-0.03895,-0.08258,-0.3188,140.0,NTD,right_only,
417,N,422,"(22825, 22828)",,,,,,,left_only,aac
475,C,480,"(22999, 23002)",,,,,,,left_only,tgt
501,Q,506,"(23077, 23080)",,,,,,,left_only,caa
526,T,531,"(23152, 23155)",,,,,,,left_only,acc
558,Q,563,"(23248, 23251)",,,,,,,left_only,caa
574,P,579,"(23296, 23299)",,,,,,,left_only,cca
715,I,720,"(23719, 23722)",,,,,,,left_only,atc
720,E,725,"(23734, 23737)",,,,,,,left_only,gag
744,C,749,"(23806, 23809)",,,,,,,left_only,tgt


In [83]:
fasta_path = "/home/yutianc/bjorn_rep/data/sc2/NC_045512.2_escape_BA.1_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/ba.1/mut1_BA.1_Omicron_baseline_EPI_ISL_10000028.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [85]:
fasta_path = "/home/yutianc/bjorn_rep/data/sc2/NC_045512.2_escape_BA.2_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/ba.2/rbd/mut1_BA.2_Omicron_baseline_EPI_ISL_10000005.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [86]:
fasta_path = "/home/yutianc/bjorn_rep/data/sc2/NC_045512.2_escape_BA.2.75_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/ba.2.75/mut1_BA.2.75_EPI_ISL_13302209.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [87]:
fasta_path = "/home/yutianc/bjorn_rep/data/sc2/NC_045512.2_escape_BA.4_BA.5_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/ba.4-ba.5/mut1_BA.4_BA.5_EPI_ISL_11207535.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [None]:
fasta_path = "/home/yutianc/sars-cov-2-dms-ref/output/NC_045512.2_escape_JN.1_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/jn.1/mut1_JN_1_EPI_ISL_18373905.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [90]:
fasta_path = "/home/yutianc/sars-cov-2-dms-ref/output/NC_045512.2_escape_KP.2_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/kp.2/mut1_KP_2_EPI_ISL_18916251_del.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [91]:
fasta_path = "/home/yutianc/sars-cov-2-dms-ref/output/NC_045512.2_escape_KP.3_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/kp.3/mut1_KP_3_EPI_ISL_19036766.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [92]:
fasta_path = "/home/yutianc/sars-cov-2-dms-ref/output/NC_045512.2_escape_XBB.1.5_rbd.fa"
csv_path = "/home/yutianc/sars-cov-2-dms-ref/data/xbb.1.5/rbd/mut1_XBB_1_5_EPI_ISL_17054053.csv"

df = cross_check(fasta_path, region="rbd")
df_true = pd.read_csv(csv_path)
df_true = df_true.drop_duplicates(subset=["pos"])
ref_seq = str(SeqIO.read(fasta_path, format="fasta").seq)

m = pd.merge(df, df_true, how="outer", on=["pos", "wt"], indicator=True)
m = m[m["_merge"] != "both"]
m["codon"] = m["coords"].apply(lambda x: ref_seq[x[0]: x[1]])
m

Unnamed: 0,wt,pos,coords,mut1,num_aa,_merge,codon


In [40]:
def get_gene_from_dms_ref(path_to_dms_ref, out_name, out_path):
    
    dms_ref = SeqIO.read(path_to_dms_ref, format="genbank")
    (start, end) = next((i.location.start, i.location.end) for i in dms_ref.features if i.type == "gene")
    gene_seq = dms_ref.seq[start: end]
    gene_rec = SeqRecord(gene_seq, id=out_name, name=out_name, description=out_name)

    with open(out_path, "a") as handle:   # append
        SeqIO.write(gene_rec, handle, "fasta")



In [43]:
get_gene_from_dms_ref("/home/yutianc/sars-cov-2-dms-ref/data/ba.1/PacBio_amplicon_BA1.gb", "ba.1", "/home/yutianc/sars-cov-2-dms-ref/rbd.fa")
get_gene_from_dms_ref("/home/yutianc/sars-cov-2-dms-ref/data/ba.2/rbd/PacBio_amplicon_BA2.gb", "ba.2", "/home/yutianc/sars-cov-2-dms-ref/rbd.fa")
get_gene_from_dms_ref("/home/yutianc/sars-cov-2-dms-ref/data/xbb.1.5/rbd/PacBio_amplicon.gb", "xbb.1.5", "/home/yutianc/sars-cov-2-dms-ref/rbd.fa")


In [44]:
get_gene_from_dms_ref("/home/yutianc/sars-cov-2-dms-ref/data/ba.2/spike/PacBio_amplicon.gb", "ba.2", "/home/yutianc/sars-cov-2-dms-ref/spike.fa")
get_gene_from_dms_ref("/home/yutianc/sars-cov-2-dms-ref/data/xbb.1.5/spike/PacBio_amplicon.gb", "xbb.1.5", "/home/yutianc/sars-cov-2-dms-ref/spike.fa")


In [None]:
from pathlib import Path

FA_EXTS = {".fa", ".fasta", ".fna"}

def iter_fasta_files(root: str):
    root_path = Path(root)
    for p in root_path.rglob("*"):
        if p.is_file() and p.suffix.lower() in FA_EXTS:
            yield p


def extract_ids_from_fasta_file(fp: Path):
    ids = []
    fname = []
    with fp.open("r", encoding="utf-8", errors="replace") as f:
        for line in f:
            if line.startswith(">"):
                # first token after '>' (drops description)
                seq_id = line[1:].strip().split()[0]
                if seq_id:
                    try:
                        sra = seq_id.split('/')[2]
                        fa_name = (f.name.split("/")[-1]).split('.')[0]
                        if sra != fa_name:
                            ids.append(sra)
                            fname.append(fa_name)
                    except:
                        continue
    return ids, fname

def extract_ids_from_dir(root: str):
    out_id, out_fname = [], []
    for fp in iter_fasta_files(root):
        ids, fname = extract_ids_from_fasta_file(fp)
        out_id.extend(ids)
        out_fname.extend(fname)
    print(len(out_fname))
    print(len(out_id))

    return pd.DataFrame({"ID": out_id, "fname": out_fname})

if __name__ == "__main__":
    root_dir = "/home/yutianc/bjorn_rep/data/sc2/consensus_sequences/"
    df = extract_ids_from_dir(root_dir)

d = dict(zip(df["fname"], df["ID"]))
