In [None]:
import boto3
import botocore
import functools
from IPython.core.display import display, HTML
from iterdub import iterdub as ib
from iterpop import iterpop as ip
import itertools as it
import json
import matplotlib
import matplotlib.pyplot as plt
import math
import numpy as np
import pandas as pd
from pandas.util import hash_pandas_object
import seaborn as sns
from teeplot import teeplot as tp


In [None]:
from dishpylib.pyanalysis import calc_loglikelihoods_by_num_sets
from dishpylib.pyanalysis import count_hands_with_k_or_more_sets
from dishpylib.pyanalysis import count_hands_without_k_or_more_sets
from dishpylib.pyanalysis import estimate_interpolation_complexity
from dishpylib.pyanalysis import calc_loglikelihoods_over_set_sizes
from dishpylib.pyhelpers import get_control_t_distns
from dishpylib.pyhelpers import get_env_context
from dishpylib.pyhelpers import get_git_revision_hash
from dishpylib.pyhelpers import make_timestamp
from dishpylib.pyhelpers import NumpyEncoder
from dishpylib.pyhelpers import preprocess_competition_fitnesses
from dishpylib.pyhelpers import print_runtime


In [None]:
print_runtime()


In [None]:
teeplot_subdir = "2025-09-05-interpolation_complexity"


# get data


In [None]:
s3_handle = boto3.resource(
    's3',
    region_name="us-east-2",
    config=botocore.config.Config(
        signature_version=botocore.UNSIGNED,
    ),
)
bucket_handle = s3_handle.Bucket('prq49')

dfs = []
for stint in range(0, 101, 20):
    series_profiles, = bucket_handle.objects.filter(
        Prefix=f'endeavor=16/noncritical-phenotypeneutral-nopinterpolation-competitions/stage=6+what=collated/stint={stint}',
    )
    control_fits_df = get_control_t_distns('prq49', 16, stint)
    df = pd.read_csv(
        f's3://prq49/{series_profiles.key}',
        compression='xz',
    )
    df["Stint"] = stint
    dfdigest = '{:x}'.format( hash_pandas_object( df ).sum() )
    print(dfdigest)
    df = preprocess_competition_fitnesses(df, control_fits_df)
    dfs.append(df)


In [None]:
df = pd.concat(dfs, ignore_index=True)


# summarize data and model fitting


In [None]:
def log_lineplot(*args, **kwargs):
    sns.lineplot(*args, **kwargs)
    plt.yscale('log')
    plt.autoscale()

def lineplot_scatterplot(*args, **kwargs):
    sns.lineplot(
        *args,
        **{k : v for k, v in kwargs.items() if k != 'hue'},
        color='gray',
        zorder=1,
    )
    data = kwargs.pop("data")
    sns.scatterplot(
        *args,
        data=data[
            data[kwargs["hue"]] == "Significantly Deleterious"
        ],
        **kwargs,
        palette={
            'Significantly Advantageous' : sns.color_palette()[2],
            'Neutral' : sns.color_palette()[0],
            'Significantly Deleterious' : sns.color_palette()[1],
        },
        zorder=2,
        alpha=0.7,
    )
    sns.scatterplot(
        *args,
        data=data[
            data[kwargs["hue"]] != "Significantly Deleterious"
        ],
        **kwargs,
        palette={
            'Significantly Advantageous' : sns.color_palette()[2],
            'Neutral' : sns.color_palette()[0],
            'Significantly Deleterious' : sns.color_palette()[1],
        },
        zorder=2,
        alpha=0.7,
    )
    print(
       data["Relative Fitness"].value_counts().to_dict()
    )

    plt.legend(
        handles=[
            matplotlib.patches.Patch(
                color=sns.color_palette()[2],
                label='Significantly Advantageous',
            ),
            matplotlib.patches.Patch(
                color=sns.color_palette()[0],
                label='Neutral',
            ),
            matplotlib.patches.Patch(
                color=sns.color_palette()[1],
                label='Significantly Deleterious',
            ),
        ],
    )
    plt.gca().figure.set_size_inches(3.5, 2.5)
    sns.move_legend(
        plt.gca(), "lower center",
        bbox_to_anchor=(.6, 1), ncol=1, title=None, frameon=False,
    )
    annotation = str(
        data["Relative Fitness"].value_counts().to_dict()
    ).replace(",", "\n")
    plt.gca().annotate(
        annotation,
        xy=(1.1, 0.5), xycoords="axes fraction",
        ha="center", va="center", fontsize=8,
        rotation=90
    )


