# VCF File Parser (cyvcf2 Implementation)

This notebook implements a VCF parser using the **cyvcf2** library, a fast Cython wrapper around htslib.

## Advantages over Manual Parsing

| Feature | Manual Parser | cyvcf2 Parser |
|---------|---------------|---------------|
| Speed | Slower (pure Python) | Fast (C/Cython) |
| Dependencies | Standard library + pandas | cyvcf2 + numpy |
| Compressed files | gzip module | Native support (including bgzip) |
| Index support | None | Supports .tbi/.csi indexes |
| Memory | Loads all to DataFrame | Can iterate lazily |
| Random access | Not possible | Query specific regions instantly |

## Features
- Parse standard VCF files using cyvcf2
- Handle gzip/bgzip-compressed VCF files (.vcf.gz)
- **Tabix indexing for fast random access queries**
- Exception handling for robust error management
- Unit tests for validation
- Visualizations of variant data
- Performance comparison with manual parser

In [None]:
import os
import unittest
import tempfile
import time
from typing import Dict, List, Optional

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# cyvcf2 for fast VCF parsing
from cyvcf2 import VCF

# Set style for visualizations
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (10, 6)

## Custom Exceptions

In [None]:
class VCFError(Exception):
    """Base exception for VCF parsing errors."""
    pass


class VCFFileNotFoundError(VCFError):
    """Raised when the VCF file cannot be found."""
    pass


class VCFFormatError(VCFError):
    """Raised when the VCF file format is invalid."""
    pass

## CyVCF2 Parser Class

In [None]:
class CyVCF2Parser:
    """
    A fast VCF parser using the cyvcf2 library.
    
    cyvcf2 is a Cython wrapper around htslib, providing significantly
    better performance than pure Python parsing for large VCF files.
    
    Attributes:
        file_path: Path to the VCF file
        samples: List of sample names
        variants: DataFrame containing variant records
    """
    
    def __init__(self, file_path: Optional[str] = None):
        """Initialize the cyvcf2 parser."""
        self.file_path = file_path
        self.samples: List[str] = []
        self.variants: pd.DataFrame = pd.DataFrame()
        self._vcf_reader: Optional[VCF] = None
        
        if file_path:
            self.parse(file_path)
    
    def _format_info_value(self, value):
        """Format an INFO field value for string representation."""
        if value is True:
            return None  # Flag field, no value needed
        elif isinstance(value, (tuple, list)):
            # Handle array values (e.g., AF with multiple alleles)
            return ','.join(str(v) for v in value)
        else:
            return str(value)
    
    def parse(self, file_path: str) -> 'CyVCF2Parser':
        """Parse a VCF file and store the data."""
        self.file_path = file_path
        
        # Check if file exists
        if not os.path.exists(file_path):
            raise VCFFileNotFoundError(f"VCF file not found: {file_path}")
        
        try:
            # Open VCF file with cyvcf2
            self._vcf_reader = VCF(file_path)
            self.samples = list(self._vcf_reader.samples)
            
            # Extract variants into a list of dictionaries
            variants_data = []
            
            for variant in self._vcf_reader:
                # Build variant record
                record = {
                    'CHROM': variant.CHROM,
                    'POS': variant.POS,
                    'ID': variant.ID if variant.ID else '.',
                    'REF': variant.REF,
                    'ALT': ','.join(str(a) for a in variant.ALT) if variant.ALT else '.',
                    'QUAL': variant.QUAL if variant.QUAL is not None else np.nan,
                    'FILTER': variant.FILTER if variant.FILTER else 'PASS',
                }
                
                # Extract INFO fields, properly formatting array values
                info_parts = []
                for k, v in variant.INFO:
                    formatted = self._format_info_value(v)
                    if formatted is None:
                        info_parts.append(k)  # Flag field
                    else:
                        info_parts.append(f"{k}={formatted}")
                record['INFO'] = ';'.join(info_parts)
                
                variants_data.append(record)
            
            # Create DataFrame
            if variants_data:
                self.variants = pd.DataFrame(variants_data)
            else:
                self.variants = pd.DataFrame(
                    columns=['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO']
                )
            
            # Reopen for potential future operations
            self._vcf_reader = VCF(file_path)
            
        except Exception as e:
            if "VCFFileNotFoundError" in str(type(e)):
                raise
            raise VCFFormatError(f"Error parsing VCF file: {e}")
        
        return self
    
    def parse_string(self, vcf_string: str) -> 'CyVCF2Parser':
        """Parse VCF data from a string (useful for testing)."""
        with tempfile.NamedTemporaryFile(mode='w', suffix='.vcf', delete=False) as f:
            f.write(vcf_string)
            temp_path = f.name
        
        try:
            self.parse(temp_path)
        finally:
            os.unlink(temp_path)
        
        return self
    
    def get_variant_count(self) -> int:
        """Return the total number of variants."""
        return len(self.variants)
    
    def get_chromosomes(self) -> List[str]:
        """Return a list of unique chromosomes."""
        if self.variants.empty:
            return []
        return self.variants['CHROM'].unique().tolist()
    
    def get_variants_by_chromosome(self, chrom: str) -> pd.DataFrame:
        """Return variants for a specific chromosome."""
        return self.variants[self.variants['CHROM'] == chrom].copy()
    
    def get_variant_types(self) -> pd.Series:
        """Classify variants by type (SNP, insertion, deletion, etc.)."""
        def classify(row):
            ref = row['REF']
            alt = row['ALT']
            
            # Handle multiple alternates
            alts = alt.split(',')
            
            types = []
            for a in alts:
                if a == '.':
                    types.append('NO_VARIATION')
                elif len(ref) == 1 and len(a) == 1:
                    types.append('SNP')
                elif len(ref) > len(a):
                    types.append('DELETION')
                elif len(ref) < len(a):
                    types.append('INSERTION')
                else:
                    types.append('MNP')  # Multi-nucleotide polymorphism
            
            return ','.join(types)
        
        if self.variants.empty:
            return pd.Series(dtype=str)
        
        return self.variants.apply(classify, axis=1)
    
    def get_info_field(self, field: str) -> pd.Series:
        """Extract a specific field from the INFO column."""
        def extract(info_str):
            if pd.isna(info_str) or info_str == '.':
                return None
            
            for item in info_str.split(';'):
                if '=' in item:
                    key, value = item.split('=', 1)
                    if key == field:
                        return value
                elif item == field:
                    return True  # Flag field
            return None
        
        if self.variants.empty:
            return pd.Series(dtype=object)
        
        return self.variants['INFO'].apply(extract)
    
    def summary(self) -> Dict:
        """Return a summary of the VCF file."""
        return {
            'file_path': self.file_path,
            'total_variants': self.get_variant_count(),
            'chromosomes': self.get_chromosomes(),
            'num_chromosomes': len(self.get_chromosomes()),
            'samples': self.samples,
            'num_samples': len(self.samples),
        }

