In [1]:
!pip install pyspark



In [1]:
import pandas as pd
import os
import glob
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataEngineerAccelerator").getOrCreate()


In [3]:
product_df = spark.read.csv("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Product/date=20220606/Product.csv", header="True")
productcategory_df = spark.read.csv("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/ProductCategory/date=20220606/ProductCategory.csv", header="True")
sales_df = spark.read.csv("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Sales/*", header="True")

def read_csv_to_df(file_path):
    df = spark.read.csv(file_path, header="True")
    return df

read_csv_to_df("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/ProductCategory/date=20220606/ProductCategory.csv").show()

+---+----------+---------------+
| ID|    Level1|         Level2|
+---+----------+---------------+
|  1|     bikes| mountain bikes|
|  2|     bikes|     road bikes|
|  3|     bikes|  touring bikes|
|  4|components|     handlebars|
|  5|components|bottom brackets|
|  6|components|         brakes|
|  7|components|         chains|
|  8|components|      cranksets|
|  9|components|    derailleurs|
| 10|components|          forks|
| 11|components|       headsets|
| 12|components|mountain frames|
| 13|components|         pedals|
| 14|components|    road frames|
| 15|components|        saddles|
| 16|components| touring frames|
| 17|components|         wheels|
| 18|  clothing|     bib-shorts|
| 19|  clothing|           caps|
| 20|  clothing|         gloves|
+---+----------+---------------+
only showing top 20 rows



# Writing files to Cleansed

In [4]:

current_date_df = spark.range(1).select(to_timestamp(current_timestamp()).alias("current_date"))
current_date_df.show()

year_df = current_date_df.select(year("current_date").alias("submission_year"))
month_df = current_date_df.select(month("current_date").alias("submission_month"))
day_df = current_date_df.select(day("current_date").alias("submission_day"))
hour_df = current_date_df.select(hour("current_date").alias("submission_hour"))
minute_df = current_date_df.select(minute("current_date").alias("submission_minute"))
second_df = current_date_df.select(second("current_date").alias("submission_second"))

submission_year = year_df.first()["submission_year"]
submission_month = month_df.first()["submission_month"]
submission_day = day_df.first()["submission_day"]
submission_hour = hour_df.first()["submission_hour"]
submission_minute = minute_df.first()["submission_minute"]
submission_second = second_df.first()["submission_second"]

+--------------------+
|        current_date|
+--------------------+
|2023-10-18 10:22:...|
+--------------------+



In [5]:
def DataFeed(file_path):
    df = spark.read.csv(file_path, header="True")
    df = df.withColumn("sourcefile",input_file_name())
    df = df.withColumn("sourcefile", substring_index("sourcefile","/", -1))
    df = df.withColumn("sourcefile", split(col("sourcefile"), "\\.")[0]).select("sourcefile").distinct()
    df = df.withColumn("sourcefile", split(col("sourcefile"), "%")[0]).select("sourcefile").distinct()
    df = df.first()["sourcefile"]
    return df

data_feed = DataFeed("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Sales/date=20220607")
print(data_feed)

Sales


In [6]:
df = spark.read.csv("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Product", header="True")
df = df.withColumn("sourcefile",input_file_name())
df = df.withColumn("sourcefile", substring_index("sourcefile","/", -1))
df = df.withColumn("sourcefile", split(col("sourcefile"), "\\.")[0]).select("sourcefile").distinct()
df = df.withColumn("sourcefile", split(col("sourcefile"), "%")[0]).select("sourcefile").distinct()

df.show()
df = df.first()["sourcefile"]
print(df)

+----------+
|sourcefile|
+----------+
|   Product|
+----------+

Product


In [7]:
cleansed_path = f"Cleansed/DataFeed={data_feed}/schemaVersion=1/SubmissionYear={submission_year}/SubmissionMonth={submission_month}/SubmissionDay={submission_day}/SubmissionHour={submission_hour}/SubmissionMinute={submission_minute}/SubmissionSecond={submission_second}"
print(cleansed_path)

Cleansed/DataFeed=Sales/schemaVersion=1/SubmissionYear=2023/SubmissionMonth=10/SubmissionDay=18/SubmissionHour=10/SubmissionMinute=22/SubmissionSecond=26


# Column mapping

In [2]:
productcategory_mapping = {
    "ID" : ("ProductCategoryID", "string"),
    "Level1" : ("ProductCategoryName", "string"),
    "Level2" : ("ProductSubCategory", "string")
}

