# Calculate fitness effects of nucleotide mutations from expected versus actual counts

Get values from `snakemake`:

In [None]:
if "snakemake" in locals() or "snakemake" in globals():
    # running in `snakemake` pipeline
    fitness_pseudocount = snakemake.params.fitness_pseudocount
    expected_vs_actual_counts_csv = snakemake.input.csv
    output_ntmut_all = snakemake.output.ntmut_all
    output_ntmut_by_clade = snakemake.output.ntmut_by_clade
    output_ntmut_by_subset = snakemake.output.ntmut_by_subset
else:
    # running interactively for development
    fitness_pseudocount = 0.5
    expected_vs_actual_counts_csv = "../results_public_2022-12-18/expected_vs_actual_mut_counts/expected_vs_actual_mut_counts.csv"
    output_ntmut_all = "../_temp/ntmut_fitness_all.csv"
    output_ntmut_by_clade = "../_temp/ntmut_fitness_by_clade.csv"
    output_ntmut_by_subset = "../_temp/ntmut_fitness_by_subset.csv"

Import Python modules:

In [None]:
import numpy

import pandas as pd

Read data, then:
 - Ignore any nucleotides masked in `UShER`
 - Ignore excluded mutations

In [None]:
expected_vs_actual = (
    pd.read_csv(expected_vs_actual_counts_csv, low_memory=False)
    .query("not masked_in_usher")
    .query("not exclude")
    .drop(
        columns=
            [
                "exclude",
                "masked_in_usher",
                "clade_founder_nt",
                "clade_founder_codon",
                "clade_founder_aa",
                "mutant_codon",
                "mutant_aa",
                "codon_position",
                "codon_site",
                "aa_mutation",
            ],
    )
)

assert len(expected_vs_actual) == len(expected_vs_actual.groupby(["nt_mutation", "clade", "subset"]))

Now for each subset, compute fitness as log expected / actual (after adding pseudocount) and write to a file:

In [None]:
for by_clade, by_subset, outfile in [
    (False, False, output_ntmut_all),
    (True, False, output_ntmut_by_clade),
    (False, True, output_ntmut_by_subset),
]:
    df = expected_vs_actual
    if not by_clade:
        assert "all" not in df["clade"].unique()
        df = (
            df
            .assign(
                weighted_mean_log_size=lambda x: x["mean_log_size"] * x["actual_count"],
            )
            .drop(columns="mean_log_size")
            .groupby(
                [
                    c for c in df.columns
                    if c not in {
                        "clade",
                        "expected_count",
                        "actual_count",
                        "count_terminal",
                        "count_non_terminal",
                        "mean_log_size",
                        "weighted_mean_log_size",
                        "synonymous",
                        "four_fold_degenerate",
                    }
                ],
                as_index=False,
            )
            .aggregate(
                {
                    "synonymous": "all",
                    "four_fold_degenerate": "all",
                    "expected_count": "sum",
                    "actual_count": "sum",
                    "count_terminal": "sum",
                    "count_non_terminal": "sum",
                    "weighted_mean_log_size": "sum",
                }
            )
            .assign(
                mean_log_size=lambda x: numpy.where(
                    x["actual_count"] > 0,
                    x["weighted_mean_log_size"] / x["actual_count"],
                    0,
                ),
            )
            .drop(columns="weighted_mean_log_size")
        )

    if not by_subset:
        df = df.query("subset == 'all'").drop(columns="subset")
    
    df["delta_fitness"] = numpy.log(
        (df["actual_count"] + fitness_pseudocount)
        / (df["expected_count"] + fitness_pseudocount)
    )
    
    n_groups = len(df.groupby([c for c in ["nt_mutation", "clade", "subset"] if c in df.columns]))
    assert len(df) == n_groups, f"{len(df)=}, {n_groups=}"

    df.to_csv(outfile, index=False, float_format="%.5g")