# Tabulate phenotypes of SARS-CoV-2 spike that might have predictive power in understanding evolution for different mutations and clades

Import Python modules:

In [1]:
import gzip
import json
import os
import urllib.request

import altair as alt

import numpy

import pandas as pd

import ruamel.yaml as yaml

_ = alt.data_transformers.disable_max_rows()

Read the configuration YAML:

In [2]:
# This cell is tagged as parameters. So if you run the notebook using papermill
# (https://papermill.readthedocs.io/en/latest/usage-cli.html) with 
# `-p config_yaml <config_yaml_file>` then the file defined here will be replaced
# with the one you pass
config_yaml = "config.yaml"

In [3]:
with open(config_yaml) as f:
    config = yaml.YAML().load(f)

## Mutation effects on phenotypes
Read all the specified mutation-level phenotypic effects into a Data Frame.
Write that data frame to a file.
Also created data frame and file in which the mutation-level phenotypic data are randomized among mutations.

In [4]:
mutation_phenotypes = []
req_cols = ["site", "wildtype", "mutant"]
ref_clades = []
for ref_clade, ref_clade_d in config["mutation_phenotype_csvs"].items():
    ref_clades.append(ref_clade)
    for phenotype_category, category_d in ref_clade_d.items():
        pheno_cols = category_d["columns"]
        print(
            f"Reading {phenotype_category} phenotypes from {category_d['csv']}\n  "
            + "\n  ".join(pheno_cols)
        )
        if set(req_cols).intersection(pheno_cols):
            raise ValueError(f"{phenotype_category} columns {pheno_cols} include {req_cols}")
        df = pd.read_csv(category_d["csv"])
        if not set(req_cols + pheno_cols).issubset(df.columns):
            raise ValueError(f"cannot find expected columns for {phenotype_category}")
        mutation_phenotypes.append(
            df
            .melt(
                id_vars=req_cols,
                value_vars=pheno_cols,
                var_name="phenotype",
                value_name="mutation_effect",
            )
            .assign(
                ref_clade=ref_clade,
                phenotype=lambda x: (
                    phenotype_category
                    if len(pheno_cols) == 1 and pheno_cols[0] == phenotype_category
                    else phenotype_category + " " + x["phenotype"]
                ),
            )
            [["ref_clade", "phenotype", *req_cols, "mutation_effect"]]
            .query("mutation_effect.notnull()")
            .query("wildtype != mutant")  # drop any wildtype to wildtype mutations
        )

mutation_phenotypes = pd.concat(mutation_phenotypes, ignore_index=True)

mutation_phenotypes_csv = config["mutation_phenotypes_csv"]
os.makedirs(os.path.dirname(mutation_phenotypes_csv), exist_ok=True)
print(f"\nWriting mutation phenotypes to {mutation_phenotypes_csv}")
mutation_phenotypes.to_csv(mutation_phenotypes_csv, index=False, float_format="%.4g")

n_randomizations = config["n_randomizations"]
print(f"\nNow randomizing each mutation phenotype {n_randomizations} times")
mutation_phenotypes_randomized = []
for _, df in mutation_phenotypes.groupby(["ref_clade", "phenotype"]):
    for random_seed in range(n_randomizations):
        numpy.random.seed(random_seed)
        mutation_phenotypes_randomized.append(
            df
            .assign(
                mutation_effect=lambda x: numpy.random.permutation(x["mutation_effect"]),
                random_seed=random_seed
            )
            [["random_seed"] + df.columns.tolist()]
        )
mutation_phenotypes_randomized = pd.concat(mutation_phenotypes_randomized, ignore_index=True)
mutation_phenotypes_randomized_csv = config["mutation_phenotypes_randomized_csv"]
os.makedirs(os.path.dirname(mutation_phenotypes_randomized_csv), exist_ok=True)
print(f"Writing randomized mutation phenotypes to {mutation_phenotypes_randomized_csv}")
mutation_phenotypes_randomized.to_csv(mutation_phenotypes_randomized_csv, index=False, float_format="%.4g")

Reading spike pseudovirus DMS phenotypes from data/spike_pseudovirus_DMS_XBB.1.5.csv
  human sera escape
  spike mediated entry
  ACE2 binding
Reading RBD yeast-display DMS phenotypes from data/yeast_RBD_DMS_XBB.1.5.csv
  ACE2 affinity
  RBD expression
  escape
Reading EVEscape phenotypes from data/EVEscape_XBB_single_mutation_predictions.csv
  EVEscape