In [None]:
# display(HTML("<style>div.output_scroll { height: 1000em; }</style>"))

for stint in df['Stint'].unique():

    dfx = df[df['Stint'] == stint].copy()
    dfx["Is Deleterious"] = (dfx["Relative Fitness"] == "Significantly Deleterious").astype(float)

    display(HTML(f'<h1>stint {stint}</h1>'))


    display(HTML('<h2>interpolation competition</h2>'))
    tp.tee(
        lineplot_scatterplot,
        x='genome nop_interpolation_num_nopped',
        y='Fitness Differential',
        hue='Relative Fitness',
        data=dfx.dropna(subset=['genome nop_interpolation_num_nopped']),
        teeplot_subdir=teeplot_subdir,
        teeplot_outattrs={
            'stint' : str(stint),
            'bucket' : ib.dub( dfx['Treatment bucket'] ),
            'endeavor' : ib.dub( dfx['Competition Endeavor'] ),
        },
    )
    plt.show()

    with tp.teed(
        sns.regplot,
        data=dfx.dropna(subset=['genome nop_interpolation_num_nopped']),
        x='genome nop_interpolation_num_nopped',
        y='Fitness Differential',
        line_kws={"color": "black"},
        teeplot_subdir=teeplot_subdir,
    ):
        plt.gca().figure.set_size_inches(4, 3)

    plt.show()

    with tp.teed(
        sns.regplot,
        data=dfx.dropna(subset=['genome nop_interpolation_num_nopped']),
        x='genome nop_interpolation_num_nopped',
        y='Is Deleterious',
        logistic=True,
        line_kws={"color": "black"},
        teeplot_outattrs={
            'stint': str(stint),
            'bucket': ib.dub(dfx['Treatment bucket']),
            'endeavor': ib.dub(dfx['Competition Endeavor']),
        },
        teeplot_subdir=teeplot_subdir,
    ):
        plt.gca().figure.set_size_inches(4, 3)

    plt.show()

    display(HTML('<h2>model fit results</h2>'))
    dfxp = dfx.dropna(subset=['genome nop_interpolation_num_nopped', 'Is Deleterious'])


    # print genome size and max non-deleterious genome nop
    print(
        dfxp.loc[
            ~dfxp['Is Deleterious'].astype(bool),
            'genome nop_interpolation_num_nopped',
        ].max()
    )

    display(HTML('<h2>num_sets fitting</h2>'))
    def tee(plotter):
        tp.tee(
            plotter,
            x='num_sets',
            y='likelihood',
            hue='set_size',
            data=calc_loglikelihoods_over_set_sizes(
                series=16005,
                interpolation_competitions_df=dfx,
            ).astype({
                # so seaborn will color as categorical, not quantitative
                'set_size': 'str',
            }),
            teeplot_subdir=teeplot_subdir,
            teeplot_outattrs={
                'stint': str(stint),
                'bucket': ib.dub(dfx['Treatment bucket']),
                'endeavor': ib.dub(dfx['Competition Endeavor']),
            },
        )

    try:
        tee(log_lineplot)
    except ValueError:
        plt.clf()
        tee(sns.lineplot)
    plt.show()

    display(HTML('<h2>model fit results</h2>'))
    # print(json.dumps(
    #     estimate_interpolation_complexity(
    #         series=16005,
    #         interpolation_competitions_df=dfx,
    #     ),
    #     sort_keys=True,
    #     indent=4,
    #     cls=NumpyEncoder,
    # ))