## Unit Tests

In [None]:
class TestCyVCF2Parser(unittest.TestCase):
    """Unit tests for the cyvcf2-based VCF parser."""
    
    def setUp(self):
        """Set up test fixtures."""
        # Include ##contig lines to avoid htslib warnings
        self.valid_vcf = """##fileformat=VCFv4.2
##contig=<ID=chr1,length=1000000>
##contig=<ID=chr2,length=1000000>
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##FILTER=<ID=PASS,Description="All filters passed">
#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
chr1\t100\trs123\tA\tG\t30\tPASS\tDP=100
chr1\t200\trs456\tC\tT\t25\tPASS\tDP=50
chr2\t300\t.\tGG\tG\t20\tPASS\tDP=75
"""
        
        self.vcf_with_samples = """##fileformat=VCFv4.2
##contig=<ID=chr1,length=1000000>
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tSAMPLE1\tSAMPLE2
chr1\t100\trs123\tA\tG\t30\tPASS\tDP=100\tGT:DP\t0/1:30\t1/1:40
"""
    
    def test_parse_valid_vcf(self):
        """Test parsing a valid VCF string."""
        parser = CyVCF2Parser()
        parser.parse_string(self.valid_vcf)
        
        self.assertEqual(parser.get_variant_count(), 3)
        self.assertEqual(set(parser.get_chromosomes()), {'chr1', 'chr2'})
    
    def test_parse_samples(self):
        """Test sample extraction."""
        parser = CyVCF2Parser()
        parser.parse_string(self.vcf_with_samples)
        
        self.assertEqual(parser.samples, ['SAMPLE1', 'SAMPLE2'])
    
    def test_get_variants_by_chromosome(self):
        """Test filtering variants by chromosome."""
        parser = CyVCF2Parser()
        parser.parse_string(self.valid_vcf)
        
        chr1_variants = parser.get_variants_by_chromosome('chr1')
        self.assertEqual(len(chr1_variants), 2)
    
    def test_variant_types(self):
        """Test variant type classification."""
        parser = CyVCF2Parser()
        parser.parse_string(self.valid_vcf)
        
        types = parser.get_variant_types()
        self.assertEqual(types.iloc[0], 'SNP')  # A->G
        self.assertEqual(types.iloc[2], 'DELETION')  # GG->G
    
    def test_get_info_field(self):
        """Test INFO field extraction."""
        parser = CyVCF2Parser()
        parser.parse_string(self.valid_vcf)
        
        dp_values = parser.get_info_field('DP')
        self.assertEqual(dp_values.iloc[0], '100')
    
    def test_file_not_found_error(self):
        """Test that missing file raises error."""
        parser = CyVCF2Parser()
        with self.assertRaises(VCFFileNotFoundError):
            parser.parse('/nonexistent/path/file.vcf')
    
    def test_summary(self):
        """Test summary generation."""
        parser = CyVCF2Parser()
        parser.parse_string(self.valid_vcf)
        
        summary = parser.summary()
        self.assertEqual(summary['total_variants'], 3)
        self.assertEqual(summary['num_chromosomes'], 2)


