# Mutation Parsing

In [1]:
with open("./panman/build/panman/small.mutations") as file:
    data = file.read()

In [2]:
import pandas as pd

file_path = "./panman/build/panman/small.mutations"
mut_df = pd.read_csv(file_path, sep='\t')

mut_df.columns = ['type', 'node_name', 'mutations']


In [None]:
s = mut_df[(mut_df['node_name'] == 'node_96') & (mut_df['type'] == 'Substitutions:')]


In [None]:
s['mutations'].to_list()[0].split(">")[1:][0].strip()

JSONL 

# Panman to Taxonium Visualization

In [3]:
import panman_pb2
import lzma
from google.protobuf.json_format import MessageToJson
import treeswift
from dataclasses import dataclass
import datetime
import numpy as np

In [4]:
config = {}
file_path = "./panman/build/panman/small.panman"

In [5]:
# Parsing the panman pb file. 
xz_file_data = lzma.open(file_path, 'rb').read()

# Parse the serialized message
tree_group = panman_pb2.treeGroup()
tree_group.ParseFromString(xz_file_data)
panmats = []
for tree in tree_group.trees:
    panmats.append(tree)

In [6]:
# Tree data
newick_data = panmats[0].newick
nodes = panmats[0].nodes

tree = treeswift.read_tree(newick_data, schema="newick")

In [8]:
# @dataclass(eq=True, frozen=True)
# class NucMutation:  #hashable
#     nucPosition: int
#     mutInfo: int
#     type: str = "nt"

@dataclass(eq=True, frozen=True)
class NucMutation:  #hashable
    one_indexed_position: int
    par_nuc: str
    mut_nuc: str
    chromosome: str = "chrom"
    type: str = "nt"


def make_aa_object(i, aa_mutation):
    # Tuple format is gene, position, prev, next

    return {
        "gene": aa_mutation.gene,
        "previous_residue": aa_mutation.initial_aa,
        "residue_pos": aa_mutation.one_indexed_codon,
        "new_residue": aa_mutation.final_aa,
        "mutation_id": i,
        "nuc_for_codon": aa_mutation.nuc_for_codon,
        "type": "aa"
    }


def make_nuc_object(i, nuc_mut):
    return {
        "gene": "nt",
        "previous_residue": nuc_mut.par_nuc,
        "residue_pos": nuc_mut.one_indexed_position,
        "new_residue": nuc_mut.mut_nuc,
        "mutation_id": i,
        "type": "nt"
    }  

In [9]:
def preorder_traversal(node):
    yield node
    for clade in node.children:
        yield from preorder_traversal(clade)

# NUC_ENUM = "ACGT"

def convert_nuc_mutation(substitution):
        par_nuc = substitution[0]
        mut_nuc = substitution[-1]
        position = substitution[1:-1]
        
        new_mut = NucMutation(one_indexed_position=position,
                              par_nuc=par_nuc,
                              mut_nuc=mut_nuc)
        return new_mut


In [36]:
def preorder_traversal(node):
    yield node
    for clade in node.children:
        yield from preorder_traversal(clade)

def annotate_mutations():
    for i, node in enumerate(preorder_traversal(tree.root)):
        s = mut_df[(mut_df['node_name'] == node.label) & (mut_df['type'] == 'Substitutions:')]
        if s['mutations'].any():
            substitute_mutations = s['mutations'].to_list()[0].split(">")[1:]

            node.nuc_mutations = [
                convert_nuc_mutation(sub_mut.strip()) for sub_mut in substitute_mutations
                ]
                # for x in self.data.node_mutations[i].mutation

                # NucMutation(mutations.nucPosition, mutations.mutInfo)
                #  for node_mutations in nodes[i].mutations 
                #  for mutations in node_mutations.nucMutation
                # ]
        else:
            print(node)
            node.nuc_mutations = []

def assign_num_tips():
    for node in tree.traverse_postorder():
        for node in tree.traverse_postorder():
            if node.is_leaf():
                node.num_tips = 1
            else:
                node.num_tips = sum(child.num_tips for child in node.children)

