In [1]:
import pyspark
import glow
from delta import *
from pyspark.sql.functions import explode, col, size

In [2]:
builder = pyspark.sql.SparkSession.builder.appName("GlowDeltalakeETL") \
    .config("spark.hadoop.io.compression.codecs", "io.projectglow.sql.util.BGZFCodec") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

extra_packages = [
    "io.projectglow:glow-spark3_2.12:1.2.1",
]

In [3]:
spark = configure_spark_with_delta_pip(builder, extra_packages=extra_packages).getOrCreate()

In [4]:
spark = glow.register(spark)

In [None]:
spark

In [None]:
# Page Break

# ETL Somatic VCF and Save as Deltatable

In [5]:
bcbio_somatic_src = "./data/bcbio_giab_somatic/na12878-na24385-somatic-hg38-truth.vcf.gz"

In [None]:
bcbio_somatic_df = spark.read.format("vcf").load(bcbio_somatic_src)

In [7]:
bcbio_somatic_df.printSchema()

root
 |-- contigName: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- INFO_platformnames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_callsetwithotheruniqgenopassing: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_callsetnames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_AC: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- INFO_FREQ: array (nullable = true)
 |    |-- element: string (containsNull = true)


In [None]:
# Page Break

## ETL ON NESTED GENOTYPES COLUMN

In [8]:
bcbio_somatic_df_exploded = bcbio_somatic_df.withColumn("genotypes", explode("genotypes"))

In [9]:
bcbio_somatic_df_exploded.printSchema()

root
 |-- contigName: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- INFO_platformnames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_callsetwithotheruniqgenopassing: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_callsetnames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_AC: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- INFO_FREQ: array (nullable = true)
 |    |-- element: string (containsNull = true)


In [None]:
# Page Break

## FLATTEN NESTED GENOTYPE FIELDS

In [10]:
def flatten_struct_fields(df_):
    """
    REF https://github.com/microsoft/genomicsnotebook
    :param df_:
    :return:
    """
    flat_cols = [c[0] for c in df_.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in df_.dtypes if c[1][:6] =='struct']
    flat_df = df_.select(flat_cols + [col(nc+'.'+c).alias(nc+'_'+c) for nc in nested_cols for c in df_.select(nc+'.*').columns])
    return flat_df

In [11]:
bcbio_somatic_df_exploded_flatten = flatten_struct_fields(bcbio_somatic_df_exploded)

In [12]:
bcbio_somatic_df_exploded_flatten.printSchema()

root
 |-- contigName: string (nullable = true)
 |-- start: long (nullable = true)
 |-- end: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- referenceAllele: string (nullable = true)
 |-- alternateAlleles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- qual: double (nullable = true)
 |-- filters: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- splitFromMultiAllelic: boolean (nullable = true)
 |-- INFO_platformnames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_callsetwithotheruniqgenopassing: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_callsetnames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- INFO_AC: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- INFO_FREQ: array (nullable = true)
 |    |-- element: string (containsNull = true)


In [None]:
# Page Break

## QA TRANSFORMED DATAFRAME

In [13]:
bcbio_somatic_df_exploded_flatten.select("genotypes_sampleId").distinct().show()



+------------------+
|genotypes_sampleId|
+------------------+
|           NA12878|
+------------------+



                                                                                

In [14]:
bcbio_somatic_df_exploded_flatten \
    .select("contigName", "start", "end", "INFO_AC", "INFO_SOMTYPE", "genotypes_sampleId", "genotypes_calls", "genotypes_alleleDepths") \
    .where("genotypes_sampleId = 'NA12878'") \
    .show()

