# Variant Calling Analysis (Improved)
Based on [Circulating tumor DNA sequencing in colorectal cancer patients treated with first-line chemotherapy with anti-EGFR](https://www.nature.com/articles/s41598-021-95345-4).
___

Analysis by: Luis Aguilera. December 2, 2025
_____

## Project Description:
This pipeline analyzes circulating tumor DNA sequencing data to track KRAS mutations in colorectal cancer patients during anti-EGFR treatment.
This code aims to identify variant allele frequency changes that serve as biomarkers for monitoring treatment response and tumor evolution.

**Improvements in this version:**
- **Quality Control**: FastQC for raw read assessment.
- **Trimming**: fastp for adapter and quality trimming.
- **Somatic Calling**: Lofreq for high-sensitivity detection of low-frequency variants (ctDNA).
- **Annotation**: SnpEff for functional annotation of variants.
_____

### Import libraries
___

In [None]:
# importing libraries
import subprocess
from pathlib import Path
import pandas as pd
import numpy as np
import re
from pysradb import SRAweb
import requests
import pysam
import matplotlib.pyplot as plt
from dna_features_viewer import GraphicFeature, GraphicRecord
from matplotlib.lines import Line2D

### Paths to data
___

In [None]:
# current working directory
cwd = Path.cwd()
print("Current working directory:", cwd)
# Data directory
data_dir = cwd / "data"
data_dir.mkdir(parents=True, exist_ok=True)
# Genome reference directory
genome_ref_dir = data_dir / "genome_reference"
genome_ref_dir.mkdir(parents=True, exist_ok=True)
# QC directory
qc_dir = data_dir / "qc"
qc_dir.mkdir(parents=True, exist_ok=True)

In [None]:
# Utility function to run shell commands
def run_cmd(cmd):
    subprocess.run(cmd, check=True)

### Downloading the reference genome (GRCh38)
___

In [None]:
def download_reference_genome(fasta_url, genome_ref_directory):
    """
    Download and decompress human reference genome.
    Input: fasta_url (str), genome_ref_directory (Path)
    Output: Path to decompressed FASTA file
    """
    # get the name from the URL
    gz_filename = Path(fasta_url).name
    fa_filename = gz_filename.replace(".gz", "")
    gz_path = genome_ref_directory / gz_filename # Path to compressed file
    fa_path = genome_ref_directory / fa_filename # Path to decompressed FASTA file
    if not fa_path.exists():
        if not gz_path.exists():
            # Download the gzipped fasta file
            run_cmd(["wget", "-O", str(gz_path), fasta_url])
        # Decompress the fasta file
        run_cmd(["gunzip", "-k", str(gz_path)])
    else:
        print("Reference genome already exists.")
    return fa_path

# Download reference genome
human_genome_fasta_url = ("https://ftp.ensembl.org/pub/release-109/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.primary_assembly.fa.gz")
fa_ref_genome_path = download_reference_genome(human_genome_fasta_url, genome_ref_dir)
fa_ref_genome_path

### Get metadata for bioproject: PRJNA714799
___

In [None]:
# Download SRA metadata for project PRJNA714799
metadata_file_path = data_dir.joinpath("SRA_metadata_PRJNA714799_full.csv")
if metadata_file_path.exists():
    metadata = pd.read_csv(metadata_file_path)
else:
    db = SRAweb()
    metadata = db.sra_metadata("PRJNA714799", detailed=True)
    metadata.to_csv(metadata_file_path, index=False)
metadata.head()

In [None]:
# Extract sample type and ID from 'tissue' column
metadata['sample_type'] = metadata['tissue'].str.extract(r'^([A-Za-z]+)_')[0]
metadata['sample_id'] = metadata['tissue'].str.extract(r'_(\d+)$')[0].astype(int)
print(f"{metadata['sample_type'].value_counts().to_string()}\n")
print(f"Unique patient IDs: {metadata['sample_id'].nunique()}\n")
print(f"{metadata['isolate'].value_counts().to_string()}")

### Extracting patient_id and timepoints from metadata
___

The SRA metadata use a consistent `run_alias` pattern:

- PBMC samples: `PBMC_<ID>.final.bam`  
  patient_id = <ID>, sample_type = PBMC, no timepoint

- FFPE tumor samples: `FFPE_<ID>.final.bam`  
  patient_id = <ID>, sample_type = Tissue, no timepoint

- ctDNA samples: `<ID>.final.bam` or `<ID>-<N>.final.bam`  
  patient_id = <ID>, sample_type = ctDNA, timepoint 0 (no suffix) or timepoint N (suffix -N)

In [None]:
def extract_patient_info_from_alias(run_alias):
    """
    Parse run_alias to extract patient_id, timepoint, and sample_type.
    Input: run_alias (str)
    Output: dict with keys patient_id, timepoint, sample_type
    """
    if pd.isna(run_alias):
        return {"patient_id": None, "timepoint": None, "sample_type": None}
    if run_alias.startswith("PBMC_"):
        patient_id = run_alias.replace("PBMC_", "").replace(".final.bam", "")
        return {"patient_id": patient_id, "timepoint": None, "sample_type": "PBMC"}
    if run_alias.startswith("FFPE_"):
        patient_id = run_alias.replace("FFPE_", "").replace(".final.bam", "")
        return {"patient_id": patient_id, "timepoint": None, "sample_type": "Tissue"}
    ctdna_pattern = re.match(r'^(CTC\d+|C_fw\d+)(?:-(\d+))?\.final\.bam$', run_alias)
    if ctdna_pattern:
        patient_id = ctdna_pattern.group(1)
        timepoint = 0 if ctdna_pattern.group(2) is None else int(ctdna_pattern.group(2))
        return {"patient_id": patient_id, "timepoint": timepoint, "sample_type": "ctDNA"}
    return {"patient_id": None, "timepoint": None, "sample_type": None}

def add_patient_info_to_metadata(input_metadata):
    """
    Add patient_id, timepoint, and sample_type columns to metadata.
    Input: input_metadata (pd.DataFrame)
    Output: pd.DataFrame with added patient information columns
    """
    output_metadata = input_metadata.copy()
    parsed_info = output_metadata['run_alias'].apply(extract_patient_info_from_alias)
    parsed_df = pd.DataFrame(parsed_info.tolist())
    columns_to_drop = ["patient_id", "timepoint", "sample_type"]
    existing_columns = [col for col in columns_to_drop if col in output_metadata.columns]
    if existing_columns:
        output_metadata = output_metadata.drop(columns=existing_columns)
    output_metadata = pd.concat([output_metadata, parsed_df], axis=1)
    return output_metadata
# Parse metadata and add patient information
metadata = add_patient_info_to_metadata(metadata)
metadata.head()

In [None]:
def filter_ctdna_samples(metadata_dataframe):
    """
    Filter metadata to only ctDNA samples.
    Input: metadata_dataframe (pd.DataFrame)
    Output: pd.DataFrame with ctDNA samples and integer timepoints
    """
    ctdna_samples = metadata_dataframe[metadata_dataframe["sample_type"] == "ctDNA"].copy()
    ctdna_samples["timepoint"] = ctdna_samples["timepoint"].astype(int)
    return ctdna_samples
# Filter to ctDNA samples only
ctdna = filter_ctdna_samples(metadata)

In [None]:
def find_eligible_patients(ctdna_dataframe):
    """
    Find patients with at least 3 timepoints including baseline (0).
    Input: ctdna_dataframe (pd.DataFrame)
    Output: pd.DataFrame with eligible patients and their timepoints
    """
    eligible_patients_list = []
    for patient_id, patient_group in ctdna_dataframe.groupby("patient_id"):
        timepoints = sorted(patient_group["timepoint"].dropna().unique())
        timepoints = [int(tp) for tp in timepoints]
        has_baseline = 0 in timepoints
        has_followup = any(tp > 0 for tp in timepoints)
        has_three_timepoints = len(timepoints) >= 3
        if has_baseline and has_followup and has_three_timepoints:
            eligible_patients_list.append((patient_id, timepoints))
    
    eligible_df = pd.DataFrame(eligible_patients_list, columns=["patient_id", "timepoints"])
    eligible_df["n_timepoints"] = eligible_df["timepoints"].apply(len)
    return eligible_df
# Find eligible patients
eligible_df = find_eligible_patients(ctdna)
eligible_df.sort_values("n_timepoints", ascending=True).head(10)

In [None]:
def annotate_clinical_timepoints(patient_dataframe):
    """
    Annotate timepoints with clinical phase labels.
    Input: patient_dataframe (pd.DataFrame)
    Output: pd.DataFrame with added tp_label column
    """
    output_df = patient_dataframe.copy()
    output_df["timepoint"] = output_df["timepoint"].astype(int)
    unique_timepoints = sorted(output_df["timepoint"].unique())
    baseline_timepoint = 0
    first_followup_timepoint = min(tp for tp in unique_timepoints if tp > 0)
    last_timepoint = max(tp for tp in unique_timepoints if tp > 0)
    def get_clinical_label(timepoint_value):
        if timepoint_value == baseline_timepoint:
            return "pre_treatment"
        elif timepoint_value == first_followup_timepoint:
            return "during_treatment"
        elif timepoint_value == last_timepoint:
            return "post_treatment"
        return None
    output_df["tp_label"] = output_df["timepoint"].apply(get_clinical_label)
    return output_df.sort_values("timepoint")
# Select and annotate one patient
selected_patient_id = "CTC030"
patient_data = ctdna[ctdna["patient_id"] == selected_patient_id]
patient_data_annotated = annotate_clinical_timepoints(patient_data)
# Display results
patient_data_annotated[["run_accession", "run_alias", "timepoint", "tp_label"]]

In [None]:
def download_single_fastq_file(download_url, output_file_path):
    """
    Download a FASTQ file if it doesn't already exist.
    Input: download_url (str), output_file_path (Path)
    Output: bool (True if downloaded, False if skipped)
    """
    if output_file_path.exists():
        print(f"Skipping {output_file_path.name} (already exists)")
        return False
    print(f"Downloading {output_file_path.name}...")
    run_cmd(["wget", "-O", str(output_file_path), download_url])
    return True

def download_fastq_files_for_patient(patient_annotated_dataframe, patient_identifier, 
                                      data_directory, timepoints_to_include=None):
    """
    Download paired FASTQ files (R1 and R2) for a patient.
    Input: patient_annotated_dataframe (pd.DataFrame), patient_identifier (str), 
           data_directory (Path), timepoints_to_include (list, optional)
    Output: Path (directory containing downloaded files)
    """
    if timepoints_to_include is None:
        timepoints_to_include = ["pre_treatment", "during_treatment", "post_treatment"]
    patient_directory = data_directory / patient_identifier
    patient_directory.mkdir(parents=True, exist_ok=True)
    for _, sample_row in patient_annotated_dataframe.iterrows():
        timepoint_label = sample_row["tp_label"]
        if timepoint_label not in timepoints_to_include:
            continue
        r1_url = sample_row["ena_fastq_http_1"]
        r2_url = sample_row["ena_fastq_http_2"]
        if pd.notna(r1_url) and r1_url != "":
            r1_file_path = patient_directory / f"{patient_identifier}_{timepoint_label}_R1.fastq.gz"
            download_single_fastq_file(r1_url, r1_file_path)
        if pd.notna(r2_url) and r2_url != "":
            r2_file_path = patient_directory / f"{patient_identifier}_{timepoint_label}_R2.fastq.gz"
            download_single_fastq_file(r2_url, r2_file_path)
    return patient_directory

# Download FASTQ files for the selected patient
patient_data_directory = download_fastq_files_for_patient(
    patient_annotated_dataframe=patient_data_annotated,
    patient_identifier=selected_patient_id,
    data_directory=data_dir
)

In [None]:
def organize_fastq_files_by_timepoint(patient_directory, patient_identifier):
    """
    Organize FASTQ files into nested dictionary by timepoint.
    Input: patient_directory (Path), patient_identifier (str)
    Output: dict with structure {timepoint: {'R1': Path, 'R2': Path}}
    """
    organized_fastq_paths = {}
    filename_pattern = re.compile(
        rf"^{re.escape(patient_identifier)}_(.+)_R([12])\.fastq\.gz$"
    )
    for file_path in patient_directory.iterdir():
        if not file_path.name.endswith(".fastq.gz"):
            continue
        match = filename_pattern.match(file_path.name)
        if match is None:
            continue
        timepoint_label = match.group(1)
        read_number = match.group(2)
        if timepoint_label not in organized_fastq_paths:
            organized_fastq_paths[timepoint_label] = {}
        organized_fastq_paths[timepoint_label][f"R{read_number}"] = file_path
    return organized_fastq_paths

# Organize FASTQ files for alignment
fastq_paths = organize_fastq_files_by_timepoint(
    patient_directory=patient_data_directory,
    patient_identifier=selected_patient_id
)
fastq_paths

### Quality Control and Trimming
___

In [None]:
def run_fastqc(fastq_files, qc_directory):
    """
    Run FastQC on a list of FASTQ files.
    Input: fastq_files (list of Path), qc_directory (Path)
    Output: None
    """
    qc_directory.mkdir(parents=True, exist_ok=True)
    command = ["fastqc", "-t", "4", "-o", str(qc_directory)] + [str(f) for f in fastq_files]
    print("Running FastQC...")
    run_cmd(command)

def trim_reads_with_fastp(r1_path, r2_path, output_r1, output_r2, qc_directory, sample_name):
    """
    Trim adapters and low-quality bases using fastp.
    Input: r1_path, r2_path, output_r1, output_r2 (Path), qc_directory (Path), sample_name (str)
    Output: None
    """
    if output_r1.exists() and output_r2.exists():
        print(f"Skipping trimming for {sample_name} (already trimmed)")
        return
    
    print(f"Trimming {sample_name} with fastp...")
    html_report = qc_directory / f"{sample_name}_fastp.html"
    json_report = qc_directory / f"{sample_name}_fastp.json"
    
    command = [
        "fastp", 
        "-i", str(r1_path), "-I", str(r2_path),
        "-o", str(output_r1), "-O", str(output_r2),
        "-h", str(html_report), "-j", str(json_report),
        "--detect_adapter_for_pe"
    ]
    run_cmd(command)

# Run QC and Trimming
patient_qc_dir = qc_dir / selected_patient_id
trimmed_fastq_paths = {}

for timepoint_label, files in fastq_paths.items():
    # Run FastQC on raw data
    run_fastqc([files['R1'], files['R2']], patient_qc_dir)
    
    # Trim reads
    trimmed_r1 = files['R1'].with_name(files['R1'].name.replace(".fastq.gz", ".trimmed.fastq.gz"))
    trimmed_r2 = files['R2'].with_name(files['R2'].name.replace(".fastq.gz", ".trimmed.fastq.gz"))
    
    trim_reads_with_fastp(
        files['R1'], files['R2'], 
        trimmed_r1, trimmed_r2, 
        patient_qc_dir, f"{selected_patient_id}_{timepoint_label}"
    )
    
    trimmed_fastq_paths[timepoint_label] = {'R1': trimmed_r1, 'R2': trimmed_r2}
    
trimmed_fastq_paths

### Perform alignment
___

In [None]:
def build_bwa_index(reference_fasta_path):
    """
    Build BWA index for reference genome.
    Input: reference_fasta_path (Path or str)
    Output: None
    """
    reference_fasta_path = Path(reference_fasta_path)
    index_extensions = [".amb", ".ann", ".bwt", ".pac", ".sa"]
    index_file_paths = [
        reference_fasta_path.with_suffix(reference_fasta_path.suffix + ext)
        for ext in index_extensions
    ]
    if all(index_file.exists() for index_file in index_file_paths):
        print("BWA index already exists.")
        return
    print("Building BWA index...")
    run_cmd(["bwa", "index", str(reference_fasta_path)])

def align_reads_to_reference(sample_label, forward_reads_path, reverse_reads_path,
                              reference_fasta_path, output_directory, patient_identifier):
    """
    Align paired-end reads to reference genome using BWA-MEM.
    Input: sample_label (str), forward_reads_path (Path), reverse_reads_path (Path),
           reference_fasta_path (Path), output_directory (Path), patient_identifier (str)
    Output: Path to sorted BAM file
    """
    output_directory = Path(output_directory)
    output_directory.mkdir(parents=True, exist_ok=True)
    sorted_bam_path = output_directory / f"{patient_identifier}_{sample_label}.sorted.bam"
    bam_index_path = sorted_bam_path.with_suffix(".bam.bai")
    if sorted_bam_path.exists() and bam_index_path.exists():
        print(f"Skipping {sorted_bam_path.name} (already aligned)")
        return sorted_bam_path
    print(f"Aligning {sample_label}...")
    bwa_mem_command = [
        "bwa", "mem", "-t", "10",
        str(reference_fasta_path),
        str(forward_reads_path),
        str(reverse_reads_path)
    ]
    process_bwa = subprocess.Popen(bwa_mem_command, stdout=subprocess.PIPE)
    process_view = subprocess.Popen(
        ["samtools", "view", "-b", "-@", "4"],
        stdin=process_bwa.stdout,
        stdout=subprocess.PIPE
    )
    process_sort = subprocess.Popen(
        ["samtools", "sort", "-@", "4", "-o", str(sorted_bam_path)],
        stdin=process_view.stdout
    )
    process_sort.communicate()
    process_bwa.stdout.close()
    process_view.stdout.close()
    run_cmd(["samtools", "index", "-@", "4", str(sorted_bam_path)])
    return sorted_bam_path

# Build BWA index for reference genome
build_bwa_index(fa_ref_genome_path)
# Align all samples using TRIMMED reads
bam_output_directory = data_dir / "bam" / selected_patient_id
aligned_bam_paths = {}
for timepoint_label, fastq_file_paths in trimmed_fastq_paths.items():
    aligned_bam_paths[timepoint_label] = align_reads_to_reference(
        sample_label=timepoint_label,
        forward_reads_path=fastq_file_paths["R1"],
        reverse_reads_path=fastq_file_paths["R2"],
        reference_fasta_path=fa_ref_genome_path,
        output_directory=bam_output_directory,
        patient_identifier=selected_patient_id
    )
aligned_bam_paths

### Duplicate removal
___

In [None]:
def mark_duplicate_reads(input_bam_path, output_bam_path):
    """
    Mark duplicate reads in BAM file using samtools.
    Input: input_bam_path (Path), output_bam_path (Path)
    Output: Path to deduplicated BAM file
    """
    output_bam_path = Path(output_bam_path)
    output_bam_index = output_bam_path.with_suffix(".bam.bai")
    if output_bam_path.exists() and output_bam_index.exists():
        print(f"Skipping {output_bam_path.name} (already marked)")
        return output_bam_path
    print(f"Marking duplicates for {input_bam_path.name}...")
    name_sorted_bam = str(input_bam_path).replace(".bam", ".name_sorted.bam")
    fixmate_bam = str(input_bam_path).replace(".bam", ".fixmate.bam")
    run_cmd(["samtools", "sort", "-n", "-@", "4", "-o", name_sorted_bam, str(input_bam_path)])
    run_cmd(["samtools", "fixmate", "-m", "-@", "4", name_sorted_bam, fixmate_bam])
    run_cmd(["samtools", "sort", "-@", "4", "-o", str(output_bam_path), fixmate_bam])
    run_cmd(["samtools", "markdup", "-@", "4", str(output_bam_path), str(output_bam_path) + ".tmp"])
    run_cmd(["mv", str(output_bam_path) + ".tmp", str(output_bam_path)])
    run_cmd(["samtools", "index", "-@", "4", str(output_bam_path)])
    Path(name_sorted_bam).unlink(missing_ok=True)
    Path(fixmate_bam).unlink(missing_ok=True)
    return output_bam_path

deduplicated_bam_paths = {}
for timepoint_label, bam_path in aligned_bam_paths.items():
    output_bam = bam_path.with_name(bam_path.stem + ".markdup.bam")
    deduplicated_bam_paths[timepoint_label] = mark_duplicate_reads(bam_path, output_bam)
deduplicated_bam_paths

### Gene Selection
___

In [None]:
def get_gene_coordinates_from_ensembl(gene_name):
    """
    Retrieve genomic coordinates for a gene from Ensembl REST API.
    Input: gene_name (str)
    Output: str (genomic region in format chromosome:start-end)
    """
    ensembl_api_url = f"https://rest.ensembl.org/lookup/symbol/homo_sapiens/{gene_name}"
    api_response = requests.get(ensembl_api_url, headers={"Content-Type": "application/json"})
    if not api_response.ok:
        raise ValueError(f"Gene '{gene_name}' not found")
    gene_data = api_response.json()
    genomic_region = f"{gene_data['seq_region_name']}:{gene_data['start']}-{gene_data['end']}"
    print(f"{gene_name}: {genomic_region}")
    return genomic_region

kras_genomic_region = get_gene_coordinates_from_ensembl("KRAS")

### Somatic Variant Calling (Lofreq)
___
Replacing `bcftools` with `Lofreq`, a sensitive somatic variant caller optimized for detecting low-frequency variants in ctDNA.

In [None]:
def call_variants_with_lofreq(bam_file_path, reference_fasta_path, genomic_region, output_vcf_path):
    """
    Call somatic variants using Lofreq.
    Input: bam_file_path (Path), reference_fasta_path (Path), genomic_region (str), output_vcf_path (Path)
    Output: None
    """
    # Lofreq requires indel qualities to be inserted into the BAM file
    bam_indel_path = bam_file_path.with_suffix(".indel.bam")
    
    if not bam_indel_path.exists():
        print(f"Adding indel qualities to {bam_file_path.name}...")
        run_cmd(["lofreq", "indelqual", "--dindel", "-f", str(reference_fasta_path), "-o", str(bam_indel_path), str(bam_file_path)])
        run_cmd(["samtools", "index", str(bam_indel_path)])
    
    if output_vcf_path.exists():
        print(f"Skipping variant calling for {output_vcf_path.name} (already exists)")
        return

    print(f"Calling variants with Lofreq for {bam_file_path.name}...")
    run_cmd([
        "lofreq", "call",
        "-f", str(reference_fasta_path),
        "-r", genomic_region,
        "-o", str(output_vcf_path),
        "--call-indels",
        str(bam_indel_path)
    ])

vcf_output_directory = data_dir / "vcf" / selected_patient_id
vcf_output_directory.mkdir(parents=True, exist_ok=True)
kras_vcf_file_paths = {}

for timepoint_label, bam_file_path in deduplicated_bam_paths.items():
    output_vcf_file = vcf_output_directory / f"{selected_patient_id}_{timepoint_label}_KRAS.lofreq.vcf"
    call_variants_with_lofreq(
        bam_file_path=bam_file_path,
        reference_fasta_path=fa_ref_genome_path,
        genomic_region=kras_genomic_region,
        output_vcf_path=output_vcf_file
    )
    kras_vcf_file_paths[timepoint_label] = output_vcf_file
kras_vcf_file_paths

### Variant Annotation (SnpEff)
___
Annotating variants with functional information using SnpEff.

In [None]:
def annotate_variants_with_snpeff(input_vcf_path, output_vcf_path, snpeff_db="GRCh38.86"):
    """
    Annotate VCF file using SnpEff, then compress and index with pysam.
    Input: input_vcf_path (Path), output_vcf_path (Path), snpeff_db (str)
    Output: Path to the compressed and indexed VCF file (.vcf.gz)
    """
    gz_output_path = output_vcf_path.with_suffix(".vcf.gz")
    
    if gz_output_path.exists():
        if gz_output_path.stat().st_size > 0:
            print(f"Skipping annotation for {gz_output_path.name} (already exists)")
            return gz_output_path
        else:
            print(f"Found empty file {gz_output_path.name}, re-running annotation...")
    
    print(f"Annotating {input_vcf_path.name}...")
    
    # Set Java heap size to 10GB to avoid OutOfMemoryError
    import os
    os.environ["_JAVA_OPTIONS"] = "-Xmx10g"
    
    # Run SnpEff to uncompressed file
    with open(output_vcf_path, "w") as f:
        subprocess.run(["snpEff", "-v", snpeff_db, str(input_vcf_path)], stdout=f, check=True)
    
    # Compress and index with pysam
    print(f"Indexing {output_vcf_path.name}...")
    pysam.tabix_index(str(output_vcf_path), preset="vcf", force=True)
    
    # Clean up uncompressed file if it exists (pysam might keep it or not depending on version/flags, usually keeps input)
    if output_vcf_path.exists():
        output_vcf_path.unlink()
        
    return gz_output_path

annotated_vcf_paths = {}
for timepoint_label, vcf_path in kras_vcf_file_paths.items():
    output_ann_vcf = vcf_path.with_name(vcf_path.stem + ".ann.vcf")
    # annotate_variants_with_snpeff returns the .vcf.gz path
    annotated_vcf_paths[timepoint_label] = annotate_variants_with_snpeff(vcf_path, output_ann_vcf)
annotated_vcf_paths

### Gene Visualization
___

In [None]:
from plots_sequences import plot_protein_mutations, plot_gene_and_variants, get_protein_features

# gff_path is optional. If set to None, the script will automatically fetch gene coordinates from Ensembl.
gff_path = None
# If you have a local GFF file, you can provide it here:
# gff_path = "data/reference/Homo_sapiens.GRCh38.113.chromosome.12.gff3.gz"

plot_gene_and_variants("KRAS", annotated_vcf_paths, fa_ref_genome_path, gff_path=gff_path, genomic_region=kras_genomic_region)


In [None]:
# Visualize Protein Mutations

features, length = get_protein_features("KRAS")
img = plot_protein_mutations("KRAS", annotated_vcf_paths, protein_features=features, protein_length=length)
