In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    IntegerType,
    FloatType,
    StringType,
    StructType,
    MapType,
)
from pyspark.sql.functions import col, split, udf, explode
import hashlib

ModuleNotFoundError: No module named 'pyspark'

In [2]:
VCF_FILES = [
    "file:///home/richard/tmp/vcfread/HG002_CHD7_AD.vcf.gz",
    "file:///home/richard/tmp/vcfread/HG003_CHD7_AD.vcf.gz",
    "file:///home/richard/tmp/vcfread/HG004_CHD7_AD.vcf.gz"
]

In [3]:
### Show contents of VCF file
import gzip
import os

vcf_file = VCF_FILES[0][7:]
if vcf_file.endswith(".bgz"):
    out_file = os.path.join("/tmp", "{}{}".format(os.path.basename(vcf_file)[0:-3], "gz"))
    print("OUTFILE", out_file)
    
    
with gzip.open(vcf_file, "rt") as vcf_handle:
    for i in range(0, 5):
        line = vcf_handle.readline().strip()
        print(line)
    print("...")
    while line.startswith("##"):
        line = vcf_handle.readline().strip()
        if line.startswith("##FORMAT"):
            print(line)
    print("...\n\n")
    print(line)
    for i in range(0, 10):
        line = vcf_handle.readline().strip()
        print(line)
    

##fileformat=VCFv4.2
##SentieonCommandLine.DNAModelApply=<ID=DNAModelApply,Version="sentieon-genomics-201911",Date="2020-05-17T20:59:21Z",CommandLine="/usr/local/sentieon-genomics-201911/libexec/driver -t 24 -r genome/hs37d5.fa --algo DNAModelApply --model /home/dnanexus/in/ml_model/SentieonDNAscopeModelBeta0.5.model -v dnascope_before_ml.vcf.gz haplotyper.vcf.gz">
##SentieonCommandLine.DNAscope=<ID=DNAscope,Version="sentieon-genomics-201911",Date="2020-05-17T20:34:41Z",CommandLine="/usr/local/sentieon-genomics-201911/libexec/driver -t 24 -r genome/hs37d5.fa -i markdup.bam -q recal_data_Sentieon.table --interval ignore_decoy.bed --algo DNAscope --model /home/dnanexus/in/ml_model/SentieonDNAscopeModelBeta0.5.model -d resources/dbsnp_138.b37.vcf.gz dnascope_before_ml.vcf.gz">
##reference=file://genome/hs37d5.fa
##SentieonModelID=SentieonModelBeta0.5
...
##FORMAT=<ID=AD,Number=R,Type=Integer,Description="Allelic depths for the ref and alt alleles in the order listed">
##FORMAT=<ID=GQ,Numb

In [4]:
### Set up spark context
spark = (
    SparkSession.builder.config("spark.sql.catalogImplementation", "hive")
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)
sc = spark.sparkContext

In [5]:
### Initial load of vcf file into dataframe
vcf_schema = StructType() \
                        .add('CHROM', StringType(), True) \
                        .add('POS', IntegerType(), True) \
                        .add('ID', StringType(), True) \
                        .add('REF', StringType(), True) \
                        .add('ALT', StringType(), True) \
                        .add('QUAL', FloatType(), True) \
                        .add('FILTER', StringType(), True) \
                        .add('INFO', StringType(), True) \
                        .add('FORMAT', StringType(), True)
for s in range(1, 4):
    vcf_schema.add('SAMPLE{}'.format(s), StringType(), True)                        

In [6]:
### Load a VCF and show its content in a dataframe
vcf_df = spark.read.schema(vcf_schema).option("sep", "\t").csv(VCF_FILES[0]).filter("CHROM not like '##%'")
#vcf_df = spark.read.schema(vcf_schema).format("csv").option("sep", "\t").option("codec", "gzip").load(VCF_FILES[0]).filter("CHROM not like '##%'")
vcf_df.show()
vcf_df.printSchema()