product_mapping = {
    "ProductID" : ("SourceProductID", "string"),
    "ModelName" : ("ProductModelName", "string"),
    "ProductName" : ("ProductName", "string"),
    "ProductColor" : ("ProductColor", "string"),
    "Size" : ("ProductSize", "string"),
    "ProductWeight" : ("ProductWeightKilograms", "double"),
    "UniqueProductNumber" : ("ProductUID", "string"),
    "ProductCategoryID" : ("ProductCategoryID", "string"),
    "StandardCost" : ("ProductCost", "double"),
    "ListPrice" : ("ProductListPrice", "double"),
    "ProductLine" : ("ProductLine", "string"),
    "Class" : ("ProductClass", "string"),
    "Style" : ("ProductStyle", "string"),
    "ProductMakeFlag" : ("ProductMakeFlag", "string"),
    "ProductFinishedGoodsFlag" : ("IsFinishedGood", "boolean"),
    "SellStartDate" : ("ProductSellingStartDate", "string"),
    "SellEndDate" : ("ProductSellingEndDate", "string"),
    "ProductID" : ("SourceProductID", "string"),
    "DiscontinuedDate" : ("ProductDiscontinuedDate", "string"),
    "ModifiedDate" : ("ModifiedDate", "string")
}

sales_mapping = {
    "SalesOrderID" : ("SalesOrderID", "string"),
    "SalesOrderDetailID" : ("SalesOrderLineID", "string"),
    "SalesOrderLineID" : ("SalesOrderNumber", "string"),
    "OrderDate" : ("SalesOrderDate", "string"),
    "ProductID" : ("ProductID", "string"),
    "SpecialOfferID" : ("SpecialOfferID", "string"),
    "SpecialOfferName" : ("SpecialOfferName", "string"),
    "SpecialOfferDiscountPct" : ("SpecialOfferDiscountPct", "string"),
    "SpecialOfferType" : ("SpecialOfferType", "string"),
    "SpecialOfferCategory" : ("SpecialOfferCategory", "string"),
    "SpecialOfferStartDate" : ("SpecialOfferStartDate", "string"),
    "SpecialOfferEndDate" : ("SpecialOfferEndDate", "string"),
    "OrderQty" : ("TotalUnits", "int"),
    "UnitPrice" : ("UnitPrice", "double"),
    "UnitPriceDiscount" : ("UnitPriceDiscountPercentage", "double"),
    "LineTotal" : ("TotalLineValue", "double")
}

# Product Category

In [9]:
cleansed_productcategory_df = productcategory_df.select(
    *[
        col(source_col).alias(target_col).cast(target_data_type)
        for source_col, (target_col, target_data_type) in productcategory_mapping.items()
    ]
)

for source_col, (target_col, target_data_type) in productcategory_mapping.items():
    if target_data_type == "string":
        cleansed_productcategory_df = cleansed_productcategory_df.withColumn(target_col, trim(initcap(col(target_col))))
        

cleansed_productcategory_df.show()


+-----------------+-------------------+------------------+
|ProductCategoryID|ProductCategoryName|ProductSubCategory|
+-----------------+-------------------+------------------+
|                1|              Bikes|    Mountain Bikes|
|                2|              Bikes|        Road Bikes|
|                3|              Bikes|     Touring Bikes|
|                4|         Components|        Handlebars|
|                5|         Components|   Bottom Brackets|
|                6|         Components|            Brakes|
|                7|         Components|            Chains|
|                8|         Components|         Cranksets|
|                9|         Components|       Derailleurs|
|               10|         Components|             Forks|
|               11|         Components|          Headsets|
|               12|         Components|   Mountain Frames|
|               13|         Components|            Pedals|
|               14|         Components|       Road Frame

# Product

In [10]:
cleansed_product_df = product_df.select(
    *[
        col(source_col).alias(target_col).cast(target_data_type)
        for source_col, (target_col, target_data_type) in product_mapping.items()
    ]
)

for source_col, (target_col, target_data_type) in product_mapping.items():
    if target_data_type == "string":
        cleansed_product_df = cleansed_product_df.withColumn(target_col, trim(initcap(col(target_col))))
    if target_data_type == "double":
        cleansed_product_df = cleansed_product_df.withColumn(target_col, round(col(target_col), 2))
    cleansed_product_df = cleansed_product_df.fillna("None")
