# Test HAIL table creation from JSON files

In [1]:
import hail as hl
import pandas as pd

hl.init(spark_conf={
        'spark.driver.memory': '320g',
        'spark.executor.memory': '320g',
        'spark.driver.maxResultSize': '100g',
        'spark.kryoserializer.buffer.max': '2047G'
    })

# Load your table
ht = hl.read_table("/dados/bioinfo.time/repos/gnomad-fc/data/variants.exo.ht/")

25/11/17 12:06:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Running on Apache Spark version 3.5.7
SparkUI available at http://r860:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.136-c32f88309ab0
LOGGING: writing to /dados/bioinfo.time/repos/gnomad-fc/code/hail-20251117-1206-0.2.136-c32f88309ab0.log


In [None]:
# # Quick checks
# print(f"Total variants: {ht.count()}")
# print(f"Chromosomes: {ht.aggregate(hl.agg.collect_as_set(ht.chromosome))}")
# print(f"Fields: {list(ht.row)}")

# # Look at first few rows
# ht.show(5)

# # Get summary statistics
# ht.describe()

In [2]:
ht = ht.flatten()
ht.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'chromosome': str 
    'position': int32 
    'ref': str 
    'alt': str 
    'vid': str 
    'hgvsg': str 
    'variant_type': str 
    'begin': int32 
    'end': int32 
    'filters': str 
    'mapping_quality': float64 
    'fisher_strand_bias': float64 
    'quality': float64 
    'cytogenetic_band': str 
    'phylop_score': float64 
    'phylop_primate_score': float64 
    'gerp_score': float64 
    'dann_score': float64 
    'rsid': str 
    'gnomad_af': float64 
    'gnomad_ac': int32 
    'gnomad_an': int32 
    'gnomad_hc': int32 
    'gnomad_afr_af': float64 
    'gnomad_amr_af': float64 
    'gnomad_eas_af': float64 
    'gnomad_fin_af': float64 
    'gnomad_nfe_af': float64 
    'gnomad_asj_af': float64 
    'gnomad_sas_af': float64 
    'gnomad_oth_af': float64 
    'gnomad_failed_filter': bool 
    'gnomad_exome_af': float64 
    'gnomad_exome_ac': int

In [4]:
# es_host = 'localhost'
es_host = "172.17.0.2"
es_port = 9200
es_index = 'fiocruz_variants'
hl.export_elasticsearch(
    ht,
    host=es_host,
    port=es_port,
    index=es_index,
    index_type='_doc',
    block_size=10000,
    config={
        "es.nodes.wan.only": "true"   # necessário quando acessar container via IP
    }

)

Config Map(es.nodes.wan.only -> true, es.batch.size.entries -> 10000, es.index.auto.create -> true, es.port -> 9200, es.nodes -> 172.17.0.2)