+----------+-------+-------+-------+------------------+------------------+---------------+----------------------+
|contigName|  start|    end|INFO_AC|      INFO_SOMTYPE|genotypes_sampleId|genotypes_calls|genotypes_alleleDepths|
+----------+-------+-------+-------+------------------+------------------+---------------+----------------------+
|      chr1| 852046| 852047|    [1]|[mod_freq_somatic]|           NA12878|         [0, 1]|            [133, 139]|
|      chr1| 971789| 971791|    [1]|[mod_freq_somatic]|           NA12878|         [0, 1]|            [192, 148]|
|      chr1| 974038| 974039|    [1]|[mod_freq_somatic]|           NA12878|         [0, 1]|            [143, 129]|
|      chr1| 975013| 975014|    [1]|[mod_freq_somatic]|           NA12878|         [0, 1]|            [103, 100]|
|      chr1|1004110|1004112|    [1]|[mod_freq_somatic]|           NA12878|         [0, 1]|            [152, 141]|
|      chr1|1004624|1004625|    [1]|[mod_freq_somatic]|           NA12878|         [0, 1



In [15]:
bcbio_somatic_df_exploded_flatten \
    .cube("INFO_AC") \
    .count() \
    .show()



+-------+-------+
|INFO_AC|  count|
+-------+-------+
|    [1]| 904974|
|   null|1082945|
| [1, 1]|   2682|
|    [2]| 175289|
+-------+-------+



                                                                                

In [16]:
bcbio_somatic_df_exploded_flatten \
    .cube("alternateAlleles") \
    .count() \
    .show()



+--------------------+-----+
|    alternateAlleles|count|
+--------------------+-----+
|             [TATTC]|   15|
|        [TTTGTTG, T]|    2|
|        [ATGCAGATTT]|    1|
|              [GCAA]|    8|
|     [CAAATAAATAAAT]|    4|
|             [TAAAC]|   39|
|              [TAGC]|   14|
|            [ATTTCT]|    4|
|     [GAAGAAGAGAGGA]|    1|
|    [AGGGCCTCTTCTCT]|    1|
|[TAGCGTAAGTACACGG...|    1|
|             [AACCG]|    1|
|         [TATGAATGA]|    1|
|     [AGATGGATGGATG]|    1|
|            [GCCCCC]|    1|
|            [CTTAAA]|    3|
|             [GT, A]|    2|
|   [GACATTTAGATTACA]|    1|
|             [ATTAT]|   36|
|           [CATTATT]|    7|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [17]:
n = bcbio_somatic_df_exploded_flatten.select("genotypes_sampleId").distinct().count()
print(f"number of samples: {n}")



number of samples: 1


                                                                                

In [18]:
n = bcbio_somatic_df_exploded_flatten.count()
print(f"number of records: {n}")



number of records: 1082945


                                                                                

In [19]:
n = bcbio_somatic_df_exploded_flatten \
    .where(size("alternateAlleles") > 1) \
    .count()

print(f"number of multiallelic sites: {n}")



number of multiallelic sites: 2682


                                                                                

In [None]:
# Page Break

# Write to Deltatable

In [20]:
somatic_table = "./lakehouse/bcbio/somatic_table"

In [21]:
bcbio_somatic_df_exploded_flatten.write.format("delta").mode("overwrite").save(somatic_table)

22/10/07 23:34:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [23]:
!tree ./lakehouse/bcbio/somatic_table

[01;34m./lakehouse/bcbio/somatic_table[0m
├── [01;34m_delta_log[0m
│   └── [00m00000000000000000000.json[0m
├── [00mpart-00000-7abdaeb9-be9f-42e8-98d1-6bf7f0579db8-c000.snappy.parquet[0m
├── [00mpart-00001-2e59bfed-0810-4656-ad5c-c9c59a7f3b73-c000.snappy.parquet[0m
├── [00mpart-00002-be0bca24-e495-4154-9b13-aec6b6ba4e02-c000.snappy.parquet[0m
└── [00mpart-00003-51bc918d-45e0-45d4-8e2a-bffb168ef549-c000.snappy.parquet[0m

1 directory, 5 files


In [None]:
# Page Break

# Summary

* In this example, we read VCF through Spark/Glow.
* We perform data transformation on Genotype column.
* We flatten (i.e. data normalisation) the nested vectors of Genotype column.
* So that it can be performant and ease of use at data query (i.e. trade off for information retrieval time) by its predicates such as Sample ID.
* We then write this dataframe out as multi-parts compressed Parquet files through Deltatable framework.
* We can write Spark dataframe in multiple Deltatable write modes such as:
    * We would use `upsert` mode when/if we were to update or insert new records into existing delta table.
    * Or, `append` mode, if destination table is immutable and, so on so ford.
* If we wish, we could also write as-is Parquet only format and arrange them in traditional Datalake structure.
    * e.g. `bcbio_somatic_df_exploded_flatten.write.format("parquet").mode("overwrite").save("./datalake/pipeline=bcbio/type=somatic/year=2022/month=01")`
    * In this case, we will be responsible for maintaining Datalake structure and its key-value partitioning such as `pipeline=bcbio`, etc
* With Deltatable framework, we abstract away this and leave it up to Deltatable framework; and achieve "Logical Table" that perform like relational database table.
* Hence, Deltatable as such technology underpin "LakeHouse" architecture pattern for BigData data warehousing possibility.

In [None]:
# Page Break

# Stop Spark Session

In [24]:
spark.stop()

In [None]:
# Continue to next notebook