cleansed_product_df = cleansed_product_df.withColumn("ProductWeightKilograms", when(col("ProductWeightKilograms") == 0, "None").otherwise(col("ProductWeightKilograms")))
cleansed_product_df = cleansed_product_df.withColumn("ProductWeightGrams", (col("ProductWeightKilograms")*1000).cast("int"))
cleansed_product_df = cleansed_product_df.withColumn("ProductProfitAtListPrice", (col("ProductListPrice") - col("ProductCost")).cast("double"))
cleansed_product_df = cleansed_product_df.withColumn("ProductMarginAtListPrice", (col("ProductProfitAtListPrice") / col("ProductCost")).cast("double"))
cleansed_product_df = cleansed_product_df.withColumn("ProductProfitAtListPrice", round(col("ProductProfitAtListPrice"), 2))
cleansed_product_df = cleansed_product_df.withColumn("ProductMarginAtListPrice", round(col("ProductMarginAtListPrice"), 2))

cleansed_product_df.select('ProductCost').show()
    

+-----------+
|ProductCost|
+-----------+
|    1059.31|
|    1059.31|
|      13.09|
|      13.09|
|        3.4|
|        3.4|
|      13.09|
|       6.92|
|      38.49|
|      38.49|
|      38.49|
|      38.49|
|     868.63|
|     868.63|
|     868.63|
|     868.63|
|     868.63|
|     204.63|
|     204.63|
|     204.63|
+-----------+
only showing top 20 rows



# Sales

In [11]:
cleansed_sales_df = sales_df.select(
    *[
        col(source_col).alias(target_col).cast(target_data_type)
        for source_col, (target_col, target_data_type) in sales_mapping.items()
    ]
)

for source_col, (target_col, target_data_type) in sales_mapping.items():
    if target_data_type == "string":
        cleansed_sales_df = cleansed_sales_df.withColumn(target_col, trim(initcap(col(target_col))))
    if target_data_type == "double":
        cleansed_sales_df = cleansed_sales_df.withColumn(target_col, round(col(target_col), 2))
cleansed_sales_df = cleansed_sales_df.withColumn("UnitDiscountValue", (col("UnitPrice") * col("UnitPriceDiscountPercentage")).cast("double"))
cleansed_sales_df = cleansed_sales_df.withColumn("UnitPriceAfterDiscount", (col("UnitPrice") - col("UnitDiscountValue")).cast("double"))
cleansed_sales_df = cleansed_sales_df.withColumn("UnitDiscountValue", round(col("UnitDiscountValue"), 2))
cleansed_sales_df = cleansed_sales_df.withColumn("UnitPriceAfterDiscount", round(col("UnitPriceAfterDiscount"), 2))
cleansed_sales_df = cleansed_sales_df.fillna(0, subset=["UnitPriceDiscountPercentage"])

cleansed_sales_df.show()

+------------+----------------+----------------+----------------+---------+--------------+----------------+-----------------------+----------------+--------------------+---------------------+-------------------+----------+---------+---------------------------+--------------+-----------------+----------------------+
|SalesOrderID|SalesOrderLineID|SalesOrderNumber|  SalesOrderDate|ProductID|SpecialOfferID|SpecialOfferName|SpecialOfferDiscountPct|SpecialOfferType|SpecialOfferCategory|SpecialOfferStartDate|SpecialOfferEndDate|TotalUnits|UnitPrice|UnitPriceDiscountPercentage|TotalLineValue|UnitDiscountValue|UnitPriceAfterDiscount|
+------------+----------------+----------------+----------------+---------+--------------+----------------+-----------------------+----------------+--------------------+---------------------+-------------------+----------+---------+---------------------------+--------------+-----------------+----------------------+
|       45266|            5717|            5717|0

# Source

In [3]:
def read_csv_to_df(file_path):
    df = spark.read.csv(file_path, header="True")
    return df

def productcategory_transformations(file_path):
    df = read_csv_to_df(file_path)
    df = df.select(
        *[
            col(source_col).alias(target_col).cast(target_data_type)
            for source_col, (target_col, target_data_type) in productcategory_mapping.items()
        ]
    )

    for source_col, (target_col, target_data_type) in productcategory_mapping.items():
        if target_data_type == "string":
            df = df.withColumn(target_col, trim(initcap(col(target_col))))
    return df