Writing mutation phenotypes to results/mutation_phenotypes.csv

Now randomizing each mutation phenotype 10 times
Writing randomized mutation phenotypes to results/mutation_phenotypes_randomized.csv


## Read the Pango clades
Read Pango clade definitions come from Cornelius Roemer's GitHub repo ([https://github.com/corneliusroemer/pango-sequences](https://github.com/corneliusroemer/pango-sequences)):

In [5]:
pango_json = config["pango_json"]
print(f"Reading Pango clade definitions from {pango_json}")
with urllib.request.urlopen(pango_json) as url:
    pango_clades = json.load(url)
print(f"Read definitions for {len(pango_clades)} clades")

Reading Pango clade definitions from https://raw.githubusercontent.com/corneliusroemer/pango-sequences/main/data/pango-consensus-sequences_summary.json
Read definitions for 3810 clades


Read Pango clade growth rates from Bedford lab repo ([https://github.com/nextstrain/forecasts-ncov/](https://github.com/nextstrain/forecasts-ncov/)):

In [6]:
pango_growth_json = config["pango_growth_json"]
print(f"Reading Pango clade growth rates from {pango_growth_json}")
with urllib.request.urlopen(pango_growth_json) as url:
    pango_growth = json.loads(gzip.decompress(url.read()))

# convert to data frame
pango_growth = (
    pd.DataFrame(pango_growth["data"])
    .query("location == 'hierarchical'")
    .query("site == 'ga'")
    .pivot_table(index="variant", values="value", columns="ps")
    .reset_index(names="clade")
    .assign(
        clade_growth_HDI_95=lambda x: x.apply(
            lambda row: f"{row['HDI_95_lower']:.3g} to {row['HDI_95_upper']:.3g}",
            axis=1,
        ),
    )
    .rename(columns={"median": "clade growth", "clade_growth_HDI_95": "clade growth HDI 95"})
    [["clade", "clade growth", "clade growth HDI 95"]]
    .query("clade != 'other'")
)
assert len(pango_growth) == pango_growth["clade"].nunique()
assert set(pango_growth["clade"]).issubset(pango_clades)

print(f"Read growth data for {len(pango_growth)} clades")

Reading Pango clade growth rates from https://data.nextstrain.org/files/workflows/forecasts-ncov/gisaid/pango_lineages/global/mlr/latest_results.json
Read growth data for 90 clades


Get a data frame of all Pango clades along with other relevant information:

In [7]:
def is_descendant_of(clade, ancestor):
    """Returns True iff `clade` is a descendant of `ancestor`."""
    if pango_clades[clade]["parent"] == ancestor:
        return True
    elif pango_clades[clade]["parent"]:
        return is_descendant_of(pango_clades[clade]["parent"], ancestor)
    else:
        return False

def relative_mutations(clade_muts, reference_muts):
    """Get mutation in `clade_muts` relative `reference_muts`."""
    shared_muts = set(clade_muts).intersection(reference_muts)
    clade_sites = {
        r: (wt, m) for (wt, r, m) in [tup for tup in clade_muts if tup not in shared_muts]
    }
    reference_sites = {
        r: (wt, m) for (wt, r, m) in [tup for tup in reference_muts if tup not in shared_muts]
    }
    muts = []
    for r, (wt, m) in clade_sites.items():
        if r in reference_sites:
            assert wt == reference_sites[r][0]
            muts.append((r, reference_sites[r][1], m))
        else:
            muts.append((r, wt, m))
    for r, (wt, m) in reference_sites.items():
        if r in clade_sites:
            assert wt == clade_sites[r][0]
            pass  # already counted
        else:
            muts.append((r, m, wt))
    return [(wt, r, m) for (r, wt, m) in sorted(muts)]

def parse_spike_muts(clade_d):
    """Parse spike mutations from dict for a clade."""
    return [
        (mut.split(":")[1][0], int(mut.split(":")[1][1: -1]), mut.split(":")[1][-1])
        for mut in clade_d["aaSubstitutions"] + clade_d["aaDeletions"]
        if mut and mut.startswith("S:")
    ]

clades_df = {}

ref_clade_spike_muts = {
    ref_clade: parse_spike_muts(pango_clades[ref_clade])
    for ref_clade in ref_clades
}

for clade, clade_d in pango_clades.items():
    spike_muts = parse_spike_muts(clade_d)
    clades_df[clade] = {
        "date": clade_d["designationDate"] if clade_d["designationDate"] else pd.NA,
        "parent": clade_d["parent"] if clade_d["parent"] else pd.NA,
        **{
            f"spike muts from {ref_clade}":
                relative_mutations(spike_muts, ref_clade_spike_muts[ref_clade])
            for ref_clade in ref_clades
        },
        **{
            f"descendant of {ancestor}": is_descendant_of(clade, ancestor)
            for ancestor in config["classify_descendants_of"]
        },
        
    }
    
clades_df = pd.DataFrame.from_dict(clades_df, orient="index").reset_index(names="clade")

# add growth information
clades_df = clades_df.merge(pango_growth, on="clade", validate="one_to_one", how="left")

## Assign phenotypes to clades
Phenotypes are just sum of mutation effects of all mutations with respect to the `spike_muts_relative_to` clade.
Mutations without measured phenotype effects are ignored (given values of zero).

In [8]:
class PhenotypeAssigner:
    """Assign phenotypes to sets of mutations.

    Parameters
    ----------
    mutation_phenotypes_df : pandas.DataFrame
        Should have columns `site`, `wildtype`, `mutant`, `mutation_effect`.
        
    """
    def __init__(self, mutation_phenotypes_df):
        assert len(mutation_phenotypes_df) == len(
            mutation_phenotypes_df[["site", "mutant"]].drop_duplicates()
        )
        self.sites = sorted(set(mutation_phenotypes_df["site"]))
        assert len(self.sites) == len(
            mutation_phenotypes_df[["site", "wildtype"]].drop_duplicates()
        )
        self.wts = mutation_phenotypes_df.set_index("site")["wildtype"].to_dict()
        self.effects = {
            site: site_df.set_index("mutant")["mutation_effect"].to_dict()
            for site, site_df in mutation_phenotypes_df.groupby("site")
        }
        for site, wt in self.wts.items():
            assert wt not in self.effects[site]
            self.effects[site][wt] = 0.0

    def phenotype(self, muts):
        """Returns phenotype for list of `muts` as `(wildtype, site, mutant)`."""
        pheno = 0.0
        for wt, site, m in muts:
            if (site in self.effects) and (wt in self.effects[site]) and (m in self.effects[site]):
                pheno += self.effects[site][m] - self.effects[site][wt]
        return pheno

# assign clade phenotypes, Hamming distance from reference, and mutation lists as strings
clade_phenos = clades_df.copy()
new_clade_pheno_cols = []
for (ref_clade, phenotype), df in mutation_phenotypes.groupby(
    ["ref_clade", "phenotype"],
    sort=False,
):
    phenos = PhenotypeAssigner(df)
    new_clade_pheno_cols.append(
        clade_phenos[f"spike muts from {ref_clade}"].map(phenos.phenotype).rename(
            f"{phenotype} relative to {ref_clade}"
        )
    )
for ref_clade in ref_clades:
    col = f"spike muts from {ref_clade}"
    new_clade_pheno_cols.append(
        clade_phenos[col].map(len).rename(f"Hamming distance relative to {ref_clade}")
    )
    clade_phenos[col] = clade_phenos[col].map(
        lambda mutlist: " ".join(f"{wt}{r}{m}" for (wt, r, m) in mutlist)
    )
clade_phenos = pd.concat([clade_phenos] + new_clade_pheno_cols, axis=1)
clade_phenotypes_csv = config["clade_phenotypes_csv"]
os.makedirs(os.path.dirname(clade_phenotypes_csv), exist_ok=True)
print(f"Writing clade phenotypes to {clade_phenotypes_csv}")
clade_phenos.to_csv(clade_phenotypes_csv, index=False, float_format="%.4g")

# assign clade phenotypes from randomized data
print(f"\nComputing clade phenotypes from {n_randomizations} randomizations of mutation phenotypes")
clade_phenos_randomized = clades_df.copy()
new_clade_pheno_randomized_cols = []
for (random_seed, ref_clade, phenotype), df in mutation_phenotypes_randomized.groupby(
    ["random_seed", "ref_clade", "phenotype"]
):
    phenos = PhenotypeAssigner(df)
    new_clade_pheno_randomized_cols.append(
        clade_phenos_randomized[f"spike muts from {ref_clade}"].map(phenos.phenotype).rename(
            f"random_{random_seed} {phenotype} relative to {ref_clade}"
        )
    )
for ref_clade in ref_clades:
    col = f"spike muts from {ref_clade}"
    for random_seed in mutation_phenotypes_randomized["random_seed"].unique():
        new_clade_pheno_randomized_cols.append(
            clade_phenos_randomized[col].map(len).rename(
                f"random_{random_seed} Hamming distance relative to {ref_clade}"
            )
        )
    clade_phenos_randomized[col] = clade_phenos_randomized[col].map(
        lambda mutlist: " ".join(f"{wt}{r}{m}" for (wt, r, m) in mutlist)
    )
clade_phenos_randomized = pd.concat([clade_phenos_randomized] + new_clade_pheno_randomized_cols, axis=1)
clade_phenotypes_randomized_csv = config["clade_phenotypes_randomized_csv"]
os.makedirs(os.path.dirname(clade_phenotypes_randomized_csv), exist_ok=True)
print(f"Writing randomized clade phenotypes to {clade_phenotypes_randomized_csv}")
clade_phenos_randomized.to_csv(clade_phenotypes_randomized_csv, index=False, float_format="%.4g")

Writing clade phenotypes to results/clade_phenotypes.csv

Computing clade phenotypes from 10 randomizations of mutation phenotypes
Writing randomized clade phenotypes to results/clade_phenotypes_randomized.csv


## Make some summary plots
First get data frame appropriate for plotting with Altair:

In [15]:
# for Altair plotting, remove "." in column names as these cause problems,
# and make date column into dates
clade_phenos_alt_df = (
    clade_phenos
    .rename(columns={c: c.replace(".", "_") for c in clade_phenos.columns})
    .assign(date=lambda x: pd.to_datetime(x["date"]))
)

pheno_cols = clade_phenos_alt_df.columns.tolist()[
    clade_phenos_alt_df.columns.tolist().index("clade growth HDI 95") + 1:
]
assert all(col.split()[-3: -1] == ["relative", "to"] for col in pheno_cols)

clade_phenos_alt_df.head()

Unnamed: 0,clade,date,parent,spike muts from XBB_1_5,descendant of XBB,descendant of BA_2,descendant of BA_2_86,clade growth,clade growth HDI 95,spike pseudovirus DMS human sera escape relative to XBB_1_5,spike pseudovirus DMS spike mediated entry relative to XBB_1_5,spike pseudovirus DMS ACE2 binding relative to XBB_1_5,RBD yeast-display DMS ACE2 affinity relative to XBB_1_5,RBD yeast-display DMS RBD expression relative to XBB_1_5,RBD yeast-display DMS escape relative to XBB_1_5,EVEscape relative to XBB_1_5,Hamming distance relative to XBB_1_5
0,A,NaT,,I19T -24L -25P -26P S27A A83V D142G -144Y Q146...,False,False,False,,,-5.284474,-8.119085,3.349975,-4.71,-0.23,1.187948,63.7833,42
1,A.1,NaT,A,I19T -24L -25P -26P S27A A83V D142G -144Y Q146...,False,False,False,,,-5.284474,-8.119085,3.349975,-4.71,-0.23,1.187948,63.7833,42
2,A.2,NaT,A,I19T -24L -25P -26P S27A A83V D142G -144Y Q146...,False,False,False,,,-5.284474,-8.119085,3.349975,-4.71,-0.23,1.187948,63.7833,42
3,A.2.2,NaT,A.2,I19T -24L -25P -26P S27A A83V D142G -144Y Q146...,False,False,False,,,-5.284474,-8.119085,3.349975,-4.71,-0.23,1.187948,63.7833,42
4,A.2.3,NaT,A.2,I19T -24L -25P -26P S27A A83V D142G -144Y Q146...,False,False,False,,,-5.284474,-8.119085,3.349975,-4.71,-0.23,1.187948,63.7833,42


In [26]:
xaxis_param = alt.param(
    value=pheno_cols[0],
    bind=alt.binding_select(
        options=pheno_cols,
        name="Xxaxis variable",
    ),
)

base_chart = (
    alt.Chart(clade_phenos_alt_df)
    .add_params(xaxis_param)
    .transform_calculate(x=f"datum[{xaxis_param.name}]")
    .encode(
        alt.X(
            "x:Q",
            title="x-axis variable (select with dropdown below)",
            scale=alt.Scale(nice=False, zero=False, padding=4),
        ),
        #alt.X("spike pseudovirus DMS human sera escape relative to XBB_1_5"),
        alt.Y("clade growth"),
        tooltip=["clade"],
    )
    .mark_circle(size=50)
)

base_chart