# Simulating MSK-IMPACT 410 Gene Panel Mutational Spectra

**Goal**: simulate an MSK-IMPACT 410 gene panel mutational spectra matrix from ICGC whole genome mutational annotation file (MAF) datasets.

This notebook assumes that the notebooks— `Consuming ICGC Data.ipynb` and `Processing ICGC Data.ipynb` have already been run, and in that order. Among other things, `Processing ICGC Data.ipynb` generates a set of MAF files from the simple somatic mutation (SSM) data, generated from whole genome sequencing (WGS) analysis, which we downloaded from the ICGC Data Portal. In this notebook, we will—
* Downsample the WGS MAF files to contain only those mutations that are covered by [the MSK-IMPACT 410 targeted gene panel](https://pubmed.ncbi.nlm.nih.gov/25801821/).
* Convert the downsampled MSK-IMPACT 410 gene panel data into a mutational spectra matrix for mutational signature analysis.


In [39]:
from collections import OrderedDict
from pathlib import Path
from typing import List

from natsort import natsorted
import numpy as np
import pandas as pd
import pyfaidx

In [2]:
dir_data = Path.cwd().parent / "data"
dir_maf_dirs = dir_data / "WGS_MAFs"

fpath_grch37 = dir_data / "hg19.fa"
fpath_msk_impact_bed = dir_data / "MSK-IMPACT410.bed"

In [11]:
dir_panel_maf_dirs = dir_data / "MSK_IMPACT_410_MAFs"
if not dir_panel_maf_dirs.exists():
        dir_panel_maf_dirs.mkdir()

In [3]:
msk_impact_df = pd.read_csv(fpath_msk_impact_bed, sep="\t")

In [4]:
msk_impact_df.head()

Unnamed: 0,Chromosome,Start_Position,End_Position,Hugo_Symbol,ID,SEQ_ASSAY_ID,Feature_Type,includeInPanel
0,9,133710831,133710915,ABL1,ABL1,MSK-IMPACT410,exon,True
1,9,133729448,133729627,ABL1,ABL1,MSK-IMPACT410,exon,True
2,9,133730185,133730486,ABL1,ABL1,MSK-IMPACT410,exon,True
3,9,133738147,133738425,ABL1,ABL1,MSK-IMPACT410,exon,True
4,9,133747513,133747603,ABL1,ABL1,MSK-IMPACT410,exon,True


In [43]:
def read_sbs_maf_file(filepath: Path) -> pd.DataFrame:
    """
    Reads only single base substitutions from an MAF file generated from an
    ICGC SSM dataset.

    :param filepath: file path to the MAF file.
    :return: Pandas dataframe with only single base substitutions.
    """
    data = pd.read_csv(filepath, sep="\t")
    data = data.loc[data["mutation_type"] == "single base substitution"].reset_index(drop=True)
    return data


def is_mutation_in_gene_panel(panel_df,
                              chromosome: str,
                              position_start: int,
                              position_end: int) -> bool:
        """
        Checks if the defined mutation is measured by the gene panel.

        :param panel_df: gene panel BED file.
        :param chromosome: chromosome name of the mutation.
        :param position_start: starting position of the mutation.
        :param position_end: end position of the mutation.
        :return: True if the mutation region is measured in the gene panel.
            False otherwise.
        """
        panel_matches = panel_df.loc[((panel_df["Chromosome"] == chromosome) &
                                      (panel_df["Start_Position"] <= position_start) &
                                      (panel_df["End_Position"] >= position_end))]
        return len(panel_matches) > 0


def return_panel_rows(panel_df: pd.DataFrame,
                      wg_maf: pd.DataFrame) -> pd.DataFrame:
    """
    Takes an MAF dataframe from WGS analysis and downsamples it to only return the
    mutations covered by the gene panel (as described by its BED file).

    :param panel_df: gene panel's BED file.
    :param wg_maf: MAF from WGS analysis.
    :return:
    """
    panel_rows = wg_maf.apply(lambda row: is_mutation_in_gene_panel(panel_df,
                                                                    row["chromosome"],
                                                                    row["chromosome_start"],
                                                                    row["chromosome_end"]),
                              axis=1)
    return wg_maf.loc[panel_rows]


def convert_wg_mafs_to_panel_mafs(dir_wg_mafs: Path,
                                  panel_df: pd.DataFrame,
                                  dir_panel_mafs: Path) -> None:
        """
        Converts each MAF file, generated from WGS analysis, in a directory and
        downsamples it to only MAF files containing only the mutations covered
        by the gene panel (as described by its BED file).

        :param dir_wg_mafs: a directory containing a set of MAF files from WGS
            analysis.
        :param panel_df: BED file of the gene panel.
        :param dir_panel_mafs: output directory.
        """
        maf_filepaths = list(dir_wg_mafs.glob("*"))

        for maf_filepath in maf_filepaths:
            filepath_panel = dir_panel_mafs / f"{maf_filepath.stem}"
            data_maf = read_sbs_maf_file(maf_filepath)
            data_maf = return_panel_rows(panel_df, data_maf)
            if len(data_maf) > 0:
                data_maf.to_csv(filepath_panel, sep="\t", index=False)


def panel_maf_dirs_from_wgs_maf_dirs(dir_wg_maf_dirs: Path,
                                     panel_df: pd.DataFrame,
                                     dir_panel_maf_dirs: Path) -> None:
        """
        For each directory within the specified directory, this method iterates through
        all MAF files, generated from WGS analysis, and downsamples it to create MAF
        files containing mutations covered by the gene panel (as described by its BED
        file).

        :param dir_wg_maf_dirs: a directory of directories, each containing a set of MAF files
            from WGS analysis.
        :param panel_df: BED file of the gene panel.
        :param dir_panel_maf_dirs: output directory.
        """
        for dir_wg_mafs in dir_wg_maf_dirs.iterdir():
                dir_panel_mafs = dir_panel_maf_dirs / dir_wg_mafs.name
                if not dir_panel_mafs.exists():
                        dir_panel_mafs.mkdir()
                convert_wg_mafs_to_panel_mafs(dir_wg_mafs, panel_df, dir_panel_mafs)

In [44]:
panel_maf_dirs_from_wgs_maf_dirs(dir_maf_dirs, msk_impact_df, dir_panel_maf_dirs)

  data = pd.read_csv(filepath, sep="\t")
  data = pd.read_csv(filepath, sep="\t")
  data = pd.read_csv(filepath, sep="\t")
  data = pd.read_csv(filepath, sep="\t")
  data = pd.read_csv(filepath, sep="\t")
  data = pd.read_csv(filepath, sep="\t")


In [45]:
def get_sbs_trinucleotide_contexts() -> List[str]:
    """
    Returns a list of trinucleotide context for single base substitutions (SBS)
    for constructing a COSMIC mutational spectra matrix.

    :return: a list of SBS trinucleotide contexts.
    """
    sbs_trinucleotide_contexts = []
    nucleotide_bases = ["A", "C", "G", "T"]
    substitution_types = ["C>A", "C>G", "C>T", "T>A", "T>C", "T>G"]

    for base_5 in nucleotide_bases:
        for substitution in substitution_types:
            for base_3 in nucleotide_bases:
                sbs_trinucleotide_contexts.append(f"{base_5}[{substitution}]{base_3}")

    return sbs_trinucleotide_contexts


def init_sbs_mutational_spectra(n_records: int) -> OrderedDict[str, List[int]]:
    """
    Initilizes an ordered dictionary with SBS trinucleotide context as keys and
    a list of counts, one for each sample.

    :param n_records: number of samples to record in the mutational spectra matrix.
    :return: an ordered dictionary of trinucleotide context and a list of counts
        initialized to zeros.
    """
    sbs_mutational_spectra = OrderedDict()
    sbs_trinucleotide_contexts = get_sbs_trinucleotide_contexts()

    for context in sbs_trinucleotide_contexts:
        sbs_mutational_spectra[context] = [0]*n_records

    return sbs_mutational_spectra


def index_reference_genome(ref_fasta_filepath: Path) -> pyfaidx.Fasta:
    """
    Returns an indexed FASTA file to quickly lookup subsequences in a genome.

    :param ref_fasta_filepath: filepath of the FASTA file of the reference genome.
    :return: an indexed FASTA file
    """
    return pyfaidx.Fasta(ref_fasta_filepath.as_posix())


def get_trinucleotide_ref_from_fasta(row: pd.Series,
                                     ref_fasta: pyfaidx.Fasta) -> str:
    """
    Returns the trinucleotides (5' base, reference allele, 3' base) around the
    mutation described by the row.

    :param row: a pandas row of the MAF file.
    :param ref_fasta: an indexed FASTA of the reference genome.
    :return: trinucleotide context for the mutation described by the row.
    """
    pointer = int(row["chromosome_start"])
    """
    '-2' and not '-1' because genomes are indexed starting from 1 but Python data
    structures are indexed starting from 0.
    """
    return ref_fasta[f"chr{row['chromosome']}"][(pointer-2):(pointer+1)].seq.upper()


def standardize_trinucleotide(trinucleotide_ref: str) -> str:
    """
    COSMIC signatures define mutations from a pyrimidine allele (C, T) to any
    other base (C>A, C>G, C>T, T>A, T>C, T>G). If a mutation in the MAF file
    is defined from a purine allele (A, G), then we infer the trinucleotide
    context in the complementary sequence, which would be from a pyrimidine
    allele due to purines and pyrimidines complementing each other in a
    double-stranded DNA.

    :param trinucleotide_ref: trinucleotide sequence seen in the reference genome.
    :return: a pyrimidine-centric trinucleotide sequence.
    """
    complement_seq = {
        'A': 'T',
        'C': 'G',
        'T': 'A',
        'G': 'C'
    }
    purines = ["A", "G"]
    if trinucleotide_ref[1] in purines:
        return f"{complement_seq[trinucleotide_ref[2]]}" \
               f"{complement_seq[trinucleotide_ref[1]]}" \
               f"{complement_seq[trinucleotide_ref[0]]}"
    else:
        return trinucleotide_ref


def standardize_substitution(ref_allele: str,
                             mut_allele: str) -> str:
    """
    COSMIC signatures define mutations from a pyrimidine allele (C, T) to any
    other base (C>A, C>G, C>T, T>A, T>C, T>G). If a mutation in the MAF file
    is defined from a reference purine allele (A, G), then we infer the substituted
    base in the complementary sequence, which would be from a pyrimidine
    allele due to purines and pyrimidines complementing each other in a
    double-stranded DNA.

    :param ref_allele: base in the reference genome.
    :param mut_allele: base in the mutated genome
    :return: substitution string from pyrimidine to any other base.
    """
    complement_seq = {
        'A': 'T',
        'C': 'G',
        'T': 'A',
        'G': 'C'
    }
    purines = ["A", "G"]
    if ref_allele in purines:
        return f"{complement_seq[ref_allele]}>{complement_seq[mut_allele]}"
    else:
        return f"{ref_allele}>{mut_allele}"


def add_instance_to_mutational_spectra(maf_df: pd.DataFrame,
                                       mutational_spectra: OrderedDict[str, List[int]],
                                       ref_fasta: pyfaidx.Fasta,
                                       index: int) -> None:
    """
    Parses each row in a MAF dataframe generated from an ICGC SSM dataset and tabulates a
    mutational spectra count matrix in the form of an ordered dictionary.

    :param maf_df: MAF dataframe generated from an ICGC SSM dataset.
    :param mutational_spectra: an ordered dictionary to tabulat the mutational spectra matrix.
    :param ref_fasta: an indexed reference genome.
    :param index: row index in the mutational spectra matrix to tabulate in the counts.
    """
    nucleotide_bases = ["A", "C", "G", "T"]
    pyrimidine = ["C", "T"]

    for _, row in maf_df.iterrows():
        if((row["chromosome_start"] != row["chromosome_end"]) or
                (row["reference_genome_allele"] not in nucleotide_bases) or
                (row["mutated_to_allele"] not in nucleotide_bases)):
            continue
        trinucleotide_ref = standardize_trinucleotide(
            get_trinucleotide_ref_from_fasta(row, ref_fasta))
        substitution = standardize_substitution(row["reference_genome_allele"],
                                                row["mutated_to_allele"])

        # sanity checks
        try:
            assert (trinucleotide_ref is not None)
            assert (trinucleotide_ref[1] == substitution[0])
            assert (trinucleotide_ref[1] in pyrimidine)
            assert (substitution[0] in pyrimidine)
        except AssertionError:
            print(f"MAF row: {row['chromosome']}, "
                  f"{row['chromosome_start']}, "
                  f"{row['chromosome_end']}, "
                  f"{row['reference_genome_allele']}, "
                  f"{row['mutated_to_allele']}")
            print(f"FASTA context: {get_trinucleotide_ref_from_fasta(row, ref_fasta)}")
            print(f"Pyrimidine-centric context: {trinucleotide_ref}")
            raise

        mutational_spectra[f"{trinucleotide_ref[0]}[{substitution}]{trinucleotide_ref[2]}"][index] += 1


def write_mutational_spectra(mutational_spectra: OrderedDict,
                             sample_names: List[str],
                             filepath: Path) -> None:
    """
    Writes the mutational spectra matrix data, stored in an ordered dictionary, to a CSV file.

    :param mutational_spectra: mutational spectra matrix data stored in an ordered dictionary.
    :param sample_names: a list of names of the samples.
    :param filepath: name of the CSV file to save the data.
    """
    data = np.stack([np.array(mutational_spectra[substitution]) for substitution in mutational_spectra.keys()])
    index = pd.Series(
        data=mutational_spectra.keys(),
        name="Mutation Types"
    )
    mutational_spectra_df = pd.DataFrame(
        data=data,
        index=index,
        columns=sample_names,
        dtype=int,
    )
    mutational_spectra_df.to_csv(filepath, sep=",", index=True)


def convert_mafs_to_sbs_mutational_spectra(dir_mafs: Path,
                                           ref_fasta_filepath: Path,
                                           filepath_output: Path) -> None:
    """
    Converts all MAF files (one file per sample) in a directory into a mutational spectra
    matrix and saves it as a CSV file.

    :param dir_mafs: a directory containing MAF files.
    :param ref_fasta_filepath: filepath to the reference genome FASTA file.
    :param filepath_output: file path to save the mutational spectra CSV file.
    """
    maf_filepaths = natsorted(list(dir_mafs.glob("*")))
    n_samples = len(maf_filepaths)
    mutational_spectra = init_sbs_mutational_spectra(n_samples)
    ref_fasta = index_reference_genome(ref_fasta_filepath)
    donors = list()

    donor_index = 0
    for maf_filepath in maf_filepaths:
        data_maf = read_sbs_maf_file(maf_filepath)
        add_instance_to_mutational_spectra(data_maf, mutational_spectra, ref_fasta, donor_index)
        donors.append(maf_filepath.name)
        donor_index += 1
    write_mutational_spectra(mutational_spectra, donors, filepath_output)


def convert_maf_dirs_to_sbs_mutational_spectra(dir_maf_dirs: Path,
                                               ref_fasta_filepath: Path,
                                               dir_output: Path) -> None:
    """
    For each directory within the specified directory, this method iterates through all
    MAF files and creates a mutational spectra matrix and saves them as a CSV file.

    :param dir_maf_dirs: a directory of directories, each containing a set of MAF files.
    :param ref_fasta_filepath: filepath to the reference genome FASTA file.
    :param dir_output: directory to save the mutational spectra CSV files.
    """
    for dir_mafs in dir_maf_dirs.iterdir():
        if dir_mafs.is_dir():
            filepath_output = dir_output / f"{dir_mafs.name}.csv"
            convert_mafs_to_sbs_mutational_spectra(dir_mafs, ref_fasta_filepath, filepath_output)

In [46]:
fpath_grch37 = dir_data / "hg19.fa"
dir_panel_spectra = dir_data / "mutational_spectra_panel"
if not dir_panel_spectra.exists():
    dir_panel_spectra.mkdir()

convert_maf_dirs_to_sbs_mutational_spectra(dir_panel_maf_dirs, fpath_grch37, dir_panel_spectra)