In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import trim, lower,concat_ws, collect_list

In [0]:
spark = SparkSession.builder \
    .appName("data_modelling") \
    .getOrCreate()

In [0]:
df_read = spark.read.format("jdbc").option("url", "jdbc:sqlserver://destination-landing-db.database.windows.net:1433;database=destination").option("dbtable", 'curated_layer.transformed_data').option("user", 'adminuser@destination-landing-db').option("password", 'Test1234').load() 

In [0]:
# df_read.select('StockCode', 'Description') \
#     .withColumn('StockCodeTrimmed', trim('StockCode')) \
#     .withColumn('StockCodeLowerCase', lower('StockCodeTrimmed')) \
#     .groupBy('StockCode', 'StockCodeTrimmed', 'StockCodeLowerCase', 'Description') \
#     .count() \
#     .filter('count > 1') \
#     .show()



In [0]:
def product_table(df_read):
    df_product_columns = ['StockCode','Description']
    df_product_columns_selected = df_read.select(*df_product_columns)
    df_cleaned = df_product_columns_selected.withColumn('StockCode', trim(lower(col('StockCode'))))
    # df_cleaned_no_null_description = df_cleaned.filter(col('Description').isNotNull() & (col('Description') != ""))
    df_concatenated_descriptions = df_cleaned.groupBy('StockCode').agg(concat_ws(", ", collect_list('Description')).alias('Description'))
    df_product_columns_selected2 = df_concatenated_descriptions.dropDuplicates(['StockCode'])
    return df_product_columns_selected2

df_product_columns_selected2 = product_table(df_read)

def product_table_mapping(df_product_columns_selected2):
    product_column_mapped = {"StockCode":"StockCode","Description":"Description"}
    selected_columns = [col(c).alias(product_column_mapped.get(c,c)) for c in product_column_mapped]
    product_mapped = df_product_columns_selected2.select(*selected_columns)
    return product_mapped

product_mapped = product_table_mapping(df_product_columns_selected2)

def write_product(product_mapped):
    product_mapped.write\
    .format("jdbc")\
    .mode("append")\
    .option("url","jdbc:sqlserver://destination-landing-db.database.windows.net:1433;database=destination")\
    .option("dbtable","data_model.product")\
    .option("user","adminuser")\
    .option("password","Test1234")\
    .option("mode","append")\
    .save()


write_product(product_mapped)

In [0]:
def customer_table(df_read):
    df_customer_columns = ['CustomerID','Country']
    df_customer_columns_selected = df_read.select(*df_customer_columns)
    df_customer_columns_selected = df_customer_columns_selected.dropDuplicates(['CustomerID'])
    return df_customer_columns_selected

df_customer_columns_selected = customer_table(df_read)

def customer_table_mapping(df_customer_columns_selected):
    customer_column_mapped = {"CustomerID":"CustomerID","Country":"Country"}
    selected_columns = [col(c).alias(customer_column_mapped.get(c,c)) for c in customer_column_mapped]
    customer_mapped = df_customer_columns_selected.select(*selected_columns)
    return customer_mapped

customer_mapped = customer_table_mapping(df_customer_columns_selected)

def write_customer(customer_mapped):
    customer_mapped.write\
    .format("jdbc")\
    .mode("append")\
    .option("url","jdbc:sqlserver://destination-landing-db.database.windows.net:1433;database=destination")\
    .option("dbtable","data_model.customer")\
    .option("user","adminuser")\
    .option("password","Test1234")\
    .option("mode","append")\
    .save()

write_customer(customer_mapped)

In [0]:
def invoice_metadata_table(df_read):
    df_invoice_metadata_columns = ['InvoiceNumber','InvoiceDate','CustomerID','StockCode']
    df_invoice_metadata_columns_selected = df_read.select(*df_invoice_metadata_columns)
    df_cleaned = df_invoice_metadata_columns_selected.withColumn('StockCode', trim(lower(col('StockCode'))))
    df_invoice_metadata_columns_selected2 = df_cleaned.dropDuplicates()
    return df_invoice_metadata_columns_selected2
df_invoice_metadata_columns_selected2 = invoice_metadata_table(df_read)


def invoice_metadata_table_mapping(df_invoice_metadata_columns_selected2):
    invoice_metadata_column_mapped = {"InvoiceNumber":"InvoiceNumber","InvoiceDate":"InvoiceDate","CustomerID":"CustomerID","StockCode":"StockCode"}
    selected_columns = [col(c).alias(invoice_metadata_column_mapped.get(c,c)) for c in invoice_metadata_column_mapped]
    invoice_metadata_mapped = df_invoice_metadata_columns_selected2.select(*selected_columns)
    return invoice_metadata_mapped

invoice_metadata_mapped = invoice_metadata_table_mapping(df_invoice_metadata_columns_selected2)

def write_invoice_metadata(invoice_metadata_mapped):
    invoice_metadata_mapped.write\
    .format("jdbc")\
    .mode("append")\
    .option("url","jdbc:sqlserver://destination-landing-db.database.windows.net:1433;database=destination")\
    .option("dbtable","data_model.invoice_metadata")\
    .option("user","adminuser")\
    .option("password","Test1234")\
    .option("mode","append")\
    .save()

write_invoice_metadata(invoice_metadata_mapped)

In [0]:
def invoice_details_table(df_read):
    df_invoice_details_columns = ['InvoiceNumber','Cancellation','Quantity','UnitPrice','InvoiceDate']
    df_invoice_details_columns_selected = df_read.select(*df_invoice_details_columns)
    return df_invoice_details_columns_selected

df_invoice_details_columns_selected = invoice_details_table(df_read)

def invoice_details_table_mapping(df_invoice_details_columns_selected):
    invoice_details_column_mapped = {"InvoiceNumber":"InvoiceNumber","Cancellation":"Cancellation","Quantity":"Quantity","UnitPrice":"UnitPrice","InvoiceDate":"InvoiceDate"}
    selected_columns = [col(c).alias(invoice_details_column_mapped.get(c,c)) for c in invoice_details_column_mapped]
    invoice_details_mapped = df_invoice_details_columns_selected.select(*selected_columns)
    return invoice_details_mapped

invoice_details_mapped = invoice_details_table_mapping(df_invoice_details_columns_selected)

def write_invoice_details(invoice_details_mapped):
    invoice_details_mapped.write\
    .format("jdbc")\
    .mode("append")\
    .option("url","jdbc:sqlserver://destination-landing-db.database.windows.net:1433;database=destination")\
    .option("dbtable","invoice_details")\
    .option("user","adminuser")\
    .option("password","Test1234")\
    .option("mode","append")\
    .save()

write_invoice_details(invoice_details_mapped)