def product_transformations(file_path):
    df = read_csv_to_df(file_path)
    df = df.select(
        *[
            col(source_col).alias(target_col).cast(target_data_type)
            for source_col, (target_col, target_data_type) in product_mapping.items()
        ]
    )

    for source_col, (target_col, target_data_type) in product_mapping.items():
        if target_data_type == "string":
            df = df.withColumn(target_col, trim(initcap(col(target_col))))
        if target_data_type == "double":
            df = df.withColumn(target_col, round(col(target_col), 2))
        df = df.fillna("None")
    df = df.withColumn("ProductWeightKilograms", when(col("ProductWeightKilograms") == 0, None).otherwise(col("ProductWeightKilograms")))
    df = df.withColumn("ProductWeightGrams", (col("ProductWeightKilograms")*1000).cast("int"))
    df = df.withColumn("ProductProfitAtListPrice", (col("ProductListPrice") - col("ProductCost")).cast("double"))
    df = df.withColumn("ProductMarginAtListPrice", (col("ProductProfitAtListPrice") / col("ProductCost")).cast("double"))
    df = df.withColumn("ProductProfitAtListPrice", round(col("ProductProfitAtListPrice"), 2))
    df = df.withColumn("ProductMarginAtListPrice", round(col("ProductMarginAtListPrice"), 2))
    return df


def sales_transformations(file_path):
    df = read_csv_to_df(file_path)
    df = df.select(
        *[
            col(source_col).alias(target_col).cast(target_data_type)
            for source_col, (target_col, target_data_type) in sales_mapping.items()
        ]
    )

    for source_col, (target_col, target_data_type) in sales_mapping.items():
        if target_data_type == "string":
            df = df.withColumn(target_col, trim(initcap(col(target_col))))
        if target_data_type == "double":
            df = df.withColumn(target_col, round(col(target_col), 2))
    df = df.withColumn("UnitDiscountValue", (col("UnitPrice") * col("UnitPriceDiscountPercentage")).cast("double"))
    df = df.withColumn("UnitPriceAfterDiscount", (col("UnitPrice") - col("UnitDiscountValue")).cast("double"))
    df = df.withColumn("UnitDiscountValue", round(col("UnitDiscountValue"), 2))
    df = df.withColumn("UnitPriceAfterDiscount", round(col("UnitPriceAfterDiscount"), 2))
    df = df.fillna(0, subset=["UnitPriceDiscountPercentage"])
    return df

def transformer(file_path):
    if "ProductCategory" in file_path:
        df = productcategory_transformations(file_path)
    elif "Product" in file_path:
        df = product_transformations(file_path)
    elif "Sales" in file_path:
        df = sales_transformations(file_path)
    else:
        return print("Unknown CSV file.")
    return df
    

def write_to_sink(file_path):
    current_date_df = spark.range(1).select(to_timestamp(current_timestamp()).alias("current_date"))
    year_df = current_date_df.select(year("current_date").alias("submission_year"))
    month_df = current_date_df.select(month("current_date").alias("submission_month"))
    day_df = current_date_df.select(day("current_date").alias("submission_day"))
    hour_df = current_date_df.select(hour("current_date").alias("submission_hour"))
    minute_df = current_date_df.select(minute("current_date").alias("submission_minute"))
    second_df = current_date_df.select(second("current_date").alias("submission_second"))

    submission_year = year_df.first()["submission_year"]
    submission_month = month_df.first()["submission_month"]
    submission_day = day_df.first()["submission_day"]
    submission_hour = hour_df.first()["submission_hour"]
    submission_minute = minute_df.first()["submission_minute"]
    submission_second = second_df.first()["submission_second"]

    df = transformer(file_path)
    data_feed = df.withColumn("sourcefile",input_file_name())
    data_feed = data_feed.withColumn("sourcefile", substring_index("sourcefile","/", -1))
    data_feed = data_feed.withColumn("sourcefile", split(col("sourcefile"), "\\.")[0]).select("sourcefile").distinct()
    data_feed = data_feed.withColumn("sourcefile", split(col("sourcefile"), "%")[0]).select("sourcefile").distinct()
    data_feed = data_feed.first()["sourcefile"]

    cleansed_path = f"C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Cleansed/DataFeed={data_feed}/schemaVersion=1/SubmissionYear={submission_year}/SubmissionMonth={submission_month}/SubmissionDay={submission_day}/SubmissionHour={submission_hour}/SubmissionMinute={submission_minute}/SubmissionSecond={submission_second}"
    
    df.write.mode("overwrite").parquet(cleansed_path)
    
    
