In [3]:
import pickle
import pandas as pd
import multiprocessing as mp
import numpy as np
import pickle
import matplotlib.pyplot as plt
from Bio import SeqIO
import os
import subprocess
import ast
import warnings
from pathlib import Path
from ete3 import NCBITaxa
warnings.filterwarnings('ignore')



In [8]:
##load filtered round2 blast chimeras
import pickle
file_path = 'outputs/clustered_ankyrin_transposon_secondary_filtered_chimeras.pickle'
with open(file_path, 'rb') as file:
    chimeras=pickle.load(file)
##append to intervals
intervals=[]
for c in chimeras:
    for i in chimeras[c]:
        intervals.append(c+";"+chimeras[c][i]+"_"+str(i).replace(" ",""))


In [24]:
##Load a dictionary of primary:secondary chimera mappings for PCR-validated secondary chimeras from a previous pipeline iteration
## this is to prioritize selection of pcr'd secondary chimeras 
import pickle
file_path = 'outputs/previous_iteration_secondary_chimeras.pickle'
with open(file_path, 'rb') as file:
    previous_iteration_secondary=pickle.load(file)

In [25]:
##load dictionary of secondary chimeras
file_path = 'outputs/secondary_chimera_adjacency_list.pickle'
with open(file_path, 'rb') as file:
    secondary_chimera_adjacency_list=pickle.load(file)

In [9]:

blast_hits=[]
for chimera in chimeras:
    ints=[x for x in intervals if chimera in x]
    for x in ints:
        a2=SeqIO.to_dict(SeqIO.parse(f'outputs/hmmbuild/{chimera}/{x}/sub_seq.fasta', 'fasta'))
        if len(a2.keys())<=1:
            ##extract sequences that have only self blast-hits (bit-score>min(bit-score non-arthropod))
            ##blast hits instead of hmmsearch hits are used for phylogenetic dataset contstruction
            blast_hits.append(x)
hmmer_hits=set(intervals)-set(blast_hits)

In [27]:
##load a dataframe of genome taxids from genome accessions
df1=pd.read_csv('Data/genbank_genomes_4_22_2025.tsv',sep='\t')
df2=pd.read_csv('Data/refseq_genomes_scaffold_plus_4_19_2025.tsv',sep='\t')
dftax=pd.concat([df1,df2]).set_index('Assembly Accession')
dftax.loc['GCF_006496715.1',['Organism Name','Organism Taxonomic ID']]=['Aedes albopictus',7160]

In [28]:
"""
for a given primary representative chimera, finds a single secondary chimera  per taxid
selects hits with maximum bitscore to the max length hgt interval per taxid
"""
def pick_secondaries(c):
    
    ##pick HGT interval with the maximum length 
    interval_map={x:x[1]-x[0] for x in chimeras[c] if chimeras[c][x]=='HGT'}
    max_len=max(interval_map.values())
    max_len_interval=str([k for k, v in interval_map.items () if v==max_len][0]).replace(" ","")
    
    ##selection if using hmmer hits
    if max_len_interval not in blast_hits:
    
        arth_path = Path(f"outputs/hmmsearch_v_arthropod/{c};HGT_{max_len_interval}.tsv")
        
        arth = pd.read_csv(arth_path, sep="\t")
        
        self_taxid=arth[arth.target_name==c].taxid.values[0]
        arth=arth[arth.taxid!=self_taxid]
        arth=arth[arth.target_name.isin(secondary_chimera_adjacency_list[c])]
        secondaries=set()
        ## add previous iteration secondary chimeras to selection
        prev=[]
        if c in previous_iteration_secondary:
            prev=previous_iteration_secondary[c]
        if arth.shape[0]>0:
            
            if len(prev)>0:
                for p in prev:
                    if p!=c:
                        parth=arth[arth.target_name==p]
                        ptaxid=parth.taxid.values[0]
                        secondaries.add(parth.target_name.values[0])
                        arth=arth[arth.taxid!=ptaxid]
            ##add maximum bit score per taxid
            secondaries=secondaries|set(arth.loc[arth.groupby('taxid')['domain_score'].idxmax()].target_name)
    ##selection if using blast hits
    else:
        arth_path = Path(f"outputs/round2_diamond_v_arthropod_output_split/{c};HGT_{max_len_interval}.tsv")
        arth = pd.read_csv(arth_path, sep="\t")
        arth['taxid']=[int(dftax.loc[x.split(";")[0],'Organism Taxonomic ID']) for x in arth['sseqid']]
        self_taxid=arth[arth.sseqid==c].taxid.values[0]
        arth=arth[arth.taxid!=self_taxid]
        arth=arth[arth.target_name.isin(secondary_chimera_adjacency_list[c])]
        secondaries=set()
        prev=[]
        if c in previous_iteration_secondary:
            prev=previous_iteration_secondary[c]
        if arth.shape[0]>0:
            if len(prev)>0:
                for p in prev:
                    if p!=c:
                        parth=arth[arth.sseqid==p]
                        ptaxid=parth.taxid.values[0]
                        secondaries.add(parth.target_name.values[0])
                        arth=arth[arth.taxid!=ptaxid]
        secondaries=secondaries|set(arth.loc[arth.groupby('taxid')['bitscore'].idxmax()].target_name)
        
    return secondaries

