#####Requirements:-
* Dataset used: AdventureWorks

####Step 1: Extract Table from Azure SQL

In [0]:
jdbcHostname = ""
jdbcPort = 1433
jdbcDatabase = ""
jbdcUsername = ""
jdbcPassword = ""
jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase};user={jdbcUsername};password={jdbcPassword}"


In [0]:
df_product = spark.read.format("jdbc")\
    .option("url", jdbcUrl)\
    .option("dbtable", "SalesLT.Product")\
    .load()

display(df_product)

In [0]:
df_sales = spark.read.format("jdbc")\
    .option("url", jdbcUrl)\
    .option("dbtable", "SalesLT.SalesOrderDetail")\
    .load()

display(df_sales)

####Step 2:Transform the data as per Business rules

In [0]:
df_product_cleansed = df_product.na.fill({"Size" : "M", "Weight" : 100})

display(df_product_cleansed)

In [0]:
df_sales_cleansed = df_sales.dropDuplicates()

display(df_sales_cleansed)

In [0]:
df_join = df_sales_cleansed \
    .join(df_product_cleansed, df_sales_cleansed.ProductID == df_product_cleansed.ProductID, "leftouter")\
    .select(df_sales_cleansed.ProductID,
            df_sales_cleansed.UnitPrice,
            df_sales_cleansed.LineTotal,
            df_product_cleansed.Name,
            df_product_cleansed.Color,
            df_product_cleansed.Size,
            df_product_cleansed.Weight,
        )
    
display(df_join)

In [0]:
df_agg = df.join.groupBy(["ProductID","Name","Color","Size","Weight"])\
    .sum("LineTotal")\
    .withColumnRenamed("sum(LineTotal)", "sum_total_sales")

display(df_agg)

####Step 3: Load Transformed Data into Azure Data Lake Storage (ADLS)

In [0]:
dbutils.fs.mount(
    source = "wasbs://<container name>@<storage account>.blob.core.windows.net",
    mount_point = "/mnt/<mount name>",
    extra_configs = ("fs.azure.account.key.<storage account>.blob.core.windows.net":"<access key>")
)

In [0]:
dbutils.fs.ls("<mount_point>")

In [0]:
df_agg.write.format("parquet")\
    .save("<mount_point>/adv_work_parquet/")

In [0]:
df_agg.write.format("parquet")\
    .save("<mount_point>/adv_work_csv/")

####End of Pipeline