# Run tests
print("Running unit tests...\n")
suite = unittest.TestLoader().loadTestsFromTestCase(TestCyVCF2Parser)
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)

print(f"\n{'='*50}")
print(f"Tests run: {result.testsRun}")
print(f"Failures: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")

## Create Sample VCF Files for Demonstration

In [None]:
import gzip

# Create a sample VCF file for demonstration
# Note: Including ##contig lines to avoid htslib warnings
sample_vcf_content = """##fileformat=VCFv4.2
##fileDate=20240101
##source=VCFParserDemo
##reference=GRCh38
##contig=<ID=chr1,length=248956422>
##contig=<ID=chr2,length=242193529>
##contig=<ID=chr3,length=198295559>
##contig=<ID=chr4,length=190214555>
##contig=<ID=chr5,length=181538259>
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##INFO=<ID=AF,Number=A,Type=Float,Description="Allele Frequency">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership">
##FILTER=<ID=PASS,Description="All filters passed">
##FILTER=<ID=LowQual,Description="Low quality">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tSAMPLE1\tSAMPLE2\tSAMPLE3
chr1\t10000\trs001\tA\tG\t50\tPASS\tDP=120;AF=0.25;DB\tGT:DP\t0/1:40\t0/0:35\t0/1:45
chr1\t20000\trs002\tC\tT\t45\tPASS\tDP=95;AF=0.15\tGT:DP\t0/0:30\t0/1:32\t0/0:33
chr1\t35000\trs003\tG\tA\t35\tLowQual\tDP=40;AF=0.05\tGT:DP\t0/0:12\t0/0:15\t0/1:13
chr1\t50000\trs004\tT\tC\t55\tPASS\tDP=150;AF=0.30\tGT:DP\t0/1:50\t0/1:48\t1/1:52
chr2\t15000\trs005\tAA\tA\t40\tPASS\tDP=85;AF=0.10\tGT:DP\t0/1:28\t0/0:27\t0/0:30
chr2\t30000\trs006\tG\tGT\t42\tPASS\tDP=90;AF=0.20\tGT:DP\t0/0:30\t0/1:28\t0/1:32
chr2\t45000\t.\tC\tA\t48\tPASS\tDP=110;AF=0.18\tGT:DP\t0/1:38\t0/1:35\t0/0:37
chr3\t10000\trs008\tT\tG\t52\tPASS\tDP=130;AF=0.22\tGT:DP\t0/0:42\t0/1:44\t0/1:44
chr3\t25000\trs009\tA\tC\t38\tLowQual\tDP=55;AF=0.08\tGT:DP\t0/0:18\t0/0:17\t0/1:20
chr3\t40000\trs010\tGG\tTT\t60\tPASS\tDP=145;AF=0.35;DB\tGT:DP\t0/1:48\t0/1:47\t1/1:50
chr4\t5000\trs011\tC\tG\t44\tPASS\tDP=100;AF=0.12\tGT:DP\t0/1:33\t0/0:32\t0/0:35
chr4\t20000\trs012\tT\tA\t46\tPASS\tDP=105;AF=0.16\tGT:DP\t0/0:35\t0/1:34\t0/0:36
chr4\t35000\trs013\tA\tT,G\t58\tPASS\tDP=140;AF=0.28,0.12\tGT:DP\t0/1:46\t0/2:45\t1/2:49
chr5\t12000\trs014\tG\tC\t36\tLowQual\tDP=48;AF=0.06\tGT:DP\t0/0:16\t0/0:15\t0/1:17
chr5\t28000\trs015\tC\tT\t50\tPASS\tDP=115;AF=0.24\tGT:DP\t0/1:38\t0/1:36\t0/0:41
"""

# Write sample VCF file
sample_vcf_path = 'sample.vcf'
with open(sample_vcf_path, 'w') as f:
    f.write(sample_vcf_content)