In [9]:
td=list(chimeras.keys())
with mp.Pool(40) as pool:
    results=pool.map(pick_secondaries,td)

In [10]:
secondary_chimera_map={x:y for x,y in zip(td,results)}

In [17]:
##save dictionary representation of filtered chimeras output
file_path = 'outputs/secondary_chimera_selection_for_trees.pickle'
with open(file_path, 'wb') as file:
    pickle.dump(secondary_chimera_map,file)

In [29]:
file_path = 'outputs/secondary_chimera_selection_for_trees.pickle'
with open(file_path, 'rb') as file:
    secondary_chimera_map=pickle.load(file)

In [8]:
!mkdir -p outputs/phylogenetic_dataset

In [30]:
representatives=SeqIO.to_dict(SeqIO.parse('outputs/split_intervals.fasta', 'fasta'))

In [31]:
##Load arthropod protein accessions from NR
ar_accessions=pd.read_csv('outputs/arthropoda.accessions',sep='\t',header=None)
ar_accessions=set(ar_accessions[0])

In [32]:
from Bio import SeqIO

#load fasta w/ all arthropod queries
all_seqs = SeqIO.to_dict(SeqIO.parse('outputs/all_arthropod_concatenated_proteins.fa', 'fasta'))
##add a secondary chimera pcr'd in the first pipeline iteration that is now suppressed in the latest a. albopictus annotation
a2=SeqIO.to_dict(SeqIO.parse('outputs/suppressed_aedes_albopictus.fa', 'fasta'))
all_seqs=all_seqs|a2

In [33]:

##these methods fill in taxonomic information in a dataframe of blast/hmmer hits to be used for phylogenetic inference
def _init_ncbi():
    """Each worker process gets its own NCBITaxa instance."""
    global ncbi
    ncbi = NCBITaxa()                  # downloads DB on first run


def tax_info_list(binomial: str):
    """
    Return [taxid, kingdom, phylum, class, order, lineage] for a species binomial,
    or [nan, nan, nan, nan] if anything goes wrong.
    
    """
    _init_ncbi()
    try:
        taxid = ncbi.get_name_translator([binomial])[binomial][0]

        lineage = ncbi.get_lineage(taxid)
        ranks   = ncbi.get_rank(lineage)
        names   = ncbi.get_taxid_translator(lineage)

        kingdom = next((names[t] for t in lineage if ranks[t] == "kingdom"), np.nan)
        phylum  = next((names[t] for t in lineage if ranks[t] == "phylum"),  np.nan)
        classl  = next((names[t] for t in lineage if ranks[t] == "class"),  np.nan)
        order  = next((names[t] for t in lineage if ranks[t] == "order"),  np.nan)
        lineage = str([names[t] for t in lineage])

        return [int(taxid), kingdom, phylum,classl,order,lineage]

    except Exception:
        return [np.nan, np.nan, np.nan,np.nan]


# ---------- 2.  Annotate an entire DataFrame --------------------------------
def add_taxonomy_columns(df, n_cores=40):
    """
    Enrich *df* (must contain a 'species' column) with three new columns:
       'taxid', 'kingdom', 'phylum'.
    """
    with mp.Pool(processes=n_cores, initializer=_init_ncbi) as pool:
        results = pool.map(tax_info_list, df["species"].tolist())

    # Split the list-of-lists into columns, preserving the original index
    df[["taxid", "kingdom", "phylum","class","order",'lineage']] = pd.DataFrame(results, index=df.index)
    return df




In [34]:

