In [3]:
import ete3
import re
import itertools
import multiprocessing
import random
import pandas as pd
import numpy  as np
import igraph as ig
import pickle as pkl
from scipy.spatial.distance import squareform, pdist
from scipy.stats            import mannwhitneyu
from collections            import Counter

ncbi = ete3.NCBITaxa()
%cd /work/eggNOG/

/work/eggNOG


In [4]:
sampled_genomes = pd.read_csv('../kelsey/genomes.tab',
                              sep='\t',
                              index_col=0)

In [5]:
lineages = pd.DataFrame()
for taxid in sampled_genomes.species_taxid.unique():
    if pd.isna(taxid):
        continue
    lineages = lineages.append({tax_rank: tmp_taxid 
                                 for tmp_taxid, tax_rank in ncbi.get_rank(ncbi.get_lineage(taxid)).items()},
                                ignore_index=True)
lineages = lineages.reindex(columns=['class', 'family',  'genus', 'phylum',
                                     'order', 'species', 'superkingdom']).copy()
lineages = lineages.query('superkingdom == 2').copy()

In [6]:
working_groups  = pd.read_parquet('working_eggNOG_groups.parquet')
working_trees   = pd.read_parquet('working_eggNOG_trees.parquet' )
eggNOG_taxonomy = pd.read_parquet('eggNOG_taxonomy.parquet'      )

In [18]:
def get_pairwise_distances(group_id):
    
    tree = ete3.Tree(working_trees.loc[group_id, 'tree'])

    leaf_names = []
    for count, node in enumerate(tree.traverse()):
        if node.is_leaf():
            leaf_names.append(node.name)
        else:
            node.name = 'node_%i' % count
    leaf_names = np.array(leaf_names)

    nodes         = []
    children      = []
    branch_length = []
    for node in tree.traverse():
        if not node.is_leaf():
            for child in node.get_children():
                nodes.append(         node.name)
                children.append(     child.name)
                branch_length.append(child.dist)

    branch_length_df                  = pd.DataFrame()
    branch_length_df['node']          = nodes
    branch_length_df['child']         = children
    branch_length_df['branch_length'] = branch_length

    dag  = ig.Graph.TupleList(edges=branch_length_df[['node', 
                                                      'child', 
                                                      'branch_length']].itertuples(index=False), 
                                directed=False, 
                                weights=True)
    
    dist_matrix = pd.DataFrame(index  =leaf_names, 
                               columns=leaf_names, 
                               data   =np.array(dag.shortest_paths(source=leaf_names, 
                                                                   target=leaf_names, 
                                                                   weights='weight'))
                              )
    return(dist_matrix)

In [14]:
def create_taxa_graph(dist_matrix, phyla):
    triu_indices       = np.triu_indices_from(dist_matrix, k=1)
    
    edge_list                 = pd.DataFrame()
    edge_list['phylum1']      = phyla[triu_indices[0]]
    edge_list['phylum2']      = phyla[triu_indices[1]]
    edge_list['sequence1']    = dist_matrix.index[triu_indices[0]]
    edge_list['sequence2']    = dist_matrix.index[triu_indices[1]]
    edge_list['distance']     = dist_matrix.values[triu_indices]
    edge_list['inverse_dist'] = np.e**np.negative(edge_list.distance)

    graph  = ig.Graph.TupleList(edges=edge_list[['sequence1', 
                                                 'sequence2', 
                                                 'inverse_dist']].itertuples(index=False), 
                                directed=False, 
                                weights =True)
    
    return(edge_list, graph)

In [22]:
def assess_cluster(reference_phylum, minimal_freq_phyla, cluster_edges, cluster_nodes):
    cluster_dists = pd.DataFrame(columns=['phylum', 'median', 'distances'])

    for phylum1, phylum2 in itertools.combinations(minimal_freq_phyla, 2):
        if   phylum1 == reference_phylum:
            phylum = phylum2
        elif phylum2 == reference_phylum:
            phylum = phylum1
        else:
            continue

        inter_phyla = cluster_edges.loc[((cluster_edges.phylum1==phylum1)&(cluster_edges.phylum2==phylum2))|\
                                        ((cluster_edges.phylum2==phylum1)&(cluster_edges.phylum1==phylum2))]
        indices     = np.unique(inter_phyla[['sequence1', 'sequence2']])
        adjacencies = pd.DataFrame(data=0.0, index=indices, columns=indices)

        indexer     = adjacencies.index.get_indexer

        adjacencies.values[indexer(inter_phyla.sequence1), indexer(inter_phyla.sequence2)]  = inter_phyla.distance.values
        adjacencies.values[indexer(inter_phyla.sequence2), indexer(inter_phyla.sequence1)] += inter_phyla.distance.values

        tmp_closest_to_phylum = adjacencies.loc[cluster_nodes.loc[cluster_nodes.phylum==reference_phylum,   'name'],
                                                cluster_nodes.loc[cluster_nodes.phylum==phylum, 'name']].sum()
        tmp_closest_to_phylum.sort_values(inplace=True)
        tmp_closest_to_phylum = tmp_closest_to_phylum.index[:5]

        try:
            distances_to_reference_phylum = adjacencies.loc[cluster_nodes.loc[cluster_nodes.phylum==reference_phylum,   'name'],
                                                            tmp_closest_to_phylum].values.flatten()
        except IndexError:
            continue        

        cluster_dists = cluster_dists.append(pd.Series(data =[phylum, 
                                                              np.median(distances_to_reference_phylum), 
                                                              distances_to_reference_phylum], 
                                                       index=['phylum', 'median', 'distances']),
                                             ignore_index=True)
    return(cluster_dists)