#write_to_sink("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Product")


In [5]:
product = product_transformations("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Product/date=20220606/Product.csv")
product.select("ProductSellingStartDate", "ProductSellingEndDate", "ProductDiscontinuedDate").distinct().show()


+-----------------------+---------------------+-----------------------+
|ProductSellingStartDate|ProductSellingEndDate|ProductDiscontinuedDate|
+-----------------------+---------------------+-----------------------+
|    31-05-2011 00:00:00|                 Null|                   Null|
|    31-05-2011 00:00:00|  29-05-2013 00:00:00|                   Null|
|    31-05-2011 00:00:00|  29-05-2012 00:00:00|                   Null|
|    30-05-2012 00:00:00|  29-05-2013 00:00:00|                   Null|
|    30-05-2013 00:00:00|                 Null|                   Null|
|    30-04-2008 00:00:00|                 Null|                   Null|
|    30-05-2012 00:00:00|                 Null|                   Null|
+-----------------------+---------------------+-----------------------+



# Target

In [18]:
product_df = product_transformations("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Product/date=20220606/Product.csv")
productcategory_df = productcategory_transformations("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/ProductCategory/date=20220606/ProductCategory.csv")
sales_df = sales_transformations("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Sales")

pandas_product = product_df.toPandas()
pandas_sales = sales_df.toPandas()
pandas_sales.head()
#sales_df.printSchema()

Unnamed: 0,SalesOrderID,SalesOrderLineID,SalesOrderNumber,SalesOrderDate,ProductID,SpecialOfferID,SpecialOfferName,SpecialOfferDiscountPct,SpecialOfferType,SpecialOfferCategory,SpecialOfferStartDate,SpecialOfferEndDate,TotalUnits,UnitPrice,UnitPriceDiscountPercentage,TotalLineValue,UnitDiscountValue,UnitPriceAfterDiscount
0,45266,5717,5717,01/01/2012 00:00,777,1,No Discount,0,No Discount,No Discount,01/05/2011 00:00,30/11/2014 00:00,2,2024.99,0.0,4049.99,0.0,2024.99
1,45266,5718,5718,01/01/2012 00:00,774,1,No Discount,0,No Discount,No Discount,01/05/2011 00:00,30/11/2014 00:00,3,2039.99,0.0,6119.98,0.0,2039.99
2,45266,5719,5719,01/01/2012 00:00,775,1,No Discount,0,No Discount,No Discount,01/05/2011 00:00,30/11/2014 00:00,2,2024.99,0.0,4049.99,0.0,2024.99
3,45266,5720,5720,01/01/2012 00:00,743,1,No Discount,0,No Discount,No Discount,01/05/2011 00:00,30/11/2014 00:00,1,714.7,0.0,714.7,0.0,714.7
4,45266,5721,5721,01/01/2012 00:00,778,1,No Discount,0,No Discount,No Discount,01/05/2011 00:00,30/11/2014 00:00,2,2024.99,0.0,4049.99,0.0,2024.99


In [28]:
product = product_df
date_columns = ["ProductSellingStartDate", "ProductSellingEndDate", "ProductDiscontinuedDate"]
for column in date_columns:
    product = product.withColumn(column, date_format(to_date(col(column), "dd-MM-yyyy HH:mm:ss"), "yyyyMMdd").cast("int"))

product = product.withColumn("CurrencyKey", lit("GBP"))
product = product.withColumn("ProductLineName", when(col("ProductLine") == "M", "Mountain Bikes")
                                                .when(col("ProductLine") == "Null", "Bike Parts")
                                                .when(col("ProductLine") == "R", "Road Bikes")
                                                .when(col("ProductLine") == "S", "Accessories and Attire")
                                                .when(col("ProductLine") == "T", "Touring Bikes")
                                                .otherwise("Unknown"))
product = product.withColumn("ProductKey", xxhash64(concat_ws("|", col("SourceProductID"), lit("Product"), lit("ProductCategory"))).cast("long"))
product = product.join(productcategory_df, on="ProductCategoryID", how="left")
product = product.withColumnRenamed("ProductSubCategory", "ProductSubCategoryName")
for col_name in product.columns:
    data_type = product.schema[col_name].dataType
    if data_type == DoubleType:
        product = product.withColumn(col_name, round(col(col_name), 2))
product = product.drop("SourceProductID", "ProductUID", "ProductCategoryID", "ProductMakeFlag", "ModifiedDate", "ProductWeightGrams")

