In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os
import sys
from pyspark.sql.functions import *

set java home and vars

In [2]:
#set java home
os.environ["JAVA_HOME"]="C:\Program Files\Java\jdk-19"

set Spark configs detail

In [3]:
conf = SparkConf()\
        .setAppName("ETLPipeline")\
        .setMaster("local")\
        .set("spark.driver.extraClassPath","C:/pyspark/*")

initial spark session

In [4]:
sc  =SparkContext.getOrCreate(conf=conf)
etl =SparkSession(sc)

In [5]:
etl

# set databases detail

In [6]:
#get password from environmnet var
pwd = os.environ['PGPASS']
uid = os.environ['PGUID']
#sql db details
server = "localhost"
src_db = "AdventureWorksDW2019"
target_db = "AdventureWorks"
src_driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
target_driver = "org.postgresql.Driver"


In [25]:
#source connection
src_url = f"jdbc:sqlserver://{server}:1433;databaseName={src_db};user={uid};password={pwd};trustServerCertificate=true;"
#target connection
target_url =f"jdbc:postgresql://{server}:5432/{target_db}?user=etl&password=demopass?client"

# sql statement and test

In [16]:
sql = """select  t.name as table_name from sys.tables t 
where t.name in ('DimProduct','DimProductSubcategory','DimProductCategory','DimSalesTerritory','FactInternetSales') """

In [17]:
# Let's test our connection
dfs=etl.read. \
    format("jdbc"). \
    options(driver=src_driver, user=uid, password=pwd, url=src_url, query=sql). \
    load()
dfs.show()

+--------------------+
|          table_name|
+--------------------+
|          DimProduct|
|  DimProductCategory|
|DimProductSubcate...|
|   DimSalesTerritory|
|   FactInternetSales|
+--------------------+



In [18]:
data_collect = dfs.collect()
# looping thorough each row of the dataframe
for row in data_collect:
    # while looping through each
    # row printing the data of table_name
    print(row["table_name"])

DimProduct
DimProductCategory
DimProductSubcategory
DimSalesTerritory
FactInternetSales


In [19]:
def extract():
    try:
        dfs=etl.read. \
            format("jdbc"). \
            options(driver=src_driver,user=uid, password=pwd,url=src_url,query=sql). \
            load()
        # get table names
        data_collect = dfs.collect()
        # looping thorough each row of the dataframe
        for row in data_collect:
        # while looping through each
        # row printing the data of table_name
            print(row["table_name"])
            tbl_name = row["table_name"]
            df = etl.read \
            .format("jdbc") \
            .option("driver", src_driver) \
            .option("user", uid) \
            .option("password", pwd) \
            .option("url", src_url) \
            .option("dbtable", f"dbo.{tbl_name}") \
            .load()
            #print(df.show(10))
            load(df, tbl_name)
            print("Data loaded successfully")
    except Exception as e:
        print("Data extract error: " + str(e))

In [20]:
def load(df, tbl):
    try:
        rows_imported = 0
        print(f'importing rows {rows_imported} to {rows_imported + df.count()}... for table {tbl}')
        df.write.mode("overwrite") \
        .format("jdbc") \
        .option("url", target_url) \
        .option("user", uid) \
        .option("password", pwd) \
        .option("driver", target_driver) \
        .option("dbtable", "src_" + tbl) \
        .save()
        print("Data imported successful")
        rows_imported += df.count()
    except Exception as e:
        print("Data load error: " + str(e))


In [27]:
extract()

DimProduct
importing rows 0 to 606... for table DimProduct
Data imported successful
Data loaded successfully
DimProductCategory
importing rows 0 to 4... for table DimProductCategory
Data imported successful
Data loaded successfully
DimProductSubcategory
importing rows 0 to 37... for table DimProductSubcategory
Data imported successful
Data loaded successfully
DimSalesTerritory
importing rows 0 to 11... for table DimSalesTerritory
Data imported successful
Data loaded successfully
FactInternetSales
importing rows 0 to 60398... for table FactInternetSales
Data imported successful
Data loaded successfully
