In [2]:
%%configure -f
{ "conf":{    
          "spark.databricks.hive.metastore.glueCatalog.enabled" : "true",
          "spark.jars.packages": "io.delta:delta-core_2.12:0.8.0",
          "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
          "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
          "spark.sql.warehouse.dir": "s3://aws-poc-serverless-analytics/delta_lake_demo/clean/delta_dw/"
    
         }
}


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
12,application_1622352033254_0013,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1622352033254_0011,pyspark,idle,Link,Link,
12,application_1622352033254_0013,pyspark,idle,Link,Link,✔


In [3]:
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def get_source_df_with_custom_schema( file_format, custom_schema, raw_file_location):
    logger.info("Checking the file extension and reading as dataframe")
    if file_format == 'csv' or file_format == 'txt':
        logger.info("Reading csv/txt file")
        raw_df = spark.read.csv(raw_file_location, header=True, sep=',', schema=custom_schema)
    elif file_format == 'parquet':
        logger.info("Reading parquet file")
        raw_df = spark.read.parquet(raw_file_location, schema=custom_schema)
    elif file_format == 'json':
        logger.info("Reading json file")
        raw_df = spark.read.json(raw_file_location, schema=custom_schema)
    else:
        logger.info("File format not supported")
    return raw_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
def delta_save(rawDF,clean_table,scd_type,partition_key_list,pk):
    logger.info("Checking the SCD Type given")
    if scd_type == 'scd0':
        (rawDF.write.mode("overwrite").format("delta").partitionBy(partition_key_list).saveAsTable(f"{clean_table}"))
    elif scd_type == 'scd1':
        logger.info("Executing scd type 1 ")
        stg_table = (f"{clean_table}_stg")
        rawDF.createOrReplaceTempView(f"{stg_table}")
        spark.sql(f""" 
                      MERGE INTO {clean_table}
                      USING {stg_table}
                      on {clean_table}.{pk} = {stg_table}.{pk}
                      WHEN MATCHED THEN 
                          UPDATE SET *
                      WHEN NOT MATCHED 
                          THEN INSERT *  
                    """)
    elif scd_type =='scd2':
        logger.info("Executing scd type 2 ")
        stg_table = (f"{clean_table}_stg")
        rawDF = rawDF.withColumn("active_flag", lit("true"))
        rawDF.createOrReplaceTempView(f"{stg_table}")
        spark.sql(f""" MERGE INTO {clean_table}
                       USING (
                       SELECT {stg_table}.{pk} as mergeKey, {stg_table}.*
                       FROM {stg_table}
                       
                       UNION ALL
                           SELECT NULL as mergeKey, {stg_table}.*
                           FROM {stg_table} JOIN {clean_table}
                           ON {stg_table}.{pk} = {clean_table}.{pk} 
                           WHERE {clean_table}.active_flag = 'true' AND {stg_table}.updated_at <> {clean_table}.updated_at 
                       
                       ) staged_updates
                       ON {clean_table}.{pk} = staged_updates.mergeKey
                       WHEN MATCHED AND {clean_table}.active_flag = 'true' AND {clean_table}.updated_at <> staged_updates.updated_at THEN  
                       UPDATE SET active_flag = 'false'
                       WHEN NOT MATCHED THEN 
                       INSERT *      
                                   """)
        

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
config_file_path = "s3://aws-poc-serverless-analytics/delta_lake_demo/config/config.json"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
logger.info("Reading configuration file")    
config_json = spark.read.option("multiline", "true").json(config_file_path)
run_constants = list(map(lambda row: row.asDict(),config_json.collect()))[0]


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
logger.info("Reading constants from configuration file")        
Env = 'dev'
database = run_constants[Env]['glue_database']
datasets = run_constants[Env]['dataset']
dataset = 'merchants'
file_format = run_constants[Env][dataset]['file_format']
scd_type = run_constants[Env][dataset]['scd_type']
clean_table = run_constants[Env][dataset]['glue_table']
custom_schema = run_constants[Env][dataset]['custom_schema']
partition_key_list = run_constants[Env][dataset]['partition_key']
pk = run_constants[Env][dataset]['primary_key']


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Overwrite file from raw location to clean location

In [9]:
file_format = 'csv'
custom_schema = "id STRING, company_name STRING, telephone STRING, state STRING, created_at STRING, updated_at STRING"
#raw_file_location = "s3://aws-poc-serverless-analytics/delta_lake_demo/test_data/merchants_SCD1.csv"
raw_file_location = "s3://aws-poc-serverless-analytics/delta_lake_demo/test_data/merchants.csv"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
raw_df = get_source_df_with_custom_schema( file_format, custom_schema, raw_file_location)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
raw_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------------------+------------+-----------+--------------------+--------------------+
|    id|        company_name|   telephone|      state|          created_at|          updated_at|
+------+--------------------+------------+-----------+--------------------+--------------------+
|178307|            MSI Kart|  9319919988|trial_ended|2021-01-02 09:33:...|2021-01-02 09:39:...|
|177863|  Bellelyse Boutique|000-000-0000|  suspended|2020-12-31 04:56:...|2021-02-19 16:53:...|
|144524|     Rachid hamzaoui|  0639756718|       free|2020-09-21 17:18:...|2020-10-22 11:15:...|
| 50742|Thenaricalicollec...|  5049394269|  suspended|2019-03-22 03:57:...|2019-05-15 00:54:...|
|176321|   CLICK MY CART LTD|  6479863690|  suspended|2020-12-23 06:20:...|2020-12-24 04:53:...|
|140294|              myself|  7149259700|       free|2020-09-05 17:41:...|2020-10-06 11:15:...|
|165163|  JeeJee’s Boutique |208-202-9314|trial_ended|2020-11-20 16:31:...|2020-11-20 16:33:...|
|156444|     Only The Flyest| 

In [12]:
rawDF = raw_df
scd_type = 'scd2'
pk = 'id'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
from pyspark.sql.functions import *
spark.sql(f"USE {database}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [19]:
delta_save(rawDF,clean_table,scd_type,partition_key_list,pk)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…