product.show()

+--------------------+--------------------+------------+-----------+----------------------+-----------+----------------+-----------+------------+------------+--------------+-----------------------+---------------------+-----------------------+------------------------+------------------------+-----------+--------------------+--------------------+-------------------+----------------------+
|    ProductModelName|         ProductName|ProductColor|ProductSize|ProductWeightKilograms|ProductCost|ProductListPrice|ProductLine|ProductClass|ProductStyle|IsFinishedGood|ProductSellingStartDate|ProductSellingEndDate|ProductDiscontinuedDate|ProductProfitAtListPrice|ProductMarginAtListPrice|CurrencyKey|     ProductLineName|          ProductKey|ProductCategoryName|ProductSubCategoryName|
+--------------------+--------------------+------------+-----------+----------------------+-----------+----------------+-----------+------------+------------+--------------+-----------------------+---------------------

In [77]:
sales = sales_df
date_columns = ["SalesOrderDate"]
for column in date_columns:
    sales = sales.withColumn(column, date_format(to_date(col(column), "dd/MM/yyyy HH:mm"), "yyyyMMdd").cast("int"))
#sales.select("SalesOrderDate").distinct().show()

sales = sales.withColumn("CurrencyKey", lit("GBP"))
sales = sales.withColumn("ProductKey", xxhash64(concat_ws("|", col("ProductID"), lit("Product"), lit("ProductCategory"))).cast("long"))
FactHashID_columns = ["SalesOrderNumber", "SalesOrderDate", "TotalUnits", "UnitPrice", "UnitPriceDiscountPercentage", "UnitPriceAfterDiscount", "TotalLineValue"]
sales = sales.withColumn("FactHashID", xxhash64(concat_ws("|", *[col(column) for column in FactHashID_columns])).cast("long"))
sales = sales.withColumn("SpecialOfferKey", xxhash64(concat_ws("|", col("SpecialOfferID"), lit("Sales"))).cast("long"))
sales = sales.drop("SalesOrderID", "SalesOrderLineID", "ProductID", "SpecialOfferID", "SpecialOfferName", "SpecialOfferDiscountPct", "SpecialOfferType", "SpecialOfferCategory", "SpecialOfferStartDate", "SpecialOfferEndDate", "UnitDiscountValue")
sales = sales.withColumnRenamed("TotalUnits", "UnitVolume").withColumnRenamed("TotalLineValue", "TotalSalesLineValue")
#Partition the file by Month and Year from the field SalesOrderDate

sales.printSchema()

root
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderDate: integer (nullable = true)
 |-- UnitVolume: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitPriceDiscountPercentage: double (nullable = false)
 |-- TotalSalesLineValue: double (nullable = true)
 |-- UnitPriceAfterDiscount: double (nullable = true)
 |-- CurrencyKey: string (nullable = false)
 |-- ProductKey: long (nullable = false)
 |-- FactHashID: long (nullable = false)
 |-- SpecialOfferKey: long (nullable = false)



In [87]:
current_date_df = spark.range(1).select(to_timestamp(current_timestamp()).alias("current_date"))
year_df = current_date_df.select(year("current_date").alias("submission_year"))
month_df = current_date_df.select(month("current_date").alias("submission_month"))
day_df = current_date_df.select(day("current_date").alias("submission_day"))
hour_df = current_date_df.select(hour("current_date").alias("submission_hour"))
minute_df = current_date_df.select(minute("current_date").alias("submission_minute"))
second_df = current_date_df.select(second("current_date").alias("submission_second"))

submission_year = year_df.first()["submission_year"]
submission_month = month_df.first()["submission_month"]
submission_day = day_df.first()["submission_day"]
submission_hour = hour_df.first()["submission_hour"]
submission_minute = minute_df.first()["submission_minute"]
submission_second = second_df.first()["submission_second"]

df = spark.read.csv("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Sourced/SystemA/Product", header="True")
entity = df.withColumn("sourcefile",input_file_name())
entity = entity.withColumn("sourcefile", substring_index("sourcefile","/", -1))
entity = entity.withColumn("sourcefile", split(col("sourcefile"), "\\.")[0]).select("sourcefile").distinct()
entity = entity.withColumn("sourcefile", split(col("sourcefile"), "%")[0]).select("sourcefile").distinct()
entity = entity.first()["sourcefile"]
if entity == "Product":
    domain = "Master"
elif entity == "Sales":
    domain = "Commercial"

