# core

> Fill in a module description here

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import subprocess
import os
import pandas as pd

In [None]:
#| export

def count_variants(vcf_file):
    """Count the number of variants in a VCF file using subprocess."""
    if vcf_file.endswith('.gz'):
        cmd = f"bcftools view -H {vcf_file} | wc -l"
    else:
        cmd = f"grep -v '^#' {vcf_file} | wc -l"
    
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return int(result.stdout.strip())

def filter_variants():
    # Define input and output VCF files
    INPUT_VCF = "../data/freebayes.annotated_pc1.vcf.gz"
    QUAL_FILTERED_VCF = "../data/filtered_qual.vcf"
    DP_FILTERED_VCF = "../data/filtered_dp.vcf"
    SNP_FILTERED_VCF = "../data/filtered_snp.vcf"
    FINAL_VCF = "../data/filtered_final.vcf"
    
    print("======================================")
    print("Starting Variant Filtering Process")
    print("======================================")
    
    # Count initial number of variants
    START_COUNT = count_variants(INPUT_VCF)
    print(f"Total variants before filtering: {START_COUNT}")
    
    # Step 1: Filter out low-quality variants (QUAL < 30)
    subprocess.run(f"bcftools filter -e 'QUAL < 30' {INPUT_VCF} -o {QUAL_FILTERED_VCF}", shell=True)
    QUAL_FILTERED_COUNT = count_variants(QUAL_FILTERED_VCF)
    print(f"Stage 1: QUAL filtering: {START_COUNT - QUAL_FILTERED_COUNT} Variants removed and {QUAL_FILTERED_COUNT} variants left")

    # Step 2: Filter variants based on per-sample depth (FORMAT/DP < 10 or > 150)
    subprocess.run(f"bcftools view -i 'FMT/DP >= 30 & FMT/DP <= 150' {QUAL_FILTERED_VCF} -o {DP_FILTERED_VCF}", shell=True)
    DP_FILTERED_COUNT = count_variants(DP_FILTERED_VCF)
    print(f"Stage 2: FORMAT/DP filtering, DP >= 30 & DP <= 150: {QUAL_FILTERED_COUNT - DP_FILTERED_COUNT} Variants removed and {DP_FILTERED_COUNT} variants left")

    # Step 3: Retain SNPs and indels (Remove other variant types if any)
    subprocess.run(f"bcftools view -v snps,indels {DP_FILTERED_VCF} -o {SNP_FILTERED_VCF}", shell=True)
    SNP_FILTERED_COUNT = count_variants(SNP_FILTERED_VCF)
    print(f"Stage 3: After keeping SNPs and indels: {DP_FILTERED_COUNT - SNP_FILTERED_COUNT} Variants removed and {SNP_FILTERED_COUNT} variants left")

    # Rename final output
    os.rename(SNP_FILTERED_VCF, FINAL_VCF)
    FINAL_COUNT = count_variants(FINAL_VCF)


In [None]:
#| export

def read_vcf(vcf_file):
    """Reads a VCF file, automatically detecting the header and using correct column names."""
    
    # Find the header line dynamically
    with open(vcf_file, "r") as f:
        for line in f:
            if line.startswith("#CHROM"):
                header = line.strip().split("\t")  # Extract column names
                break  # Stop searching after finding header
    
    # Read VCF using Pandas, skipping comment lines
    df = pd.read_csv(vcf_file, sep="\t", comment="#", header=None, names=header)
    return df


In [None]:
#| export

def find_index(format_value, field):
    """Finds the index of a specific field (e.g., RO, AO, DP) in the FORMAT column."""
    item_list = format_value.split(':')
    return item_list.index(field) if field in item_list else None

def expand_multiallelic_variants(df_vcf):
    """
    Extracts allele counts (RO, AO, DP) for each sample from the VCF DataFrame
    and expands multi-allelic variants into separate rows.
    """

    # Extract sample names (everything after FORMAT column)
    sample_names = df_vcf.columns[9:]  # Skip CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO, FORMAT

    # Find the index positions of RO, AO, and DP in the FORMAT field
    format_example = df_vcf['FORMAT'].values[0]  # Take the first row as an example
    ro_index = find_index(format_example, 'RO')
    ao_index = find_index(format_example, 'AO')
    dp_index = find_index(format_example, 'DP')

    # Ensure indices exist
    if None in [ro_index, ao_index, dp_index]:
        raise ValueError("RO, AO, or DP field not found in FORMAT column.")

    # Initialize an empty list to store expanded rows
    expanded_rows = []

    for _, row in df_vcf.iterrows():
        # Split ALT alleles (multi-allelic sites will have multiple ALT values)
        alt_alleles = row['ALT'].split(',')

        # Process each ALT allele separately
        for i, alt in enumerate(alt_alleles):
            new_row = {
                "#CHROM": row["#CHROM"],
                "POS": row["POS"],
                "REF": row["REF"],
                "ALT": alt  # Assign each alternate allele to a separate row
            }

            for sample in sample_names:
                # Split FORMAT fields for the sample
                sample_values = row[sample].split(':')
                
                # Extract and store RO and DP
                new_row[f"RO_{sample}"] = int(sample_values[ro_index]) if sample_values[ro_index] != '.' else 0
                new_row[f"DP_{sample}"] = int(sample_values[dp_index]) if sample_values[dp_index] != '.' else 0

                # Extract AO for the specific ALT allele
                ao_values = sample_values[ao_index].split(',')  # Multiple values for multiple ALT alleles
                new_row[f"AO_{sample}"] = int(ao_values[i]) if i < len(ao_values) and ao_values[i] != '.' else 0

            # Append expanded row
            expanded_rows.append(new_row)

    # Convert list of dictionaries into DataFrame
    expanded_df = pd.DataFrame(expanded_rows)

    return expanded_df

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()