In [None]:
def explore_hail_table(ht: hl.Table):
    """
    Comprehensive exploration and testing of a Hail Table
    
    Args:
        ht (hl.Table): Hail Table to explore
    """
    print("="*80)
    print("HAIL TABLE EXPLORATION AND TESTING")
    print("="*80)
    
    # Basic info
    print("\n2. BASIC STATISTICS")
    print("-" * 80)
    n_variants = ht.count()
    print(f"   Total variants: {n_variants:,}")
    print(f"   Key fields: {ht.key}")
    
    # Schema
    print("\n3. SCHEMA OVERVIEW")
    print("-" * 80)
    ht.describe()
    
    # Field names
    print("\n4. ALL FIELD NAMES")
    print("-" * 80)
    fields = list(ht.row)
    for i, field in enumerate(fields, 1):
        print(f"   {i:2d}. {field}")
    
    # Sample data - first 5 rows
    print("\n5. SAMPLE DATA (First 5 variants)")
    print("-" * 80)
    sample_df = ht.head(5).to_pandas()
    print(sample_df[['chromosome', 'position', 'ref', 'alt', 'vid', 'rsid']].to_string())
    
    # Chromosome distribution
    print("\n6. CHROMOSOME DISTRIBUTION")
    print("-" * 80)
    chrom_counts = ht.group_by(ht.chromosome).aggregate(n=hl.agg.count()).collect()
    chrom_df = pd.DataFrame(chrom_counts).sort_values('chromosome')
    print(chrom_df.to_string(index=False))
    
    # Variant type distribution
    print("\n7. VARIANT TYPE DISTRIBUTION")
    print("-" * 80)
    var_types = ht.group_by(ht.variant_type).aggregate(n=hl.agg.count()).collect()
    var_type_df = pd.DataFrame(var_types).sort_values('n', ascending=False)
    print(var_type_df.to_string(index=False))
    
    # gnomAD frequency statistics
    print("\n8. GNOMAD ALLELE FREQUENCY STATISTICS")
    print("-" * 80)
    gnomad_stats = ht.aggregate(hl.struct(
        total_variants=hl.agg.count(),
        with_gnomad_af=hl.agg.count_where(hl.is_defined(ht.gnomad_af)),
        with_gnomad_exome_af=hl.agg.count_where(hl.is_defined(ht.gnomad_exome_af)),
        mean_gnomad_af=hl.agg.mean(ht.gnomad_af),
        median_gnomad_af=hl.agg.approx_median(ht.gnomad_af),
        max_gnomad_af=hl.agg.max(ht.gnomad_af),
    ))
    print(f"   Total variants: {gnomad_stats.total_variants:,}")
    print(f"   With gnomAD genome AF: {gnomad_stats.with_gnomad_af:,}")
    print(f"   With gnomAD exome AF: {gnomad_stats.with_gnomad_exome_af:,}")
    print(f"   Mean gnomAD AF: {gnomad_stats.mean_gnomad_af:.6f}" if gnomad_stats.mean_gnomad_af else "   Mean gnomAD AF: N/A")
    print(f"   Median gnomAD AF: {gnomad_stats.median_gnomad_af:.6f}" if gnomad_stats.median_gnomad_af else "   Median gnomAD AF: N/A")
    print(f"   Max gnomAD AF: {gnomad_stats.max_gnomad_af:.6f}" if gnomad_stats.max_gnomad_af else "   Max gnomAD AF: N/A")
    
    # Frequency bins
    print("\n9. GNOMAD AF FREQUENCY BINS")
    print("-" * 80)
    freq_bins = ht.aggregate(hl.struct(
        rare_0_0001=hl.agg.count_where((ht.max_gnomad_af > 0) & (ht.max_gnomad_af <= 0.0001)),
        rare_0_001=hl.agg.count_where((ht.max_gnomad_af > 0.0001) & (ht.max_gnomad_af <= 0.001)),
        low_freq_0_01=hl.agg.count_where((ht.max_gnomad_af > 0.001) & (ht.max_gnomad_af <= 0.01)),
        common_0_05=hl.agg.count_where((ht.max_gnomad_af > 0.01) & (ht.max_gnomad_af <= 0.05)),
        very_common=hl.agg.count_where(ht.max_gnomad_af > 0.05),
        no_data=hl.agg.count_where(~hl.is_defined(ht.max_gnomad_af)),
    ))
    print(f"   Ultra-rare (AF ≤ 0.0001): {freq_bins.rare_0_0001:,}")
    print(f"   Rare (0.0001 < AF ≤ 0.001): {freq_bins.rare_0_001:,}")
    print(f"   Low freq (0.001 < AF ≤ 0.01): {freq_bins.low_freq_0_01:,}")
    print(f"   Common (0.01 < AF ≤ 0.05): {freq_bins.common_0_05:,}")
    print(f"   Very common (AF > 0.05): {freq_bins.very_common:,}")
    print(f"   No gnomAD data: {freq_bins.no_data:,}")
    
    # Transcript statistics
    print("\n10. TRANSCRIPT STATISTICS")
    print("-" * 80)
    transcript_stats = ht.aggregate(hl.struct(
        with_transcripts=hl.agg.count_where(ht.n_transcripts > 0),
        total_transcripts=hl.agg.sum(ht.n_transcripts),
        with_canonical=hl.agg.count_where(hl.is_defined(ht.canonical_transcript)),
        mean_transcripts=hl.agg.mean(ht.n_transcripts),
        max_transcripts=hl.agg.max(ht.n_transcripts),
    ))
    print(f"   Variants with transcripts: {transcript_stats.with_transcripts:,}")
    print(f"   Total transcript annotations: {transcript_stats.total_transcripts:,}")
    print(f"   Variants with canonical transcript: {transcript_stats.with_canonical:,}")
    print(f"   Mean transcripts per variant: {transcript_stats.mean_transcripts:.2f}")
    print(f"   Max transcripts per variant: {transcript_stats.max_transcripts}")
    
    # ClinVar statistics
    print("\n11. CLINVAR ANNOTATIONS")
    print("-" * 80)
    clinvar_stats = ht.aggregate(hl.struct(
        with_clinvar=hl.agg.count_where(hl.is_defined(ht.clinvar_significance)),
        total=hl.agg.count(),
    ))
    print(f"   Variants with ClinVar: {clinvar_stats.with_clinvar:,}")
    print(f"   Percentage: {100 * clinvar_stats.with_clinvar / clinvar_stats.total:.2f}%")
    
    # Conservation scores
    print("\n12. CONSERVATION SCORE STATISTICS")
    print("-" * 80)
    conservation_stats = ht.aggregate(hl.struct(
        phylop_mean=hl.agg.mean(ht.phylop_score),
        phylop_max=hl.agg.max(ht.phylop_score),
        gerp_mean=hl.agg.mean(ht.gerp_score),
        gerp_max=hl.agg.max(ht.gerp_score),
    ))
    print(f"   PhyloP mean: {conservation_stats.phylop_mean:.3f}" if conservation_stats.phylop_mean else "   PhyloP mean: N/A")
    print(f"   PhyloP max: {conservation_stats.phylop_max:.3f}" if conservation_stats.phylop_max else "   PhyloP max: N/A")
    print(f"   GERP mean: {conservation_stats.gerp_mean:.3f}" if conservation_stats.gerp_mean else "   GERP mean: N/A")
    print(f"   GERP max: {conservation_stats.gerp_max:.3f}" if conservation_stats.gerp_max else "   GERP max: N/A")
    
    # Gene statistics
    print("\n13. GENE STATISTICS")
    print("-" * 80)
    gene_stats = ht.aggregate(hl.struct(
        variants_with_genes=hl.agg.count_where(hl.len(ht.genes) > 0),
    ))
    # Count unique genes by converting set to array and aggregating
    unique_genes_count = ht.aggregate(
        hl.len(hl.agg.explode(lambda gene: hl.agg.collect_as_set(gene), hl.array(ht.genes)))
    )
    print(f"   Variants with gene annotations: {gene_stats.variants_with_genes:,}")
    print(f"   Unique genes: {unique_genes_count:,}")
    
    # Top consequences
    print("\n14. TOP 10 CONSEQUENCES")
    print("-" * 80)
    consequences_ht = ht.explode(ht.all_consequences)
    top_consequences = (consequences_ht
                       .group_by(consequences_ht.all_consequences)
                       .aggregate(n=hl.agg.count())
                       .order_by(hl.desc('n'))
                       .head(10)
                       .collect())
    for i, row in enumerate(top_consequences, 1):
        print(f"   {i:2d}. {row.all_consequences}: {row.n:,}")
    
    # Example queries
    print("\n15. EXAMPLE VARIANT LOOKUPS")
    print("-" * 80)
    
    # Get a few interesting variants
    # Common variant
    common = ht.filter(ht.max_gnomad_af > 0.1).head(1).collect()
    if common:
        v = common[0]
        print(f"\n   Common variant (AF > 0.1):")
        print(f"   {v.chromosome}:{v.position} {v.ref}>{v.alt}")
        print(f"   rsID: {v.rsid}")
        print(f"   gnomAD AF: {v.gnomad_af:.4f}" if v.gnomad_af else "   gnomAD AF: N/A")
        print(f"   Genes: {', '.join(v.genes) if v.genes else 'N/A'}")
    
    # Rare variant
    rare = ht.filter((ht.max_gnomad_af > 0) & (ht.max_gnomad_af < 0.001)).head(1).collect()
    if rare:
        v = rare[0]
        print(f"\n   Rare variant (AF < 0.001):")
        print(f"   {v.chromosome}:{v.position} {v.ref}>{v.alt}")
        print(f"   gnomAD AF: {v.gnomad_af:.6f}" if v.gnomad_af else "   gnomAD AF: N/A")
        print(f"   Consequences: {', '.join(list(v.all_consequences)) if v.all_consequences else 'N/A'}")
    
    # Variant with ClinVar
    clinvar_var = ht.filter(hl.is_defined(ht.clinvar_significance)).head(1).collect()
    if clinvar_var:
        v = clinvar_var[0]
        print(f"\n   ClinVar variant:")
        print(f"   {v.chromosome}:{v.position} {v.ref}>{v.alt}")
        print(f"   ClinVar significance: {v.clinvar_significance}")
        print(f"   ClinVar ID: {v.clinvar_id}")
    
    print("\n" + "="*80)
    print("TESTING COMPLETE!")
    print("="*80)
   