def set_branch_lengths():
    for node in list(preorder_traversal(tree.root)):
        node.edge_length = len(node.nuc_mutations)

In [47]:
mut_df[(mut_df['node_name'] == "node_11")]

Unnamed: 0,type,node_name,mutations
164,Substitutions:,node_11,
165,Insertions:,node_11,
166,Deletions:,node_11,> 1- > 1- > 1- > 1- > 1- > 1- > 1- > 1- > 1- ...


In [37]:
annotate_mutations()
assign_num_tips()
set_branch_lengths()

node_1
node_2
node_3
node_4
node_8
node_9
node_24
node_22
node_10
node_26
node_5
node_7
node_19
node_17
node_6
node_15
node_14
node_11
node_15
node_17
node_45
node_16
node_41
node_12
node_14
node_36
node_13
node_34
node_18
node_26
node_30
node_85
node_83
node_31
node_79
node_27
node_29
node_74
node_19
node_23
node_64
node_24
node_62
node_60
node_20
node_22
node_57
node_55
node_21
node_52
node_33
node_34
node_92
node_36
node_94
node_37
node_99


In [15]:
def set_x_coords(root, chronumental_enabled):
    """ Set x coordinates for the tree"""
    root.x_dist = 0
    root.x_time = 0
    for node in root.traverse_preorder():
        if node.parent:
            node.x_dist = node.parent.x_dist + node.edge_length
            if chronumental_enabled:
                node.x_time = node.parent.x_time + node.time_length

def set_terminal_y_coords(root):
    for i, node in enumerate(root.traverse_leaves()):
        node.y = i
        node.y = i

def set_internal_y_coords(root):
    # Each node should be halfway between the min and max y of its children. 
    for node in root.traverse_postorder(leaves=False, internal=True):
        child_ys = [child.y for child in node.children]
        node.y = (min(child_ys) + max(child_ys))/2

def sort_on_y(tree):
    """
    """
    def return_y(node):
        return node.y
    
    nodes_sorted_by_y = sorted(tree.root.traverse_preorder(),
                               key=lambda x: return_y(x))
    return nodes_sorted_by_y


def get_node_object(node, node_to_index, metadata, input_to_index, columns, chronumental_enabled):
    object = {}
    object['name'] = node.label if node.label else ""
    
    # round to 5 dp
    object["x_dist"] = round(node.x_dist, 5)
    if chronumental_enabled:
        object["x_time"] = round(node.x_time, 5)
    object["y"] = node.y
    object["mutations"] = []
    if hasattr(node, 'aa_muts'):
        object['mutations'] += [
            input_to_index[my_input] for my_input in node.aa_muts]

    if hasattr(node, 'nuc_mutations'):
        object['mutations'] += [
            input_to_index[my_input] for my_input in node.nuc_mutations
        ]
    if node.is_leaf():
        object['is_tip'] = True
    else:
        object['is_tip'] = False
    
    try:
        my_dict = metadata[node.label]
        for key in my_dict:
            value = my_dict[key]

            # if value is pd.NaN then set to empty string
            if pd.isna(value):
                value = ""
            object['meta_'+key] = value
    except KeyError:
        for key in columns:
            object["meta_"+key] = ""
    
    object['parent_id'] = node_to_index[node.parent] if node.parent else node_to_index[node]
    object['node_id'] = node_to_index[node] # We don't strictly need this, but it doesn't add much to the space

    object['num_tips'] = node.num_tips

    if hasattr(node, 'clades'):
        object['clades'] = node.clades
    
    return object
    
def get_all_aa_muts(root):
    all_aa_muts = set()
    for node in list(root.traverse_preorder()):
        if hasattr(node, 'aa_muts'):
            all_aa_muts.update(node.aa_muts)
    return list(all_aa_muts)

def get_all_nuc_muts(root):
    all_nuc_muts = set()
    for node in list(root.traverse_preorder()):
        if node.nuc_mutations:
            # mut_df[mut_df['node_name']== node]
            # all_nuc_muts.add(tuple(node.nuc_mutations))
            all_nuc_muts.update(node.nuc_mutations)
    return list(all_nuc_muts)


