In [12]:
"""Module to annotate TSV againts a Panel of Normals."""
import gzip
from os.path import basename, dirname, join
import pandas as pd
import logging

LOGGER = logging.getLogger("annotvcf")
LOGGER_LEVEL = logging.DEBUG
LOGGER_FORMAT = "[%(levelname)s] - %(asctime)s: %(message)s"
LOGGER_DATEFMT = "%m/%d/%Y %I:%M:%S %p"
LOGGER.setLevel(LOGGER_LEVEL)

# Create console handler and set level to debug.
HANDLER = logging.StreamHandler()
HANDLER.setLevel(LOGGER_LEVEL)

# Add formatter to handler.
FORMATTER = logging.Formatter(LOGGER_FORMAT, datefmt=LOGGER_DATEFMT)
HANDLER.setFormatter(FORMATTER)
LOGGER.addHandler(HANDLER)
LOGGER.propagate = False

LOGGING_LEVELS = {
    "critical": logging.CRITICAL,
    "error": logging.ERROR,
    "warning": logging.WARNING,
    "debug": logging.DEBUG,
    "info": logging.INFO,
}

def annot_normals(
        input_file_name, normal_tsv, threshold, output_file_name, loglevel
    ):
    """Annotate variants with internal normal controls."""
    LOGGER.setLevel(LOGGING_LEVELS.get(loglevel))
    LOGGER.info("Running script for annotating against panel of normals...")

    # Create aggregated file for Pool of Panel of Normals
    normals = __import_pool_normals(normal_tsv)
    normals = __aggregate_pool_normals(normals)
    outdir = dirname(output_file_name)
    normal_tsv_name = basename(normal_tsv[0]).split('.tsv')[0]
    normals_file = join(outdir, f"{normal_tsv_name}.pon.normals.tsv.gz")
    nfile = gzip.open(normals_file, "wt")
    normals.to_csv(
        path_or_buf=nfile,
        sep="\t",
        float_format="%.3f",
        na_rep="NA",
    )
    nfile.close()
    LOGGER.info("Created pool of normals file: %s", normals_file)
    normals["CHR"] = normals["CHR"].astype(str)
    normals = normals.set_index(["CHR", "START"]).sort_index()

    # Get columns to annotate
    common_columns = ["Match_Class", "Counts", "Position", "Change"]
    extra_columns = list(set(normals.columns) - {'REF', 'ALT', 'COUNT'})
    output_columns = [
        f"NORMALS_{field}" for field in common_columns + extra_columns
    ]

    # Annotate Input File against Panel of Normals
    if not output_file_name.endswith('.gz'):
        output_file_name += ".gz"

    ifile = gzip.open(input_file_name, "rt")
    ofile = gzip.open(output_file_name, "wt")

    data = []
    write_column_names = True
    for line in ifile.readlines():
        if line.startswith("#"):
            ofile.write(line)
        elif line.startswith("ID_VARIANT"):
            input_columns = line.strip().split("\t")
        else:
            values = line.strip().split("\t")
            data.append(values)

        # Write Data by chunks of 10000 records
        if len(data) >= 10000:
            variants = pd.DataFrame(data=data, columns=input_columns)
            variants[output_columns] = variants.apply(
                lambda row: __annotate_line(
                    row, normals, extra_columns, threshold
                ),
                axis=1
            )
            variants.to_csv(
                path_or_buf=ofile,
                header=write_column_names,
                index=False,
                sep="\t",
                float_format="%.3f",
                na_rep="NA",
            )
            data = []

            # Flag to only write column names once in the file
            if write_column_names:
                write_column_names = False

    # Write the last chunk of data when all lines are parsed
    if data:
        variants = pd.DataFrame(data=data, columns=input_columns)
        variants[output_columns] = variants.apply(
            lambda row: __annotate_line(
                row, normals, extra_columns, threshold
            ),
            axis=1
        )
        variants.to_csv(
            path_or_buf=ofile,
            header=write_column_names,
            index=False,
            sep="\t",
            float_format="%.3f",
            na_rep="NA",
        )

    ifile.close()
    ofile.close()
    LOGGER.info(
        "Created file with Pool of normals annotation: %s", output_file_name
    )


def __annotate_line(line, normals, extra_columns, threshold):
    """
    Annotate each variant with found variants in panel of normals.
    A variant is annotated if there is a match, with 4 fixed columns, plus the
    extra columns information regarding to VAF found in the normals file.
    Fixed columns:
        Match_class:    genomic_exact: exact match with variant.
                        genomic_same_pos: other variants in same position.
                        genomic_close_pos: other variants found close.
        Counts:         Number of matches.
        Positions:      START position.
        Change:         REF > ALT.
    Arguments:
        line (pd.Series): Dataframe row containing a variant information.
        normals (pd.Dataframe): Dataframe with panel of normals info.
        extra_columns (list): extra columns of panel of normals to annotate.
        threshold (int): number of base pairs to define the search region.
    Return:
        pd.Series: Annotation added to each variant row.
    """
    match_class = ""
    chrom = line["CHR"]
    start = int(line["START"])

    if (chrom, start) in normals.index:
        # variants in same position.
        same_pos = normals.loc[(chrom, start):(chrom, start)]
        # variants that match exactly both position and change.
        exact_match = same_pos[
            (same_pos["REF"] == line["REF"]) &
            (same_pos["ALT"] == line["ALT"])
        ]
        if not exact_match.empty:
            match = exact_match
            match_class = "genomic_exact"
        else:
            match = same_pos
            match_class = "genomic_same_pos"
    else:
        # variants that are located close in matchition,
        close_pos = normals.loc[
            (chrom, start - threshold):(chrom, start + threshold)
        ]
        if not close_pos.empty:
            match = close_pos
            match_class = "genomic_close_pos"

    if match_class:
        annotation = [
            match_class,
            ", ".join(match["COUNT"].astype(str)),
            ", ".join([
                str(i) for i in match.index.get_level_values("START")
            ]),
            ", ".join([
                f"{a}>{b}" for a, b in zip(match["REF"], match["ALT"])
            ])
        ]
        annotation += [
            ", ".join(match[column].astype(str))
            for column in extra_columns
        ]
    else:
        annotation = (4 + len(extra_columns)) * [None]
    return pd.Series(annotation)