# Create compressed version
sample_vcf_gz_path = 'sample.vcf.gz'
with gzip.open(sample_vcf_gz_path, 'wt', encoding='utf-8') as f:
    f.write(sample_vcf_content)

print(f"Created sample VCF file: {sample_vcf_path}")
print(f"Created compressed VCF file: {sample_vcf_gz_path}")

## Parse the Sample VCF File

In [None]:
# Parse the uncompressed VCF file
parser = CyVCF2Parser(sample_vcf_path)

# Display summary
print("VCF File Summary (cyvcf2)")
print("="*50)
summary = parser.summary()
for key, value in summary.items():
    print(f"{key}: {value}")

In [None]:
# Test parsing compressed file
parser_gz = CyVCF2Parser(sample_vcf_gz_path)
print(f"\nCompressed file parsed successfully!")
print(f"Variants in compressed file: {parser_gz.get_variant_count()}")

In [None]:
# Display the variants DataFrame
print("\nVariant Data:")
parser.variants

In [None]:
# Add variant types to the dataframe for visualization
parser.variants['VARIANT_TYPE'] = parser.get_variant_types()
parser.variants['AF'] = parser.get_info_field('AF').apply(
    lambda x: float(x.split(',')[0]) if x and x != 'None' else np.nan
)
parser.variants['DP_INFO'] = pd.to_numeric(parser.get_info_field('DP'), errors='coerce')

parser.variants[['CHROM', 'POS', 'REF', 'ALT', 'QUAL', 'VARIANT_TYPE', 'AF', 'DP_INFO']]

## Tabix Indexing and Random Access

One of the key advantages of cyvcf2 over manual parsing is support for **tabix indexes**. 
Tabix enables fast random access to specific genomic regions without scanning the entire file.

### Requirements
- **bgzip**: Block-gzip compression (NOT regular gzip)
- **tabix**: Creates the `.tbi` index file

### How it works
1. Compress VCF with `bgzip` (creates `.vcf.gz` with block compression)
2. Index with `tabix` (creates `.vcf.gz.tbi`)
3. Query specific regions with cyvcf2: `vcf("chr1:10000-50000")`

In [None]:
import subprocess
import shutil

# Check if bgzip and tabix are available
bgzip_path = shutil.which('bgzip')
tabix_path = shutil.which('tabix')

print("Checking for required tools...")
print(f"  bgzip: {'Found at ' + bgzip_path if bgzip_path else 'NOT FOUND'}")
print(f"  tabix: {'Found at ' + tabix_path if tabix_path else 'NOT FOUND'}")

if not bgzip_path or not tabix_path:
    print("\nInstall htslib to get bgzip and tabix:")
    print("  macOS:  brew install htslib")
    print("  Ubuntu: apt-get install tabix")
    print("  conda:  conda install -c bioconda htslib")
    TABIX_AVAILABLE = False
else:
    TABIX_AVAILABLE = True
    print("\nAll tools available!")

In [None]:
if TABIX_AVAILABLE:
    # Create a bgzip-compressed VCF file with tabix index
    indexed_vcf_path = 'sample_indexed.vcf.gz'
    
    # Remove existing files if present
    for f in [indexed_vcf_path, indexed_vcf_path + '.tbi']:
        if os.path.exists(f):
            os.remove(f)
    
    # Step 1: Compress with bgzip (reads from stdin, writes to file)
    print("Step 1: Compressing VCF with bgzip...")
    with open(sample_vcf_path, 'rb') as vcf_in:
        with open(indexed_vcf_path, 'wb') as vcf_out:
            result = subprocess.run(['bgzip', '-c'], stdin=vcf_in, stdout=vcf_out)
    
    if result.returncode == 0:
        print(f"  Created: {indexed_vcf_path}")
    else:
        print(f"  Error compressing file")
    
    # Step 2: Create tabix index
    print("\nStep 2: Creating tabix index...")
    result = subprocess.run(['tabix', '-p', 'vcf', indexed_vcf_path], capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"  Created: {indexed_vcf_path}.tbi")
    else:
        print(f"  Error: {result.stderr}")
    
    # Verify files exist
    print("\nGenerated files:")
    for f in [indexed_vcf_path, indexed_vcf_path + '.tbi']:
        if os.path.exists(f):
            size = os.path.getsize(f)
            print(f"  {f}: {size} bytes")
else:
    print("Skipping bgzip/tabix - tools not available")

### Random Access Queries

With an indexed VCF file, cyvcf2 can jump directly to specific regions without reading the entire file. This is critical for large VCF files (e.g., whole genome sequencing data).

**Query syntax:**
- Single region: `vcf("chr1:10000-50000")`
- Whole chromosome: `vcf("chr1")`
- Multiple regions: `vcf("chr1:10000-20000,chr2:15000-30000")`

