# ETL Ingestion

## Setup environment and import libraries

### Upload the jdbc driver in S3 and declare the path

In [1]:
%%configure -f

{
    "conf": {
        "spark.jars": "s3://aws-glue-assets-130835040051-ap-southeast-1/jars/postgresql-42.3.3.jar,s3://aws-glue-assets-130835040051-ap-southeast-1/jars/emr_redshift_spark/minimal-json.jar,s3://aws-glue-assets-130835040051-ap-southeast-1/jars/emr_redshift_spark/RedshiftJDBC.jar,s3://aws-glue-assets-130835040051-ap-southeast-1/jars/emr_redshift_spark/spark-avro.jar,s3://aws-glue-assets-130835040051-ap-southeast-1/jars/emr_redshift_spark/spark-redshift.jar"
    }
}



In [2]:

import os
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, col, udf

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.2,io.github.spark-redshift-community:spark-redshift_2.11:4.0.1 pyspark-shell'


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
88,application_1667433809375_0087,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Install boto3 in cluster first
`sudo pip3 install -U boto3`

In [3]:

#sc.install_pypi_package("boto3")
import boto3
import botocore

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Find latest event time from a dataframe

In [4]:
def find_latest_event_time (inputDF, column_name,spark):
    inputDF.createOrReplaceTempView("df_table")
    sql_string = "SELECT MAX(" + column_name + ") as maxval FROM df_table"
    return spark.sql(sql_string).collect()[0].asDict()['maxval']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Uppercase functions as UDF

In [5]:

from pyspark.sql.functions import udf
@udf
def my_uppercase(my_string):
    return my_string.upper()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Get bookmark value from Redshift Bookmark table

In [6]:

def get_bookmark_value(v_bookmark_job_id):
    
    sql = "select job_id,bookmark_col, bookmark_val from etlfw_rs.bookmark_table where job_id ='" + v_bookmark_job_id + "'"
    v_cluster_identifier = 'lakehouse-redshift-cluster'
    v_secret_arn = 'arn:aws:secretsmanager:ap-southeast-1:130835040051:secret:prod/redshift-DqEEI9'
    v_database_name = 'dev'
    
    rsd = boto3.client('redshift-data', region_name='ap-southeast-1')

    resp = rsd.execute_statement(
        Database=v_database_name,
        ClusterIdentifier=v_cluster_identifier,
        SecretArn=v_secret_arn,
        Sql=sql
    )
    qid = resp["Id"]
    
    while True:
        desc = rsd.describe_statement(Id=qid)
        if desc["Status"] == "FINISHED":
            break
            print(desc["ResultRows"])

    if desc and desc["ResultRows"]  > 0:
        result = rsd.get_statement_result(Id=qid)
    
    return result["Records"][0][2]["stringValue"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Update bookmark value with new information

In [7]:
def update_bookmark_value(v_bookmark_job_id, v_new_bookmark_val):
    
    if v_new_bookmark_val:
        sql = "update etlfw_rs.bookmark_table set bookmark_val='" + v_new_bookmark_val + "' where job_id ='" + v_bookmark_job_id + "'"
        v_cluster_identifier = 'lakehouse-redshift-cluster'
        v_secret_arn = 'arn:aws:secretsmanager:ap-southeast-1:130835040051:secret:prod/redshift-DqEEI9'
        v_database_name = 'dev'

        rsd = boto3.client('redshift-data', region_name='ap-southeast-1')

        resp = rsd.execute_statement(
            Database=v_database_name,
            ClusterIdentifier=v_cluster_identifier,
            SecretArn=v_secret_arn,
            Sql=sql
        )
        qid = resp["Id"]
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Get bookmark value from previous run

In [8]:
previous_bookmark_val = get_bookmark_value("ecommerce_customer_activity_ts")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read from Postgres database with customized filtering

In [11]:
db_url = "jdbc:postgresql://lakehouse-source-db.cluster-cxoaow30vcfp.ap-southeast-1.rds.amazonaws.com:5432/lakehouse_source_db"
db_query = "select item_id,user_id,event_type,event_time,discount from ecommerce.customer_activity_ts where event_time>to_timestamp('" + previous_bookmark_val +"', 'YYYY-MM-DD HH24:MI:SS.US')"


jdbcDF = spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", db_url) \
    .option("query", db_query) \
    .option("user", "postgres") \
    .option("password", "Password123") \
    .option("fetchsize", 10000) \
    .load()

print(db_query)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

select item_id,user_id,event_type,event_time at time zone 'Asia/Jakarta' as event_time,discount from ecommerce.customer_activity_ts where event_time>to_timestamp('2022-11-04 11:02:57.472228', 'YYYY-MM-DD HH24:MI:SS.US')

In [12]:
jdbcDF.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+----------+----------+--------+
|item_id|user_id|event_type|event_time|discount|
+-------+-------+----------+----------+--------+
+-------+-------+----------+----------+--------+

### Transform using a UDF function to set as Uppercase

In [None]:
outputDF = jdbcDF.withColumn('event_type',my_uppercase(col("event_type")))
outputDF.show(10)

### Use function to find max value in a dataframe

In [None]:
new_bookmark_val = find_latest_event_time(outputDF,"event_time",spark)
print(new_bookmark_val)

## Save in Redshift

https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-redshift.html

In [None]:


db_url = "jdbc:redshift://lakehouse-redshift-cluster.c8opzqzu3aka.ap-southeast-1.redshift.amazonaws.com:5439/dev"

outputDF.write \
  .format("io.github.spark_redshift_community.spark.redshift") \
  .mode("append") \
  .option("url", db_url) \
  .option("user", "rs_admin") \
  .option("password", "Password123") \
  .option("dbtable", "ecommerce.customer_activity_ts") \
  .option("aws_iam_role", "arn:aws:iam::130835040051:role/LakeHouseRedshiftGlueAccessRole") \
  .option("tempdir", "s3://aws-emr-resources-130835040051-ap-southeast-1/tempfolder/") \
  .save()

### Update bookmark value with latest value

In [None]:
update_bookmark_value("ecommerce_customer_activity_ts",str(new_bookmark_val))