In [16]:
tree.ladderize(ascending=False)
total_tips = tree.root.num_tips
total_tips

38

In [17]:
set_x_coords(tree.root, False)

In [18]:
set_terminal_y_coords(tree.root)
set_internal_y_coords(tree.root)

In [19]:
nodes_sorted_by_y = sort_on_y(tree)

In [20]:
all_aa_muts_objects = get_all_aa_muts(tree.root)

In [21]:
all_nuc_muts = get_all_nuc_muts(tree.root)

In [22]:
all_nuc_muts

[NucMutation(one_indexed_position='1', par_nuc='N', mut_nuc='T', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='27511', par_nuc='T', mut_nuc='C', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='1', par_nuc='A', mut_nuc='T', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='1', par_nuc='C', mut_nuc='T', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='22351', par_nuc='A', mut_nuc='C', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='1', par_nuc='T', mut_nuc='C', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='20596', par_nuc='T', mut_nuc='C', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='1', par_nuc='G', mut_nuc='T', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='1', par_nuc='A', mut_nuc='C', chromosome='chrom', type='nt'),
 NucMutation(one_indexed_position='9423', par_nuc='T', mut_nuc='C', chromosome='chrom', type='nt'),
 NucMutation(on

In [23]:
all_mut_inputs = all_aa_muts_objects + all_nuc_muts
all_mut_objects = [
    make_aa_object(i, input_thing)
        if input_thing.type == "aa" else make_nuc_object(i, input_thing)
        for i, input_thing in enumerate(all_mut_inputs)
    ]

In [30]:
for i in all_mut_objects:
    print(i)

{'gene': 'nt', 'previous_residue': 'N', 'residue_pos': '1', 'new_residue': 'T', 'mutation_id': 0, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'T', 'residue_pos': '27511', 'new_residue': 'C', 'mutation_id': 1, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'A', 'residue_pos': '1', 'new_residue': 'T', 'mutation_id': 2, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'C', 'residue_pos': '1', 'new_residue': 'T', 'mutation_id': 3, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'A', 'residue_pos': '22351', 'new_residue': 'C', 'mutation_id': 4, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'T', 'residue_pos': '1', 'new_residue': 'C', 'mutation_id': 5, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'T', 'residue_pos': '20596', 'new_residue': 'C', 'mutation_id': 6, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'G', 'residue_pos': '1', 'new_residue': 'T', 'mutation_id': 7, 'type': 'nt'}
{'gene': 'nt', 'previous_residue': 'A', 'residue_pos': '1', 'new_residue': 'C', 'mutation_id

In [24]:
input_to_index = {
        input_thing: i
        for i, input_thing in enumerate(all_mut_inputs)
    }
config['num_tips'] = total_tips


In [25]:
yyyymmdd = datetime.datetime.now().strftime("%Y-%m-%d")
config['date_created'] = yyyymmdd

In [26]:
first_json = {
        "version": 'dev',
        "mutations": all_mut_objects,
        "total_nodes": len(nodes_sorted_by_y),
        "config": config
    }

In [27]:
node_to_index = {node:i for i, node in enumerate(nodes_sorted_by_y)}

In [28]:
import orjson

import gzip 

output_file = "panman_taxonium_with_subs.jsonl.gz"
if "gz" in output_file:
    output_file = gzip.open(output_file, "wb")
else:
    output_file = open(output_file, 'wb')

output_file.write(orjson.dumps(first_json)+ b"\n") 

remove_after_pipe=False

for node in nodes_sorted_by_y:
    node_object = get_node_object(
        node, 
        node_to_index,
        {}, input_to_index,
        [], 
        chronumental_enabled=False)
    
    if remove_after_pipe and 'name' in node_object and node_object['name']:
        node_object['name'] = node_object['name'].split("|")[0]
    output_file.write(orjson.dumps(node_object) + b"\n")
output_file.close()

print("Done")



Done


# Read Panman-Taxonium

In [None]:
import json 

data = []
with gzip.open("panman_taxonium.jsonl.gz", 'rt', encoding='utf-8') as file:
    for line in file:
        json_obj = json.loads(line)
        data.append(json_obj)


In [None]:
data[0]