In [None]:
if TABIX_AVAILABLE:
    # Open the indexed VCF file
    indexed_vcf = VCF(indexed_vcf_path)
    
    print("Random Access Query Examples")
    print("=" * 60)
    
    # Example 1: Query a specific region on chr1
    print("\n1. Query chr1:10000-40000")
    print("-" * 40)
    for variant in indexed_vcf("chr1:10000-40000"):
        print(f"   {variant.CHROM}:{variant.POS} {variant.REF}>{','.join(variant.ALT)} (ID: {variant.ID})")
    
    # Reopen to reset iterator
    indexed_vcf = VCF(indexed_vcf_path)
    
    # Example 2: Query entire chromosome
    print("\n2. Query all of chr2")
    print("-" * 40)
    for variant in indexed_vcf("chr2"):
        print(f"   {variant.CHROM}:{variant.POS} {variant.REF}>{','.join(variant.ALT)} (ID: {variant.ID})")
    
    # Reopen to reset iterator
    indexed_vcf = VCF(indexed_vcf_path)
    
    # Example 3: Query multiple regions
    print("\n3. Query multiple regions (chr1:10000-25000 and chr3:20000-50000)")
    print("-" * 40)
    regions = ["chr1:10000-25000", "chr3:20000-50000"]
    for region in regions:
        indexed_vcf = VCF(indexed_vcf_path)  # Reopen for each region
        print(f"   Region: {region}")
        for variant in indexed_vcf(region):
            print(f"      {variant.CHROM}:{variant.POS} {variant.REF}>{','.join(variant.ALT)}")
else:
    print("Skipping random access demo - tabix not available")

In [None]:
if TABIX_AVAILABLE:
    def query_region_to_dataframe(vcf_path: str, region: str) -> pd.DataFrame:
        """Query a region and return results as a DataFrame."""
        vcf = VCF(vcf_path)
        variants_data = []
        
        for variant in vcf(region):
            record = {
                'CHROM': variant.CHROM,
                'POS': variant.POS,
                'ID': variant.ID if variant.ID else '.',
                'REF': variant.REF,
                'ALT': ','.join(str(a) for a in variant.ALT) if variant.ALT else '.',
                'QUAL': variant.QUAL,
                'FILTER': variant.FILTER if variant.FILTER else 'PASS',
            }
            variants_data.append(record)
        
        return pd.DataFrame(variants_data)
    
    # Query chr1 and get as DataFrame
    print("Query chr1:10000-50000 as DataFrame:")
    chr1_df = query_region_to_dataframe(indexed_vcf_path, "chr1:10000-50000")
    display(chr1_df)
else:
    print("Skipping DataFrame query demo - tabix not available")

### Random Access vs Full Scan Performance

For large VCF files, random access is dramatically faster than scanning the entire file.
The benefit scales with file size - for a 10GB VCF, a region query can be 1000x+ faster.

In [None]:
if TABIX_AVAILABLE:
    import time
    
    n_iterations = 50
    
    # Benchmark: Full file scan to find chr1 variants
    full_scan_times = []
    for _ in range(n_iterations):
        start = time.perf_counter()
        vcf = VCF(indexed_vcf_path)
        chr1_variants = [v for v in vcf if v.CHROM == "chr1"]
        full_scan_times.append(time.perf_counter() - start)
    
    # Benchmark: Random access query for chr1
    random_access_times = []
    for _ in range(n_iterations):
        start = time.perf_counter()
        vcf = VCF(indexed_vcf_path)
        chr1_variants = list(vcf("chr1"))
        random_access_times.append(time.perf_counter() - start)
    
    print("Performance: Full Scan vs Random Access")
    print("=" * 50)
    print(f"Task: Retrieve all chr1 variants ({len(chr1_variants)} variants)")
    print(f"Iterations: {n_iterations}\n")
    
    full_scan_mean = np.mean(full_scan_times) * 1000
    full_scan_std = np.std(full_scan_times) * 1000
    random_mean = np.mean(random_access_times) * 1000
    random_std = np.std(random_access_times) * 1000
    
    print(f"Full Scan:     {full_scan_mean:.3f} ms ± {full_scan_std:.3f} ms")
    print(f"Random Access: {random_mean:.3f} ms ± {random_std:.3f} ms")
    
    if random_mean < full_scan_mean:
        speedup = full_scan_mean / random_mean
        print(f"\nRandom access is {speedup:.1f}x faster")
    else:
        print(f"\n(For small files, the difference is minimal)")
    
    print("\nNote: The speedup is dramatic for large files (GB-scale VCFs)")
else:
    print("Skipping performance comparison - tabix not available")