integrated_path = f"C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Integrated/Domain={domain}/Entity={entity}/schemaVersion=1/SubmissionYear={submission_year}/SubmissionMonth={submission_month}/SubmissionDay={submission_day}/SubmissionHour={submission_hour}/SubmissionMinute={submission_minute}/SubmissionSecond={submission_second}"
print(integrated_path)

C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Integrated/Domain=Master/Entity=Product/schemaVersion=1/SubmissionYear=2023/SubmissionMonth=10/SubmissionDay=18/SubmissionHour=16/SubmissionMinute=34/SubmissionSecond=20


In [None]:
def read_parquet_to_df(file_path):
    df = spark.read.parquet(file_path, header="True")
    return df
    
def read_latest_parquet(file_path):
    latest_file = None
    latest_timestamp = 0

    for root, dirs, files in os.walk(file_path):
        for file in files:
            file_timestamp = os.path.getctime(root)

            if file_timestamp > latest_timestamp:
                latest_timestamp = file_timestamp
                latest_file = root
            
    if latest_file:
        df = spark.read.parquet(latest_file)
        return df


def product_integrated(file_path):
    df = read_parquet_to_df(file_path)
    ProductCategory_df = read_latest_parquet("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Cleansed/DataFeed=ProductCategory")
    DimProduct_df = df
    date_columns = ["ProductSellingStartDate", "ProductSellingEndDate", "ProductDiscontinuedDate"]
    for column in date_columns:
        DimProduct_df = DimProduct_df.withColumn(column, date_format(to_date(col(column), "dd-MM-yyyy HH:mm:ss"), "yyyyMMdd").cast("int"))

    DimProduct_df = DimProduct_df.withColumn("CurrencyKey", lit("GBP"))
    DimProduct_df = DimProduct_df.withColumn("ProductLineName", when(col("ProductLine") == "M", "Mountain Bikes")
                                                                .when(col("ProductLine") == "Null", "Bike Parts")
                                                                .when(col("ProductLine") == "R", "Road Bikes")
                                                                .when(col("ProductLine") == "S", "Accessories and Attire")
                                                                .when(col("ProductLine") == "T", "Touring Bikes")
                                                                .otherwise("Unknown"))
    DimProduct_df = DimProduct_df.withColumn("ProductKey", xxhash64(concat_ws("|", col("SourceProductID"), lit("Product"), lit("ProductCategory"))).cast("long"))
    DimProduct_df = DimProduct_df.join(ProductCategory_df, on="ProductCategoryID", how="left")
    DimProduct_df = DimProduct_df.withColumnRenamed("ProductSubCategory", "ProductSubCategoryName")
    for col_name in DimProduct_df.columns:
        data_type = DimProduct_df.schema[col_name].dataType
        if data_type == DoubleType:
            DimProduct_df = DimProduct_df.withColumn(col_name, round(col(col_name), 2))
    DimProduct_df = DimProduct_df.drop("SourceProductID", "ProductUID", "ProductCategoryID", "ProductMakeFlag", "ModifiedDate", "ProductWeightGrams")
    return DimProduct_df


def sales_integrated(file_path):
    df = transformer(file_path)

    FactSales_df = df
    date_columns = ["SalesOrderDate"]
    for column in date_columns:
        FactSales_df = FactSales_df.withColumn(column, date_format(to_date(col(column), "dd/MM/yyyy HH:mm"), "yyyyMMdd").cast("int"))

    FactSales_df = FactSales_df.withColumn("CurrencyKey", lit("GBP"))
    FactSales_df = FactSales_df.withColumn("ProductKey", xxhash64(concat_ws("|", col("ProductID"), lit("Product"), lit("ProductCategory"))).cast("long"))
    FactHashID_columns = ["SalesOrderNumber", "SalesOrderDate", "TotalUnits", "UnitPrice", "UnitPriceDiscountPercentage", "UnitPriceAfterDiscount", "TotalLineValue"]
    FactSales_df = FactSales_df.withColumn("FactHashID", xxhash64(concat_ws("|", *[col(column) for column in FactHashID_columns])).cast("long"))
    FactSales_df = FactSales_df.withColumn("SpecialOfferKey", xxhash64(concat_ws("|", col("SpecialOfferID"), lit("Sales"))).cast("long"))
    FactSales_df = FactSales_df.drop("SalesOrderID", "SalesOrderLineID", "ProductID", "SpecialOfferID", "SpecialOfferName", "SpecialOfferDiscountPct", "SpecialOfferType", "SpecialOfferCategory", "SpecialOfferStartDate", "SpecialOfferEndDate", "UnitDiscountValue")
    FactSales_df = FactSales_df.withColumnRenamed("TotalUnits", "UnitVolume").withColumnRenamed("TotalLineValue", "TotalSalesLineValue")
    #Partition the file by Month and Year from the field SalesOrderDate
    return FactSales_df