+------+-----+-----------+---+---+-------+------+----+--------+---------------+-------+-------+
| CHROM|  POS|         ID|REF|ALT|   QUAL|FILTER|INFO|  FORMAT|        SAMPLE1|SAMPLE2|SAMPLE3|
+------+-----+-----------+---+---+-------+------+----+--------+---------------+-------+-------+
|#CHROM| null|         ID|REF|ALT|   null|FILTER|INFO|  FORMAT|   HG002_237983|   null|   null|
|     1|13302|rs180734498|  C|  T| 334.83|     .|   .|GT:AD:GQ|    1/1:0,13:39|   null|   null|
|     1|13656|          .|CAG|  C|2401.83|     .|   .|GT:AD:GQ|    1/1:8,59:99|   null|   null|
|     1|13896|rs201696125|  C|  A| 336.83|     .|   .|GT:AD:GQ|   0/1:55,19:99|   null|   null|
|     1|13957|rs201747181| TC|  T| 663.96|     .|   .|GT:AD:GQ|    1/1:1,18:15|   null|   null|
|     1|13980|rs151276478|  T|  C| 181.96|     .|   .|GT:AD:GQ|     1/1:0,1:15|   null|   null|
|     1|14513|          .|  G|  A|  84.83|     .|   .|GT:AD:GQ|    0/1:13,5:99|   null|   null|
|     1|14653|rs375086259|  C|  T| 104.8

In [7]:
### Rename columns from header line, and remove empty sample columns
vcf_header = vcf_df.filter("CHROM = '#CHROM'")
vcf_df = vcf_df.subtract(vcf_header)
vcf_header = vcf_header.first()

samples = list()

for column in vcf_df.columns:
    if column.startswith("SAMPLE"):
        if vcf_header[column]:
            vcf_df = vcf_df.withColumnRenamed(column, vcf_header[column])
            samples.append(vcf_header[column])
        else:
            vcf_df = vcf_df.drop(column)
#vcf_df = vcf_df.withColumn("POS", col("POS").cast("integer")).withColumn("QUAL", col("QUAL").cast("double"))

vcf_df.show()
vcf_df.printSchema()

+-----+--------+----------+-----+----+-------+------+----+--------+------------+
|CHROM|     POS|        ID|  REF| ALT|   QUAL|FILTER|INFO|  FORMAT|HG002_237983|
+-----+--------+----------+-----+----+-------+------+----+--------+------------+
|    1| 2118359|  rs262684|    A|   G|2494.83|     .|   .|GT:AD:GQ|0/1:18,19:99|
|    1| 6659872| rs6678681|    A|   G|1513.83|     .|   .|GT:AD:GQ| 1/1:0,43:99|
|    1| 7980364|rs34786825|    C|  CT| 216.83|     .|   .|GT:AD:GQ|0/1:11,11:99|
|    1| 8021778|  rs226249|    T|   C|1439.83|     .|   .|GT:AD:GQ|0/1:21,22:99|
|    1| 9009444| rs2274328|    A|   C|1929.83|     .|   .|GT:AD:GQ| 1/1:0,22:69|
|    1|10511589|  rs666103|    T|   C|1666.83|     .|   .|GT:AD:GQ| 1/1:0,28:84|
|    1|12267265| rs1061624|    A|   G| 837.83|     .|   .|GT:AD:GQ|0/1:13,24:99|
|    1|16011005|         .|    G|GGGC| 859.83|     .|   .|GT:AD:GQ|0/1:12,16:99|
|    1|16736327|rs41269197|    G|   A| 762.83|     .|   .|GT:AD:GQ|0/1:20,20:99|
|    1|22310824| rs1803531| 

In [8]:
### order rows
vcf_df.orderBy(['CHROM', 'POS']).show()