## Visualizations

### Visualization 1: Variant Distribution by Chromosome

In [None]:
# Count variants per chromosome
chrom_counts = parser.variants['CHROM'].value_counts().sort_index()

fig, ax = plt.subplots(figsize=(10, 6))
colors = sns.color_palette("viridis", len(chrom_counts))
bars = ax.bar(chrom_counts.index, chrom_counts.values, color=colors, edgecolor='black')

# Add value labels on bars
for bar, count in zip(bars, chrom_counts.values):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.1, 
            str(count), ha='center', va='bottom', fontweight='bold')

ax.set_xlabel('Chromosome', fontsize=12)
ax.set_ylabel('Number of Variants', fontsize=12)
ax.set_title('Variant Distribution by Chromosome (cyvcf2)', fontsize=14, fontweight='bold')
ax.set_ylim(0, max(chrom_counts.values) + 1)

plt.tight_layout()
plt.savefig('viz1_variants_by_chromosome_cyvcf2.png', dpi=150, bbox_inches='tight')
plt.show()

### Visualization 2: Variant Types Distribution (Pie Chart)

In [None]:
# Count variant types
type_counts = parser.variants['VARIANT_TYPE'].value_counts()

fig, ax = plt.subplots(figsize=(8, 8))
colors = sns.color_palette("Set2", len(type_counts))
explode = [0.05] * len(type_counts)

wedges, texts, autotexts = ax.pie(
    type_counts.values, 
    labels=type_counts.index,
    autopct='%1.1f%%',
    colors=colors,
    explode=explode,
    shadow=True,
    startangle=90
)

# Style the text
for autotext in autotexts:
    autotext.set_fontweight('bold')

ax.set_title('Distribution of Variant Types (cyvcf2)', fontsize=14, fontweight='bold')

# Add legend with counts
legend_labels = [f"{label} (n={count})" for label, count in zip(type_counts.index, type_counts.values)]
ax.legend(wedges, legend_labels, title="Variant Types", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))

plt.tight_layout()
plt.savefig('viz2_variant_types_cyvcf2.png', dpi=150, bbox_inches='tight')
plt.show()

