In [45]:
from os import environ as env
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [46]:
spark = SparkSession.builder.master("local[1]") \
                    .appName("My ETL") \
                    .config("spark.jars.packages", "com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre11") \
                    .config("spark.jars", env["SPARK_JAR_PATH"]) \
                    .getOrCreate()

print("Spark Version: "+ spark.version)
print("Spark App Name: "+ spark.sparkContext.appName)

Spark Version: 3.4.0
Spark App Name: My ETL


Read SQL Server Table to PySpark DataFrame

Extract

In [47]:
def extract(tbl_name, schema="dbo", output_path="/tmp"):
    
    server_name = env["MSSQL_HOST"]
    database_name = env["MSSQL_DB"]
    username = env["MSSQL_SA_USER"]
    password = env["MSSQL_SA_PASSWORD"]
    port = env["MSSQL_PORT"]
    jdbc_url = f"jdbc:sqlserver://{server_name}:{port};database={database_name};user={username};password={password};trustServerCertificate=true;"

    try:
        sql_query = f"""SELECT * FROM {schema}.{tbl_name}"""
        
        df_src = spark.read.format("jdbc") \
            .option("url", jdbc_url) \
            .option("query", sql_query) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .load()

        # print(f"Total rows: {df_src.count()}")
        # df_src.show(5, truncate=False)
        # df_src.printSchema()

        final_output_path= f"{output_path}/{tbl_name}"

        df_src.write.format("csv") \
            .option("header", "true") \
            .option("sep", ";") \
            .mode("overwrite") \
            .save(final_output_path)

        print(f"Datos cargados existosamente en {final_output_path}")
        return final_output_path
    except Exception as e:
        print("Error al extraer datos: " + str(e))

Transform

In [48]:
def transform(input_path):
    try:
        df = spark.read.format("csv") \
            .option("header", "true") \
            .option("sep", ";") \
            .load(input_path) \
            .select("CustomerKey" 
                ,"CustomerAlternateKey" 
                ,"FirstName" 
                ,"LastName" 
                ,"BirthDate" 
                ,"EmailAddress"
            )

        tranformed_df = df.withColumn("FullName", concat_ws(" ", df.FirstName, df.LastName)) \
            .withColumn("DBSource", lit("AdventureWorks")) \
            .withColumn("IngestionDate", current_date())

        tranformed_df = tranformed_df.select("CustomerKey" 
                ,"CustomerAlternateKey" 
                ,"FirstName" 
                ,"LastName"
                ,"FullName" 
                ,"BirthDate" 
                ,"EmailAddress"
                ,"DBSource"
                ,"IngestionDate")
        
        return tranformed_df
    except Exception as e:
        print("Error al transformar los datos: " + str(e))

Load

In [None]:
def load(df, tbl_name):
    try:
        jdbc_url = f"jdbc:postgresql://{env['POSTGRES_HOST']}:{env['POSTGRES_PORT']}/{env['POSTGRES_DB']}?user={env['POSTGRES_USER']}&password={env['POSTGRES_PASSWORD']}"

        df.write.mode("overwrite") \
                    .format("jdbc") \
                    .option("url", jdbc_url) \
                    .option("dbtable", "stg_"+tbl_name) \
                    .option("driver", "org.postgresql.Driver") \
                    .save()
        print(f"Datos cargados existosamente en la tabla stg_{tbl_name}")
    except Exception as e:
        print("Error al cargar los datos: " + str(e))

In [50]:
def run_etl(src_table, tgt_table):
    input_path=extract(src_table)

    df = transform(input_path)

    tbl_name = tgt_table

    load(df, tbl_name)

In [51]:
run_etl("DimCustomer", "dim_customer")

Datos cargados existosamente en /tmp/DimCustomer


In [52]:
# Stop the Spark session
spark.sparkContext.stop()