"""
runs BLAST+ blastdb_cmbdb to return a sliced protein sequence
parameter: (protein accession, start coordinate, stop coordinate, indexed blast db path)
"""
def _fetch_slice(task):
    acc, start, stop, blast_db = task
    cmd = [
        'blastdbcmd',
        '-db', blast_db,
        '-entry', acc,
        '-range', f'{start}-{stop}',
        '-outfmt', '%s'
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0 or not result.stdout.strip():
        
        return None
    seq = result.stdout.replace('\n', '')
    return f'>{acc};({start},{stop})\n{seq}\n'


"""
accepts a data frame with hmmer or blast hits (specified by t)
uses coordinates in the df to write slices of each protein to "out_fasta"
parallelized with mp.pool
"""
def slice_proteins_to_fasta(df, out_fasta, blast_db='../dbs/nr', t='hmmer',workers=40):
    if t=='blast':
         tasks = [
            (acc, int(start), int(stop), str(blast_db))
            for acc, start, stop in df[['target_name', 'envfrom', 'envto']].itertuples(index=False)
        ]
    else:
        tasks = [
            (acc, int(start), int(stop), str(blast_db))
            for acc, start, stop in df[['sseqid', 'sstart', 'send']].itertuples(index=False)
        ]
    with mp.Pool(processes=workers) as pool, open(out_fasta, 'a') as fout:
        for fasta_record in pool.imap_unordered(_fetch_slice, tasks):
            if fasta_record:
                fout.write(fasta_record)


In [42]:


"""
takes an interval name and writes a fasta with hmmsearch-aligned intervals
from secondary chimeras, non-chimeric arthropod sequences, and non-arthropod sequences
selects a single sequence per ncbi taxid in each of the above categories
calls a script to execute MUSCLE, trimAl and IQ-tree for phylogenetic inference
"""
def get_phylogen_dataset_hmmer(interv):
    
    ch=";".join(interv.split(";")[0:2])
    os.makedirs(f'outputs/phylogenetic_dataset/{ch}', exist_ok=True)
    os.makedirs(f'outputs/phylogenetic_dataset/{ch}/{interv}', exist_ok=True)

 
    ##load_secondary chimeras 
    f=open(f'outputs/phylogenetic_dataset/{ch}/{interv}/all_sequences.fa','w')
    sec=pd.read_csv(f"outputs/hmmsearch_v_arthropod/{interv}.tsv", sep="\t")
    sec=sec.loc[sec.groupby('target_name')['domain_score'].idxmax(),:]
    sec=sec[sec['target_name'].isin(secondary_chimera_map[ch])]
    sec['secondary']=True
    if sec.shape[0]>0:
        
        for index, row in sec.iterrows():
            name=row.target_name
            start=row.envfrom
            stop=row.envto
            seq=str(all_seqs[name].seq)[start-1:stop]
            name=name+";"+str((start,stop)).replace(" ","")
            f.write(f'>{name}\n')
            f.write(f'{seq}\n')
    ## add representative primary chimera to fasta and df
    f.write(f">{interv}\n")
    s=str(representatives[interv].seq)
    f.write(f"{s}\n")
    
    n=sec.shape[0]
    sec.loc[n,:]='primary_chimera'
    sec.loc[n,'target_name']=ch
    sec.loc[n,['envfrom','envto']]=ast.literal_eval(interv.split("_")[-1])
    sec.loc[n,'species']=dftax.loc[ch.split(";")[0],'Organism Name']
    sec=add_taxonomy_columns(sec)
    

    ##load other arthropod_hits
    arth_path = Path(f"outputs/hmmsearch_v_arthropod/{interv}.tsv")
    
    arth = pd.read_csv(arth_path, sep="\t",nrows=20000)
    arth = arth[(arth['i-Evalue']<1e-2)]
    arth=arth[~arth['target_name'].isin(secondary_chimera_adjacency_list[ch])]
    arth=arth[arth['target_name']!=ch]
   ##add non-secondary arthropod hits, 1/taxid
    if arth.shape[0]>0:
        arth=add_taxonomy_columns(arth)
        idx = arth.groupby('taxid')['domain_score'].idxmax()   # index of winners by max domain_score/taxid
        arth = arth.loc[idx].reset_index(drop=True)        # the filtered DataFrame
        arth= arth[arth.taxid.astype('str')!='nan']
        for index, row in arth.iterrows():
            name=row.target_name
            start=row.envfrom
            stop=row.envto
            seq=str(all_seqs[name].seq)[start-1:stop]
            name=name+";"+str((start,stop)).replace(" ","")
            f.write(f'>{name}\n')
            f.write(f'{seq}\n')
    f.close()
   
    ##Load non-arthropod hits
    non_arth=pd.read_csv(f"outputs/hmmsearch_v_nr/{interv}.tsv", sep="\t",nrows=20000)
    non_arth=non_arth[~non_arth.target_name.isin(ar_accessions)]
    non_arth=non_arth[non_arth['i-Evalue']<1e-2]
    non_arth=non_arth.sort_values('i-Evalue')
    
    non_arth=non_arth.iloc[0:20000,:]
    if non_arth.shape[0]>0:
        non_arth=add_taxonomy_columns(non_arth)
        ##filter out synthetic sequence hits and double-check arthropod filtering
        non_arth=non_arth[(non_arth.taxid.astype('str')!='nan')&(non_arth.taxid!=32630)&(non_arth.phylum!='Arthropoda')] 
        idx = non_arth.groupby('taxid')['domain_score'].idxmax()   
        non_arth = non_arth.loc[idx].reset_index(drop=True)     
        
        ##add up to 500 non-metazoan hits
        non_meta=non_arth[~non_arth.kingdom.astype(str).str.contains('Metazoa')]
        non_meta=non_meta.iloc[0:500,:]
        slice_proteins_to_fasta(non_meta,f'outputs/phylogenetic_dataset/{ch}/{interv}/all_sequences.fa',t='blast')

        ## add up to 500 non-arthropod metazoan hits 
        non_arth_meta=non_arth[~(non_arth.phylum.astype(str).str.contains('Arthropoda'))&(non_arth.kingdom=='Metazoa')]
        non_arth_meta=non_arth_meta.iloc[0:500,:]
        slice_proteins_to_fasta(non_arth_meta,f'outputs/phylogenetic_dataset/{ch}/{interv}/all_sequences.fa',t='blast')

    combined=pd.concat([sec,arth,non_arth_meta,non_meta])
    combined.to_csv(f'outputs/phylogenetic_dataset/{ch}/{interv}/combined_sequences_data.tsv',sep='\t')
    if combined.shape[0]<500:
        !sbatch scripts/align_iq_pipe.sh "$ch" "$interv"
    else:
        !sbatch scripts/align_iq_pipe_long.sh "$ch" "$interv"

In [1]:
"""
takes an interval name and writes a fasta with DIAMOND BLASTP-aligned intervals
from secondary chimeras, non-chimeric arthropod sequences, and non-arthropod sequences
selects a single sequence per ncbi taxid in each of the above categories
calls a script to execute MUSCLE, trimAl and IQ-tree for phylogenetic inference
"""
def get_phylogen_dataset_blast(interv):
    
    ch=";".join(interv.split(";")[0:2])
    os.makedirs(f'outputs/phylogenetic_dataset/{ch}', exist_ok=True)
    os.makedirs(f'outputs/phylogenetic_dataset/{ch}/{interv}', exist_ok=True)

 
    ##load_secondary chimeras 
    f=open(f'outputs/phylogenetic_dataset/{ch}/{interv}/all_sequences.fa','w')
    
    sec=pd.read_csv(f"outputs/round2_diamond_v_arthropod_output_split/{interv}.tsv", sep="\t")
    sec=sec.loc[sec.groupby('sseqid')['bitscore'].idxmax(),:]
    sec=sec[sec['sseqid'].isin(secondary_chimera_map[ch])]
    sec['species']=[dftax.loc[x.split(";")[0],'Organism Name'] for x in sec['sseqid']]
    sec['secondary']=True
    if sec.shape[0]>0:
        for index, row in sec.iterrows():
            name=row.sseqid
            start=row.sstart
            stop=row.send
            seq=str(all_seqs[name].seq)[start-1:stop]
            name=name+";"+str((start,stop)).replace(" ","")
            f.write(f'>{name}\n')
            f.write(f'{seq}\n')
    
    ### add representative primary chimera to fasta and df
    f.write(f">{interv}\n")
    s=str(representatives[interv].seq)
    f.write(f"{s}\n")
    
    n=sec.shape[0]
    sec.loc[n,:]='primary_chimera'
    sec.loc[n,'sseqid']=ch
    sec.loc[n,['sstart','send']]=ast.literal_eval(interv.split("_")[-1])
    sec.loc[n,'species']=dftax.loc[ch.split(";")[0],'Organism Name']
    sec=add_taxonomy_columns(sec)
    

    ##load other arthropod_hits
    arth_path = Path(f"outputs/round2_diamond_v_arthropod_output_split/{interv}.tsv")
    
    arth = pd.read_csv(arth_path, sep="\t",nrows=20000)
    arth['species']=[dftax.loc[x.split(";")[0],'Organism Name'] for x in arth['sseqid']]
    arth = arth[(arth['evalue']<1e-2)]
    arth=arth[~arth['sseqid'].isin(secondary_chimera_adjacency_list[ch])]
    arth=arth[arth['sseqid']!=ch]
   
    if arth.shape[0]>0:
        arth=add_taxonomy_columns(arth)
        idx = arth.groupby('taxid')['bitscore'].idxmax()   # index of winners by max domain_score/taxid
        arth = arth.loc[idx].reset_index(drop=True)        # the filtered DataFrame
        arth= arth[arth.taxid.astype('str')!='nan']
        for index, row in arth.iterrows():
            name=row.sseqid
            start=row.sstart
            stop=row.send
            seq=str(all_seqs[name].seq)[start-1:stop]
            name=name+";"+str((start,stop)).replace(" ","")
            f.write(f'>{name}\n')
            f.write(f'{seq}\n')

    f.close()
    ##Load non-arthropod hits
    non_arth=pd.read_csv(f"outputs/round2_diamond_output_split/{interv}.tsv", sep="\t",nrows=20000)
    non_arth=non_arth[~non_arth.sphylums.astype(str).str.contains('Arthropoda')]
    non_arth['species']=non_arth['sscinames']
    non_arth=non_arth.drop('sscinames',axis=1)
    non_arth=non_arth[non_arth['evalue']<1e-2]
    non_arth=non_arth.sort_values('evalue')
    
    non_arth=non_arth.iloc[0:20000,:]
    if non_arth.shape[0]>0:
        non_arth=add_taxonomy_columns(non_arth)
        ##filter out synthetic sequence hits
        non_arth=non_arth[(non_arth.staxids.astype('str')!='nan')&(non_arth.staxids!=32630)]
        idx = non_arth.groupby('taxid')['bitscore'].idxmax()   # index of winners by max domain_score/taxid
        non_arth = non_arth.loc[idx].reset_index(drop=True)        # the filtered DataFrame
        ##add up to 500 non-metazoan hits
        non_meta=non_arth[~non_arth.skingdoms.astype(str).str.contains('Metazoa')]
        non_meta=non_meta.iloc[0:500,:]
        slice_proteins_to_fasta(non_meta,f'outputs/phylogenetic_dataset/{ch}/{interv}/all_sequences.fa')

        ##add up to 500 non-arthropod metazoan hits 
        non_arth_meta=non_arth[(non_arth.skingdoms=='Metazoa')]
        non_arth_meta=non_arth_meta.iloc[0:500,:]
        slice_proteins_to_fasta(non_arth_meta,f'outputs/phylogenetic_dataset/{ch}/{interv}/all_sequences.fa')

    combined=pd.concat([sec,arth,non_arth_meta,non_meta])
    combined.to_csv(f'outputs/phylogenetic_dataset/{ch}/{interv}/combined_sequences_data.tsv',sep='\t')
    print(combined.shape[0])
    if combined.shape[0]<500:
        !sbatch scripts/align_iq_pipe.sh "$ch" "$interv"
    else:
        !sbatch scripts/align_iq_pipe_long.sh "$ch" "$interv"

In [24]:
##run phylogenetic inference on HGT intervals with hmmer intervals
td=[x for x in intervals if 'HGT' in x and x in hmmer_hits ]
with mp.Pool(40) as pool:
    results=pool.map(get_phylogen_dataset_hmmer,td)

In [None]:
##run phylogenetic inference on HGT intervals with blast intervals
td=[x for x in intervals if 'HGT' in x and x in blast_hits ]
with mp.Pool(40) as pool:
    results=pool.map(get_phylogen_dataset_blast,td)
    

In [6]:
## load manual inspection results for HGT trees (note--only metazoan intervals from HGT-confirmed trees were inspected)

hgt=pd.read_csv("Tree_manual_inspection_HGT.tsv",sep='\t',index_col=0)
hgt=hgt[hgt.Tree_annot=='Yes']
confirmed_hgt=[";".join(x.split(";")[0:2]) for x in hgt.index]

In [12]:
##run phylogenetic inference on Metazoan intervals from chimeras with confirmed HGT intervals
td=[x for x in intervals if 'Meta' in x and x.split(";")[1] in confirmed_hgt and x in hmmer_hits]
with mp.Pool(40) as pool:
    results=pool.map(get_phylogen_dataset_hmmer,td)

td=[x for x in intervals if 'Meta' in x and x.split(";")[1] in confirmed_hgt and x in blast_hits]
with mp.Pool(40) as pool:
    results=pool.map(get_phylogen_dataset_blast,td)