In [None]:
explore_hail_table(ht)

In [None]:
def query_examples(ht: hl.Table):
    """
    Examples of useful queries you can run
    
    Args:
        ht (hl.Table): Hail Table to query
    """
    print("\n" + "="*80)
    print("EXAMPLE QUERIES")
    print("="*80)
    
    # 1. Filter by frequency
    print("\n1. Get rare variants (AF < 0.01)")
    rare_variants = ht.filter(
        (ht.max_gnomad_af > 0) & (ht.max_gnomad_af < 0.01)
    )
    print(f"   Found {rare_variants.count():,} rare variants")
    
    # 2. Filter by gene
    print("\n2. Get variants in specific gene (e.g., first gene in dataset)")
    first_gene = ht.filter(hl.len(ht.genes) > 0).head(1).collect()[0].genes
    if first_gene:
        gene_name = list(first_gene)[0]
        gene_variants = ht.filter(ht.genes.contains(gene_name))
        print(f"   Gene: {gene_name}")
        print(f"   Variants: {gene_variants.count()}")
    
    # 3. Filter by consequence
    print("\n3. Get missense variants")
    missense = ht.filter(ht.all_consequences.contains('missense_variant'))
    print(f"   Found {missense.count():,} missense variants")
    
    # 4. Filter by ClinVar pathogenic
    print("\n4. Get pathogenic variants")
    pathogenic = ht.filter(
        hl.is_defined(ht.clinvar_significance) &
        ht.clinvar_significance.contains('Pathogenic')
    )
    print(f"   Found {pathogenic.count():,} pathogenic variants")
    
    # 5. Filter by conservation
    print("\n5. Get highly conserved variants (PhyloP > 2)")
    conserved = ht.filter(ht.phylop_score > 2)
    print(f"   Found {conserved.count():,} highly conserved variants")
    
    # 6. Complex filter
    print("\n6. Complex filter: Rare + missense + conserved")
    complex_filter = ht.filter(
        (ht.max_gnomad_af < 0.01) &
        ht.all_consequences.contains('missense_variant') &
        (ht.phylop_score > 2)
    )
    print(f"   Found {complex_filter.count():,} variants")
    
    print("\n" + "="*80)

In [None]:
query_examples(ht)