# Connecting to the Storage Account and Mounting Locally

In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='abcretail_secretScope')]

In [0]:
secret_scope = "abcretail_secretScope"

In [0]:
dbutils.secrets.list(secret_scope)

[SecretMetadata(key='azure-sql-server-password'),
 SecretMetadata(key='client-id'),
 SecretMetadata(key='client-secret'),
 SecretMetadata(key='databricks-token'),
 SecretMetadata(key='directory-id'),
 SecretMetadata(key='onprem-sqlserver-password')]

In [0]:
client_id = dbutils.secrets.get(secret_scope, "client-id")
client_secret = dbutils.secrets.get(secret_scope, "client-secret")
directory_id = dbutils.secrets.get(secret_scope, "directory-id")

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": client_secret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"
}

In [0]:
storage_account = "dlsaabcretail"

container_source = "raw"
container_target = "curated"

source_link = f"abfss://{container_source}@{storage_account}.dfs.core.windows.net/"
mount_point_source = f"/mnt/{storage_account}/{container_source}"

target_link = f"abfss://{container_target}@{storage_account}.dfs.core.windows.net/"
mount_point_target = f"/mnt/{storage_account}/{container_target}"

## Mounting the Source Container: "raw"

In [0]:
# Checking if any existing mount points in dbutils.fs.mounts() match the desired mount point
if any(mount.mountPoint == mount_point_source for mount in dbutils.fs.mounts()):
  # If a mount point exists, unmount it
  dbutils.fs.unmount(mount_point_source)
  print(f"Unmount existing mount at {mount_point_source}")

# Try to mount the new source to the specified mount point
try:
  dbutils.fs.mount(
    source = source_link,
    mount_point = mount_point_source,
    extra_configs = configs
  )
  print(f"Mounted successfully at {mount_point_source}")

# If an error occurs during the mount process, handle the exception
except Exception as e:
  print(f"Error mounting: {mount_point_source}")

/mnt/dlsaabcretail/raw has been unmounted.
Unmount existing mount at /mnt/dlsaabcretail/raw
Mounted successfully at /mnt/dlsaabcretail/raw


## Mounting the Target Container: "curated"

In [0]:
if any(mount.mountPoint == mount_point_target for mount in dbutils.fs.mounts()):
  dbutils.fs.unmount(mount_point_target)
  print(f"Unmount existing mount at {mount_point_target}")

try:
  dbutils.fs.mount(
    source = target_link,
    mount_point = mount_point_target,
    extra_configs = configs
  )
  print(f"Mounted successfully at {mount_point_target}")
except Exception as e:
  print(f"Error mounting: {mount_point_target}")

/mnt/dlsaabcretail/curated has been unmounted.
Unmount existing mount at /mnt/dlsaabcretail/curated
Mounted successfully at /mnt/dlsaabcretail/curated


# Importing Python Packages, Libraries, Functions and Methods

In [0]:
from pyspark.sql.functions import current_timestamp, col, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StringType, DecimalType, BooleanType, DoubleType, BinaryType
from delta.tables import DeltaTable

# Creating File Lists

In [0]:
dbutils.fs.ls(f"{mount_point_source}/http/csv") # Checking the list of files in the "http" folder of the source container.

[FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/http/csv/accessories/', name='accessories/', size=0, modificationTime=1732824536000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/http/csv/clothing/', name='clothing/', size=0, modificationTime=1732824447000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/http/csv/footwear/', name='footwear/', size=0, modificationTime=1732824397000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/http/csv/home_decor/', name='home_decor/', size=0, modificationTime=1732824492000)]

In [0]:
http_Accessories_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/http/csv/accessories") if ".csv" in file.name]
http_Clothing_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/http/csv/clothing") if ".csv" in file.name]
http_Footwear_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/http/csv/footwear") if ".csv" in file.name]
http_HomeDecor_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/http/csv/home_decor") if ".csv" in file.name]

In [0]:
dbutils.fs.ls(f"{mount_point_source}/azsqldb")

[FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/Address.parquet', name='Address.parquet', size=35923, modificationTime=1732824383000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/Customer.parquet', name='Customer.parquet', size=100518, modificationTime=1732824372000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/CustomerAddress.parquet', name='CustomerAddress.parquet', size=21289, modificationTime=1732824372000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/Product.parquet', name='Product.parquet', size=196575, modificationTime=1732824380000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/ProductCategory.parquet', name='ProductCategory.parquet', size=3421, modificationTime=1732824371000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/ProductDescription.parquet', name='ProductDescription.parquet', size=77633, modificationTime=1732824378000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/azsqldb/ProductModel.parquet', name='ProductModel.parquet', size=

In [0]:
azsqldb_Address_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "Address" in file.name and "CustomerAddress" not in file.name] # Creating a list of the hard dataset Address files
azsqldb_Customer_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "Customer" in file.name and "CustomerAddress" not in file.name]
azsqldb_CustomerAddress_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "CustomerAddress" in file.name]
azsqldb_Product_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "Product" in file.name and "ProductCategory" not in file.name and "ProductDescription" not in file.name and "ProductModel" not in file.name]
azsqldb_ProductCategory_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "ProductCategory" in file.name]
azsqldb_ProductDescription_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "ProductDescription" in file.name and "ProductModel" not in file.name]
azsqldb_ProductModel_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "ProductModel" in file.name and "ProductDescription" not in file.name]
azsqldb_ProductModelProductDescription_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "ProductModelProductDescription" in file.name]
azsqldb_SalesOrderDetail_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "SalesOrderDetail" in file.name]
azsqldb_SalesOrderHeader_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/azsqldb") if "SalesOrderHeader" in file.name]

In [0]:
dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver")

[FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/CountryRolling.parquet', name='CountryRolling.parquet', size=544, modificationTime=1732824376000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/Customer.parquet', name='Customer.parquet', size=2280, modificationTime=1732824379000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/CustomerProductReview.parquet', name='CustomerProductReview.parquet', size=1263, modificationTime=1732824488000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/CustomerSellerReview.parquet', name='CustomerSellerReview.parquet', size=1161, modificationTime=1732824521000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/Order.parquet', name='Order.parquet', size=869, modificationTime=1732824395000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/Product.parquet', name='Product.parquet', size=1694, modificationTime=1732824371000),
 FileInfo(path='dbfs:/mnt/dlsaabcretail/raw/onpremsqlserver/

In [0]:
onprem_CountryRolling_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "CountryRolling" in file.name]
onprem_Customer_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "Customer" in file.name and "CustomerProductReview" not in file.name and "CustomerSellerReview" not in file.name]
onprem_CustomerProductReview_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "CustomerProductReview" in file.name]
onprem_CustomerSellerReview_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "CustomerSellerReview" in file.name]
onprem_Order_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "Order" in file.name]
onprem_Product_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "Product" in file.name and "CustomerProductReview" not in file.name and "ProductCategories" not in file.name and "ProductQuality" not in file.name and "SellerProductPromotion" not in file.name]
onprem_ProductCategories_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "ProductCategories" in file.name]
onprem_ProductQuality_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "ProductQuality" in file.name]
onprem_Promotion_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "Promotion" in file.name and "SellerProductPromotion" not in file.name]
onprem_Seller_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "Seller" in file.name and "CustomerSellerReview" not in file.name and "SellerProductPromotion" not in file.name]
onprem_SellerProductPromotion_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "SellerProductPromotion" in file.name]
onprem_StateProvinceRolling_files = [file.path for file in dbutils.fs.ls(f"{mount_point_source}/onpremsqlserver") if "StateProvinceRolling" in file.name]

# Reading and Cleaning Data

## Reading and Cleaning Softline Data From HTTP API Source

### http_accessories Dataset

In [0]:
df_http_accessories = spark.read.csv(*http_Accessories_files, header=True)
df_http_accessories.show(3)

+------+--------+-----------------+-----------+-----------------+--------------------+------+---------+--------------------+------------+-----------+
|source|filename|            Brand|   Category|           Colors|         Description| Price|ProductID|         ProductName|       Sizes|SubCategory|
+------+--------+-----------------+-----------+-----------------+--------------------+------+---------+--------------------+------------+-----------+
|  NULL|    NULL|    Timepiece Co.|Accessories|["Brown","Black"]|A timeless leathe...|149.99|        4|Luxury Leather Watch|["One Size"]|    Watches|
|  NULL|    NULL|      ShinyThings|Accessories|       ["Silver"]|A delicate silver...| 39.99|        5|Silver Pendant Ne...|["One Size"]|    Jewelry|
|  NULL|    NULL|Traveler's Choice|Accessories|["Brown","Black"]|A durable leather...| 89.99|        6|    Leather Backpack|  ["Medium"]|       Bags|
+------+--------+-----------------+-----------+-----------------+--------------------+------+-------

In [0]:
# Adding "ingestion_timestamp" column, dropping unnecessary columns, dropping any columns with NULL "ProductID" values, rearranging the columns and ordering by "ProductID"
df_http_accessories = df_http_accessories.withColumn("ingestion_timestamp", current_timestamp()) \
    .drop("source", "filename") \
    .dropna(subset = ["ProductID"]) \
    .select("ProductID", "Category", "SubCategory", "ProductName", "Brand", "Sizes", "Colors", "Price", "Description", "ingestion_timestamp") \
    .orderBy("ProductID")

# Defining a window partitioned by ProductID and ordered by ingestion_timestamp descending
window_spec = Window.partitionBy("ProductID").orderBy(col("ingestion_timestamp").desc())

# Adding a row number column to rank rows within each partition
df_http_accessories = df_http_accessories.withColumn("row_num", row_number().over(window_spec))

# Filtering to keep only the first row in each partition (latest entry per "ProductID")
df_http_accessories = df_http_accessories.filter(col("row_num") == 1).drop("row_num")

df_http_accessories.show(3)

+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|ProductID|   Category|SubCategory|         ProductName|            Brand|       Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|        4|Accessories|    Watches|Luxury Leather Watch|    Timepiece Co.|["One Size"]|["Brown","Black"]|149.99|A timeless leathe...|2024-11-28 22:07:...|
|        5|Accessories|    Jewelry|Silver Pendant Ne...|      ShinyThings|["One Size"]|       ["Silver"]| 39.99|A delicate silver...|2024-11-28 22:07:...|
|        6|Accessories|       Bags|    Leather Backpack|Traveler's Choice|  ["Medium"]|["Brown","Black"]| 89.99|A durable leather...|2024-11-28 22:07:...|
+---------+-----------+-----------+--------------------+--------------

In [0]:
print(f"Total http_accessories records: {df_http_accessories.count()}")

Total http_accessories records: 3


In [0]:
df_http_accessories.columns

['ProductID',
 'Category',
 'SubCategory',
 'ProductName',
 'Brand',
 'Sizes',
 'Colors',
 'Price',
 'Description',
 'ingestion_timestamp']

