### Historical and CDC loading 
###### database: Insurance
###### table: Policy

In [None]:
%help

In [None]:
%iam_role arn:aws:iam::331504768406:role/service-role/AWSGlueServiceRole
%region us-east-1
%idle_timeout 5
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2


In [None]:

%%configure -f
{
    "conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "datalake-formats":"delta",
    'enable-auto-scaling': 'true',
    'JOB_NAME': 'glue-job-policy-insurance-full-load', 
    's3_bucket': 's3://jamil-datalake-dev/',
    'start_date': '2020-01-01',
    'final_date': '2024-12-31',
    'environment': 'prd' ## ['prd', 'dev']
}


In [None]:
import sys
import pyspark.sql.functions as F
from pyspark.context import SparkContext
from pyspark.sql import DataFrame
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from delta import DeltaTable
from datetime import datetime, timedelta
from dateutil.parser import parse
from re import sub

glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session


In [None]:
# convert a text to a snake case string
def to_snake_case(text):
    return '_'.join(
                    sub('([A-Z][a-z]+)', r' \1',
                    sub('([A-Z]+)', r' \1',
                        text.replace('-', ' ')
                        )).split()
                    ).lower()


In [None]:
def dedup_keys_str(primary_keys: list) -> str:

    dedup_str = ''
    condition_list = []

    for index in range(0, len(primary_keys) ):
        condition_list.append(f'target.{primary_keys[index]} = delta.{primary_keys[index]}')

    if len(condition_list) > 1:
        dedup_str = ' AND '.join(condition_list)
    elif len(condition_list) == 1:
        dedup_str = condition_list[0]

    return dedup_str


In [None]:
def table_exists(database, table) -> bool:
    
    exist = spark.sql(f""" select * 
                            from {database}.{table} 
                            limit 1
                        """)
    
    print('table exists:', exist.count())
        
    return bool(exist.count() > 0) 


In [None]:
def delta_table_exists(path) -> bool:

    exist = False
    try:
        delta = DeltaTable.forPath(spark, path)
        exist = True
    except:
        exist = False

    return exist    

In [None]:

def is_valid_date(date_ymd) -> bool:

    is_valid = False

    if date_ymd:
        try:
            parse(timestr=date_ymd, yearfirst=True, dayfirst=True)
            is_valid = True
        except:
            is_valid = False
    
    return is_valid


In [None]:
def s3_bucket_exists(s3_bucket) -> bool:
    return True


In [None]:
# reading source data file

def read_source(path, start_dt, final_dt) -> DataFrame:
        
    source_df = (spark.read
                    .format('csv')
                    .option('header', True)
                    .load(path + '*.csv')
                    .filter(f" expiry_date >= '{start_dt}' and expiry_date <= '{final_dt}' ")
                )

    return source_df


In [None]:
# transforming
def transform(data_frame) -> DataFrame:
      
      # apply mapping
      dyf = DynamicFrame.fromDF(data_frame, glueContext, "dyf")

      mappings = [('operation', 'string', 'operation', 'char(1)'), 
                  ('policy_id', 'string', 'policy_id', 'bigint'), 
                  ('expiry_date', 'string', 'expiry_date', 'date'), 
                  ('location_name', 'string', 'location_name', 'string'), 
                  ('state_code', 'string', 'state_code', 'string'), 
                  ('region_name', 'string', 'region_name', 'string'), 
                  ('insured_value', 'string', 'insured_value', 'double'), 
                  ('business_type', 'string', 'business_type', 'string'), 
                  ('earthquake', 'string', 'earth_quake', 'char(1)'), 
                  ('flood', 'string', 'flood', 'char(1)')]
#                  ('update_date', 'string', 'update_date', 'date')]

      dyf = dyf.apply_mapping(mappings)
      data_frame = dyf.toDF()

      data_frame = (data_frame
                        .withColumn('file_name',      F.input_file_name())
                        .withColumn('year_month_day', F.expr("substring(file_name, length(file_name) -11, 8)"))
                        .withColumn('year',           F.expr("substring(year_month_day, 1, 4)"))
                        .withColumn('month',          F.expr("substring(year_month_day, 5, 2)"))
                        .withColumn('day',            F.expr("substring(year_month_day, 7, 2)"))
                        .drop('operation')
                        .dropDuplicates()
                  )

      target_df = data_frame.select([F.col(c) for c in data_frame.columns])
      return target_df


In [None]:
# loading

def historical_load(target_df, path):

    try:
        (target_df.write
            .format('delta')
            .mode('overwrite') 
            .partitionBy(['year', 'month', 'day'])
            .option("overwriteSchema", "true")
            .option("path", path)
            .save()
        )
    except:
        print(f"**** Error saving into the bucket {path}")
        sys.exit(-1)
        raise
        

In [None]:
# upsert

def delta_load(delta_df, primary_keys, path):

    try:
        target_df = DeltaTable.forPath(spark, path)
    except:
        print('**** Target S3 target folder has not found.')
        sys.exit(-1)
        raise

    try:
        (target_df.alias('target')
                .merge( source    = delta_df.alias('delta'),
                        condition = F.expr(dedup_keys_str(primary_keys)))
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
        ).execute()
    except:
        print(f"**** Error upserting into bucket {path}")
        sys.exit(-1)
        raise


In [None]:
def main(args) -> None:
    environment  = args['environment']
    s3_bucket    = args['s3_bucket']
    start_date   = args['start_date']
    final_date   = args['final_date']

    ingestion    = 'raw-data'
    catalog      = 'glue-catalog'
    database     = 'insurance_db'
    table_name   = 'policy'
    primary_keys = ['policy_id']

    full_source_path = s3_bucket + ingestion +'/'+ database +'/'+ table_name + '/full-load/'
    cdc_source_path  = s3_bucket + ingestion +'/'+ database +'/'+ table_name + '/cdc-load/'
    target_path      = s3_bucket + catalog +'/'+ database +'/'+ table_name + '/'

    if not s3_bucket_exists(s3_bucket):
        print('**** Bucket name is invalid.')
        sys.exit(-1)
        raise
        
    if not start_date:
        start_date = (datetime.date.today() - timedelta.days(1)).strftime('%Y-%m-%d')
    elif not is_valid_date(start_date):
        print('**** Start date is invalid.')
        sys.exit(-1)
        raise
        
    if not final_date:
        final_date = datetime.date.today().strftime('%Y-%m-%d')
    elif not is_valid_date(final_date):
        print('**** Final date is invalid.')
        sys.exit(-1)
        raise


    if delta_table_exists(target_path):
        source_path = cdc_source_path
    else: 
        source_path = full_source_path
        
    print('Start date: ', start_date)
    print('Final date: ', final_date)
    print('Source path:', source_path)
    print('Target path:', target_path)


    src_df = read_source(source_path, start_date, final_date)
    final_df = transform(src_df)
    
    final_df.show()
    
    if delta_table_exists(target_path):
        print(' >>> Delta loading')
        delta_load(final_df, primary_keys, target_path)
    else:
        print(' >>> Historiccal loading')
        historical_load(final_df, target_path)
        
    delta_df = spark.read.format('delta').load(target_path)
    delta_df.select('year_month_day').distinct().show()
    

In [None]:
   
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'environment', 's3_bucket', 'start_date', 'final_date'])

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

main(args)

job.commit()

In [None]:
%stop_session