In [None]:
# display(HTML("<style>div.output_scroll { height: 1000em; }</style>"))
def plot_logistic_fit():
    xlim = None
    cmap = sns.color_palette("crest", as_cmap=True)
    hues = [
        cmap(i) for i in np.linspace(0, 1, len(df['Stint'].unique()))
    ]
    for stint, hue in zip(df['Stint'].unique(), hues):

        dfx = df[df['Stint'] == stint].copy()
        dfx["Is Deleterious"] = (dfx["Relative Fitness"] == "Significantly Deleterious").astype(float)

        max_neutral = dfx.loc[
            ~dfx['Is Deleterious'].astype(bool),
            'genome nop_interpolation_num_nopped',
        ].max()
        print(max_neutral, dfx['genome nop_interpolation_num_nopped'].max())
        print(
            dfx.loc[
                dfx["genome nop_interpolation_num_nopped"] > max_neutral,
                "genome nop_interpolation_num_nopped",
            ].tolist()
        )


        display(HTML(f'<h1>stint {stint}</h1>'))


        display(HTML('<h2>interpolation competition</h2>'))
        data = dfx.dropna(subset=['genome nop_interpolation_num_nopped'])
        data = data[data['genome nop_interpolation_num_nopped'] <= 100]
        ax = sns.regplot(
            data=data,
            x='genome nop_interpolation_num_nopped',
            y='Is Deleterious',
            logistic=True,
            scatter=False,
            color=hue,
        )
        if xlim is None:
            xlim = ax.get_xlim()

    ax.set_xlim(0, 100)
    ax.figure.set_size_inches(1.5, 2.3)
    ax.set_ylim(0, 1)
    ax.legend(
        handles=[
            plt.Line2D([0], [0], color=hue, lw=4)
            for hue in hues
        ],
        labels=[f"s. {stint}" for stint in df['Stint'].unique()],
        # bbox_to_anchor=(1.05, 1),
        loc='lower right',
        fontsize='small',
        ncol=2,
        handlelength=0.3,
        columnspacing=0.7,
        frameon=False,
    )
    ax.set_ylabel("$p$ Deleterious")
    ax.set_xlabel("Num Sites")
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)


tp.tee(
    plot_logistic_fit,
    teeplot_subdir=teeplot_subdir,
    teeplot_outattrs={
        'bucket' : ib.dub( df['Treatment bucket'] ),
        'endeavor' : ib.dub( df['Competition Endeavor'] ),
    },
)


In [None]:
from typing import Iterable
from math import prod
from scipy.stats import hypergeom

def min_skeleton_size(
    genome_size: int,
    knockout_sizes: Iterable[int],
    confidence: float = 0.95,
) -> int:
    """
    Return the minimum skeleton_size s such that
        Π_i (1 - P_i_success(s)) >= 1 - confidence,
    where P_i_success(s) is the probability that knockout i (size k_i) samples
    ALL of the s skeleton sites from a genome of size n.

    Success probability for one knockout of size k:
        P_success = hypergeom.pmf(s; M=n, n=s, N=k)
                   = C(n - s, k - s) / C(n, k) if k >= s else 0
    Failure probability = 1 - P_success.
    """
    n = int(genome_size)
    ks = [n - int(k) for k in knockout_sizes]

    if n < 0 or any(k < 0 or k > n for k in ks):
        raise ValueError("All knockout_sizes must be integers in [0, genome_size].")
    if not (0.0 < confidence < 1.0):
        raise ValueError("confidence must be in (0, 1).")

    alpha = 1.0 - confidence

    # If any knockout sampled the entire genome, success would be certain for any s <= n.
    # Observing all failures would then have probability 0 under this model.
    if any(k == n for k in ks) and n > 0:
        raise ValueError(
            "At least one knockout_size equals genome_size; observing all failures "
            "has probability 0 for any skeleton size."
        )

    def all_fail_prob(s: int) -> float:
        """Plain-space product Π_i (1 - hypergeom.pmf(s; M=n, n=s, N=k_i))."""
        terms = []
        for k in ks:
            if k < s:
                # Can't include all s skeleton sites when drawing fewer than s sites -> success prob 0
                terms.append(1.0)
            else:
                p_success = hypergeom.pmf(s, M=n, n=s, N=k)
                terms.append(1.0 - float(p_success))
        return prod(terms) if terms else 1.0

    # Monotone in s: as s increases, success gets harder, so all-fail probability increases.
    lo, hi = 0, n

    # Early-out
    if all_fail_prob(0) >= alpha:
        return 0

    while lo < hi:
        mid = (lo + hi) // 2
        if all_fail_prob(mid) >= alpha:
            hi = mid
        else:
            lo = mid + 1
    return lo