### Visualization 3: Quality Score Distribution

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Histogram
ax1 = axes[0]
qual_data = parser.variants['QUAL'].dropna()
ax1.hist(qual_data, bins=15, color='steelblue', edgecolor='black', alpha=0.7)
ax1.axvline(qual_data.mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {qual_data.mean():.1f}')
ax1.axvline(qual_data.median(), color='orange', linestyle='--', linewidth=2, label=f'Median: {qual_data.median():.1f}')
ax1.set_xlabel('Quality Score (QUAL)', fontsize=12)
ax1.set_ylabel('Frequency', fontsize=12)
ax1.set_title('Quality Score Distribution (cyvcf2)', fontsize=14, fontweight='bold')
ax1.legend()

# Box plot by filter status
ax2 = axes[1]
filter_groups = parser.variants.groupby('FILTER')['QUAL'].apply(list).to_dict()
box_data = [filter_groups.get(f, []) for f in ['PASS', 'LowQual']]
bp = ax2.boxplot(box_data, tick_labels=['PASS', 'LowQual'], patch_artist=True)

colors_box = ['lightgreen', 'lightcoral']
for patch, color in zip(bp['boxes'], colors_box):
    patch.set_facecolor(color)

ax2.set_xlabel('Filter Status', fontsize=12)
ax2.set_ylabel('Quality Score (QUAL)', fontsize=12)
ax2.set_title('Quality Score by Filter Status (cyvcf2)', fontsize=14, fontweight='bold')

plt.tight_layout()
plt.savefig('viz3_quality_distribution_cyvcf2.png', dpi=150, bbox_inches='tight')
plt.show()

### Visualization 4: Allele Frequency vs Read Depth

In [None]:
fig, ax = plt.subplots(figsize=(10, 7))

# Filter out NaN values
plot_data = parser.variants.dropna(subset=['AF', 'DP_INFO'])

# Create scatter plot colored by variant type
variant_types = plot_data['VARIANT_TYPE'].unique()
colors = sns.color_palette("husl", len(variant_types))
color_map = dict(zip(variant_types, colors))

for vtype in variant_types:
    mask = plot_data['VARIANT_TYPE'] == vtype
    subset = plot_data[mask]
    ax.scatter(
        subset['AF'], 
        subset['DP_INFO'],
        c=[color_map[vtype]],
        s=subset['QUAL'] * 3,  # Size by quality
        alpha=0.7,
        label=vtype,
        edgecolors='black',
        linewidth=0.5
    )

ax.set_xlabel('Allele Frequency (AF)', fontsize=12)
ax.set_ylabel('Read Depth (DP)', fontsize=12)
ax.set_title('Allele Frequency vs Read Depth (cyvcf2)\n(point size = quality score)', fontsize=14, fontweight='bold')
ax.legend(title='Variant Type')

plt.tight_layout()
plt.savefig('viz4_af_vs_depth_cyvcf2.png', dpi=150, bbox_inches='tight')
plt.show()

### Visualization 5: Genomic Position Density Plot

In [None]:
fig, ax = plt.subplots(figsize=(12, 6))

# Get unique chromosomes in order
chromosomes = sorted(parser.variants['CHROM'].unique())
colors = sns.color_palette("tab10", len(chromosomes))

# Create chromosome offset for visualization
chrom_offsets = {}
offset = 0
tick_positions = []
tick_labels = []

for i, chrom in enumerate(chromosomes):
    chrom_data = parser.variants[parser.variants['CHROM'] == chrom].copy()
    positions = chrom_data['POS'].values + offset
    
    # Plot variants as vertical lines
    for pos in positions:
        ax.axvline(pos, color=colors[i], alpha=0.7, linewidth=2)
    
    # Store tick position (middle of chromosome region)
    max_pos = chrom_data['POS'].max()
    tick_positions.append(offset + max_pos / 2)
    tick_labels.append(chrom)
    
    # Add separator and update offset
    offset += max_pos + 10000
    if i < len(chromosomes) - 1:
        ax.axvline(offset - 5000, color='gray', linestyle=':', alpha=0.5)

ax.set_xticks(tick_positions)
ax.set_xticklabels(tick_labels)
ax.set_xlabel('Chromosome', fontsize=12)
ax.set_ylabel('Variant Presence', fontsize=12)
ax.set_title('Variant Positions Across Chromosomes (cyvcf2)', fontsize=14, fontweight='bold')
ax.set_yticks([])

# Add legend
from matplotlib.patches import Patch
legend_elements = [Patch(facecolor=colors[i], label=chrom) for i, chrom in enumerate(chromosomes)]
ax.legend(handles=legend_elements, title='Chromosome', loc='upper right')

plt.tight_layout()
plt.savefig('viz5_genomic_positions_cyvcf2.png', dpi=150, bbox_inches='tight')
plt.show()

## Performance Comparison: Manual Parser vs cyvcf2

In [None]:
# Import the manual parser for comparison
# We'll define a simplified version here to avoid file dependencies

class ManualVCFParser:
    """Simplified manual VCF parser for performance comparison."""
    
    def __init__(self, file_path: Optional[str] = None):
        self.file_path = file_path
        self.variants: pd.DataFrame = pd.DataFrame()
        self.samples: List[str] = []
        
        if file_path:
            self.parse(file_path)
    
    def parse(self, file_path: str) -> 'ManualVCFParser':
        self.file_path = file_path
        variants_data = []
        header = []
        
        # Handle gzip
        if file_path.endswith('.gz'):
            open_func = lambda p: gzip.open(p, 'rt', encoding='utf-8')
        else:
            open_func = lambda p: open(p, 'r', encoding='utf-8')
        
        with open_func(file_path) as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('##'):
                    continue
                elif line.startswith('#'):
                    header = line.lstrip('#').split('\t')
                    if 'FORMAT' in header:
                        format_idx = header.index('FORMAT')
                        self.samples = header[format_idx + 1:]
                else:
                    variants_data.append(line.split('\t'))
        
        if variants_data:
            self.variants = pd.DataFrame(variants_data, columns=header)
            self.variants['POS'] = pd.to_numeric(self.variants['POS'], errors='coerce')
            self.variants['QUAL'] = pd.to_numeric(self.variants['QUAL'], errors='coerce')
        
        return self
    
    def get_variant_count(self) -> int:
        return len(self.variants)

In [None]:
def benchmark_parsers(file_path: str, n_iterations: int = 10):
    """Compare parsing speed between manual and cyvcf2 parsers."""
    
    # Benchmark manual parser
    manual_times = []
    for _ in range(n_iterations):
        start = time.perf_counter()
        parser = ManualVCFParser(file_path)
        manual_times.append(time.perf_counter() - start)
    
    # Benchmark cyvcf2 parser
    cyvcf2_times = []
    for _ in range(n_iterations):
        start = time.perf_counter()
        parser = CyVCF2Parser(file_path)
        cyvcf2_times.append(time.perf_counter() - start)
    
    return {
        'manual': {
            'mean': np.mean(manual_times) * 1000,  # Convert to ms
            'std': np.std(manual_times) * 1000,
            'times': manual_times
        },
        'cyvcf2': {
            'mean': np.mean(cyvcf2_times) * 1000,
            'std': np.std(cyvcf2_times) * 1000,
            'times': cyvcf2_times
        }
    }

# Run benchmark
print("Performance Comparison")
print("=" * 50)
print(f"\nBenchmarking with file: {sample_vcf_path}")
print(f"Number of variants: {parser.get_variant_count()}")
print(f"Number of iterations: 10\n")

results = benchmark_parsers(sample_vcf_path, n_iterations=10)

print(f"Manual Parser:")
print(f"  Mean time: {results['manual']['mean']:.2f} ms ± {results['manual']['std']:.2f} ms")

print(f"\ncyvcf2 Parser:")
print(f"  Mean time: {results['cyvcf2']['mean']:.2f} ms ± {results['cyvcf2']['std']:.2f} ms")

speedup = results['manual']['mean'] / results['cyvcf2']['mean']
if speedup > 1:
    print(f"\ncyvcf2 is {speedup:.2f}x faster than manual parsing")
else:
    print(f"\nManual parsing is {1/speedup:.2f}x faster than cyvcf2")
    print("(Note: For small files, overhead may dominate. cyvcf2 shines with large files)")

In [None]:
# Visualize the benchmark results
fig, ax = plt.subplots(figsize=(8, 5))

parsers = ['Manual Parser', 'cyvcf2 Parser']
means = [results['manual']['mean'], results['cyvcf2']['mean']]
stds = [results['manual']['std'], results['cyvcf2']['std']]
colors = ['#ff7f0e', '#2ca02c']

bars = ax.bar(parsers, means, yerr=stds, color=colors, edgecolor='black', capsize=5)

# Add value labels
for bar, mean, std in zip(bars, means, stds):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + std + 0.2,
            f'{mean:.2f} ms', ha='center', va='bottom', fontweight='bold')

