# 5. MSA cleanup and collapse by organism, Li et al.
___
Dr. Raffael lab <br>
2024

In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import multiprocessing as mp

from Bio.Align import MultipleSeqAlignment
from Bio import SeqIO, AlignIO, Seq, SeqRecord

## 1. Functions

In [11]:
def create_consensus_seq(msa, enzyme, organism, growth_temp):
    """
    Create consensus sequence for a set of sequences. In this instance,
    we calculate a consensus sequence from an MSA of sequences for a 
    given enzyme and corresponding to a single organism.
    
    Arguments:
        (Bio.Aling.MultipleSeqAlignment) msa:
        (str) enzyme: enzyme name
        (str) organism: organism corresponding to the supplied MSA
        (float) growth_temp: organismal growth temp from Li et al.
    
    Returns:
        (Bio.SeqRecord) organism consensus sequence
    """
    np.random.seed(0)
    consensus_list = []
    for index in range(msa.get_alignment_length()):
        # If the residues are of equal abundance 
        unique_res, counts = np.unique(list(msa[:,index]), return_counts=True)
        if all([count==1 for count in counts]):
            rep_res = np.random.choice(unique_res)
        else:
            rep_res = unique_res[np.argmax(counts)]
        if rep_res == '-': #Gap is most common ident
            # get the next most common residue
            sorted_residue_idents = unique_res[np.argsort(counts)]
            if len(sorted_residue_idents) == 1: 
                # All idents gapped, it's really gapped (will catch in logistic regression)
                consensus_list.append('-')
            else:
                # Get most common non-gap ident in that case
                consensus_list.append(sorted_residue_idents[1])
        else:
            consensus_list.append(rep_res)
    consensus_str = ''.join(consensus_list)
    consensus_record = SeqRecord.SeqRecord(Seq.Seq(consensus_str), id = '', name = '', 
                                           description = '{}|{}|{}'.format(enzyme, organism, growth_temp))
    return consensus_record


def create_consensus_recs(organism_msas_dict, enzyme):
    """
    Create consensus sequence for a set of sequences. In this instance,
    we calculate a consensus sequence from an MSA of sequences for a 
    given enzyme and corresponding to a single organism.
    
    Arguments:
        (dict) organism_msas_dict: dict mapping form organism name
            to a Bio.Aling.MultipleSeqAlignment corresponding
            to NR DB seqs from that organism. NOTE: these seqs are not 
            necessarily unique because although they're from NR.gz, many subspecies
            share the same sequence but have different accessions.
        (str) enzyme: enzyme name
    
    Returns:
        (list) a list of per-organism consensus sequences
            (list of Bio.SeqRecord objects)
    """
    return [create_consensus_seq(msa, enzyme, org_temp[0], org_temp[1]) 
            for org_temp, msa in organism_msas_dict.items()]


def create_msas_by_organism(fasta_handle, aln_score_thresh):
    """
    Create consensus MSAs for each organism by collapsing the
    per-organism MSA each each position into the most frequent
    residues. Gather each consensus sequence into a consensus MSA
    AND FILTER THE SEQUENCES BY THE PASSED ALIGNMENT SCORE THRESHOLD.
    
    Arguments:
        (pathlib.Path) fasta_handle:
        (float) aln_score_thresh:
    
    Returns:
        (list) a list of Bio.SeqRecord objects (an MSA) containing
            per-organism consensus sequences
    """
    retained_records = {}
    dumped_recs = []

    for rec in SeqIO.parse(fasta_handle, 'fasta'):
        desc = rec.description.split('|')
        organism = desc[3]
        growth_temp = float(desc[4])
        aln_score = float(desc[5])
        if aln_score > aln_score_thresh:
           # if organism in retained_records.keys():
            if (organism,growth_temp) in retained_records.keys(): #It should be this instead of above line    
                retained_records[(organism, growth_temp)].append(rec)
            else:
                retained_records[(organism, growth_temp)] = [rec]
        else:
            dumped_recs.append(rec)
    return retained_records


def create_export_consensus_aln(fasta_handle, out_root):
    """
    Generates a pseudo alignment of consensus sequences generated
    per organism from an MSA (derived from pairwise alignments)

    Arguments:
        (str) fasta_handle: MSA path
        (str) out_root: root folder to which write consensus pseudo alignments
    
    Returns:
        None
    """
    with open(fasta_handle, 'r') as msa:
        reference_header = msa.readline().strip()[1:]
        reference_seq = msa.readline().strip()[1:]
    
    aln_threshold = float(reference_header.split('|')[-1])
    fasta_path = Path(fasta_handle)
    enzyme_name = fasta_path.stem
    
    retained_records = create_msas_by_organism(fasta_path, aln_threshold)
    organism_msas = {organism:MultipleSeqAlignment(recs) for organism, recs in retained_records.items()}
    consensus_recs = create_consensus_recs(organism_msas, enzyme_name)
    
    out_path = Path(out_root).joinpath('{}.consensus.aln'.format(enzyme_name))
    with open(out_path, 'a+') as consensus_output:
        consensus_output.write(';{}\n;{}\n'.format(reference_header, reference_seq))
        SeqIO.write(consensus_recs, consensus_output,'fasta')