In [0]:
# Casting data types
df_http_accessories = df_http_accessories \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("Category", col("Category").cast(StringType())) \
    .withColumn("SubCategory", col("SubCategory").cast(StringType())) \
    .withColumn("ProductName", col("ProductName").cast(StringType())) \
    .withColumn("Brand", col("Brand").cast(StringType())) \
    .withColumn("Sizes", col("Sizes").cast(StringType())) \
    .withColumn("Colors", col("Colors").cast(StringType())) \
    .withColumn("Price", col("Price").cast(DecimalType(10,2))) \
    .withColumn("Description", col("Description").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_http_accessories.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Sizes: string (nullable = true)
 |-- Colors: string (nullable = true)
 |-- Price: decimal(10,2) (nullable = true)
 |-- Description: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_http_accessories.show(3)

+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|ProductID|   Category|SubCategory|         ProductName|            Brand|       Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|        4|Accessories|    Watches|Luxury Leather Watch|    Timepiece Co.|["One Size"]|["Brown","Black"]|149.99|A timeless leathe...|2024-11-28 22:07:...|
|        5|Accessories|    Jewelry|Silver Pendant Ne...|      ShinyThings|["One Size"]|       ["Silver"]| 39.99|A delicate silver...|2024-11-28 22:07:...|
|        6|Accessories|       Bags|    Leather Backpack|Traveler's Choice|  ["Medium"]|["Brown","Black"]| 89.99|A durable leather...|2024-11-28 22:07:...|
+---------+-----------+-----------+--------------------+--------------

### http_clothing Dataset

In [0]:
df_http_clothing = spark.read.csv(*http_Clothing_files, header=True)
df_http_clothing.show(3)

+------+--------+------------+--------+--------------------+--------------------+-----+---------+--------------------+------------------+-----------+
|source|filename|       Brand|Category|              Colors|         Description|Price|ProductID|         ProductName|             Sizes|SubCategory|
+------+--------+------------+--------+--------------------+--------------------+-----+---------+--------------------+------------------+-----------+
|  NULL|    NULL| FashionWear|Clothing|["Red","Blue","Bl...|A stylish slim-fi...|29.99|        1|    Slim Fit T-Shirt|["S","M","L","XL"]|   Menswear|
|  NULL|    NULL|ElegantStyle|Clothing|["White","Yellow"...|A light and breez...|49.99|        2| Floral Summer Dress|     ["S","M","L"]| Womenswear|
|  NULL|    NULL|   UrbanEdge|Clothing|    ["Blue","Black"]|A classic denim j...|69.99|        3|Classic Denim Jacket|    ["M","L","XL"]|  Outerwear|
+------+--------+------------+--------+--------------------+--------------------+-----+---------+---

In [0]:
df_http_clothing = df_http_clothing.withColumn("ingestion_timestamp", current_timestamp()) \
    .drop("source", "filename") \
    .dropna(subset = ["ProductID"]) \
    .select("ProductID", "Category", "SubCategory", "ProductName", "Brand", "Sizes", "Colors", "Price", "Description", "ingestion_timestamp") \
    .orderBy("ProductID")

window_spec = Window.partitionBy("ProductID").orderBy(col("ingestion_timestamp").desc())
df_http_clothing = df_http_clothing.withColumn("row_num", row_number().over(window_spec))
df_http_clothing = df_http_clothing.filter(col("row_num") == 1).drop("row_num")

df_http_clothing.show(3)

+---------+--------+-----------+--------------------+------------+------------------+--------------------+-----+--------------------+--------------------+
|ProductID|Category|SubCategory|         ProductName|       Brand|             Sizes|              Colors|Price|         Description| ingestion_timestamp|
+---------+--------+-----------+--------------------+------------+------------------+--------------------+-----+--------------------+--------------------+
|        1|Clothing|   Menswear|    Slim Fit T-Shirt| FashionWear|["S","M","L","XL"]|["Red","Blue","Bl...|29.99|A stylish slim-fi...|2024-11-28 22:07:...|
|        2|Clothing| Womenswear| Floral Summer Dress|ElegantStyle|     ["S","M","L"]|["White","Yellow"...|49.99|A light and breez...|2024-11-28 22:07:...|
|        3|Clothing|  Outerwear|Classic Denim Jacket|   UrbanEdge|    ["M","L","XL"]|    ["Blue","Black"]|69.99|A classic denim j...|2024-11-28 22:07:...|
+---------+--------+-----------+--------------------+------------+----

In [0]:
print(f"Total http_clothing records: {df_http_clothing.count()}")

Total http_clothing records: 3


In [0]:
df_http_clothing.columns

['ProductID',
 'Category',
 'SubCategory',
 'ProductName',
 'Brand',
 'Sizes',
 'Colors',
 'Price',
 'Description',
 'ingestion_timestamp']

In [0]:
df_http_clothing = df_http_clothing \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("Category", col("Category").cast(StringType())) \
    .withColumn("SubCategory", col("SubCategory").cast(StringType())) \
    .withColumn("ProductName", col("ProductName").cast(StringType())) \
    .withColumn("Brand", col("Brand").cast(StringType())) \
    .withColumn("Sizes", col("Sizes").cast(StringType())) \
    .withColumn("Colors", col("Colors").cast(StringType())) \
    .withColumn("Price", col("Price").cast(DecimalType(10,2))) \
    .withColumn("Description", col("Description").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_http_clothing.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Sizes: string (nullable = true)
 |-- Colors: string (nullable = true)
 |-- Price: decimal(10,2) (nullable = true)
 |-- Description: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_http_accessories.show(3)

+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|ProductID|   Category|SubCategory|         ProductName|            Brand|       Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|        4|Accessories|    Watches|Luxury Leather Watch|    Timepiece Co.|["One Size"]|["Brown","Black"]|149.99|A timeless leathe...|2024-11-28 22:07:...|
|        5|Accessories|    Jewelry|Silver Pendant Ne...|      ShinyThings|["One Size"]|       ["Silver"]| 39.99|A delicate silver...|2024-11-28 22:07:...|
|        6|Accessories|       Bags|    Leather Backpack|Traveler's Choice|  ["Medium"]|["Brown","Black"]| 89.99|A durable leather...|2024-11-28 22:07:...|
+---------+-----------+-----------+--------------------+--------------

### http_footwear Dataset

In [0]:
df_http_footwear = spark.read.csv(*http_Footwear_files, header=True)
df_http_footwear.show(3)

+------+--------+-----------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+-----------+
|source|filename|      Brand|Category|              Colors|         Description|Price|ProductID|      ProductName|               Sizes|SubCategory|
+------+--------+-----------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+-----------+
|  NULL|    NULL| SpeedTrack|Footwear|["White","Blue","...|Lightweight runni...|79.99|        7|    Running Shoes|["8","9","10","11...|   Menswear|
|  NULL|    NULL|  StyleStep|Footwear|["Black","Red","G...|Elegant high-heel...|59.99|        8|High Heel Sandals|["5","6","7","8",...| Womenswear|
|  NULL|    NULL|ComfortWalk|Footwear|["Gray","Black","...|Comfortable sneak...|49.99|        9|  Casual Sneakers|["6","7","8","9",...|     Unisex|
+------+--------+-----------+--------+--------------------+--------------------+-----+---------+----------------

In [0]:
df_http_footwear = df_http_footwear.withColumn("ingestion_timestamp", current_timestamp()) \
    .drop("source", "filename") \
    .dropna(subset = ["ProductID"]) \
    .select("ProductID", "Category", "SubCategory", "ProductName", "Brand", "Sizes", "Colors", "Price", "Description", "ingestion_timestamp") \
    .orderBy("ProductID")

window_spec = Window.partitionBy("ProductID").orderBy(col("ingestion_timestamp").desc())
df_http_footwear = df_http_footwear.withColumn("row_num", row_number().over(window_spec))
df_http_footwear = df_http_footwear.filter(col("row_num") == 1).drop("row_num")

df_http_footwear.show(3)

+---------+--------+-----------+-----------------+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|ProductID|Category|SubCategory|      ProductName|      Brand|               Sizes|              Colors|Price|         Description| ingestion_timestamp|
+---------+--------+-----------+-----------------+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|        7|Footwear|   Menswear|    Running Shoes| SpeedTrack|["8","9","10","11...|["White","Blue","...|79.99|Lightweight runni...|2024-11-28 22:07:...|
|        8|Footwear| Womenswear|High Heel Sandals|  StyleStep|["5","6","7","8",...|["Black","Red","G...|59.99|Elegant high-heel...|2024-11-28 22:07:...|
|        9|Footwear|     Unisex|  Casual Sneakers|ComfortWalk|["6","7","8","9",...|["Gray","Black","...|49.99|Comfortable sneak...|2024-11-28 22:07:...|
+---------+--------+-----------+-----------------+-----------+--------------------

In [0]:
print(f"Total http_footwear records: {df_http_footwear.count()}")

Total http_footwear records: 3


In [0]:
df_http_footwear.columns

['ProductID',
 'Category',
 'SubCategory',
 'ProductName',
 'Brand',
 'Sizes',
 'Colors',
 'Price',
 'Description',
 'ingestion_timestamp']

In [0]:
df_http_footwear = df_http_footwear \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("Category", col("Category").cast(StringType())) \
    .withColumn("SubCategory", col("SubCategory").cast(StringType())) \
    .withColumn("ProductName", col("ProductName").cast(StringType())) \
    .withColumn("Brand", col("Brand").cast(StringType())) \
    .withColumn("Sizes", col("Sizes").cast(StringType())) \
    .withColumn("Colors", col("Colors").cast(StringType())) \
    .withColumn("Price", col("Price").cast(DecimalType(10,2))) \
    .withColumn("Description", col("Description").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_http_footwear.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Sizes: string (nullable = true)
 |-- Colors: string (nullable = true)
 |-- Price: decimal(10,2) (nullable = true)
 |-- Description: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_http_footwear.show(3)

+---------+--------+-----------+-----------------+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|ProductID|Category|SubCategory|      ProductName|      Brand|               Sizes|              Colors|Price|         Description| ingestion_timestamp|
+---------+--------+-----------+-----------------+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|        7|Footwear|   Menswear|    Running Shoes| SpeedTrack|["8","9","10","11...|["White","Blue","...|79.99|Lightweight runni...|2024-11-28 22:07:...|
|        8|Footwear| Womenswear|High Heel Sandals|  StyleStep|["5","6","7","8",...|["Black","Red","G...|59.99|Elegant high-heel...|2024-11-28 22:07:...|
|        9|Footwear|     Unisex|  Casual Sneakers|ComfortWalk|["6","7","8","9",...|["Gray","Black","...|49.99|Comfortable sneak...|2024-11-28 22:07:...|
+---------+--------+-----------+-----------------+-----------+--------------------

### http_home_decor Dataset

In [0]:
df_http_home_decor = spark.read.csv(*http_HomeDecor_files, header=True)
df_http_home_decor.show(3)

+------+--------+--------------+----------+-----------------+--------------------+------+---------+--------------------+------------------+-----------+
|source|filename|         Brand|  Category|           Colors|         Description| Price|ProductID|         ProductName|             Sizes|SubCategory|
+------+--------+--------------+----------+-----------------+--------------------+------+---------+--------------------+------------------+-----------+
|  NULL|    NULL|   BrightHomes|Home Decor|["White","Black"]|A sleek and moder...| 39.99|       10|   Modern Table Lamp|      ["One Size"]|   Lighting|
|  NULL|    NULL|Artistic Vibes|Home Decor|   ["Multicolor"]|A vibrant abstrac...| 99.99|       11|Abstract Canvas P...|["Medium","Large"]|   Wall Art|
|  NULL|    NULL|  Rustic Charm|Home Decor|["Brown","Black"]|A stylish wooden ...|129.99|       12| Wooden Coffee Table|      ["One Size"]|  Furniture|
+------+--------+--------------+----------+-----------------+--------------------+------

In [0]:
df_http_home_decor = df_http_home_decor.withColumn("ingestion_timestamp", current_timestamp()) \
    .drop("source", "filename") \
    .dropna(subset = ["ProductID"]) \
    .select("ProductID", "Category", "SubCategory", "ProductName", "Brand", "Sizes", "Colors", "Price", "Description", "ingestion_timestamp") \
    .orderBy("ProductID")

window_spec = Window.partitionBy("ProductID").orderBy(col("ingestion_timestamp").desc())
df_http_home_decor = df_http_home_decor.withColumn("row_num", row_number().over(window_spec))
df_http_home_decor = df_http_home_decor.filter(col("row_num") == 1).drop("row_num")

df_http_home_decor.show(3)

+---------+----------+-----------+--------------------+--------------+------------------+-----------------+------+--------------------+--------------------+
|ProductID|  Category|SubCategory|         ProductName|         Brand|             Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+----------+-----------+--------------------+--------------+------------------+-----------------+------+--------------------+--------------------+
|       10|Home Decor|   Lighting|   Modern Table Lamp|   BrightHomes|      ["One Size"]|["White","Black"]| 39.99|A sleek and moder...|2024-11-28 22:07:...|
|       11|Home Decor|   Wall Art|Abstract Canvas P...|Artistic Vibes|["Medium","Large"]|   ["Multicolor"]| 99.99|A vibrant abstrac...|2024-11-28 22:07:...|
|       12|Home Decor|  Furniture| Wooden Coffee Table|  Rustic Charm|      ["One Size"]|["Brown","Black"]|129.99|A stylish wooden ...|2024-11-28 22:07:...|
+---------+----------+-----------+--------------------+---

In [0]:
print(f"Total http_home_decor records: {df_http_home_decor.count()}")

Total http_home_decor records: 3


In [0]:
df_http_home_decor.columns

['ProductID',
 'Category',
 'SubCategory',
 'ProductName',
 'Brand',
 'Sizes',
 'Colors',
 'Price',
 'Description',
 'ingestion_timestamp']

In [0]:
df_http_home_decor = df_http_home_decor \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("Category", col("Category").cast(StringType())) \
    .withColumn("SubCategory", col("SubCategory").cast(StringType())) \
    .withColumn("ProductName", col("ProductName").cast(StringType())) \
    .withColumn("Brand", col("Brand").cast(StringType())) \
    .withColumn("Sizes", col("Sizes").cast(StringType())) \
    .withColumn("Colors", col("Colors").cast(StringType())) \
    .withColumn("Price", col("Price").cast(DecimalType(10,2))) \
    .withColumn("Description", col("Description").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_http_home_decor.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Sizes: string (nullable = true)
 |-- Colors: string (nullable = true)
 |-- Price: decimal(10,2) (nullable = true)
 |-- Description: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_http_home_decor.show(3)

+---------+----------+-----------+--------------------+--------------+------------------+-----------------+------+--------------------+--------------------+
|ProductID|  Category|SubCategory|         ProductName|         Brand|             Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+----------+-----------+--------------------+--------------+------------------+-----------------+------+--------------------+--------------------+
|       10|Home Decor|   Lighting|   Modern Table Lamp|   BrightHomes|      ["One Size"]|["White","Black"]| 39.99|A sleek and moder...|2024-11-28 22:07:...|
|       11|Home Decor|   Wall Art|Abstract Canvas P...|Artistic Vibes|["Medium","Large"]|   ["Multicolor"]| 99.99|A vibrant abstrac...|2024-11-28 22:07:...|
|       12|Home Decor|  Furniture| Wooden Coffee Table|  Rustic Charm|      ["One Size"]|["Brown","Black"]|129.99|A stylish wooden ...|2024-11-28 22:07:...|
+---------+----------+-----------+--------------------+---

## Reading and Cleaning Data From the Azure SQL Database Source

### azsqldb_address Dataset

In [0]:
df_azsqldb_address = spark.read.parquet(*azsqldb_Address_files, header=True) 
df_azsqldb_address.show(3)

+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+
|AddressID|       AddressLine1|AddressLine2|   City|StateProvince|CountryRegion|PostalCode|             rowguid|       ModifiedDate|
+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+
|        9|  8713 Yosemite Ct.|        NULL|Bothell|   Washington|United States|     98011|268af621-76d7-4c7...|2006-07-01 00:00:00|
|       11|1318 Lasalle Street|        NULL|Bothell|   Washington|United States|     98011|981b3303-aca2-49c...|2007-04-01 00:00:00|
|       25|   9178 Jumping St.|        NULL| Dallas|        Texas|United States|     75201|c8df3bd9-48f0-465...|2006-09-01 00:00:00|
+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_address = df_azsqldb_address.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["AddressID"]) \
    .select("AddressID", "AddressLine1", "AddressLine2", "City", "StateProvince", "CountryRegion", "PostalCode", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("AddressID")

window_spec = Window.partitionBy("AddressID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_address = df_azsqldb_address.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_address = df_azsqldb_address.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_address.show(3)

+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+--------------------+
|AddressID|       AddressLine1|AddressLine2|   City|StateProvince|CountryRegion|PostalCode|             rowguid|       ModifiedDate| ingestion_timestamp|
+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+--------------------+
|        9|  8713 Yosemite Ct.|        NULL|Bothell|   Washington|United States|     98011|268af621-76d7-4c7...|2006-07-01 00:00:00|2024-11-28 22:07:...|
|       11|1318 Lasalle Street|        NULL|Bothell|   Washington|United States|     98011|981b3303-aca2-49c...|2007-04-01 00:00:00|2024-11-28 22:07:...|
|       25|   9178 Jumping St.|        NULL| Dallas|        Texas|United States|     75201|c8df3bd9-48f0-465...|2006-09-01 00:00:00|2024-11-28 22:07:...|
+---------+-------------------+------------+-------+-------------+----------

In [0]:
print(f"Total azsqldb_address records: {df_azsqldb_address.count()}")

Total azsqldb_address records: 450


In [0]:
df_azsqldb_address.columns

['AddressID',
 'AddressLine1',
 'AddressLine2',
 'City',
 'StateProvince',
 'CountryRegion',
 'PostalCode',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_address = df_azsqldb_address \
    .withColumn("AddressID", col("AddressID").cast(IntegerType())) \
    .withColumn("AddressLine1", col("AddressLine1").cast(StringType())) \
    .withColumn("AddressLine2", col("AddressLine2").cast(StringType())) \
    .withColumn("City", col("City").cast(StringType())) \
    .withColumn("StateProvince", col("StateProvince").cast(StringType())) \
    .withColumn("CountryRegion", col("CountryRegion").cast(StringType())) \
    .withColumn("PostalCode", col("PostalCode").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_address.printSchema()

root
 |-- AddressID: integer (nullable = true)
 |-- AddressLine1: string (nullable = true)
 |-- AddressLine2: string (nullable = true)
 |-- City: string (nullable = true)
 |-- StateProvince: string (nullable = true)
 |-- CountryRegion: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_address.show(3)

+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+--------------------+
|AddressID|       AddressLine1|AddressLine2|   City|StateProvince|CountryRegion|PostalCode|             rowguid|       ModifiedDate| ingestion_timestamp|
+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+--------------------+
|        9|  8713 Yosemite Ct.|        NULL|Bothell|   Washington|United States|     98011|268af621-76d7-4c7...|2006-07-01 00:00:00|2024-11-28 22:07:...|
|       11|1318 Lasalle Street|        NULL|Bothell|   Washington|United States|     98011|981b3303-aca2-49c...|2007-04-01 00:00:00|2024-11-28 22:07:...|
|       25|   9178 Jumping St.|        NULL| Dallas|        Texas|United States|     75201|c8df3bd9-48f0-465...|2006-09-01 00:00:00|2024-11-28 22:07:...|
+---------+-------------------+------------+-------+-------------+----------

### azsqldb_customer Dataset

In [0]:
df_azsqldb_customer = spark.read.parquet(*azsqldb_Customer_files, header=True) 
df_azsqldb_customer.show(3)

+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+
|CustomerID|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|         CompanyName|         SalesPerson|        EmailAddress|       Phone|        PasswordHash|PasswordSalt|             rowguid|       ModifiedDate|
+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+
|         1|    false|  Mr.|  Orlando|        N.|     Gee|  NULL|        A Bike Store|adventure-works\p...|orlando0@adventur...|245-555-0173|L/Rlwxzp4w7RWmEgX...|    1KjXYs4=|3f5ae95e-b87d-4ae...|2005-08-01 00:00:00|
|         2|    false|  Mr.|    Keith|      NULL|  Harris|  NULL|  Progressive Sports|adventure-works\d...|keith0@adventure-...|170-

In [0]:
df_azsqldb_customer = df_azsqldb_customer.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["CustomerID"]) \
    .select("CustomerID", "NameStyle", "Title", "FirstName", "MiddleName", "LastName", "Suffix", "CompanyName", "SalesPerson", "EmailAddress", "Phone", "PasswordHash", "PasswordSalt", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("CustomerID")

window_spec = Window.partitionBy("CustomerID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_customer = df_azsqldb_customer.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_customer = df_azsqldb_customer.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_customer.show(3)

+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+--------------------+
|CustomerID|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|         CompanyName|         SalesPerson|        EmailAddress|       Phone|        PasswordHash|PasswordSalt|             rowguid|       ModifiedDate| ingestion_timestamp|
+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+--------------------+
|         1|    false|  Mr.|  Orlando|        N.|     Gee|  NULL|        A Bike Store|adventure-works\p...|orlando0@adventur...|245-555-0173|L/Rlwxzp4w7RWmEgX...|    1KjXYs4=|3f5ae95e-b87d-4ae...|2005-08-01 00:00:00|2024-11-28 22:07:...|
|         2|    false|  Mr.|    Keith|      NULL

In [0]:
print(f"Total azsqldb_customer records: {df_azsqldb_customer.count()}")

Total azsqldb_customer records: 847


In [0]:
df_azsqldb_customer.columns

['CustomerID',
 'NameStyle',
 'Title',
 'FirstName',
 'MiddleName',
 'LastName',
 'Suffix',
 'CompanyName',
 'SalesPerson',
 'EmailAddress',
 'Phone',
 'PasswordHash',
 'PasswordSalt',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_customer = df_azsqldb_customer \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("NameStyle", col("NameStyle").cast(BooleanType())) \
    .withColumn("Title", col("Title").cast(StringType())) \
    .withColumn("FirstName", col("FirstName").cast(StringType())) \
    .withColumn("MiddleName", col("MiddleName").cast(StringType())) \
    .withColumn("LastName", col("LastName").cast(StringType())) \
    .withColumn("Suffix", col("Suffix").cast(StringType())) \
    .withColumn("CompanyName", col("CompanyName").cast(StringType())) \
    .withColumn("SalesPerson", col("SalesPerson").cast(StringType())) \
    .withColumn("EmailAddress", col("EmailAddress").cast(StringType())) \
    .withColumn("Phone", col("Phone").cast(StringType())) \
    .withColumn("PasswordHash", col("PasswordHash").cast(StringType())) \
    .withColumn("PasswordSalt", col("PasswordSalt").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_customer.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- NameStyle: boolean (nullable = true)
 |-- Title: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Suffix: string (nullable = true)
 |-- CompanyName: string (nullable = true)
 |-- SalesPerson: string (nullable = true)
 |-- EmailAddress: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- PasswordHash: string (nullable = true)
 |-- PasswordSalt: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_customer.show(3)

+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+--------------------+
|CustomerID|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|         CompanyName|         SalesPerson|        EmailAddress|       Phone|        PasswordHash|PasswordSalt|             rowguid|       ModifiedDate| ingestion_timestamp|
+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+--------------------+
|         1|    false|  Mr.|  Orlando|        N.|     Gee|  NULL|        A Bike Store|adventure-works\p...|orlando0@adventur...|245-555-0173|L/Rlwxzp4w7RWmEgX...|    1KjXYs4=|3f5ae95e-b87d-4ae...|2005-08-01 00:00:00|2024-11-28 22:07:...|
|         2|    false|  Mr.|    Keith|      NULL

### azsqldb_customer_address Dataset

In [0]:
df_azsqldb_customer_address = spark.read.parquet(*azsqldb_CustomerAddress_files, header=True) 
df_azsqldb_customer_address.show(3)

+----------+---------+-----------+--------------------+-------------------+
|CustomerID|AddressID|AddressType|             rowguid|       ModifiedDate|
+----------+---------+-----------+--------------------+-------------------+
|     29485|     1086|Main Office|16765338-dbe4-442...|2007-09-01 00:00:00|
|     29486|      621|Main Office|22b3e910-14af-4ed...|2005-09-01 00:00:00|
|     29489|     1069|Main Office|a095c88b-d7e6-417...|2005-07-01 00:00:00|
+----------+---------+-----------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_customer_address = df_azsqldb_customer_address.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["CustomerID", "AddressID"]) \
    .select("CustomerID", "AddressID", "AddressType", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("CustomerID", "AddressID")

window_spec = Window.partitionBy("CustomerID", "AddressID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_customer_address = df_azsqldb_customer_address.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_customer_address = df_azsqldb_customer_address.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_customer_address.show(3)

+----------+---------+-----------+--------------------+-------------------+--------------------+
|CustomerID|AddressID|AddressType|             rowguid|       ModifiedDate| ingestion_timestamp|
+----------+---------+-----------+--------------------+-------------------+--------------------+
|     29485|     1086|Main Office|16765338-dbe4-442...|2007-09-01 00:00:00|2024-11-28 22:07:...|
|     29486|      621|Main Office|22b3e910-14af-4ed...|2005-09-01 00:00:00|2024-11-28 22:07:...|
|     29489|     1069|Main Office|a095c88b-d7e6-417...|2005-07-01 00:00:00|2024-11-28 22:07:...|
+----------+---------+-----------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total azsqldb_customer_address records: {df_azsqldb_customer_address.count()}")

Total azsqldb_customer_address records: 417


In [0]:
df_azsqldb_customer_address.columns

['CustomerID',
 'AddressID',
 'AddressType',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_customer_address = df_azsqldb_customer_address \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("AddressID", col("AddressID").cast(IntegerType())) \
    .withColumn("AddressType", col("AddressType").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_customer_address.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- AddressID: integer (nullable = true)
 |-- AddressType: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_customer_address.show(3)

+----------+---------+-----------+--------------------+-------------------+--------------------+
|CustomerID|AddressID|AddressType|             rowguid|       ModifiedDate| ingestion_timestamp|
+----------+---------+-----------+--------------------+-------------------+--------------------+
|     29485|     1086|Main Office|16765338-dbe4-442...|2007-09-01 00:00:00|2024-11-28 22:07:...|
|     29486|      621|Main Office|22b3e910-14af-4ed...|2005-09-01 00:00:00|2024-11-28 22:07:...|
|     29489|     1069|Main Office|a095c88b-d7e6-417...|2005-07-01 00:00:00|2024-11-28 22:07:...|
+----------+---------+-----------+--------------------+-------------------+--------------------+
only showing top 3 rows



### azsqldb_product Dataset

In [0]:
df_azsqldb_product = spark.read.parquet(*azsqldb_Product_files, header=True) 
df_azsqldb_product.show(3)

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|   1059.3100|1431.5000|  58|1016.04|               18|             6|2002-06-01 00:00:00|       NULL|            NULL|[47 49 46 38 39 6...|  no_image_availabl...|43dd68d6-14a4-4

In [0]:
df_azsqldb_product = df_azsqldb_product.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductID"]) \
    .select("ProductID", "Name", "ProductNumber", "Color", "StandardCost", "ListPrice", "Size", "Weight", "ProductCategoryID", "ProductModelID", "SellStartDate", "SellEndDate", "DiscontinuedDate", "ThumbNailPhoto", "ThumbnailPhotoFileName", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("ProductID")

window_spec = Window.partitionBy("ProductID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_product = df_azsqldb_product.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_product = df_azsqldb_product.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_product.show(3)

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate| ingestion_timestamp|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|   1059.3100|1431.5000|  58|1016.04|               18|             6|2002-06-01 00:00:00|       NULL|            N

In [0]:
print(f"Total azsqldb_product records: {df_azsqldb_product.count()}")

Total azsqldb_product records: 295


In [0]:
df_azsqldb_product.columns

['ProductID',
 'Name',
 'ProductNumber',
 'Color',
 'StandardCost',
 'ListPrice',
 'Size',
 'Weight',
 'ProductCategoryID',
 'ProductModelID',
 'SellStartDate',
 'SellEndDate',
 'DiscontinuedDate',
 'ThumbNailPhoto',
 'ThumbnailPhotoFileName',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_product = df_azsqldb_product \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("Name", col("Name").cast(StringType())) \
    .withColumn("ProductNumber", col("ProductNumber").cast(StringType())) \
    .withColumn("Color", col("Color").cast(StringType())) \
    .withColumn("StandardCost", col("StandardCost").cast(DoubleType())) \
    .withColumn("ListPrice", col("ListPrice").cast(DoubleType())) \
    .withColumn("Size", col("Size").cast(StringType())) \
    .withColumn("Weight", col("Weight").cast(DoubleType())) \
    .withColumn("ProductCategoryID", col("ProductCategoryID").cast(IntegerType())) \
    .withColumn("ProductModelID", col("ProductModelID").cast(IntegerType())) \
    .withColumn("SellStartDate", col("SellStartDate").cast("timestamp")) \
    .withColumn("SellEndDate", col("SellEndDate").cast("timestamp")) \
    .withColumn("DiscontinuedDate", col("DiscontinuedDate").cast("timestamp")) \
    .withColumn("ThumbNailPhoto", col("ThumbNailPhoto").cast(BinaryType())) \
    .withColumn("ThumbnailPhotoFileName", col("ThumbnailPhotoFileName").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_product.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- StandardCost: double (nullable = true)
 |-- ListPrice: double (nullable = true)
 |-- Size: string (nullable = true)
 |-- Weight: double (nullable = true)
 |-- ProductCategoryID: integer (nullable = true)
 |-- ProductModelID: integer (nullable = true)
 |-- SellStartDate: timestamp (nullable = true)
 |-- SellEndDate: timestamp (nullable = true)
 |-- DiscontinuedDate: timestamp (nullable = true)
 |-- ThumbNailPhoto: binary (nullable = true)
 |-- ThumbnailPhotoFileName: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_product.show(3)

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate| ingestion_timestamp|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|     1059.31|   1431.5|  58|1016.04|               18|             6|2002-06-01 00:00:00|       NULL|            N

### azsqldb_product_category Dataset

In [0]:
df_azsqldb_product_category = spark.read.parquet(*azsqldb_ProductCategory_files, header=True) 
df_azsqldb_product_category.show(3)

+-----------------+-----------------------+----------+--------------------+-------------------+
|ProductCategoryID|ParentProductCategoryID|      Name|             rowguid|       ModifiedDate|
+-----------------+-----------------------+----------+--------------------+-------------------+
|                1|                   NULL|     Bikes|cfbda25c-df71-47a...|2002-06-01 00:00:00|
|                2|                   NULL|Components|c657828d-d808-4ab...|2002-06-01 00:00:00|
|                3|                   NULL|  Clothing|10a7c342-ca82-48d...|2002-06-01 00:00:00|
+-----------------+-----------------------+----------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_product_category = df_azsqldb_product_category.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductCategoryID"]) \
    .select("ProductCategoryID", "ParentProductCategoryID", "Name", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("ProductCategoryID")

window_spec = Window.partitionBy("ProductCategoryID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_product_category = df_azsqldb_product_category.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_product_category = df_azsqldb_product_category.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_product_category.show(3)

+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
|ProductCategoryID|ParentProductCategoryID|      Name|             rowguid|       ModifiedDate| ingestion_timestamp|
+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
|                1|                   NULL|     Bikes|cfbda25c-df71-47a...|2002-06-01 00:00:00|2024-11-28 22:21:...|
|                2|                   NULL|Components|c657828d-d808-4ab...|2002-06-01 00:00:00|2024-11-28 22:21:...|
|                3|                   NULL|  Clothing|10a7c342-ca82-48d...|2002-06-01 00:00:00|2024-11-28 22:21:...|
+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total azsqldb_product_category records: {df_azsqldb_product_category.count()}")

Total azsqldb_product_category records: 41


In [0]:
df_azsqldb_product_category.columns

['ProductCategoryID',
 'ParentProductCategoryID',
 'Name',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_product_category = df_azsqldb_product_category \
    .withColumn("ProductCategoryID", col("ProductCategoryID").cast(IntegerType())) \
    .withColumn("ParentProductCategoryID", col("ParentProductCategoryID").cast(IntegerType())) \
    .withColumn("Name", col("Name").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_product_category.printSchema()

root
 |-- ProductCategoryID: integer (nullable = true)
 |-- ParentProductCategoryID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_product_category.show(3)

+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
|ProductCategoryID|ParentProductCategoryID|      Name|             rowguid|       ModifiedDate| ingestion_timestamp|
+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
|                1|                   NULL|     Bikes|cfbda25c-df71-47a...|2002-06-01 00:00:00|2024-11-28 22:23:...|
|                2|                   NULL|Components|c657828d-d808-4ab...|2002-06-01 00:00:00|2024-11-28 22:23:...|
|                3|                   NULL|  Clothing|10a7c342-ca82-48d...|2002-06-01 00:00:00|2024-11-28 22:23:...|
+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
only showing top 3 rows



### azsqldb_product_description Dataset

In [0]:
df_azsqldb_product_description = spark.read.parquet(*azsqldb_ProductDescription_files, header=True) 
df_azsqldb_product_description.show(3)

+--------------------+--------------------+--------------------+-------------------+
|ProductDescriptionID|         Description|             rowguid|       ModifiedDate|
+--------------------+--------------------+--------------------+-------------------+
|                   3|     Chromoly steel.|301eed3a-1a82-485...|2007-06-01 00:00:00|
|                   4|Aluminum alloy cu...|dfeba528-da11-465...|2007-06-01 00:00:00|
|                   5|Aluminum alloy cu...|f7178da7-1a7e-499...|2007-06-01 00:00:00|
+--------------------+--------------------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_product_description = df_azsqldb_product_description.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductDescriptionID"]) \
    .select("ProductDescriptionID", "Description", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("ProductDescriptionID")

window_spec = Window.partitionBy("ProductDescriptionID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_product_description = df_azsqldb_product_description.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_product_description = df_azsqldb_product_description.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_product_description.show(3)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|ProductDescriptionID|         Description|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|                   3|     Chromoly steel.|301eed3a-1a82-485...|2007-06-01 00:00:00|2024-11-28 22:27:...|
|                   4|Aluminum alloy cu...|dfeba528-da11-465...|2007-06-01 00:00:00|2024-11-28 22:27:...|
|                   5|Aluminum alloy cu...|f7178da7-1a7e-499...|2007-06-01 00:00:00|2024-11-28 22:27:...|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total azsqldb_product_description records: {df_azsqldb_product_description.count()}")

Total azsqldb_product_description records: 762


In [0]:
df_azsqldb_product_description.columns

['ProductDescriptionID',
 'Description',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_product_description = df_azsqldb_product_description \
    .withColumn("ProductDescriptionID", col("ProductDescriptionID").cast(IntegerType())) \
    .withColumn("Description", col("Description").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_product_description.printSchema()

root
 |-- ProductDescriptionID: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_product_description.show(3)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|ProductDescriptionID|         Description|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|                   3|     Chromoly steel.|301eed3a-1a82-485...|2007-06-01 00:00:00|2024-11-28 22:33:...|
|                   4|Aluminum alloy cu...|dfeba528-da11-465...|2007-06-01 00:00:00|2024-11-28 22:33:...|
|                   5|Aluminum alloy cu...|f7178da7-1a7e-499...|2007-06-01 00:00:00|2024-11-28 22:33:...|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 3 rows



### azsqldb_product_model Dataset

In [0]:
df_azsqldb_product_model = spark.read.parquet(*azsqldb_ProductModel_files, header=True) 
df_azsqldb_product_model.show(3)

+--------------+------------------+------------------+--------------------+-------------------+
|ProductModelID|              Name|CatalogDescription|             rowguid|       ModifiedDate|
+--------------+------------------+------------------+--------------------+-------------------+
|             1|      Classic Vest|              NULL|29321d47-1e4c-4aa...|2007-06-01 00:00:00|
|             2|       Cycling Cap|              NULL|474fb654-3c96-4cb...|2005-06-01 00:00:00|
|             3|Full-Finger Gloves|              NULL|a75483fe-3c47-4aa...|2006-06-01 00:00:00|
+--------------+------------------+------------------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_product_model = df_azsqldb_product_model.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductModelID"]) \
    .select("ProductModelID", "Name", "CatalogDescription", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("ProductModelID")

window_spec = Window.partitionBy("ProductModelID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_product_model = df_azsqldb_product_model.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_product_model = df_azsqldb_product_model.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_product_model.show(3)

+--------------+------------------+------------------+--------------------+-------------------+--------------------+
|ProductModelID|              Name|CatalogDescription|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------+------------------+------------------+--------------------+-------------------+--------------------+
|             1|      Classic Vest|              NULL|29321d47-1e4c-4aa...|2007-06-01 00:00:00|2024-11-28 22:36:...|
|             2|       Cycling Cap|              NULL|474fb654-3c96-4cb...|2005-06-01 00:00:00|2024-11-28 22:36:...|
|             3|Full-Finger Gloves|              NULL|a75483fe-3c47-4aa...|2006-06-01 00:00:00|2024-11-28 22:36:...|
+--------------+------------------+------------------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total azsqldb_product_model records: {df_azsqldb_product_model.count()}")

Total azsqldb_product_model records: 128


In [0]:
df_azsqldb_product_model.columns

['ProductModelID',
 'Name',
 'CatalogDescription',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_product_model = df_azsqldb_product_model \
    .withColumn("ProductModelID", col("ProductModelID").cast(IntegerType())) \
    .withColumn("Name", col("Name").cast(StringType())) \
    .withColumn("CatalogDescription", col("CatalogDescription").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_product_model.printSchema()

root
 |-- ProductModelID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- CatalogDescription: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_product_model.show(3)

+--------------+------------------+------------------+--------------------+-------------------+--------------------+
|ProductModelID|              Name|CatalogDescription|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------+------------------+------------------+--------------------+-------------------+--------------------+
|             1|      Classic Vest|              NULL|29321d47-1e4c-4aa...|2007-06-01 00:00:00|2024-11-28 22:38:...|
|             2|       Cycling Cap|              NULL|474fb654-3c96-4cb...|2005-06-01 00:00:00|2024-11-28 22:38:...|
|             3|Full-Finger Gloves|              NULL|a75483fe-3c47-4aa...|2006-06-01 00:00:00|2024-11-28 22:38:...|
+--------------+------------------+------------------+--------------------+-------------------+--------------------+
only showing top 3 rows



### azsqldb_productmodel_productdescription Dataset

In [0]:
df_azsqldb_productmodel_productdescription = spark.read.parquet(*azsqldb_ProductModelProductDescription_files, header=True) 
df_azsqldb_productmodel_productdescription.show(3)

+--------------+--------------------+-------+--------------------+-------------------+
|ProductModelID|ProductDescriptionID|Culture|             rowguid|       ModifiedDate|
+--------------+--------------------+-------+--------------------+-------------------+
|             1|                1199| en    |4d00b649-027a-4f9...|2007-06-01 00:00:00|
|             1|                1467| ar    |7de7204e-4efc-40f...|2007-06-01 00:00:00|
|             1|                1589| fr    |20bffcf4-bfa6-400...|2007-06-01 00:00:00|
+--------------+--------------------+-------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_productmodel_productdescription = df_azsqldb_productmodel_productdescription.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductModelID", "ProductDescriptionID"]) \
    .select("ProductModelID", "ProductDescriptionID", "Culture", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("ProductModelID", "ProductDescriptionID")

window_spec = Window.partitionBy("ProductModelID", "ProductDescriptionID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_productmodel_productdescription = df_azsqldb_productmodel_productdescription.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_productmodel_productdescription = df_azsqldb_productmodel_productdescription.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_productmodel_productdescription.show(3)

+--------------+--------------------+-------+--------------------+-------------------+--------------------+
|ProductModelID|ProductDescriptionID|Culture|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------+--------------------+-------+--------------------+-------------------+--------------------+
|             1|                1199| en    |4d00b649-027a-4f9...|2007-06-01 00:00:00|2024-11-28 22:43:...|
|             1|                1467| ar    |7de7204e-4efc-40f...|2007-06-01 00:00:00|2024-11-28 22:43:...|
|             1|                1589| fr    |20bffcf4-bfa6-400...|2007-06-01 00:00:00|2024-11-28 22:43:...|
+--------------+--------------------+-------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total azsqldb_productmodel_productdescription records: {df_azsqldb_productmodel_productdescription.count()}")

Total azsqldb_productmodel_productdescription records: 762


In [0]:
df_azsqldb_productmodel_productdescription.columns

['ProductModelID',
 'ProductDescriptionID',
 'Culture',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_productmodel_productdescription = df_azsqldb_productmodel_productdescription \
    .withColumn("ProductModelID", col("ProductModelID").cast(IntegerType())) \
    .withColumn("ProductDescriptionID", col("ProductDescriptionID").cast(IntegerType())) \
    .withColumn("Culture", col("Culture").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_productmodel_productdescription.printSchema()

root
 |-- ProductModelID: integer (nullable = true)
 |-- ProductDescriptionID: integer (nullable = true)
 |-- Culture: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_productmodel_productdescription.show(3)

+--------------+--------------------+-------+--------------------+-------------------+--------------------+
|ProductModelID|ProductDescriptionID|Culture|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------+--------------------+-------+--------------------+-------------------+--------------------+
|             1|                1199| en    |4d00b649-027a-4f9...|2007-06-01 00:00:00|2024-11-28 22:45:...|
|             1|                1467| ar    |7de7204e-4efc-40f...|2007-06-01 00:00:00|2024-11-28 22:45:...|
|             1|                1589| fr    |20bffcf4-bfa6-400...|2007-06-01 00:00:00|2024-11-28 22:45:...|
+--------------+--------------------+-------+--------------------+-------------------+--------------------+
only showing top 3 rows



### azsqldb_salesorder_detail Dataset

In [0]:
df_azsqldb_salesorder_detail = spark.read.parquet(*azsqldb_SalesOrderDetail_files, header=True) 
df_azsqldb_salesorder_detail.show(3)

+------------+------------------+--------+---------+---------+-----------------+----------+--------------------+-------------------+
|SalesOrderID|SalesOrderDetailID|OrderQty|ProductID|UnitPrice|UnitPriceDiscount| LineTotal|             rowguid|       ModifiedDate|
+------------+------------------+--------+---------+---------+-----------------+----------+--------------------+-------------------+
|       71774|            110562|       1|      836| 356.8980|           0.0000|356.898000|e3a1994c-7a68-4ce...|2008-06-01 00:00:00|
|       71774|            110563|       1|      822| 356.8980|           0.0000|356.898000|5c77f557-fdb6-43b...|2008-06-01 00:00:00|
|       71776|            110567|       1|      907|  63.9000|           0.0000| 63.900000|6dbfe398-d15d-425...|2008-06-01 00:00:00|
+------------+------------------+--------+---------+---------+-----------------+----------+--------------------+-------------------+
only showing top 3 rows



In [0]:
df_azsqldb_salesorder_detail = df_azsqldb_salesorder_detail.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["SalesOrderID", "SalesOrderDetailID", "ProductID"]) \
    .select("SalesOrderID", "SalesOrderDetailID", "OrderQty", "ProductID", "UnitPrice", "UnitPriceDiscount", "LineTotal", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("SalesOrderID", "SalesOrderDetailID", "ProductID")

window_spec = Window.partitionBy("SalesOrderID", "SalesOrderDetailID", "ProductID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_salesorder_detail = df_azsqldb_salesorder_detail.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_salesorder_detail = df_azsqldb_salesorder_detail.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_salesorder_detail.show(3)

+------------+------------------+--------+---------+---------+-----------------+----------+--------------------+-------------------+--------------------+
|SalesOrderID|SalesOrderDetailID|OrderQty|ProductID|UnitPrice|UnitPriceDiscount| LineTotal|             rowguid|       ModifiedDate| ingestion_timestamp|
+------------+------------------+--------+---------+---------+-----------------+----------+--------------------+-------------------+--------------------+
|       71774|            110562|       1|      836| 356.8980|           0.0000|356.898000|e3a1994c-7a68-4ce...|2008-06-01 00:00:00|2024-11-28 22:53:...|
|       71774|            110563|       1|      822| 356.8980|           0.0000|356.898000|5c77f557-fdb6-43b...|2008-06-01 00:00:00|2024-11-28 22:53:...|
|       71776|            110567|       1|      907|  63.9000|           0.0000| 63.900000|6dbfe398-d15d-425...|2008-06-01 00:00:00|2024-11-28 22:53:...|
+------------+------------------+--------+---------+---------+--------------

In [0]:
print(f"Total azsqldb_salesorder_detail records: {df_azsqldb_salesorder_detail.count()}")

Total azsqldb_salesorder_detail records: 542


In [0]:
df_azsqldb_salesorder_detail.columns

['SalesOrderID',
 'SalesOrderDetailID',
 'OrderQty',
 'ProductID',
 'UnitPrice',
 'UnitPriceDiscount',
 'LineTotal',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_salesorder_detail = df_azsqldb_salesorder_detail \
    .withColumn("SalesOrderID", col("SalesOrderID").cast(IntegerType())) \
    .withColumn("SalesOrderDetailID", col("SalesOrderDetailID").cast(IntegerType())) \
    .withColumn("OrderQty", col("OrderQty").cast(IntegerType())) \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("UnitPrice", col("UnitPrice").cast(DoubleType())) \
    .withColumn("UnitPriceDiscount", col("UnitPriceDiscount").cast(DoubleType())) \
    .withColumn("LineTotal", col("LineTotal").cast(DoubleType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_salesorder_detail.printSchema()

root
 |-- SalesOrderID: integer (nullable = true)
 |-- SalesOrderDetailID: integer (nullable = true)
 |-- OrderQty: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitPriceDiscount: double (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_azsqldb_salesorder_detail.show(3)

+------------+------------------+--------+---------+---------+-----------------+---------+--------------------+-------------------+--------------------+
|SalesOrderID|SalesOrderDetailID|OrderQty|ProductID|UnitPrice|UnitPriceDiscount|LineTotal|             rowguid|       ModifiedDate| ingestion_timestamp|
+------------+------------------+--------+---------+---------+-----------------+---------+--------------------+-------------------+--------------------+
|       71774|            110562|       1|      836|  356.898|              0.0|  356.898|e3a1994c-7a68-4ce...|2008-06-01 00:00:00|2024-11-28 22:56:...|
|       71774|            110563|       1|      822|  356.898|              0.0|  356.898|5c77f557-fdb6-43b...|2008-06-01 00:00:00|2024-11-28 22:56:...|
|       71776|            110567|       1|      907|     63.9|              0.0|     63.9|6dbfe398-d15d-425...|2008-06-01 00:00:00|2024-11-28 22:56:...|
+------------+------------------+--------+---------+---------+-----------------+--

### azsqldb_salesorder_header Dataset

In [0]:
df_azsqldb_salesorder_header = spark.read.parquet(*azsqldb_SalesOrderHeader_files, header=True) 
df_azsqldb_salesorder_header.show(3)

+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+----------+-------+--------------------+-------------------+
|SalesOrderID|RevisionNumber|          OrderDate|            DueDate|           ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|  SubTotal|   TaxAmt| Freight|  TotalDue|Comment|             rowguid|       ModifiedDate|
+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+----------+-------+--------------------+----

In [0]:
df_azsqldb_salesorder_header = df_azsqldb_salesorder_header.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["SalesOrderID"]) \
    .select("SalesOrderID", "RevisionNumber", "OrderDate", "DueDate", "ShipDate", "Status", "OnlineOrderFlag", "SalesOrderNumber", "PurchaseOrderNumber", "AccountNumber", "CustomerID", "ShipToAddressID", "BillToAddressID", "ShipMethod", "CreditCardApprovalCode", "SubTotal", "TaxAmt", "Freight", "TotalDue", "Comment", "rowguid", "ModifiedDate", "ingestion_timestamp") \
    .orderBy("SalesOrderID")

window_spec = Window.partitionBy("SalesOrderID").orderBy(col("ingestion_timestamp").desc())
df_azsqldb_salesorder_header = df_azsqldb_salesorder_header.withColumn("row_num", row_number().over(window_spec))
df_azsqldb_salesorder_header = df_azsqldb_salesorder_header.filter(col("row_num") == 1).drop("row_num")

df_azsqldb_salesorder_header.show(3)

+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+----------+-------+--------------------+-------------------+--------------------+
|SalesOrderID|RevisionNumber|          OrderDate|            DueDate|           ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|  SubTotal|   TaxAmt| Freight|  TotalDue|Comment|             rowguid|       ModifiedDate| ingestion_timestamp|
+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+--

In [0]:
print(f"Total azsqldb_salesorder_header records: {df_azsqldb_salesorder_header.count()}")

Total azsqldb_salesorder_header records: 32


In [0]:
df_azsqldb_salesorder_header.columns

['SalesOrderID',
 'RevisionNumber',
 'OrderDate',
 'DueDate',
 'ShipDate',
 'Status',
 'OnlineOrderFlag',
 'SalesOrderNumber',
 'PurchaseOrderNumber',
 'AccountNumber',
 'CustomerID',
 'ShipToAddressID',
 'BillToAddressID',
 'ShipMethod',
 'CreditCardApprovalCode',
 'SubTotal',
 'TaxAmt',
 'Freight',
 'TotalDue',
 'Comment',
 'rowguid',
 'ModifiedDate',
 'ingestion_timestamp']

In [0]:
df_azsqldb_salesorder_header = df_azsqldb_salesorder_header \
    .withColumn("SalesOrderID", col("SalesOrderID").cast(IntegerType())) \
    .withColumn("RevisionNumber", col("RevisionNumber").cast(IntegerType())) \
    .withColumn("OrderDate", col("OrderDate").cast("timestamp")) \
    .withColumn("DueDate", col("DueDate").cast("timestamp")) \
    .withColumn("ShipDate", col("ShipDate").cast("timestamp")) \
    .withColumn("Status", col("Status").cast(IntegerType())) \
    .withColumn("OnlineOrderFlag", col("OnlineOrderFlag").cast(BooleanType())) \
    .withColumn("SalesOrderNumber", col("SalesOrderNumber").cast(StringType())) \
    .withColumn("PurchaseOrderNumber", col("PurchaseOrderNumber").cast(StringType())) \
    .withColumn("AccountNumber", col("AccountNumber").cast(StringType())) \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("ShipToAddressID", col("ShipToAddressID").cast(IntegerType())) \
    .withColumn("BillToAddressID", col("BillToAddressID").cast(IntegerType())) \
    .withColumn("ShipMethod", col("ShipMethod").cast(StringType())) \
    .withColumn("CreditCardApprovalCode", col("CreditCardApprovalCode").cast(StringType())) \
    .withColumn("SubTotal", col("SubTotal").cast(DoubleType())) \
    .withColumn("TaxAmt", col("TaxAmt").cast(DoubleType())) \
    .withColumn("Freight", col("Freight").cast(DoubleType())) \
    .withColumn("TotalDue", col("TotalDue").cast(DoubleType())) \
    .withColumn("Comment", col("Comment").cast(StringType())) \
    .withColumn("rowguid", col("rowguid").cast(StringType())) \
    .withColumn("ModifiedDate", col("ModifiedDate").cast("timestamp")) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_azsqldb_salesorder_header.printSchema()

root
 |-- SalesOrderID: integer (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- DueDate: timestamp (nullable = true)
 |-- ShipDate: timestamp (nullable = true)
 |-- Status: integer (nullable = true)
 |-- OnlineOrderFlag: boolean (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- ShipToAddressID: integer (nullable = true)
 |-- BillToAddressID: integer (nullable = true)
 |-- ShipMethod: string (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- SubTotal: double (nullable = true)
 |-- TaxAmt: double (nullable = true)
 |-- Freight: double (nullable = true)
 |-- TotalDue: double (nullable = true)
 |-- Comment: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)
 |-- ingestion_timestamp: 

In [0]:
df_azsqldb_salesorder_header.show(3)

+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+----------+-------+--------------------+-------------------+--------------------+
|SalesOrderID|RevisionNumber|          OrderDate|            DueDate|           ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|  SubTotal|   TaxAmt| Freight|  TotalDue|Comment|             rowguid|       ModifiedDate| ingestion_timestamp|
+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+--

## Reading and Cleaning Data From On-Premise Microsoft SQL Server

### onprem_country_rolling Dataset

In [0]:
df_onprem_country_rolling = spark.read.parquet(*onprem_CountryRolling_files, header=True) 
df_onprem_country_rolling.show(3)

+---------+-------------+
|CountryID|  CountryName|
+---------+-------------+
|        1|       Canada|
|        2|United States|
+---------+-------------+



In [0]:
df_onprem_country_rolling = df_onprem_country_rolling.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["CountryID"]) \
    .select("CountryID", "CountryName", "ingestion_timestamp") \
    .orderBy("CountryID")

window_spec = Window.partitionBy("CountryID").orderBy(col("ingestion_timestamp").desc())
df_onprem_country_rolling = df_onprem_country_rolling.withColumn("row_num", row_number().over(window_spec))
df_onprem_country_rolling = df_onprem_country_rolling.filter(col("row_num") == 1).drop("row_num")

df_onprem_country_rolling.show(3)

+---------+-------------+--------------------+
|CountryID|  CountryName| ingestion_timestamp|
+---------+-------------+--------------------+
|        1|       Canada|2024-11-28 23:25:...|
|        2|United States|2024-11-28 23:25:...|
+---------+-------------+--------------------+



In [0]:
print(f"Total onprem_country_rolling records: {df_onprem_country_rolling.count()}")

Total onprem_country_rolling records: 2


In [0]:
df_onprem_country_rolling.columns

['CountryID', 'CountryName', 'ingestion_timestamp']

In [0]:
df_onprem_country_rolling = df_onprem_country_rolling \
    .withColumn("CountryID", col("CountryID").cast(IntegerType())) \
    .withColumn("CountryName", col("CountryName").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_country_rolling.printSchema()

root
 |-- CountryID: integer (nullable = true)
 |-- CountryName: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_country_rolling.show(3)

+---------+-------------+--------------------+
|CountryID|  CountryName| ingestion_timestamp|
+---------+-------------+--------------------+
|        1|       Canada|2024-11-28 23:27:...|
|        2|United States|2024-11-28 23:27:...|
+---------+-------------+--------------------+



### onprem_customer Dataset

In [0]:
df_onprem_customer = spark.read.parquet(*onprem_Customer_files, header=True) 
df_onprem_customer.show(3)

+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+
|CustomerID|FirstName|LastName|               Email|     Phone|AddressLine1|AddressLine2|       City|StateID|PostalCode|CountryID|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+
|         1|     John|     Doe|john.doe@example.com|1234567890| 123 Main St|            |    Toronto|      1|   M5H 2N2|        1|
|         2|     Jane|   Smith|jane.smith@exampl...|9876543210|  456 Elm St|            |Los Angeles|      4|     90001|        2|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+



In [0]:
df_onprem_customer = df_onprem_customer.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["CustomerID"]) \
    .select("CustomerID", "FirstName", "LastName", "Email", "Phone", "AddressLine1", "AddressLine2", "City", "StateID", "PostalCode", "CountryID", "ingestion_timestamp") \
    .orderBy("CustomerID")

window_spec = Window.partitionBy("CustomerID").orderBy(col("ingestion_timestamp").desc())
df_onprem_customer = df_onprem_customer.withColumn("row_num", row_number().over(window_spec))
df_onprem_customer = df_onprem_customer.filter(col("row_num") == 1).drop("row_num")

df_onprem_customer.show(3)

+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+
|CustomerID|FirstName|LastName|               Email|     Phone|AddressLine1|AddressLine2|       City|StateID|PostalCode|CountryID| ingestion_timestamp|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+
|         1|     John|     Doe|john.doe@example.com|1234567890| 123 Main St|            |    Toronto|      1|   M5H 2N2|        1|2024-11-28 23:32:...|
|         2|     Jane|   Smith|jane.smith@exampl...|9876543210|  456 Elm St|            |Los Angeles|      4|     90001|        2|2024-11-28 23:32:...|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+



In [0]:
print(f"Total onprem_customer records: {df_onprem_customer.count()}")

Total onprem_customer records: 2


In [0]:
df_onprem_customer.columns

['CustomerID',
 'FirstName',
 'LastName',
 'Email',
 'Phone',
 'AddressLine1',
 'AddressLine2',
 'City',
 'StateID',
 'PostalCode',
 'CountryID',
 'ingestion_timestamp']

In [0]:
df_onprem_customer = df_onprem_customer \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("FirstName", col("FirstName").cast(StringType())) \
    .withColumn("LastName", col("LastName").cast(StringType())) \
    .withColumn("Email", col("Email").cast(StringType())) \
    .withColumn("Phone", col("Phone").cast(StringType())) \
    .withColumn("AddressLine1", col("AddressLine1").cast(StringType())) \
    .withColumn("AddressLine2", col("AddressLine2").cast(StringType())) \
    .withColumn("City", col("City").cast(StringType())) \
    .withColumn("StateID", col("StateID").cast(IntegerType())) \
    .withColumn("PostalCode", col("PostalCode").cast(StringType())) \
    .withColumn("CountryID", col("CountryID").cast(IntegerType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_customer.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- AddressLine1: string (nullable = true)
 |-- AddressLine2: string (nullable = true)
 |-- City: string (nullable = true)
 |-- StateID: integer (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- CountryID: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_customer.show(3)

+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+
|CustomerID|FirstName|LastName|               Email|     Phone|AddressLine1|AddressLine2|       City|StateID|PostalCode|CountryID| ingestion_timestamp|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+
|         1|     John|     Doe|john.doe@example.com|1234567890| 123 Main St|            |    Toronto|      1|   M5H 2N2|        1|2024-11-28 23:36:...|
|         2|     Jane|   Smith|jane.smith@exampl...|9876543210|  456 Elm St|            |Los Angeles|      4|     90001|        2|2024-11-28 23:36:...|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+



### onprem_customer_productreview Dataset

In [0]:
df_onprem_customer_productreview = spark.read.parquet(*onprem_CustomerProductReview_files, header=True) 
df_onprem_customer_productreview.show(3)

+--------+---------+----------+------+--------------------+
|ReviewID|ProductID|CustomerID|Rating|              Review|
+--------+---------+----------+------+--------------------+
|       1|        1|         1|     5|The smartphone is...|
|       2|        2|         2|     4|Laptop performanc...|
+--------+---------+----------+------+--------------------+



In [0]:
df_onprem_customer_productreview = df_onprem_customer_productreview.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ReviewID", "ProductID", "CustomerID"]) \
    .select("ReviewID", "ProductID", "CustomerID", "Rating", "Review", "ingestion_timestamp") \
    .orderBy("ReviewID", "ProductID", "CustomerID")

window_spec = Window.partitionBy("ReviewID", "ProductID", "CustomerID").orderBy(col("ingestion_timestamp").desc())
df_onprem_customer_productreview = df_onprem_customer_productreview.withColumn("row_num", row_number().over(window_spec))
df_onprem_customer_productreview = df_onprem_customer_productreview.filter(col("row_num") == 1).drop("row_num")

df_onprem_customer_productreview.show(3)

+--------+---------+----------+------+--------------------+--------------------+
|ReviewID|ProductID|CustomerID|Rating|              Review| ingestion_timestamp|
+--------+---------+----------+------+--------------------+--------------------+
|       1|        1|         1|     5|The smartphone is...|2024-11-28 23:42:...|
|       2|        2|         2|     4|Laptop performanc...|2024-11-28 23:42:...|
+--------+---------+----------+------+--------------------+--------------------+



In [0]:
print(f"Total onprem_customer_productreview records: {df_onprem_customer_productreview.count()}")

Total onprem_customer_productreview records: 2


In [0]:
df_onprem_customer_productreview.columns

['ReviewID',
 'ProductID',
 'CustomerID',
 'Rating',
 'Review',
 'ingestion_timestamp']

In [0]:
df_onprem_customer_productreview = df_onprem_customer_productreview \
    .withColumn("ReviewID", col("ReviewID").cast(IntegerType())) \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("Rating", col("Rating").cast(IntegerType())) \
    .withColumn("Review", col("Review").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_customer_productreview.printSchema()

root
 |-- ReviewID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Review: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_customer_productreview.show(3)

+--------+---------+----------+------+--------------------+--------------------+
|ReviewID|ProductID|CustomerID|Rating|              Review| ingestion_timestamp|
+--------+---------+----------+------+--------------------+--------------------+
|       1|        1|         1|     5|The smartphone is...|2024-11-28 23:45:...|
|       2|        2|         2|     4|Laptop performanc...|2024-11-28 23:45:...|
+--------+---------+----------+------+--------------------+--------------------+



### onprem_customer_sellerreview Dataset

In [0]:
df_onprem_customer_sellerreview = spark.read.parquet(*onprem_CustomerSellerReview_files, header=True) 
df_onprem_customer_sellerreview.show(3)

+--------+--------+----------+------+--------------------+
|ReviewID|SellerID|CustomerID|Rating|              Review|
+--------+--------+----------+------+--------------------+
|       1|       1|         1|     5|Great service and...|
|       2|       2|         2|     4|Good quality prod...|
+--------+--------+----------+------+--------------------+



In [0]:
df_onprem_customer_sellerreview = df_onprem_customer_sellerreview.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ReviewID", "SellerID", "CustomerID"]) \
    .select("ReviewID", "SellerID", "CustomerID", "Rating", "Review", "ingestion_timestamp") \
    .orderBy("ReviewID", "SellerID", "CustomerID")

window_spec = Window.partitionBy("ReviewID", "SellerID", "CustomerID").orderBy(col("ingestion_timestamp").desc())
df_onprem_customer_sellerreview = df_onprem_customer_sellerreview.withColumn("row_num", row_number().over(window_spec))
df_onprem_customer_sellerreview = df_onprem_customer_sellerreview.filter(col("row_num") == 1).drop("row_num")

df_onprem_customer_sellerreview.show(3)

+--------+--------+----------+------+--------------------+--------------------+
|ReviewID|SellerID|CustomerID|Rating|              Review| ingestion_timestamp|
+--------+--------+----------+------+--------------------+--------------------+
|       1|       1|         1|     5|Great service and...|2024-11-28 23:49:...|
|       2|       2|         2|     4|Good quality prod...|2024-11-28 23:49:...|
+--------+--------+----------+------+--------------------+--------------------+



In [0]:
print(f"Total onprem_customer_sellerreview records: {df_onprem_customer_sellerreview.count()}")

Total onprem_customer_sellerreview records: 2


In [0]:
df_onprem_customer_sellerreview.columns

['ReviewID',
 'SellerID',
 'CustomerID',
 'Rating',
 'Review',
 'ingestion_timestamp']

In [0]:
df_onprem_customer_sellerreview = df_onprem_customer_sellerreview \
    .withColumn("ReviewID", col("ReviewID").cast(IntegerType())) \
    .withColumn("SellerID", col("SellerID").cast(IntegerType())) \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("Rating", col("Rating").cast(IntegerType())) \
    .withColumn("Review", col("Review").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_customer_sellerreview.printSchema()

root
 |-- ReviewID: integer (nullable = true)
 |-- SellerID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Review: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_customer_sellerreview.show(3)

+--------+--------+----------+------+--------------------+--------------------+
|ReviewID|SellerID|CustomerID|Rating|              Review| ingestion_timestamp|
+--------+--------+----------+------+--------------------+--------------------+
|       1|       1|         1|     5|Great service and...|2024-11-28 23:52:...|
|       2|       2|         2|     4|Good quality prod...|2024-11-28 23:52:...|
+--------+--------+----------+------+--------------------+--------------------+



### onprem_order Dataset

In [0]:
df_onprem_order = spark.read.parquet(*onprem_Order_files, header=True) 
df_onprem_order.show(3)

+-------+------------------+----------+--------+
|OrderID|PromotionProductID|CustomerID|Quantity|
+-------+------------------+----------+--------+
|      1|                 1|         1|       2|
|      2|                 2|         2|       3|
+-------+------------------+----------+--------+



In [0]:
df_onprem_order = df_onprem_order.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["OrderID", "CustomerID"]) \
    .select("OrderID", "PromotionProductID", "CustomerID", "Quantity", "ingestion_timestamp") \
    .orderBy("OrderID")

window_spec = Window.partitionBy("OrderID").orderBy(col("ingestion_timestamp").desc())
df_onprem_order = df_onprem_order.withColumn("row_num", row_number().over(window_spec))
df_onprem_order = df_onprem_order.filter(col("row_num") == 1).drop("row_num")

df_onprem_order.show(3)

+-------+------------------+----------+--------+--------------------+
|OrderID|PromotionProductID|CustomerID|Quantity| ingestion_timestamp|
+-------+------------------+----------+--------+--------------------+
|      1|                 1|         1|       2|2024-11-28 23:57:...|
|      2|                 2|         2|       3|2024-11-28 23:57:...|
+-------+------------------+----------+--------+--------------------+



In [0]:
print(f"Total onprem_order records: {df_onprem_order.count()}")

Total onprem_order records: 2


In [0]:
df_onprem_order.columns

['OrderID',
 'PromotionProductID',
 'CustomerID',
 'Quantity',
 'ingestion_timestamp']

In [0]:
df_onprem_order = df_onprem_order \
    .withColumn("OrderID", col("OrderID").cast(IntegerType())) \
    .withColumn("PromotionProductID", col("PromotionProductID").cast(IntegerType())) \
    .withColumn("CustomerID", col("CustomerID").cast(IntegerType())) \
    .withColumn("Quantity", col("Quantity").cast(IntegerType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_order.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- PromotionProductID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_order.show(3)

+-------+------------------+----------+--------+--------------------+
|OrderID|PromotionProductID|CustomerID|Quantity| ingestion_timestamp|
+-------+------------------+----------+--------+--------------------+
|      1|                 1|         1|       2|2024-11-28 23:59:...|
|      2|                 2|         2|       3|2024-11-28 23:59:...|
+-------+------------------+----------+--------+--------------------+



### onprem_product Dataset

In [0]:
df_onprem_product = spark.read.parquet(*onprem_Product_files, header=True) 
df_onprem_product.show(3)

+---------+----------+-----------+-----------+--------------------+
|ProductID|CategoryID|ProductName|      Brand|      Specifications|
+---------+----------+-----------+-----------+--------------------+
|        1|         2| Smartphone|  TechBrand|{"screen_size": "...|
|        2|         2|     Laptop|  CompBrand|{"processor": "In...|
|        3|         3|    T-Shirt|FashionWear|{"sizes": ["S", "...|
+---------+----------+-----------+-----------+--------------------+
only showing top 3 rows



In [0]:
df_onprem_product = df_onprem_product.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductID"]) \
    .select("ProductID", "CategoryID", "ProductName", "Brand", "Specifications", "ingestion_timestamp") \
    .orderBy("ProductID")

window_spec = Window.partitionBy("ProductID").orderBy(col("ingestion_timestamp").desc())
df_onprem_product = df_onprem_product.withColumn("row_num", row_number().over(window_spec))
df_onprem_product = df_onprem_product.filter(col("row_num") == 1).drop("row_num")

df_onprem_product.show(3)

+---------+----------+-----------+-----------+--------------------+--------------------+
|ProductID|CategoryID|ProductName|      Brand|      Specifications| ingestion_timestamp|
+---------+----------+-----------+-----------+--------------------+--------------------+
|        1|         2| Smartphone|  TechBrand|{"screen_size": "...|2024-11-29 00:02:...|
|        2|         2|     Laptop|  CompBrand|{"processor": "In...|2024-11-29 00:02:...|
|        3|         3|    T-Shirt|FashionWear|{"sizes": ["S", "...|2024-11-29 00:02:...|
+---------+----------+-----------+-----------+--------------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total onprem_product records: {df_onprem_product.count()}")

Total onprem_product records: 6


In [0]:
df_onprem_product.columns

['ProductID',
 'CategoryID',
 'ProductName',
 'Brand',
 'Specifications',
 'ingestion_timestamp']

In [0]:
df_onprem_product = df_onprem_product \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("CategoryID", col("CategoryID").cast(IntegerType())) \
    .withColumn("ProductName", col("ProductName").cast(StringType())) \
    .withColumn("Brand", col("Brand").cast(StringType())) \
    .withColumn("Specifications", col("Specifications").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_product.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Specifications: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_product.show(3)

+---------+----------+-----------+-----------+--------------------+--------------------+
|ProductID|CategoryID|ProductName|      Brand|      Specifications| ingestion_timestamp|
+---------+----------+-----------+-----------+--------------------+--------------------+
|        1|         2| Smartphone|  TechBrand|{"screen_size": "...|2024-11-29 00:08:...|
|        2|         2|     Laptop|  CompBrand|{"processor": "In...|2024-11-29 00:08:...|
|        3|         3|    T-Shirt|FashionWear|{"sizes": ["S", "...|2024-11-29 00:08:...|
+---------+----------+-----------+-----------+--------------------+--------------------+
only showing top 3 rows



### onprem_product_categories Dataset

In [0]:
df_onprem_product_categories = spark.read.parquet(*onprem_ProductCategories_files, header=True) 
df_onprem_product_categories.show(3)

+----------+------------+
|CategoryID|CategoryName|
+----------+------------+
|         1|       Books|
|         2| Electronics|
|         3|    Clothing|
+----------+------------+
only showing top 3 rows



In [0]:
df_onprem_product_categories = df_onprem_product_categories.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["CategoryID"]) \
    .select("CategoryID", "CategoryName", "ingestion_timestamp") \
    .orderBy("CategoryID")

window_spec = Window.partitionBy("CategoryID").orderBy(col("ingestion_timestamp").desc())
df_onprem_product_categories = df_onprem_product_categories.withColumn("row_num", row_number().over(window_spec))
df_onprem_product_categories = df_onprem_product_categories.filter(col("row_num") == 1).drop("row_num")

df_onprem_product_categories.show(3)

+----------+------------+--------------------+
|CategoryID|CategoryName| ingestion_timestamp|
+----------+------------+--------------------+
|         1|       Books|2024-11-29 00:09:...|
|         2| Electronics|2024-11-29 00:09:...|
|         3|    Clothing|2024-11-29 00:09:...|
+----------+------------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total onprem_product_categories records: {df_onprem_product_categories.count()}")

Total onprem_product_categories records: 6


In [0]:
df_onprem_product_categories.columns

['CategoryID', 'CategoryName', 'ingestion_timestamp']

In [0]:
df_onprem_product_categories = df_onprem_product_categories \
    .withColumn("CategoryID", col("CategoryID").cast(IntegerType())) \
    .withColumn("CategoryName", col("CategoryName").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_product_categories.printSchema()

root
 |-- CategoryID: integer (nullable = true)
 |-- CategoryName: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_product_categories.show(3)

+----------+------------+--------------------+
|CategoryID|CategoryName| ingestion_timestamp|
+----------+------------+--------------------+
|         1|       Books|2024-11-29 00:11:...|
|         2| Electronics|2024-11-29 00:11:...|
|         3|    Clothing|2024-11-29 00:11:...|
+----------+------------+--------------------+
only showing top 3 rows



### onprem_product_quality Datasets

In [0]:
df_onprem_product_quality = spark.read.parquet(*onprem_ProductQuality_files, header=True) 
df_onprem_product_quality.show(3)

+----------------+-----------+
|ProductQualityID|QualityType|
+----------------+-----------+
|               1|        New|
|               2|       Used|
|               3|   Like New|
+----------------+-----------+
only showing top 3 rows



In [0]:
df_onprem_product_quality = df_onprem_product_quality.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["ProductQualityID"]) \
    .select("ProductQualityID", "QualityType", "ingestion_timestamp") \
    .orderBy("ProductQualityID")

window_spec = Window.partitionBy("ProductQualityID").orderBy(col("ingestion_timestamp").desc())
df_onprem_product_quality = df_onprem_product_quality.withColumn("row_num", row_number().over(window_spec))
df_onprem_product_quality = df_onprem_product_quality.filter(col("row_num") == 1).drop("row_num")

df_onprem_product_quality.show(3)

+----------------+-----------+--------------------+
|ProductQualityID|QualityType| ingestion_timestamp|
+----------------+-----------+--------------------+
|               1|        New|2024-11-29 00:15:...|
|               2|       Used|2024-11-29 00:15:...|
|               3|   Like New|2024-11-29 00:15:...|
+----------------+-----------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total onprem_product_quality records: {df_onprem_product_quality.count()}")

Total onprem_product_quality records: 5


In [0]:
df_onprem_product_quality.columns

['ProductQualityID', 'QualityType', 'ingestion_timestamp']

In [0]:
df_onprem_product_quality = df_onprem_product_quality \
    .withColumn("ProductQualityID", col("ProductQualityID").cast(IntegerType())) \
    .withColumn("QualityType", col("QualityType").cast(StringType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_product_quality.printSchema()

root
 |-- ProductQualityID: integer (nullable = true)
 |-- QualityType: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_product_quality.show(3)

+----------------+-----------+--------------------+
|ProductQualityID|QualityType| ingestion_timestamp|
+----------------+-----------+--------------------+
|               1|        New|2024-11-29 00:17:...|
|               2|       Used|2024-11-29 00:17:...|
|               3|   Like New|2024-11-29 00:17:...|
+----------------+-----------+--------------------+
only showing top 3 rows



### onprem_promotion Dataset

In [0]:
df_onprem_promotion = spark.read.parquet(*onprem_Promotion_files, header=True) 
df_onprem_promotion.show(3)

+-----------+--------------------+------------------+
|PromotionID|PromotionDescription|DiscountPercentage|
+-----------+--------------------+------------------+
|          1|   Black Friday Sale|             20.00|
|          2|    Holiday Discount|             15.00|
+-----------+--------------------+------------------+



In [0]:
df_onprem_promotion = df_onprem_promotion.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["PromotionID"]) \
    .select("PromotionID", "PromotionDescription", "DiscountPercentage", "ingestion_timestamp") \
    .orderBy("PromotionID")

window_spec = Window.partitionBy("PromotionID").orderBy(col("ingestion_timestamp").desc())
df_onprem_promotion = df_onprem_promotion.withColumn("row_num", row_number().over(window_spec))
df_onprem_promotion = df_onprem_promotion.filter(col("row_num") == 1).drop("row_num")

df_onprem_promotion.show(3)

+-----------+--------------------+------------------+--------------------+
|PromotionID|PromotionDescription|DiscountPercentage| ingestion_timestamp|
+-----------+--------------------+------------------+--------------------+
|          1|   Black Friday Sale|             20.00|2024-11-29 00:20:...|
|          2|    Holiday Discount|             15.00|2024-11-29 00:20:...|
+-----------+--------------------+------------------+--------------------+



In [0]:
print(f"Total onprem_promotion records: {df_onprem_promotion.count()}")

Total onprem_promotion records: 2


In [0]:
df_onprem_promotion.columns

['PromotionID',
 'PromotionDescription',
 'DiscountPercentage',
 'ingestion_timestamp']

In [0]:
df_onprem_promotion = df_onprem_promotion \
    .withColumn("PromotionID", col("PromotionID").cast(IntegerType())) \
    .withColumn("PromotionDescription", col("PromotionDescription").cast(StringType())) \
    .withColumn("DiscountPercentage", col("DiscountPercentage").cast(DoubleType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_promotion.printSchema()

root
 |-- PromotionID: integer (nullable = true)
 |-- PromotionDescription: string (nullable = true)
 |-- DiscountPercentage: double (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_promotion.show(3)

+-----------+--------------------+------------------+--------------------+
|PromotionID|PromotionDescription|DiscountPercentage| ingestion_timestamp|
+-----------+--------------------+------------------+--------------------+
|          1|   Black Friday Sale|              20.0|2024-11-29 00:22:...|
|          2|    Holiday Discount|              15.0|2024-11-29 00:22:...|
+-----------+--------------------+------------------+--------------------+



### onprem_seller Dataset

In [0]:
df_onprem_seller = spark.read.parquet(*onprem_Seller_files, header=True) 
df_onprem_seller.show(3)

+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+
|SellerID|  SellerName|               Email|     Phone|     AddressLine1|AddressLine2|     City|StateID|PostalCode|CountryID|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+
|       1|ElectroWorld|contact@electrowo...|1234567890|    789 Tech Park|            | New York|      3|     10001|        2|
|       2|  FashionHub|support@fashionhu...|0987654321|321 Market Street|            |Vancouver|      2|   V6B 1A1|        1|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+



In [0]:
df_onprem_seller = df_onprem_seller.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["SellerID"]) \
    .select("SellerID", "SellerName", "Email", "Phone", "AddressLine1", "AddressLine2", "City", "StateID", "PostalCode", "CountryID", "ingestion_timestamp") \
    .orderBy("SellerID")

window_spec = Window.partitionBy("SellerID").orderBy(col("ingestion_timestamp").desc())
df_onprem_seller = df_onprem_seller.withColumn("row_num", row_number().over(window_spec))
df_onprem_seller = df_onprem_seller.filter(col("row_num") == 1).drop("row_num")

df_onprem_seller.show(3)

+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+
|SellerID|  SellerName|               Email|     Phone|     AddressLine1|AddressLine2|     City|StateID|PostalCode|CountryID| ingestion_timestamp|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+
|       1|ElectroWorld|contact@electrowo...|1234567890|    789 Tech Park|            | New York|      3|     10001|        2|2024-11-29 00:24:...|
|       2|  FashionHub|support@fashionhu...|0987654321|321 Market Street|            |Vancouver|      2|   V6B 1A1|        1|2024-11-29 00:24:...|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+



In [0]:
print(f"Total onprem_seller records: {df_onprem_seller.count()}")

Total onprem_seller records: 2


In [0]:
df_onprem_seller.columns

['SellerID',
 'SellerName',
 'Email',
 'Phone',
 'AddressLine1',
 'AddressLine2',
 'City',
 'StateID',
 'PostalCode',
 'CountryID',
 'ingestion_timestamp']

In [0]:
df_onprem_seller = df_onprem_seller \
    .withColumn("SellerID", col("SellerID").cast(IntegerType())) \
    .withColumn("SellerName", col("SellerName").cast(StringType())) \
    .withColumn("Email", col("Email").cast(StringType())) \
    .withColumn("Phone", col("Phone").cast(StringType())) \
    .withColumn("AddressLine1", col("AddressLine1").cast(StringType())) \
    .withColumn("AddressLine2", col("AddressLine2").cast(StringType())) \
    .withColumn("City", col("City").cast(StringType())) \
    .withColumn("StateID", col("StateID").cast(IntegerType())) \
    .withColumn("PostalCode", col("PostalCode").cast(StringType())) \
    .withColumn("CountryID", col("CountryID").cast(IntegerType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_seller.printSchema()

root
 |-- SellerID: integer (nullable = true)
 |-- SellerName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- AddressLine1: string (nullable = true)
 |-- AddressLine2: string (nullable = true)
 |-- City: string (nullable = true)
 |-- StateID: integer (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- CountryID: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_seller.show(3)

+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+
|SellerID|  SellerName|               Email|     Phone|     AddressLine1|AddressLine2|     City|StateID|PostalCode|CountryID| ingestion_timestamp|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+
|       1|ElectroWorld|contact@electrowo...|1234567890|    789 Tech Park|            | New York|      3|     10001|        2|2024-11-29 00:28:...|
|       2|  FashionHub|support@fashionhu...|0987654321|321 Market Street|            |Vancouver|      2|   V6B 1A1|        1|2024-11-29 00:28:...|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+



### onprem_seller_product_promotion Dataset

In [0]:
df_onprem_seller_product_promotion = spark.read.parquet(*onprem_SellerProductPromotion_files, header=True) 
df_onprem_seller_product_promotion.show(3)

+------------------+---------+--------+----------------+------+-----------+
|PromotionProductID|ProductID|SellerID|ProductQualityID| Price|PromotionID|
+------------------+---------+--------+----------------+------+-----------+
|                 1|        1|       1|               1|699.99|          1|
|                 2|        3|       2|               1| 19.99|          2|
+------------------+---------+--------+----------------+------+-----------+



In [0]:
df_onprem_seller_product_promotion = df_onprem_seller_product_promotion.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["PromotionProductID"]) \
    .select("PromotionProductID", "ProductID", "SellerID", "ProductQualityID", "Price", "PromotionID", "ingestion_timestamp") \
    .orderBy("PromotionProductID")

window_spec = Window.partitionBy("PromotionProductID").orderBy(col("ingestion_timestamp").desc())
df_onprem_seller_product_promotion = df_onprem_seller_product_promotion.withColumn("row_num", row_number().over(window_spec))
df_onprem_seller_product_promotion = df_onprem_seller_product_promotion.filter(col("row_num") == 1).drop("row_num")

df_onprem_seller_product_promotion.show(3)

+------------------+---------+--------+----------------+------+-----------+--------------------+
|PromotionProductID|ProductID|SellerID|ProductQualityID| Price|PromotionID| ingestion_timestamp|
+------------------+---------+--------+----------------+------+-----------+--------------------+
|                 1|        1|       1|               1|699.99|          1|2024-11-29 00:32:...|
|                 2|        3|       2|               1| 19.99|          2|2024-11-29 00:32:...|
+------------------+---------+--------+----------------+------+-----------+--------------------+



In [0]:
print(f"Total onprem_seller_product_promotion records: {df_onprem_seller_product_promotion.count()}")

Total onprem_seller_product_promotion records: 2


In [0]:
df_onprem_seller_product_promotion.columns

['PromotionProductID',
 'ProductID',
 'SellerID',
 'ProductQualityID',
 'Price',
 'PromotionID',
 'ingestion_timestamp']

In [0]:
df_onprem_seller_product_promotion = df_onprem_seller_product_promotion \
    .withColumn("PromotionProductID", col("PromotionProductID").cast(IntegerType())) \
    .withColumn("ProductID", col("ProductID").cast(IntegerType())) \
    .withColumn("SellerID", col("SellerID").cast(IntegerType())) \
    .withColumn("ProductQualityID", col("ProductQualityID").cast(IntegerType())) \
    .withColumn("Price", col("Price").cast(DoubleType())) \
    .withColumn("PromotionID", col("PromotionID").cast(IntegerType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_seller_product_promotion.printSchema()

root
 |-- PromotionProductID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- SellerID: integer (nullable = true)
 |-- ProductQualityID: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- PromotionID: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_seller_product_promotion.show(3)

+------------------+---------+--------+----------------+------+-----------+--------------------+
|PromotionProductID|ProductID|SellerID|ProductQualityID| Price|PromotionID| ingestion_timestamp|
+------------------+---------+--------+----------------+------+-----------+--------------------+
|                 1|        1|       1|               1|699.99|          1|2024-11-29 00:35:...|
|                 2|        3|       2|               1| 19.99|          2|2024-11-29 00:35:...|
+------------------+---------+--------+----------------+------+-----------+--------------------+



### onprem_state_province_rolling Dataset

In [0]:
df_onprem_state_province_rolling = spark.read.parquet(*onprem_StateProvinceRolling_files, header=True) 
df_onprem_state_province_rolling.show(3)

+-------+----------------+---------+
|StateID|       StateName|CountryID|
+-------+----------------+---------+
|      1|         Ontario|        1|
|      2|British Columbia|        1|
|      3|        New York|        2|
+-------+----------------+---------+
only showing top 3 rows



In [0]:
df_onprem_state_province_rolling = df_onprem_state_province_rolling.withColumn("ingestion_timestamp", current_timestamp()) \
    .dropna(subset = ["StateID"]) \
    .select("StateID", "StateName", "CountryID", "ingestion_timestamp") \
    .orderBy("StateID")

window_spec = Window.partitionBy("StateID").orderBy(col("ingestion_timestamp").desc())
df_onprem_state_province_rolling = df_onprem_state_province_rolling.withColumn("row_num", row_number().over(window_spec))
df_onprem_state_province_rolling = df_onprem_state_province_rolling.filter(col("row_num") == 1).drop("row_num")

df_onprem_state_province_rolling.show(3)

+-------+----------------+---------+--------------------+
|StateID|       StateName|CountryID| ingestion_timestamp|
+-------+----------------+---------+--------------------+
|      1|         Ontario|        1|2024-11-29 00:38:...|
|      2|British Columbia|        1|2024-11-29 00:38:...|
|      3|        New York|        2|2024-11-29 00:38:...|
+-------+----------------+---------+--------------------+
only showing top 3 rows



In [0]:
print(f"Total onprem_state_province_rolling records: {df_onprem_state_province_rolling.count()}")

Total onprem_state_province_rolling records: 4


In [0]:
df_onprem_state_province_rolling.columns

['StateID', 'StateName', 'CountryID', 'ingestion_timestamp']

In [0]:
df_onprem_state_province_rolling = df_onprem_state_province_rolling \
    .withColumn("StateID", col("StateID").cast(IntegerType())) \
    .withColumn("StateName", col("StateName").cast(StringType())) \
    .withColumn("CountryID", col("CountryID").cast(IntegerType())) \
    .withColumn("ingestion_timestamp", col("ingestion_timestamp").cast("timestamp"))

df_onprem_state_province_rolling.printSchema()

root
 |-- StateID: integer (nullable = true)
 |-- StateName: string (nullable = true)
 |-- CountryID: integer (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = false)



In [0]:
df_onprem_state_province_rolling.show(3)

+-------+----------------+---------+--------------------+
|StateID|       StateName|CountryID| ingestion_timestamp|
+-------+----------------+---------+--------------------+
|      1|         Ontario|        1|2024-11-29 00:39:...|
|      2|British Columbia|        1|2024-11-29 00:39:...|
|      3|        New York|        2|2024-11-29 00:39:...|
+-------+----------------+---------+--------------------+
only showing top 3 rows



# Loading the Cleaned Datasets in Delta Tables on the Target Container

In [0]:
delta_http_folder = "delta/http"
delta_azsqldb_folder = "delta/azsqldb"
delta_onprem_folder = "delta/onprem"

delta_http_path = f"{mount_point_target}/{delta_http_folder}"
delta_azsqldb_path = f"{mount_point_target}/{delta_azsqldb_folder}"
delta_onprem_path = f"{mount_point_target}/{delta_onprem_folder}"

## Datasets From the HTTP API Source

In [0]:
# Checking if the Delta table already exists
if DeltaTable.isDeltaTable(spark, f"{delta_http_path}/accessories"):
    # If the table exists, perform merge (upsert)
    existing_data = DeltaTable.forPath(spark, f"{delta_http_path}/accessories")
    
    (existing_data.alias("existing") \
        .merge(df_http_accessories.alias("new"), "existing.ProductID = new.ProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    # If the table does not exist, write data
    df_http_accessories.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_http_path}/accessories")

# Reading the Delta table
spark.read.format("delta").load(f"{delta_http_path}/accessories").show(3)

+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|ProductID|   Category|SubCategory|         ProductName|            Brand|       Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+-----------+-----------+--------------------+-----------------+------------+-----------------+------+--------------------+--------------------+
|        4|Accessories|    Watches|Luxury Leather Watch|    Timepiece Co.|["One Size"]|["Brown","Black"]|149.99|A timeless leathe...|2024-11-29 00:53:...|
|        5|Accessories|    Jewelry|Silver Pendant Ne...|      ShinyThings|["One Size"]|       ["Silver"]| 39.99|A delicate silver...|2024-11-29 00:53:...|
|        6|Accessories|       Bags|    Leather Backpack|Traveler's Choice|  ["Medium"]|["Brown","Black"]| 89.99|A durable leather...|2024-11-29 00:53:...|
+---------+-----------+-----------+--------------------+--------------

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_http_path}/clothing"):
    existing_data = DeltaTable.forPath(spark, f"{delta_http_path}/clothing")
    
    (existing_data.alias("existing") \
        .merge(df_http_clothing.alias("new"), "existing.ProductID = new.ProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_http_clothing.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_http_path}/clothing")

spark.read.format("delta").load(f"{delta_http_path}/clothing").show(3)

+---------+--------+-----------+--------------------+------------+------------------+--------------------+-----+--------------------+--------------------+
|ProductID|Category|SubCategory|         ProductName|       Brand|             Sizes|              Colors|Price|         Description| ingestion_timestamp|
+---------+--------+-----------+--------------------+------------+------------------+--------------------+-----+--------------------+--------------------+
|        1|Clothing|   Menswear|    Slim Fit T-Shirt| FashionWear|["S","M","L","XL"]|["Red","Blue","Bl...|29.99|A stylish slim-fi...|2024-11-29 00:55:...|
|        2|Clothing| Womenswear| Floral Summer Dress|ElegantStyle|     ["S","M","L"]|["White","Yellow"...|49.99|A light and breez...|2024-11-29 00:55:...|
|        3|Clothing|  Outerwear|Classic Denim Jacket|   UrbanEdge|    ["M","L","XL"]|    ["Blue","Black"]|69.99|A classic denim j...|2024-11-29 00:55:...|
+---------+--------+-----------+--------------------+------------+----

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_http_path}/footwear"):
    existing_data = DeltaTable.forPath(spark, f"{delta_http_path}/footwear")
    
    (existing_data.alias("existing") \
        .merge(df_http_footwear.alias("new"), "existing.ProductID = new.ProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_http_footwear.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_http_path}/footwear")

spark.read.format("delta").load(f"{delta_http_path}/footwear").show(3)

+---------+--------+-----------+-----------------+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|ProductID|Category|SubCategory|      ProductName|      Brand|               Sizes|              Colors|Price|         Description| ingestion_timestamp|
+---------+--------+-----------+-----------------+-----------+--------------------+--------------------+-----+--------------------+--------------------+
|        7|Footwear|   Menswear|    Running Shoes| SpeedTrack|["8","9","10","11...|["White","Blue","...|79.99|Lightweight runni...|2024-11-29 00:57:...|
|        8|Footwear| Womenswear|High Heel Sandals|  StyleStep|["5","6","7","8",...|["Black","Red","G...|59.99|Elegant high-heel...|2024-11-29 00:57:...|
|        9|Footwear|     Unisex|  Casual Sneakers|ComfortWalk|["6","7","8","9",...|["Gray","Black","...|49.99|Comfortable sneak...|2024-11-29 00:57:...|
+---------+--------+-----------+-----------------+-----------+--------------------

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_http_path}/home_decor"):
    existing_data = DeltaTable.forPath(spark, f"{delta_http_path}/home_decor")
    
    (existing_data.alias("existing") \
        .merge(df_http_home_decor.alias("new"), "existing.ProductID = new.ProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_http_home_decor.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_http_path}/home_decor")

spark.read.format("delta").load(f"{delta_http_path}/home_decor").show(3)

+---------+----------+-----------+--------------------+--------------+------------------+-----------------+------+--------------------+--------------------+
|ProductID|  Category|SubCategory|         ProductName|         Brand|             Sizes|           Colors| Price|         Description| ingestion_timestamp|
+---------+----------+-----------+--------------------+--------------+------------------+-----------------+------+--------------------+--------------------+
|       10|Home Decor|   Lighting|   Modern Table Lamp|   BrightHomes|      ["One Size"]|["White","Black"]| 39.99|A sleek and moder...|2024-11-29 00:58:...|
|       11|Home Decor|   Wall Art|Abstract Canvas P...|Artistic Vibes|["Medium","Large"]|   ["Multicolor"]| 99.99|A vibrant abstrac...|2024-11-29 00:58:...|
|       12|Home Decor|  Furniture| Wooden Coffee Table|  Rustic Charm|      ["One Size"]|["Brown","Black"]|129.99|A stylish wooden ...|2024-11-29 00:58:...|
+---------+----------+-----------+--------------------+---

## Datasets from Azure SQL Database Source

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/address"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/address")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_address.alias("new"), "existing.AddressID = new.AddressID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_address.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/address")

spark.read.format("delta").load(f"{delta_azsqldb_path}/address").show(3)

+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+--------------------+
|AddressID|       AddressLine1|AddressLine2|   City|StateProvince|CountryRegion|PostalCode|             rowguid|       ModifiedDate| ingestion_timestamp|
+---------+-------------------+------------+-------+-------------+-------------+----------+--------------------+-------------------+--------------------+
|        9|  8713 Yosemite Ct.|        NULL|Bothell|   Washington|United States|     98011|268af621-76d7-4c7...|2006-07-01 00:00:00|2024-11-29 01:01:...|
|       11|1318 Lasalle Street|        NULL|Bothell|   Washington|United States|     98011|981b3303-aca2-49c...|2007-04-01 00:00:00|2024-11-29 01:01:...|
|       25|   9178 Jumping St.|        NULL| Dallas|        Texas|United States|     75201|c8df3bd9-48f0-465...|2006-09-01 00:00:00|2024-11-29 01:01:...|
+---------+-------------------+------------+-------+-------------+----------

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/customer"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/customer")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_customer.alias("new"), "existing.CustomerID = new.CustomerID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_customer.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/customer")

spark.read.format("delta").load(f"{delta_azsqldb_path}/customer").show(3)

+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+--------------------+
|CustomerID|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|         CompanyName|         SalesPerson|        EmailAddress|       Phone|        PasswordHash|PasswordSalt|             rowguid|       ModifiedDate| ingestion_timestamp|
+----------+---------+-----+---------+----------+--------+------+--------------------+--------------------+--------------------+------------+--------------------+------------+--------------------+-------------------+--------------------+
|         1|    false|  Mr.|  Orlando|        N.|     Gee|  NULL|        A Bike Store|adventure-works\p...|orlando0@adventur...|245-555-0173|L/Rlwxzp4w7RWmEgX...|    1KjXYs4=|3f5ae95e-b87d-4ae...|2005-08-01 00:00:00|2024-11-29 01:03:...|
|         2|    false|  Mr.|    Keith|      NULL

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/customer_address"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/customer_address")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_customer_address.alias("new"), "existing.CustomerID = new.CustomerID AND existing.AddressID = new.AddressID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_customer_address.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/customer_address")

spark.read.format("delta").load(f"{delta_azsqldb_path}/customer_address").show(3)

+----------+---------+-----------+--------------------+-------------------+--------------------+
|CustomerID|AddressID|AddressType|             rowguid|       ModifiedDate| ingestion_timestamp|
+----------+---------+-----------+--------------------+-------------------+--------------------+
|     29485|     1086|Main Office|16765338-dbe4-442...|2007-09-01 00:00:00|2024-11-29 01:08:...|
|     29486|      621|Main Office|22b3e910-14af-4ed...|2005-09-01 00:00:00|2024-11-29 01:08:...|
|     29489|     1069|Main Office|a095c88b-d7e6-417...|2005-07-01 00:00:00|2024-11-29 01:08:...|
+----------+---------+-----------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/product"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/product")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_product.alias("new"), "existing.ProductID = new.ProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_product.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/product")

spark.read.format("delta").load(f"{delta_azsqldb_path}/product").show(3)

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate| ingestion_timestamp|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|     1059.31|   1431.5|  58|1016.04|               18|             6|2002-06-01 00:00:00|       NULL|            N

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/product_category"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/product_category")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_product_category.alias("new"), "existing.ProductCategoryID = new.ProductCategoryID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_product_category.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/product_category")

spark.read.format("delta").load(f"{delta_azsqldb_path}/product_category").show(3)

+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
|ProductCategoryID|ParentProductCategoryID|      Name|             rowguid|       ModifiedDate| ingestion_timestamp|
+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
|                1|                   NULL|     Bikes|cfbda25c-df71-47a...|2002-06-01 00:00:00|2024-11-29 01:11:...|
|                2|                   NULL|Components|c657828d-d808-4ab...|2002-06-01 00:00:00|2024-11-29 01:11:...|
|                3|                   NULL|  Clothing|10a7c342-ca82-48d...|2002-06-01 00:00:00|2024-11-29 01:11:...|
+-----------------+-----------------------+----------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/product_description"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/product_description")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_product_description.alias("new"), "existing.ProductDescriptionID = new.ProductDescriptionID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_product_description.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/product_description")

spark.read.format("delta").load(f"{delta_azsqldb_path}/product_description").show(3)

+--------------------+--------------------+--------------------+-------------------+--------------------+
|ProductDescriptionID|         Description|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------------+--------------------+--------------------+-------------------+--------------------+
|                   3|     Chromoly steel.|301eed3a-1a82-485...|2007-06-01 00:00:00|2024-11-29 01:12:...|
|                   4|Aluminum alloy cu...|dfeba528-da11-465...|2007-06-01 00:00:00|2024-11-29 01:12:...|
|                   5|Aluminum alloy cu...|f7178da7-1a7e-499...|2007-06-01 00:00:00|2024-11-29 01:12:...|
+--------------------+--------------------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/product_model"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/product_model")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_product_model.alias("new"), "existing.ProductModelID = new.ProductModelID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_product_model.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/product_model")

spark.read.format("delta").load(f"{delta_azsqldb_path}/product_model").show(3)

+--------------+------------------+------------------+--------------------+-------------------+--------------------+
|ProductModelID|              Name|CatalogDescription|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------+------------------+------------------+--------------------+-------------------+--------------------+
|             1|      Classic Vest|              NULL|29321d47-1e4c-4aa...|2007-06-01 00:00:00|2024-11-29 01:14:...|
|             2|       Cycling Cap|              NULL|474fb654-3c96-4cb...|2005-06-01 00:00:00|2024-11-29 01:14:...|
|             3|Full-Finger Gloves|              NULL|a75483fe-3c47-4aa...|2006-06-01 00:00:00|2024-11-29 01:14:...|
+--------------+------------------+------------------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/productmodel_productdescription"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/productmodel_productdescription")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_productmodel_productdescription.alias("new"), "existing.ProductModelID = new.ProductModelID AND existing.ProductDescriptionID = new.ProductDescriptionID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_productmodel_productdescription.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/productmodel_productdescription")

spark.read.format("delta").load(f"{delta_azsqldb_path}/productmodel_productdescription").show(3)

+--------------+--------------------+-------+--------------------+-------------------+--------------------+
|ProductModelID|ProductDescriptionID|Culture|             rowguid|       ModifiedDate| ingestion_timestamp|
+--------------+--------------------+-------+--------------------+-------------------+--------------------+
|             1|                1199| en    |4d00b649-027a-4f9...|2007-06-01 00:00:00|2024-11-29 01:16:...|
|             1|                1467| ar    |7de7204e-4efc-40f...|2007-06-01 00:00:00|2024-11-29 01:16:...|
|             1|                1589| fr    |20bffcf4-bfa6-400...|2007-06-01 00:00:00|2024-11-29 01:16:...|
+--------------+--------------------+-------+--------------------+-------------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/salesorder_detail"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/salesorder_detail")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_salesorder_detail.alias("new"), "existing.SalesOrderID = new.SalesOrderID AND existing.SalesOrderDetailID = new.SalesOrderDetailID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_salesorder_detail.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/salesorder_detail")

spark.read.format("delta").load(f"{delta_azsqldb_path}/salesorder_detail").show(3)

+------------+------------------+--------+---------+---------+-----------------+---------+--------------------+-------------------+--------------------+
|SalesOrderID|SalesOrderDetailID|OrderQty|ProductID|UnitPrice|UnitPriceDiscount|LineTotal|             rowguid|       ModifiedDate| ingestion_timestamp|
+------------+------------------+--------+---------+---------+-----------------+---------+--------------------+-------------------+--------------------+
|       71774|            110562|       1|      836|  356.898|              0.0|  356.898|e3a1994c-7a68-4ce...|2008-06-01 00:00:00|2024-11-29 01:19:...|
|       71774|            110563|       1|      822|  356.898|              0.0|  356.898|5c77f557-fdb6-43b...|2008-06-01 00:00:00|2024-11-29 01:19:...|
|       71776|            110567|       1|      907|     63.9|              0.0|     63.9|6dbfe398-d15d-425...|2008-06-01 00:00:00|2024-11-29 01:19:...|
+------------+------------------+--------+---------+---------+-----------------+--

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_azsqldb_path}/salesorder_header"):
    existing_data = DeltaTable.forPath(spark, f"{delta_azsqldb_path}/salesorder_header")
    
    (existing_data.alias("existing") \
        .merge(df_azsqldb_salesorder_header.alias("new"), "existing.SalesOrderID = new.SalesOrderID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_azsqldb_salesorder_header.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_azsqldb_path}/salesorder_header")

spark.read.format("delta").load(f"{delta_azsqldb_path}/salesorder_header").show(3)

+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+----------+-------+--------------------+-------------------+--------------------+
|SalesOrderID|RevisionNumber|          OrderDate|            DueDate|           ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|ShipToAddressID|BillToAddressID|       ShipMethod|CreditCardApprovalCode|  SubTotal|   TaxAmt| Freight|  TotalDue|Comment|             rowguid|       ModifiedDate| ingestion_timestamp|
+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+---------------+---------------+-----------------+----------------------+----------+---------+--------+--

## Datasets from On-Premise Microsoft SQL Server Source

In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/country_rolling"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/country_rolling")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_country_rolling.alias("new"), "existing.CountryID = new.CountryID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_country_rolling.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/country_rolling")

spark.read.format("delta").load(f"{delta_onprem_path}/country_rolling").show(3)

+---------+-------------+--------------------+
|CountryID|  CountryName| ingestion_timestamp|
+---------+-------------+--------------------+
|        1|       Canada|2024-11-29 01:22:...|
|        2|United States|2024-11-29 01:22:...|
+---------+-------------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/customer"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/customer")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_customer.alias("new"), "existing.CustomerID = new.CustomerID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_customer.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/customer")

spark.read.format("delta").load(f"{delta_onprem_path}/customer").show(3)

+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+
|CustomerID|FirstName|LastName|               Email|     Phone|AddressLine1|AddressLine2|       City|StateID|PostalCode|CountryID| ingestion_timestamp|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+
|         1|     John|     Doe|john.doe@example.com|1234567890| 123 Main St|            |    Toronto|      1|   M5H 2N2|        1|2024-11-29 01:23:...|
|         2|     Jane|   Smith|jane.smith@exampl...|9876543210|  456 Elm St|            |Los Angeles|      4|     90001|        2|2024-11-29 01:23:...|
+----------+---------+--------+--------------------+----------+------------+------------+-----------+-------+----------+---------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/customer_productreview"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/customer_productreview")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_customer_productreview.alias("new"), "existing.ReviewID = new.ReviewID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_customer_productreview.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/customer_productreview")

spark.read.format("delta").load(f"{delta_onprem_path}/customer_productreview").show(3)

+--------+---------+----------+------+--------------------+--------------------+
|ReviewID|ProductID|CustomerID|Rating|              Review| ingestion_timestamp|
+--------+---------+----------+------+--------------------+--------------------+
|       1|        1|         1|     5|The smartphone is...|2024-11-29 01:25:...|
|       2|        2|         2|     4|Laptop performanc...|2024-11-29 01:25:...|
+--------+---------+----------+------+--------------------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/customer_sellerreview"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/customer_sellerreview")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_customer_sellerreview.alias("new"), "existing.ReviewID = new.ReviewID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_customer_sellerreview.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/customer_sellerreview")

spark.read.format("delta").load(f"{delta_onprem_path}/customer_sellerreview").show(3)

+--------+--------+----------+------+--------------------+--------------------+
|ReviewID|SellerID|CustomerID|Rating|              Review| ingestion_timestamp|
+--------+--------+----------+------+--------------------+--------------------+
|       1|       1|         1|     5|Great service and...|2024-11-29 01:26:...|
|       2|       2|         2|     4|Good quality prod...|2024-11-29 01:26:...|
+--------+--------+----------+------+--------------------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/order"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/order")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_order.alias("new"), "existing.OrderID = new.OrderID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_order.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/order")

spark.read.format("delta").load(f"{delta_onprem_path}/order").show(3)

+-------+------------------+----------+--------+--------------------+
|OrderID|PromotionProductID|CustomerID|Quantity| ingestion_timestamp|
+-------+------------------+----------+--------+--------------------+
|      1|                 1|         1|       2|2024-11-29 01:27:...|
|      2|                 2|         2|       3|2024-11-29 01:27:...|
+-------+------------------+----------+--------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/product"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/product")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_product.alias("new"), "existing.ProductID = new.ProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_product.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/product")

spark.read.format("delta").load(f"{delta_onprem_path}/product").show(3)

+---------+----------+-----------+-----------+--------------------+--------------------+
|ProductID|CategoryID|ProductName|      Brand|      Specifications| ingestion_timestamp|
+---------+----------+-----------+-----------+--------------------+--------------------+
|        1|         2| Smartphone|  TechBrand|{"screen_size": "...|2024-11-29 01:28:...|
|        2|         2|     Laptop|  CompBrand|{"processor": "In...|2024-11-29 01:28:...|
|        3|         3|    T-Shirt|FashionWear|{"sizes": ["S", "...|2024-11-29 01:28:...|
+---------+----------+-----------+-----------+--------------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/product_categories"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/product_categories")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_product_categories.alias("new"), "existing.CategoryID = new.CategoryID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_product_categories.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/product_categories")

spark.read.format("delta").load(f"{delta_onprem_path}/product_categories").show(3)

+----------+------------+--------------------+
|CategoryID|CategoryName| ingestion_timestamp|
+----------+------------+--------------------+
|         1|       Books|2024-11-29 01:30:...|
|         2| Electronics|2024-11-29 01:30:...|
|         3|    Clothing|2024-11-29 01:30:...|
+----------+------------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/product_quality"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/product_quality")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_product_quality.alias("new"), "existing.ProductQualityID = new.ProductQualityID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_product_quality.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/product_quality")

spark.read.format("delta").load(f"{delta_onprem_path}/product_quality").show(3)

+----------------+-----------+--------------------+
|ProductQualityID|QualityType| ingestion_timestamp|
+----------------+-----------+--------------------+
|               1|        New|2024-11-29 01:31:...|
|               2|       Used|2024-11-29 01:31:...|
|               3|   Like New|2024-11-29 01:31:...|
+----------------+-----------+--------------------+
only showing top 3 rows



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/promotion"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/promotion")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_promotion.alias("new"), "existing.PromotionID = new.PromotionID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_promotion.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/promotion")

spark.read.format("delta").load(f"{delta_onprem_path}/promotion").show(3)

+-----------+--------------------+------------------+--------------------+
|PromotionID|PromotionDescription|DiscountPercentage| ingestion_timestamp|
+-----------+--------------------+------------------+--------------------+
|          1|   Black Friday Sale|              20.0|2024-11-29 01:32:...|
|          2|    Holiday Discount|              15.0|2024-11-29 01:32:...|
+-----------+--------------------+------------------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/seller"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/seller")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_seller.alias("new"), "existing.SellerID = new.SellerID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_seller.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/seller")

spark.read.format("delta").load(f"{delta_onprem_path}/seller").show(3)

+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+
|SellerID|  SellerName|               Email|     Phone|     AddressLine1|AddressLine2|     City|StateID|PostalCode|CountryID| ingestion_timestamp|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+
|       1|ElectroWorld|contact@electrowo...|1234567890|    789 Tech Park|            | New York|      3|     10001|        2|2024-11-29 01:32:...|
|       2|  FashionHub|support@fashionhu...|0987654321|321 Market Street|            |Vancouver|      2|   V6B 1A1|        1|2024-11-29 01:32:...|
+--------+------------+--------------------+----------+-----------------+------------+---------+-------+----------+---------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/seller_product_promotion"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/seller_product_promotion")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_seller_product_promotion.alias("new"), "existing.PromotionProductID = new.PromotionProductID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_seller_product_promotion.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/seller_product_promotion")

spark.read.format("delta").load(f"{delta_onprem_path}/seller_product_promotion").show(3)

+------------------+---------+--------+----------------+------+-----------+--------------------+
|PromotionProductID|ProductID|SellerID|ProductQualityID| Price|PromotionID| ingestion_timestamp|
+------------------+---------+--------+----------------+------+-----------+--------------------+
|                 1|        1|       1|               1|699.99|          1|2024-11-29 01:34:...|
|                 2|        3|       2|               1| 19.99|          2|2024-11-29 01:34:...|
+------------------+---------+--------+----------------+------+-----------+--------------------+



In [0]:
if DeltaTable.isDeltaTable(spark, f"{delta_onprem_path}/state_province_rolling"):
    existing_data = DeltaTable.forPath(spark, f"{delta_onprem_path}/state_province_rolling")
    
    (existing_data.alias("existing") \
        .merge(df_onprem_state_province_rolling.alias("new"), "existing.StateID = new.StateID") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute())
else:
    df_onprem_state_province_rolling.coalesce(1).write.format("delta").mode("overwrite").save(f"{delta_onprem_path}/state_province_rolling")

spark.read.format("delta").load(f"{delta_onprem_path}/state_province_rolling").show(3)

+-------+----------------+---------+--------------------+
|StateID|       StateName|CountryID| ingestion_timestamp|
+-------+----------------+---------+--------------------+
|      1|         Ontario|        1|2024-11-29 01:36:...|
|      2|British Columbia|        1|2024-11-29 01:36:...|
|      3|        New York|        2|2024-11-29 01:36:...|
+-------+----------------+---------+--------------------+
only showing top 3 rows