ax.set_ylabel('Time (milliseconds)', fontsize=12)
ax.set_title('VCF Parsing Performance Comparison', fontsize=14, fontweight='bold')
ax.set_ylim(0, max(means) + max(stds) + 2)

plt.tight_layout()
plt.savefig('viz6_performance_comparison.png', dpi=150, bbox_inches='tight')
plt.show()

## Summary Statistics

In [None]:
print("VCF Analysis Summary (cyvcf2)")
print("=" * 60)
print(f"\nFile: {parser.file_path}")
print(f"Total Variants: {parser.get_variant_count()}")
print(f"Chromosomes: {', '.join(parser.get_chromosomes())}")
print(f"Samples: {', '.join(parser.samples) if parser.samples else 'None'}")

print(f"\nVariant Types:")
for vtype, count in parser.variants['VARIANT_TYPE'].value_counts().items():
    print(f"  - {vtype}: {count}")

print(f"\nFilter Status:")
for filt, count in parser.variants['FILTER'].value_counts().items():
    print(f"  - {filt}: {count}")

print(f"\nQuality Score Statistics:")
qual_stats = parser.variants['QUAL'].describe()
print(f"  - Mean: {qual_stats['mean']:.2f}")
print(f"  - Std: {qual_stats['std']:.2f}")
print(f"  - Min: {qual_stats['min']:.2f}")
print(f"  - Max: {qual_stats['max']:.2f}")

print(f"\nRead Depth Statistics:")
dp_stats = parser.variants['DP_INFO'].describe()
print(f"  - Mean: {dp_stats['mean']:.2f}")
print(f"  - Std: {dp_stats['std']:.2f}")
print(f"  - Min: {dp_stats['min']:.2f}")
print(f"  - Max: {dp_stats['max']:.2f}")

## Cleanup

In [None]:
# Optionally clean up generated files
# Uncomment the following lines to remove generated files

# import os
# for f in ['sample.vcf', 'sample.vcf.gz', 'sample_indexed.vcf.gz', 'sample_indexed.vcf.gz.tbi']:
#     if os.path.exists(f):
#         os.remove(f)
#         print(f"Removed {f}")

print("\nGenerated files:")
print("- sample.vcf (sample VCF file)")
print("- sample.vcf.gz (gzip-compressed sample VCF file)")
print("- sample_indexed.vcf.gz (bgzip-compressed VCF for random access)")
print("- sample_indexed.vcf.gz.tbi (tabix index)")
print("- viz1_variants_by_chromosome_cyvcf2.png")
print("- viz2_variant_types_cyvcf2.png")
print("- viz3_quality_distribution_cyvcf2.png")
print("- viz4_af_vs_depth_cyvcf2.png")
print("- viz5_genomic_positions_cyvcf2.png")
print("- viz6_performance_comparison.png")