def create_export_consensus_aln_chunked(fasta_handle_list, out_root):
    """
    Wrapper to generate consensus alignments in groups. Needed
    for following progress while parallelizing
    
    Arguments:
        (list) fasta_handle_list: list of paths corresponding to per-enzyme
            MSAs
        (str) out_root: folder in which to export consensus alignments
        
    Returns:
        None
    
    """
    for handle in fasta_handle_list:
        create_export_consensus_aln(handle, out_root)
        

def msa_to_csv(msa_handle, out_root):
    """
    Parses a multiple sequence alignment of consensus sequences into a long-form
    CSV with each entry (row) a single position and organism, with growth temperature.
    
    Arguments:
        (str | pathlib.Path) msa_handle: path of consensus MSA fasta to convert
            to long CSV form
        (str) out_root: folder in which to export consensus alignments
    
    Returns:
        None
    
    """
    consensus_msa = AlignIO.read(msa_handle, 'fasta')

    header_ids = ('Enzyme_Name', 'Organism', 'Temperature')

    msa_range = range(1, consensus_msa.get_alignment_length()+1)
    rec_df = pd.DataFrame([{
            **dict(zip(header_ids, rec.description.strip().split('|'))), 
            **dict(zip(msa_range, list(rec.seq)))} 
            for rec in consensus_msa]).melt(id_vars = header_ids, value_name = 'Consensus_Residue',
                                            value_vars = msa_range, var_name = 'Position')

#     out_path = out_root.joinpath(msa_handle.stem + '_summary.csv.bz2')#for multiple enzymes use this
    out_path = out_root.joinpath(msa_handle.stem + '_summary.csv') # for single enzyme use this
    rec_df.to_csv(out_path)

## 2. Parallelized consensus generation (~75 min)

### Setup

In [3]:
msa_root = Path('Lietal_PairwiseAlns/')
out_root = 'Lietal_ConsensusAlns/'

msas = np.array([str(p) for p in msa_root.iterdir() if not '.ipynb_checkpoints' in str(p)])
msa_sizes = [p.stat().st_size for p in msa_root.iterdir() if not '.ipynb_checkpoints' in str(p)]
sorted_msas = msas[np.argsort(msa_sizes)]

### Execute

In [4]:
# numchunks = 80
# for msa_handle_list in tqdm(np.array_split(sorted_msas, numchunks), total = numchunks, 
#                             desc = 'Generating consensus alignments'):

#     numthreads = 24
#     pool = mp.Pool(numthreads)
#     results = []

#     result_objects = [pool.apply_async(create_export_consensus_aln_chunked, args=(fasta_handle_list, out_root)) 
#                       for fasta_handle_list in np.array_split(msa_handle_list, numthreads)
#                      ]

#     pool.close()
#     pool.join()

Generating consensus alignments: 100%|██████████| 80/80 [1:15:51<00:00, 56.90s/it] 


In [4]:
#For multiple enzymes use above code


result_objects = [create_export_consensus_aln_chunked(sorted_msas, out_root)]


## 3. Generate "long" per enzyme CSV representations, for logistic regression (~15min)

### Setup

In [3]:
# msa_root = Path('Lietal_ConsensusAlns/')

# out_root = Path('Lietal_PerPositionSummaries')


# MAX_THREADS = 24

# # Obtain msa handles and sort by size to make parallelization more performant
# all_msas = np.array([Path(msa_handle) for msa_handle in msa_root.iterdir() if not str(msa_handle.stem).startswith('.')])
# all_msas_sorted = all_msas[np.argsort([Path(msa).stat().st_size for msa in all_msas])]

# msa_chunks = np.array_split(all_msas_sorted, (len(all_msas_sorted)//MAX_THREADS)+1)
# print('Processing {} MSAs in {} Chunks'.format(sum([len(c) for c in msa_chunks]), len(msa_chunks)))

Processing 2194 MSAs in 92 Chunks


In [5]:
#For multiple enzymes run above cell for setup

msa_root = Path('Lietal_ConsensusAlns/')

out_root = Path('Lietal_PerPositionSummaries')



# Obtain msa handles and sort by size to make parallelization more performant
all_msas = np.array([Path(msa_handle) for msa_handle in msa_root.iterdir() if not str(msa_handle.stem).startswith('.')])
all_msas_sorted = all_msas[np.argsort([Path(msa).stat().st_size for msa in all_msas])]

# msa_chunks = np.array_split(all_msas_sorted, (len(all_msas_sorted)//MAX_THREADS)+1)
# print('Processing {} MSAs in {} Chunks'.format(sum([len(c) for c in msa_chunks]), len(msa_chunks)))
print(all_msas,all_msas_sorted)

[PosixPath('Lietal_ConsensusAlns/steroid_DELTA-isomerase.consensus.aln')] [PosixPath('Lietal_ConsensusAlns/steroid_DELTA-isomerase.consensus.aln')]


### Execute

In [4]:
# for msa_handle_list in tqdm(msa_chunks, total = len(msa_chunks), 
#                             desc = 'Generating CSV representations'):

#     numthreads = len(msa_handle_list)
#     pool = mp.Pool(numthreads)

#     result_objects = [pool.apply_async(msa_to_csv, args=(msa_handle, out_root)) 
#                       for msa_handle in msa_handle_list]

#     pool.close()
#     pool.join()

Generating CSV representations: 100%|██████████| 92/92 [15:36<00:00, 10.18s/it]


In [12]:
#For multiple enzymes run above cell

result_objects = [msa_to_csv(i, out_root) for i in all_msas_sorted]
