In [0]:
from pyspark.sql.functions import col, to_timestamp, initcap, trim, year, month, dayofmonth, date_format, expr, round, when
from pyspark.sql.types import DoubleType, IntegerType, TimestampType
from etl_lib.transformations import *

In [0]:
client_id = "..."
client_secret = "..."
tenant_id = "..."
storage_account = "stgetlprj01"

# Set up OAuth connection to ADLS Gen2
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
bronze_path = f"abfss://bronze@{storage_account}.dfs.core.windows.net/retail/year=*/data.csv"
df_all_years = spark.read.option("header", "true").csv(bronze_path)
df_all_years.show(5)

### 1. Create copy of dataset

In [0]:
# Create a working copy so original df_raw is preserved
df_silver = df_all_years


### 2. Assign data type

In [0]:
df_silver = df_silver \
    .withColumn("Quantity", col("Quantity").cast(IntegerType())) \
    .withColumn("Price", col("Price").cast(DoubleType())) \
    .withColumn("InvoiceDate", expr("substring(InvoiceDate, 1, 26)")) \
    .withColumn("InvoiceDate", to_timestamp("InvoiceDate", "yyyy-MM-dd HH:mm:ss.SSSSSS"))


### 3. Remove Duplicates

In [0]:
print("Number of rows before cleaning: ",df_silver.count())
df_silver = df_silver.dropna()
df_silver = df_silver.dropDuplicates()
print("Number of rows after cleaning: ",df_silver.count())


### 4. Data cleaning and formating

In [0]:
# Filter Invalid Quantity and Price
df_silver = remove_invalid_columns(df_silver, "Price")
df_silver = remove_invalid_columns(df_silver, "Quantity")

# Clean Text Fields
df_silver = cap_every_first_letter(df_silver, "Country")
df_silver = capitalize_first_letter(df_silver, "Description")  



### 5. Add Features like Total Price, IsReturn and split date column

In [0]:
# Calculate Total Price and IsReturn
df_silver = calc_total_price(df_silver)
df_silver = isReturn(df_silver)
df_silver = isUKCustomer(df_silver)

In [0]:
# Extract Date Component
df_silver = extract_date(df_silver) 


### 6. Rename Columns

In [0]:
# Rename columns
df_silver = df_silver \
    .withColumnRenamed("Customer ID", "CustomerID")


In [0]:
display(df_silver)

### 7. Saving in parquet format

In [0]:
silver_path = f"abfss://silver@{storage_account}.dfs.core.windows.net/retail/"
df_silver.write.mode("overwrite").parquet(silver_path)