In [None]:
import anytree
from collections import defaultdict
from interval_search import doubling_search
from iterpop import iterpop as ip
import itertools as it
from hstrat import hstrat
from keyname import keyname as kn
from nbmetalog import nbmetalog as nbm
import opytional as opyt
import pandas as pd
import random
from slugify import slugify
from tqdm import tqdm


In [None]:
random.seed(1) # ensure reproducibility


In [None]:
nbm.print_metadata()


# Retrieve Target Phylogeny from OSF


In [None]:
url = 'https://osf.io/8ycq7/'
target_phylogeny_df = pd.read_csv(
    f'{url}/download',
)

nbm.print_dataframe_synopsis(target_phylogeny_df)

from urllib import request
html = request.urlopen(url).read().decode('utf8')

from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')

print(title.string)


# Create a Tree with Target Phylogeny Structure


In [None]:
# map id to anytree node
nodes = defaultdict(anytree.AnyNode)
for __, row in target_phylogeny_df.iterrows():
    if 'NONE' not in row['ancestor_list']:
        ancestor_id = ip.popsingleton(
            eval(row['ancestor_list'])
        )
        nodes[row['id']].id = row['id']
        nodes[row['id']].parent = nodes[ancestor_id]

roots = {node.root for node in nodes.values()}


In [None]:
root = ip.popsingleton(roots)
root.height


# Pick Parameters for Hereditary Stratigraphic Columns


In [None]:
def make_conditions(num_generations: int) -> pd.DataFrame:
    res = []
    for condemner_factory, target_column_bits, differentia_bit_width in it.product(
        [
            hstrat.StratumRetentionCondemnerTaperedDepthProportionalResolution,
            hstrat.StratumRetentionCondemnerRecencyProportionalResolution,
        ],
        [
            64,
            64 * 8,
            64 * 16,
        ],
        [
            1,
            8,
            64,
        ],
    ):
        policy_param = doubling_search(
            lambda x: \
                condemner_factory(x + 1).CalcNumStrataRetainedExact(num_generations)
                * differentia_bit_width > target_column_bits or x >= num_generations,
            1 if condemner_factory == hstrat.StratumRetentionCondemnerTaperedDepthProportionalResolution else 0,
        )

        actual_column_strata = condemner_factory(policy_param).CalcNumStrataRetainedExact(num_generations)
        actual_column_bits = actual_column_strata * differentia_bit_width

        res.append({
            'Retention Policy' : condemner_factory.__name__[25:],
            'Differentia Bit Width' : differentia_bit_width,
            'Retention Policy Resolution Parameter' : policy_param,
            'Target Retained Bits' : target_column_bits,
            'Actual Retained Bits' : actual_column_bits,
            'Retained Bits Error' : actual_column_bits - target_column_bits,
            'Actual Retained Strata' : actual_column_strata,
            'condemner' : condemner_factory(policy_param),
        })
    return pd.DataFrame.from_records(res)


In [None]:
conditions_df = make_conditions(root.height + 1)
conditions_df.drop('condemner', axis=1)


# Set Up Ancestor Column


In [None]:
bundle = hstrat.HereditaryStratigraphicColumnBundle({
    kn.pack({
        'differentia' : row['Differentia Bit Width'],
        'policy' : row['Retention Policy'],
        'resolution' : row['Retention Policy Resolution Parameter'],
        'target_bits' : row['Target Retained Bits'],
        'actual_bits' : row['Actual Retained Bits'],
        'bits_error' : row['Retained Bits Error'],
        'actual_strata' : row['Actual Retained Strata'],
    }) \
        : hstrat.HereditaryStratigraphicColumn(
            stratum_differentia_bit_width=row['Differentia Bit Width'],
            stratum_retention_condemner=row['condemner'],
    )
    for __, row in conditions_df.iterrows()
})


# Simulate Inheritance of Ancestor Column Down Phylogenetic Tree


In [None]:
root = ip.popsingleton(roots)
root.hstrat_column = bundle

for node in anytree.LevelOrderIter(ip.popsingleton(roots)):
    if node.parent is not None:
        node.hstrat_column = node.parent.hstrat_column.CloneDescendant()


In [None]:
for node in anytree.LevelOrderIter(ip.popsingleton(roots)):
    if node.parent is not None:
        node.hstrat_column = node.parent.hstrat_column.CloneDescendant()


# Extract Pairwise MRCA Estimates for Extant Organisms


In [None]:
res = []
for extant1, extant2 in tqdm([*it.product(root.leaves, root.leaves)]):
    if extant1 != extant2:
        bounds = extant1.hstrat_column.CalcRankOfMrcaBoundsWith(extant2.hstrat_column)
        for impl in extant1.hstrat_column:
            res.append({
                'Column Configuration' \
                    : impl,
                'Differentia Bit Width' \
                    : kn.unpack(impl)['differentia'],
                'Stratum Retention Policy' \
                    : kn.unpack(impl)['policy'],
                'Stratum Retention Policy Resolution Parameter' \
                    : kn.unpack(impl)['resolution'],
                'Stratigraphic Column Actual Retained Bits' \
                    : kn.unpack(impl)['actual_bits'],
                'Stratigraphic Column Target Retained Bits' \
                    : kn.unpack(impl)['target_bits'],
                'Stratigraphic Column Retained Bits Error' \
                    : kn.unpack(impl)['bits_error'],
                'Stratigraphic Column Actual Num Retained Strata' \
                    : kn.unpack(impl)['actual_strata'],
                'Taxon Compared From' \
                    : extant1.id,
                'Taxon Compared To' \
                    : extant2.id,
                'Generation of Taxon Compared From' \
                    : extant1.hstrat_column.GetNumStrataDeposited(),
                'Generation of Taxon Compared To' \
                    : extant2.hstrat_column.GetNumStrataDeposited(),
                'Generation Of MRCA Lower Bound (inclusive)' \
                    : opyt.apply_if(
                        bounds[impl],
                        lambda x: x[0],
                    ),
                'Generation Of MRCA Upper Bound (exclusive)' \
                    : opyt.apply_if(
                        bounds[impl],
                        lambda x: x[0],
                    ),
                'MRCA Bound Confidence' \
                    : extant1.hstrat_column[impl].CalcRankOfMrcaBoundsWithProvidedConfidenceLevel(),
                'Rank of Earliest Detectable Mrca With' \
                    : extant1.hstrat_column[impl].CalcRankOfEarliestDetectableMrcaWith(extant2.hstrat_column[impl]),
            })

res_df = pd.DataFrame.from_records(res)


In [None]:
res_df


# Save Pairwise MRCA Estimates to File


In [None]:
res_df.to_csv(
    f'a=pairwise_mrca_estimates+source={slugify(title.string)}.gz',
    compression='gzip',
)