+-----+-----+-----------+---+---+-------+------+----+--------+---------------+
|CHROM|  POS|         ID|REF|ALT|   QUAL|FILTER|INFO|  FORMAT|   HG002_237983|
+-----+-----+-----------+---+---+-------+------+----+--------+---------------+
|    1|13302|rs180734498|  C|  T| 334.83|     .|   .|GT:AD:GQ|    1/1:0,13:39|
|    1|13656|          .|CAG|  C|2401.83|     .|   .|GT:AD:GQ|    1/1:8,59:99|
|    1|13896|rs201696125|  C|  A| 336.83|     .|   .|GT:AD:GQ|   0/1:55,19:99|
|    1|13957|rs201747181| TC|  T| 663.96|     .|   .|GT:AD:GQ|    1/1:1,18:15|
|    1|13980|rs151276478|  T|  C| 181.96|     .|   .|GT:AD:GQ|     1/1:0,1:15|
|    1|14513|          .|  G|  A|  84.83|     .|   .|GT:AD:GQ|    0/1:13,5:99|
|    1|14653|rs375086259|  C|  T| 104.83|     .|   .|GT:AD:GQ|    0/1:20,6:99|
|    1|14907| rs79585140|  A|  G|4392.83|     .|   .|GT:AD:GQ|  0/1:49,116:99|
|    1|14930| rs75454623|  A|  G|4065.83|     .|   .|GT:AD:GQ|  0/1:46,111:99|
|    1|15118| rs71252250|  A|  G|2413.83|     .|   .

In [9]:
### Split FORMAT and sample values into arrays
print("SAMPLES", samples)
for sample in ["FORMAT"] + samples:
    vcf_df = vcf_df.withColumn(sample, split(col(sample), ":"))

vcf_df.show()
vcf_df.printSchema()