In [57]:
def get_phyla_evol_distances(group_id):    
    dist_matrix = get_pairwise_distances(group_id)

    taxids = [int(leaf.split('.')[0]) for leaf in dist_matrix.index]
    phyla  = eggNOG_taxonomy.loc[taxids, 'phylum'].values.astype(int)

    edge_list, graph  = create_taxa_graph(dist_matrix, phyla)

    random.seed(12345)
    clusters = graph.community_multilevel(weights='weight')

    node_data = pd.DataFrame(columns=['name', 'phylum', 'cluster'],
                             data   =zip(dist_matrix.index, 
                                         phyla, 
                                         clusters.membership)
                            )
    
    cluster_evol_relations = {}
    target_phyla = {1090, 1117, 1224, 200795, 976, 1134404, 1798710}

    for cluster_num in set(clusters.membership):
        
        cluster_nodes      = node_data[node_data.cluster==cluster_num]
        minimal_freq_phyla = [phylum for phylum, frequency in Counter(cluster_nodes.phylum).items() if frequency>=5 \
                                                                                                    and phylum > 0]
       
        if len( target_phyla.intersection( minimal_freq_phyla ) ) < 2:
            continue
        
        cluster_evol_relations[cluster_num] = {}
        
        cluster_edges = edge_list.loc[(edge_list.sequence1.isin(cluster_nodes.name))&
                                      (edge_list.sequence2.isin(cluster_nodes.name)),
                                      ['phylum1', 'phylum2', 'sequence1', 'sequence2', 'distance']]

        cluster_edges      = cluster_edges[(cluster_edges.phylum1.isin(minimal_freq_phyla)) &\
                                           (cluster_edges.phylum2.isin(minimal_freq_phyla))]
        normalizer         = np.median(cluster_edges.distance)
        cluster_edges      = cluster_edges[cluster_edges.phylum1 != cluster_edges.phylum2] 

        #
        #
        #
        for ref_phylum in target_phyla.intersection(minimal_freq_phyla):
            cluster_dists = assess_cluster(ref_phylum, 
                                           minimal_freq_phyla, 
                                           cluster_edges,
                                           cluster_nodes)

            cluster_dists.sort_values('median', inplace=True)
            cluster_evol_relations[cluster_num][ref_phylum] = {'df':cluster_dists[['phylum', 'median']].copy(),
                                                   'significant':False}
            if not cluster_dists.shape[0]:
                continue

            cluster_evol_relations[cluster_num][ref_phylum]['df']['median']   /= normalizer
            if cluster_dists.shape[0] == 1:
                cluster_evol_relations[cluster_num][ref_phylum]['significant'] = True
                continue

            try:
                hypothesis = mannwhitneyu(cluster_dists.iloc[0, 2], 
                                          cluster_dists.iloc[1, 2], 
                                          alternative='less')
            except ValueError:
                continue
            else:
                effect_size = hypothesis.statistic / (len(cluster_dists.iloc[0, 2])*len(cluster_dists.iloc[1, 2]))

                if hypothesis.pvalue < 0.01 and effect_size < 0.2:
                    cluster_evol_relations[cluster_num][ref_phylum]['significant'] = True
    
    return(group_id, cluster_evol_relations)

In [25]:
# %%time
# get_phyla_evol_distances('COG0499')

CPU times: user 11.4 s, sys: 928 ms, total: 12.3 s
Wall time: 12 s


In [60]:
%%time
pool    = multiprocessing.Pool(processes=10, maxtasksperchild=5)
results = pool.map_async(get_phyla_evol_distances, working_groups.group_id.values)
pool.close()
pool.join()

CPU times: user 42.7 ms, sys: 57.9 ms, total: 101 ms
Wall time: 1.55 s


In [63]:
with open('all_results.pkl', 'wb') as out:
    pkl.dump(results.get(), out)
del(results)