In [0]:
#Importing required libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import col,avg,sum,min,max,desc,dense_rank,current_timestamp

In [0]:
#Spark object
spark = SparkSession.builder.appName("ETL1").getOrCreate()

In [0]:
#Reading data from the given source database table as per provided query
def read_data(source_db_url,source_user,source_pwd,query):
    df=spark.read.format("jdbc")\
        .option("url", source_db_url)\
        .option("user",source_user)\
        .option("password",source_pwd)\
        .option("driver","org.postgresql.Driver")\
        .option("query", query)\
        .load()
    return df

In [0]:
## DQ Checks
## Check Duplicates in df  and nulls in sal
def data_quality(dq_df):
    filtered_df = dq_df.dropna(subset="emp_salary").dropDuplicates()
    return filtered_df

In [0]:
## Transform the df apply aggregations 
def transform_source_df(source_df):
    windowSpecAgg = Window.partitionBy("emp_department").orderBy(desc("emp_salary"))
    transformed_df = source_df.withColumn("rank", dense_rank().over(windowSpecAgg)) \
        .withColumn("avg_sal", avg(col("emp_salary")).over(windowSpecAgg)) \
        .withColumn("sum_sal", sum(col("emp_salary")).over(windowSpecAgg)) \
        .withColumn("min_sal", min(col("emp_salary")).over(windowSpecAgg)) \
        .withColumn("max_sal", max(col("emp_salary")).over(windowSpecAgg)) \
        .where(col("rank") == 1).select("emp_department", "avg_sal", "sum_sal", "min_sal", "max_sal", col("emp_id").alias("highest_sal_emp_id"), col("emp_name").alias("highest_sal_emp_name")) 
    return transformed_df

In [0]:
## Write to the destination database table 
def write_to_postgres(final_df,database_url,postgres_table,properties,mode_of):
    try:
        final_df.write.jdbc(url=database_url, table=postgres_table, mode=mode_of, properties=properties)
        return "Successfully loaded into postgres"
    except Exception as e:
        print(e)
        return "Failed to load into Postgres"

In [0]:
#Get Secrets from Databricks backed key-vault
destination_db_url = dbutils.secrets.get('usecaseScope','destination_database_url')
destination_user =  dbutils.secrets.get('usecaseScope','destination_user')
destination_pwd =   dbutils.secrets.get('usecaseScope','destination_password')
destination_table = dbutils.secrets.get('usecaseScope','destination_table')
checkpoint_table = dbutils.secrets.get('usecaseScope','checkpoint_table')
source_db_url = dbutils.secrets.get('usecaseScope','source_database_url')
source_user = dbutils.secrets.get('usecaseScope','source_user')
source_pwd = dbutils.secrets.get('usecaseScope','source_password')
source_table = dbutils.secrets.get('usecaseScope','source_table')

properties = {
    "user": destination_user ,
    "password":destination_pwd ,
    "driver": "org.postgresql.Driver"}
checkpoint_query = f"select * from {checkpoint_table}"

try :
    max_id_df = read_data(destination_db_url,destination_user,destination_pwd,checkpoint_query)
    if not max_id_df.isEmpty():
        max_id = max_id_df.select("emp_id").collect()[0][0]
        query= f"select * from {source_table} where emp_id>{max_id}"
    else:
        query = f"select * from {source_table}"
        
    #print(query)

    source_df = read_data(source_db_url,source_user,source_pwd,query)

    if not source_df.isEmpty():
        get_max_empid = source_df.select(max("emp_id").alias("emp_id"))
        ckp = write_to_postgres(get_max_empid,destination_db_url,checkpoint_table,properties,"overwrite")
        if ckp:
            print("Successfully updated checkpoint")

        data_quality_df = data_quality(source_df)
        #display(data_quality_df)
        transform_df = transform_source_df(data_quality_df)
        final_df = transform_df.withColumn("timestamp",current_timestamp())
        load = write_to_postgres(final_df,destination_db_url,destination_table,properties,"append")
        print(load)

except Exception as e:
    raise Exception(e)

select * from [REDACTED] where emp_id>10000
Successfully updated checkpoint
Successfully loaded into postgres