SAMPLES ['HG002_237983']
+-----+--------+----------+-----+----+-------+------+----+------------+----------------+
|CHROM|     POS|        ID|  REF| ALT|   QUAL|FILTER|INFO|      FORMAT|    HG002_237983|
+-----+--------+----------+-----+----+-------+------+----+------------+----------------+
|    1| 2118359|  rs262684|    A|   G|2494.83|     .|   .|[GT, AD, GQ]|[0/1, 18,19, 99]|
|    1| 6659872| rs6678681|    A|   G|1513.83|     .|   .|[GT, AD, GQ]| [1/1, 0,43, 99]|
|    1| 7980364|rs34786825|    C|  CT| 216.83|     .|   .|[GT, AD, GQ]|[0/1, 11,11, 99]|
|    1| 8021778|  rs226249|    T|   C|1439.83|     .|   .|[GT, AD, GQ]|[0/1, 21,22, 99]|
|    1| 9009444| rs2274328|    A|   C|1929.83|     .|   .|[GT, AD, GQ]| [1/1, 0,22, 69]|
|    1|10511589|  rs666103|    T|   C|1666.83|     .|   .|[GT, AD, GQ]| [1/1, 0,28, 84]|
|    1|12267265| rs1061624|    A|   G| 837.83|     .|   .|[GT, AD, GQ]|[0/1, 13,24, 99]|
|    1|16011005|         .|    G|GGGC| 859.83|     .|   .|[GT, AD, GQ]|[0/1, 12,16, 9

In [10]:
### Convert sample values into a dictionary
@udf(MapType(StringType(), StringType()))
def create_dict(keys, values):
    res = dict()
    for index, key in enumerate(keys):
        res[key] = values[index]
    return res

for sample in samples:
    vcf_df = vcf_df.withColumn(sample, create_dict(col("FORMAT"), col(sample)))
    
vcf_df.show()
vcf_df.printSchema()

+-----+--------+----------+-----+----+-------+------+----+------------+--------------------+
|CHROM|     POS|        ID|  REF| ALT|   QUAL|FILTER|INFO|      FORMAT|        HG002_237983|
+-----+--------+----------+-----+----+-------+------+----+------------+--------------------+
|    1| 2118359|  rs262684|    A|   G|2494.83|     .|   .|[GT, AD, GQ]|[GQ -> 99, AD -> ...|
|    1| 6659872| rs6678681|    A|   G|1513.83|     .|   .|[GT, AD, GQ]|[GQ -> 99, AD -> ...|
|    1| 7980364|rs34786825|    C|  CT| 216.83|     .|   .|[GT, AD, GQ]|[GQ -> 99, AD -> ...|
|    1| 8021778|  rs226249|    T|   C|1439.83|     .|   .|[GT, AD, GQ]|[GQ -> 99, AD -> ...|
|    1| 9009444| rs2274328|    A|   C|1929.83|     .|   .|[GT, AD, GQ]|[GQ -> 69, AD -> ...|
|    1|10511589|  rs666103|    T|   C|1666.83|     .|   .|[GT, AD, GQ]|[GQ -> 84, AD -> ...|
|    1|12267265| rs1061624|    A|   G| 837.83|     .|   .|[GT, AD, GQ]|[GQ -> 99, AD -> ...|
|    1|16011005|         .|    G|GGGC| 859.83|     .|   .|[GT, AD, GQ]

In [11]:
### Select value from dictionary into columns
vcf_df.select(col("CHROM"), col("POS"), col("ID"), col("REF"), col("ALT"), col("HG002_237983.GQ"), col("HG002_237983.AD"), col("HG002_237983.GT")).show()


+-----+--------+----------+-----+----+---+-----+---+
|CHROM|     POS|        ID|  REF| ALT| GQ|   AD| GT|
+-----+--------+----------+-----+----+---+-----+---+
|    1| 2118359|  rs262684|    A|   G| 99|18,19|0/1|
|    1| 6659872| rs6678681|    A|   G| 99| 0,43|1/1|
|    1| 7980364|rs34786825|    C|  CT| 99|11,11|0/1|
|    1| 8021778|  rs226249|    T|   C| 99|21,22|0/1|
|    1| 9009444| rs2274328|    A|   C| 69| 0,22|1/1|
|    1|10511589|  rs666103|    T|   C| 84| 0,28|1/1|
|    1|12267265| rs1061624|    A|   G| 99|13,24|0/1|
|    1|16011005|         .|    G|GGGC| 99|12,16|0/1|
|    1|16736327|rs41269197|    G|   A| 99|20,20|0/1|
|    1|22310824| rs1803531|    T|   C| 99| 0,40|1/1|
|    1|26110143| rs3767877|    G|   A| 99|17,28|0/1|
|    1|34326409| rs1321626|    T|   C| 99| 0,45|1/1|
|    1|39994490|  rs941281|    T|   C| 99| 0,33|1/1|
|    1|40146417|  rs736009|    G|   T| 99| 0,43|1/1|
|    1|42314756|rs17365632|    C|   T| 99| 0,45|1/1|
|    1|47607785| rs2056900|    G|   A| 99|16,2

In [12]:
%%time
### function to load a vcf into a dataframe
def load_vcf(vcf_file):
    print("LOADING", vcf_file)
    vcf_df = spark.read.schema(vcf_schema).option("sep", "\t").csv(vcf_file).filter("CHROM not like '##%'")
    vcf_header = vcf_df.filter("CHROM = '#CHROM'")
    vcf_df = vcf_df.subtract(vcf_header)
    vcf_header = vcf_header.first()

    samples = list()
    for column in vcf_df.columns:
        if column.startswith("SAMPLE"):
            if vcf_header[column]:
                vcf_df = vcf_df.withColumnRenamed(column, vcf_header[column])
                samples.append(vcf_header[column])
            else:
                vcf_df = vcf_df.drop(column)
    vcf_df = vcf_df.withColumn("POS", col("POS").cast("integer")).withColumn("QUAL", col("QUAL").cast("double"))
    for sample in ["FORMAT"] + samples:
        vcf_df = vcf_df.withColumn(sample, split(col(sample), ":"))
    for sample in samples:
        vcf_df = vcf_df.withColumn(sample, create_dict(col("FORMAT"), col(sample)))
    vcf_df = vcf_df.drop("FILTER").drop("INFO").drop("FORMAT").drop("QUAL")
    return vcf_df

def print_stats(df, title=None):
    if title:
        print(title)
    print("ROWS:", df.count())
    print("COLS:", len(df.columns))

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 8.34 µs


In [13]:
%%time
### Load/prepare 3 vcfs

vcf_dfs = list()
for vcf_file in VCF_FILES:
    vcf_dfs.append(load_vcf(vcf_file))
    
for index, vcf_df in enumerate(vcf_dfs):
    print_stats(vcf_df, VCF_FILES[index])
    vcf_df.orderBy(['CHROM', 'POS']).show()

LOADING file:///home/richard/tmp/vcfread/HG002_CHD7_AD.vcf.gz
LOADING file:///home/richard/tmp/vcfread/HG003_CHD7_AD.vcf.gz
LOADING file:///home/richard/tmp/vcfread/HG004_CHD7_AD.vcf.gz
file:///home/richard/tmp/vcfread/HG002_CHD7_AD.vcf.gz
ROWS: 111945
COLS: 6
+-----+-----+-----------+---+---+--------------------+
|CHROM|  POS|         ID|REF|ALT|        HG002_237983|
+-----+-----+-----------+---+---+--------------------+
|    1|13302|rs180734498|  C|  T|[GQ -> 39, AD -> ...|
|    1|13656|          .|CAG|  C|[GQ -> 99, AD -> ...|
|    1|13896|rs201696125|  C|  A|[GQ -> 99, AD -> ...|
|    1|13957|rs201747181| TC|  T|[GQ -> 15, AD -> ...|
|    1|13980|rs151276478|  T|  C|[GQ -> 15, AD -> ...|
|    1|14513|          .|  G|  A|[GQ -> 99, AD -> ...|
|    1|14653|rs375086259|  C|  T|[GQ -> 99, AD -> ...|
|    1|14907| rs79585140|  A|  G|[GQ -> 99, AD -> ...|
|    1|14930| rs75454623|  A|  G|[GQ -> 99, AD -> ...|
|    1|15118| rs71252250|  A|  G|[GQ -> 99, AD -> ...|
|    1|15211| rs78601809

In [14]:
%%time
### Join 3 vcfs on CHROM, POS, REF, ALT fields

merged_df = vcf_dfs[0].join(vcf_dfs[1], ['CHROM', 'POS', 'REF', 'ALT'], how='full').join(vcf_dfs[2], ['CHROM', 'POS', 'REF', 'ALT'], how='full')
merged_df = merged_df.orderBy(['CHROM', 'POS'])
merged_df.show()
print_stats(merged_df)

+-----+-----+---+---+-----------+--------------------+-----------+--------------------+-----------+--------------------+
|CHROM|  POS|REF|ALT|         ID|        HG002_237983|         ID|        HG003_237983|         ID|        HG004_237983|
+-----+-----+---+---+-----------+--------------------+-----------+--------------------+-----------+--------------------+
|    1|13273|  G|  C|       null|                null|       null|                null|          .|[GQ -> 99, AD -> ...|
|    1|13302|  C|  T|rs180734498|[GQ -> 39, AD -> ...|       null|                null|rs180734498|[GQ -> 99, AD -> ...|
|    1|13656|CAG|  C|          .|[GQ -> 99, AD -> ...|          .|[GQ -> 99, AD -> ...|          .|[GQ -> 99, AD -> ...|
|    1|13896|  C|  A|rs201696125|[GQ -> 99, AD -> ...|       null|                null|       null|                null|
|    1|13957| TC|  T|rs201747181|[GQ -> 15, AD -> ...|rs201747181|[GQ -> 99, AD -> ...|rs201747181|[GQ -> 99, AD -> ...|
|    1|13980|  T|  C|rs151276478