In [None]:
import os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp, col, when, split, explode, array_distinct
from delta import *
import re

import findspark

findspark.init()

warehouse_diretory_path = '[YOUR_WAREHOUSE]'

conf = SparkConf()
conf.setAll(
    [
        ('spark.master', 'local[*]'), 
        ('spark.driver.host', 'localhost'),
        ('spark.app.name', 'Transform Illumina Human Methylation 450 Manifest To CPG Island'),
        ('spark.ui.showConsoleProgress', 'true'),
        ('spark.sql.execution.arrow.pyspark.enabled', 'false'),                   
        ('spark.sql.execution.arrow.pyspark.fallback.enabled', 'true'),
        ('spark.dynamicAllocation.enabled', 'false'),
        ('spark.sql.caseSensitive', 'true'),
        ('spark.sql.adaptive.enabled', 'true'),
        ('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension'),
        ('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog'),
        ('spark.sql.warehouse.dir', warehouse_diretory_path),
        ('spark.driver.extraJavaOptions', f'-Dderby.system.home={warehouse_diretory_path}')
    ])

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

In [None]:
biological_database_name = 'biological_database'

bronze_human_methylation_manifest_table_name = 'bronze_illumina_human_methylation_450_Manifest'

silver_cpg_island_table_name = 'silver_cpg_island'
silver_cpg_island_view_name = 'silver_cpg_island_view'

origin_data = 'Illumina Infinium HumanMethylation450'

In [None]:
spark.sql(f'CREATE DATABASE IF NOT EXISTS {biological_database_name};')
spark.sql(f'USE {biological_database_name};')

In [None]:
silver_cpg_island_df = spark.sql(f"""SELECT * FROM {bronze_human_methylation_manifest_table_name}""")

In [None]:
ilumina_id_name = list(filter(lambda c: 'IlmnID' not in c, silver_cpg_island_df.columns))[0]
cpg_island_columns = silver_cpg_island_df.where(f"{ilumina_id_name} = 'IlmnID'").first().asDict()

In [None]:
silver_cpg_island_df = silver_cpg_island_df.filter(col(ilumina_id_name).contains('cg')) \
    .selectExpr(*[f'{column} AS {cpg_column}' for column, cpg_column in cpg_island_columns.items()]) \
    .selectExpr('IlmnID AS probe_id', 'UCSC_RefGene_Name AS gene_symbol', 'Relation_to_UCSC_CpG_Island AS region') \
    .na.fill('') \
    .withColumn('gene_symbol', array_distinct(split(col('gene_symbol'), ';'))) \
    .withColumn('gene_symbol', explode('gene_symbol').alias('gene_symbol')) \
    .withColumn('origin', lit(origin_data)) \
    .withColumn('timestamp', current_timestamp())

In [None]:
if not [t.name for t in spark.catalog.listTables(biological_database_name) if t.name == silver_cpg_island_table_name] or \
   (any([t.name for t in spark.catalog.listTables(biological_database_name) if t.name == silver_cpg_island_table_name]) and
    spark.sql(f"SELECT origin FROM {silver_cpg_island_table_name};").count() == 0):  

    silver_cpg_island_df.write \
        .format('delta') \
        .mode('overwrite') \
        .option('overwriteSchema', 'true') \
        .option('partitionOverwriteMode', 'dynamic') \
        .partitionBy('origin') \
        .saveAsTable(silver_cpg_island_table_name)
else:    
    silver_cpg_island_df.createOrReplaceTempView(silver_cpg_island_view_name)
    
    spark.sql(f"""MERGE INTO {silver_cpg_island_table_name} AS target
                  USING {silver_cpg_island_view_name} AS source
                  ON source.origin = target.origin
                     AND source.probe_id = target.probe_id 
                     AND source.gene_symbol = target.gene_symbol                    
                  WHEN MATCHED THEN 
                    UPDATE SET *
                  WHEN NOT MATCHED THEN INSERT * """)

    spark.sql(f"""MERGE INTO {silver_cpg_island_table_name} AS target
                  USING (SELECT 
                           origin,
                           probe_id,
                           gene_symbol,
                           region,
                           timestamp
                         FROM (SELECT 
                                 target.origin, 
                                 target.probe_id,
                                 target.gene_symbol,
                                 target.region,
                                 target.timestamp,
                                 source.origin AS source_origin
                               FROM {silver_cpg_island_table_name} AS target
                               LEFT JOIN {silver_cpg_island_view_name} AS source
                                 ON target.origin = source.origin
                                    AND target.probe_id = source.probe_id
                                    AND target.gene_symbol = source.gene_symbol
                                    AND target.region = source.region)
                         WHERE ISNULL(source_origin)) AS source
                  ON source.origin = target.origin
                     AND source.probe_id = target.probe_id
                     AND source.gene_symbol = target.gene_symbol
                     AND source.region = target.region
                  WHEN MATCHED THEN                    
                    DELETE """)
                     
    spark.catalog.dropTempView(silver_cpg_island_view_name)