def __import_pool_normals(normals_tsv):
    """Read all tsv files and generate an appended dataframe by sample rows."""
    pool = []
    for path in normals_tsv:
        try:
            normals_chunks = pd.read_csv(
                filepath_or_buffer=path,
                compression="gzip",
                chunksize=20000,
                sep="\t",
                comment="#",
                low_memory=False,
                dtype={'CHR': str},
            )
            pool.extend(normals_chunks)
        except NameError:
            raise Exception(f"Error when reading {path}. File must be bgzip.")

    normals = pd.concat(pool)
    LOGGER.info(
        "Creating normals using %s file(s), containing %s variant calls.",
        len(normals_tsv), normals.shape[0]
    )
    return normals


def __aggregate_pool_normals(normals):
    """Aggregate a tsv file with normals of several patients, one by row."""
    callers = ["caveman_", "pindel_", "mutect_", "strelka_", ""]
    targets = {
        (caller + "TARGET_VAF"): caller
        for caller in callers
        if caller + "TARGET_VAF" in normals.columns
    }
    normals = normals[["CHR", "START", "REF", "ALT"] + list(targets)]
    normals = normals.groupby(["CHR", "START", "REF", "ALT"])
    pon_agg = normals['ALT'].size().to_frame(name='COUNT')

    for target, caller in targets.items():
        pon_agg[caller + 'MEDIAN_VAF'] = (
            normals[target].quantile(q=0.5).round(3)
        )
    for target, caller in targets.items():
        pon_agg[caller + 'VAF_Q25'] = normals[target].quantile(q=0.25).round(3)
        pon_agg[caller + 'VAF_Q75'] = normals[target].quantile(q=0.75).round(3)

    pon_agg = pon_agg.reset_index()

    LOGGER.info(
        "Aggregated normals containing %s unique variants.", pon_agg.shape[0]
    )
    return pon_agg


In [13]:
def __import_pool(tsvs):
    """Read all tsv files and generate an appended dataframe by sample rows."""
    for path in tsvs:
        try:
            indels = []
            chunks = pd.read_csv(
                filepath_or_buffer=path,
                compression="gzip",
                chunksize=20000,
                sep="\t",
                comment="#",
                low_memory=False,
                dtype={'CHR': str},
            )
            for i in chunks:
                indels.append(i[i["ANY2_LCC"].astype(bool)])
        except NameError:
            raise Exception(f"Error when reading {path}. File must be bgzip.")

    pool = pd.concat(indels)
    LOGGER.info(
        "Creating normals using %s file(s), containing %s variant calls.",
        len(tsvs), pool.shape[0]
    )
    return pool

WGS_PASS_TSV = "/work/isabl/data/analyses/30/02/173002/annot_indels__2.0.0__32.wg.pass_tsv.gz"
# WGS_PASS_TSV = "/work/isabl/data/analyses/43/24/164324/pass/I-H-135076-T1-1-D1-1_vs_I-H-135076-N1-1-D1-1.indels.output.annot.tsv.gz"

pool = __import_pool([WGS_PASS_TSV])

[INFO] - 08/06/2019 06:23:54 PM: Creating normals using 1 file(s), containing 183425 variant calls.
[INFO] - 08/06/2019 06:23:54 PM: Creating normals using 1 file(s), containing 183425 variant calls.


In [None]:
len(pool)
pool = __aggregate_pool_normals(pool)

In [None]:
pool.head()

In [None]:
with gzip.open("any2lcc_wgs.tsv.gz", "wt") as nfile:
    pool.to_csv(
        path_or_buf=nfile,
        sep="\t",
        float_format="%.3f",
        na_rep="NA",
    )

In [None]:
# with open('indels_pass_p116.txt', 'r') as ifile:
#     for pass_pattern in ifile.readlines()[:2]:
#         pass_path = glob(pass_pattern.strip())[0]
#         filename = basename(pass_path).split('.pass.vcf.gz')[0]
        
#         print(f'📖Reading file {pass_path}')
#         indels = []
#         chunks = pd.read_csv(
#             filepath_or_buffer=pass_path,
#             compression="gzip",
#             chunksize=20000,
#             sep="\t",
#             comment="#",
#             low_memory=False,
#             dtype={'CHR': str},
#         )
#         for i in chunks:
#             indels.append(i[i["ANY2_LCC"].astype(bool)])
        
#         out_file_name = join('P116', f'{filename}.any2lcc.vcf')
#         print(f'📖Saving file {pass_path}')
#         with gzip.open("any2lcc_wgs.tsv.gz", "wt") as ofile:
#             pool.to_csv(
#                 path_or_buf=ofile,
#                 sep="\t",
#                 float_format="%.3f",
#                 na_rep="NA",
#             )