# ---- Example ----
# n = 10_000
# ks = [300, 450, 200, 500]
# print(min_skeleton_size(n, ks, confidence=0.95))


In [None]:
# display(HTML("<style>div.output_scroll { height: 1000em; }</style>"))
xlim = None
cmap = sns.color_palette("crest", as_cmap=True)
hues = [
    cmap(i) for i in np.linspace(0, 1, len(df['Stint'].unique()))
]
stints = []
genome_sizes = []
skeleton_lbs = []
skeleton_ubs = []
for stint, hue in zip(df['Stint'].unique(), hues):
    stints.append(stint)
    dfx = df[df['Stint'] == stint].copy()
    dfx["Is Deleterious"] = (dfx["Relative Fitness"] == "Significantly Deleterious").astype(float)

    max_neutral = dfx.loc[
        ~dfx['Is Deleterious'].astype(bool),
        'genome nop_interpolation_num_nopped',
    ].max()
    genome_sizes.append(
        dfx['genome nop_interpolation_num_nopped'].max(),
    )
    skeleton_ubs.append(
        genome_sizes[-1] - max_neutral,
    )
    past_neutral = dfx.loc[
        dfx["genome nop_interpolation_num_nopped"] > max_neutral,
        "genome nop_interpolation_num_nopped",
    ].tolist()
    skeleton_lbs.append(
        min_skeleton_size(
            dfx['genome nop_interpolation_num_nopped'].max(),
            past_neutral,
            confidence=0.5
        ),
    )

print(stints)
print(genome_sizes)
print(skeleton_lbs)
print(skeleton_ubs)
res_df = pd.DataFrame({
    "Stint": stints,
    "Genome Size": genome_sizes,
    "Skeleton LB": skeleton_lbs,
    "Skeleton UB": skeleton_ubs
})
res_df


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def plot_skeleton():
    fig, axes = plt.subplots(1, 2, figsize=(3.2, 2.5), sharex=True, sharey=False)

    # Common y-ticks
    yticks = [0, 20, 40, 60, 80, 100]

    # --- First plot: full height ---
    sns.lineplot(
        data=res_df,
        x="Stint",
        y="Skeleton LB",
        color="blue",
        marker="o",
        ax=axes[0]
    )
    sns.lineplot(
        data=res_df,
        x="Stint",
        y="Skeleton UB",
        color="blue",
        marker="o",
        ax=axes[0]
    )
    sns.lineplot(
        data=res_df,
        x="Stint",
        y="Genome Size",
        color="green",
        marker="o",
        ax=axes[0]
    )
    axes[0].fill_between(
        res_df["Stint"],
        res_df["Skeleton LB"],
        res_df["Skeleton UB"],
        color="blue",
        alpha=0.2
    )
    axes[0].set_xticks(yticks)
    axes[0].set_xticklabels(yticks, rotation=60)
    axes[0].set_ylabel("Num Sites")
    axes[0].spines["top"].set_visible(False)
    axes[0].spines["right"].set_visible(False)
    # --- Second plot: restricted y-range 0–20 ---
    sns.lineplot(
        data=res_df,
        x="Stint",
        y="Skeleton LB",
        color="blue",
        marker="o",
        ax=axes[1]
    )
    sns.lineplot(
        data=res_df,
        x="Stint",
        y="Skeleton UB",
        color="blue",
        marker="o",
        ax=axes[1]
    )
    sns.lineplot(
        data=res_df,
        x="Stint",
        y="Genome Size",
        color="green",
        marker="o",
        ax=axes[1]
    )
    axes[1].fill_between(
        res_df["Stint"],
        res_df["Skeleton LB"],
        res_df["Skeleton UB"],
        color="blue",
        alpha=0.2
    )
    axes[1].set_yscale("log")
    axes[1].set_xticks(yticks)
    axes[1].set_xticklabels(yticks, rotation=60)
    axes[1].set_ylabel("")
    axes[1].spines["top"].set_visible(False)
    axes[1].spines["right"].set_visible(False)
    plt.tight_layout()

tp.tee(
    plot_skeleton,
    teeplot_subdir=teeplot_subdir,
    teeplot_outattrs={
        'bucket' : ib.dub( df['Treatment bucket'] ),
        'endeavor' : ib.dub( df['Competition Endeavor'] ),
    },
)