def transformer(file_path):
    if "Product" in file_path:
        df = product_integrated(file_path)
    elif "Sales" in file_path:
        df = sales_integrated(file_path)
    else:
        return print("Unknown parquet file.")
    return df

def write_to_sink(file_path):
    current_date_df = spark.range(1).select(to_timestamp(current_timestamp()).alias("current_date"))
    year_df = current_date_df.select(year("current_date").alias("submission_year"))
    month_df = current_date_df.select(month("current_date").alias("submission_month"))
    day_df = current_date_df.select(day("current_date").alias("submission_day"))
    hour_df = current_date_df.select(hour("current_date").alias("submission_hour"))
    minute_df = current_date_df.select(minute("current_date").alias("submission_minute"))
    second_df = current_date_df.select(second("current_date").alias("submission_second"))

    submission_year = year_df.first()["submission_year"]
    submission_month = month_df.first()["submission_month"]
    submission_day = day_df.first()["submission_day"]
    submission_hour = hour_df.first()["submission_hour"]
    submission_minute = minute_df.first()["submission_minute"]
    submission_second = second_df.first()["submission_second"]

    df = transformer(file_path)
    if "Product" in file_path:
        entity = "Product"
        domain = "Master"
    elif "Sales" in file_path:
        entity = "Sales"
        domain = "Commercial"
    else:
        return print("Unknown parquet file.")
    '''
    entity = df.withColumn("sourcefile",input_file_name())
    entity = entity.withColumn("sourcefile", substring_index("sourcefile","/", -1))
    entity = entity.withColumn("sourcefile", split(col("sourcefile"), "\\.")[0]).select("sourcefile").distinct()
    entity = entity.withColumn("sourcefile", split(col("sourcefile"), "%")[0]).select("sourcefile").distinct()
    entity = entity.first()["sourcefile"]
    if entity == "Product":
        domain = "Master"
    elif entity == "Sales":
        domain = "Commercial"
    '''
    integrated_path = f"C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Integrated/Domain={domain}/Entity={entity}/schemaVersion=1/SubmissionYear={submission_year}/SubmissionMonth={submission_month}/SubmissionDay={submission_day}/SubmissionHour={submission_hour}/SubmissionMinute={submission_minute}/SubmissionSecond={submission_second}"
    
    df.write.mode("overwrite").parquet(integrated_path)
    
write_to_sink("C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Cleansed/DataFeed=Sales/schemaVersion=1/SubmissionYear=2023/SubmissionMonth=10/SubmissionDay=17/SubmissionHour=16/SubmissionMinute=52/SubmissionSecond=13")

In [12]:
import os
import glob
import pandas as pd

def read_latest_parquet(file_path):
    latest_file = None
    latest_timestamp = 0

    for root, dirs, files in os.walk(file_path):
        for file in files:
            file_timestamp = os.path.getctime(root)

            if file_timestamp > latest_timestamp:
                latest_timestamp = file_timestamp
                latest_file = root
            
    if latest_file:
        df = spark.read.parquet(latest_file)
        return df

read_latest_parquet('C:/Users/TobyBarker/Documents/Data_Engineering_Accelerator/src/databricks/sample_lake/Cleansed/DataFeed=ProductCategory/').show()


+-----------------+-------------------+------------------+
|ProductCategoryID|ProductCategoryName|ProductSubCategory|
+-----------------+-------------------+------------------+
|                1|              Bikes|    Mountain Bikes|
|                2|              Bikes|        Road Bikes|
|                3|              Bikes|     Touring Bikes|
|                4|         Components|        Handlebars|
|                5|         Components|   Bottom Brackets|
|                6|         Components|            Brakes|
|                7|         Components|            Chains|
|                8|         Components|         Cranksets|
|                9|         Components|       Derailleurs|
|               10|         Components|             Forks|
|               11|         Components|          Headsets|
|               12|         Components|   Mountain Frames|
|               13|         Components|            Pedals|
|               14|         Components|       Road Frame