# Transform data from Bronze 'Raw data' to Silver 'Cleansed and conformed data' layer and perform data transformation

![Medallion Architecture](https://fabricddib.blob.core.windows.net/notebookimage/MedallionArchitecture.png)

In [None]:
df = spark.sql("SELECT * FROM #LAKEHOUSE_SILVER#.dimension_campaign LIMIT 1000")
display(df)

## Importing the necessary libraries and spark configurations.

In [1]:
spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
from pyspark.sql.types import *
from pyspark.sql.functions import col, unix_timestamp, to_date,col,year,quarter,month
from pyspark.sql.types import DateType
from pyspark.sql.functions import col, unix_timestamp, to_date,col,year,quarter,month
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import DateType
from pyspark.sql import functions as F

StatementMeta(, , , Waiting, )

## Set input parameters

In [29]:
Path_Dim='abfss://#SALES_WORKSPACE_NAME#@onelake.dfs.fabric.microsoft.com/#LAKEHOUSE_BRONZE#.Lakehouse/Files/DimensionData/'
Path_Fact='abfss://#SALES_WORKSPACE_NAME#@onelake.dfs.fabric.microsoft.com/#LAKEHOUSE_BRONZE#.Lakehouse/Files/FactData/'
Path_LitwareData='abfss://#SALES_WORKSPACE_NAME#@onelake.dfs.fabric.microsoft.com/#LAKEHOUSE_BRONZE#.Lakehouse/Files/sales-transaction-litware'


StatementMeta(, , , Waiting, )

## Dimension - Brand
We created a shortcut to raw data that we landed earlier in the Bronze lakehouse. We then do the necessary cleanup and transformation on the data and write the dimension table to the silver lakehouse in open standard delta parquet format. The table starts appearing under the tables pane as soon as we execute this cell. We follow the similar approach for rest of the tables.

In [3]:
table_name = 'dimension_brand'
dimension_campaignchatgpt_schema = StructType([
    StructField('BrandId', IntegerType(), True),
    StructField('BrandName', StringType(), True),
    StructField('EntityCode', StringType(), True)]
)
df = spark.read.format("csv").schema(dimension_campaignchatgpt_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 5, Finished, Available)

## Dimension-Campaign

In [4]:
table_name = 'dimension_campaign'
dimension_campaign_schema = StructType([
    StructField('Campaigns_ID', IntegerType(), True), 
    StructField('CampaignName', StringType(), True), 
    StructField('SubCampaignId', StringType(), True)] 
)
df = spark.read.format("csv").schema(dimension_campaign_schema).option("header","true").load(Path_Dim+table_name)
df = df.withColumnRenamed("Campaigns_ID", "CampaignId")
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 6, Finished, Available)

## Dimension-Customer

In [5]:
table_name = 'dimension_customer'
dimension_customer_schema = StructType([
    StructField('Id', IntegerType(), True), 
    StructField('Age', IntegerType(), True), 
    StructField('Gender', StringType(), True), 
    StructField('Pincode', StringType(), True), 
    StructField('FirstName', StringType(), True), 
    StructField('LastName', StringType(), True), 
    StructField('FullName', StringType(), True), 
    StructField('DateOfBirth', StringType(), True), 
    StructField('Address', StringType(), True), 
    StructField('Email', StringType(), True), 
    StructField('Mobile', StringType(), True),
    StructField('UserName', StringType(), True)])

df = spark.read.format("csv").schema(dimension_customer_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 7, Finished, Available)

## Dimension-Date

In [6]:
table_name = 'dimension_date'
dimension_date_schema = StructType([
    StructField('DateKey', IntegerType(), True), 
    StructField('DateValue', TimestampType(), True), 
    StructField('DayOfMonth', IntegerType(), True), 
    StructField('DayOfYear', IntegerType(), True), 
    StructField('Year', IntegerType(), True), 
    StructField('MonthOfYear', IntegerType(), True), 
    StructField('MonthName', StringType(), True), 
    StructField('QuarterOfYear', IntegerType(), True), 
    StructField('QuarterName', StringType(), True), 
    StructField('WeekEnding', TimestampType(), True)])

df = spark.read.format("csv").schema(dimension_date_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 8, Finished, Available)

## Dimension-Country

In [7]:
table_name = 'dimension_country'
dimension_country_schema = StructType([
    StructField('ID', IntegerType(), True),
    StructField('Country', StringType(), True), 
    StructField('Region', StringType(), True)])

df = spark.read.format("csv").schema(dimension_country_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 9, Finished, Available)

## Dimension-City

In [8]:
table_name = 'dimension_city'
dimension_city_schema = StructType([
    StructField('CityKey', IntegerType(), True),
    StructField('CityID', IntegerType(), True), 
    StructField('City', StringType(), True),
    StructField('StateProvince', StringType(), True),
    StructField('Country', StringType(), True),
    StructField('Continent', StringType(), True),
    StructField('SalesTerritory', StringType(), True),
    StructField('Region', StringType(), True),
    StructField('SubRegion', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('LatestRecordedPopulation', StringType(), True),
    StructField('ValidFrom', StringType(), True),
    StructField('ValidTo', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 10, Finished, Available)

## Dimension-Employee

In [9]:
table_name = 'dimension_employee'
dimension_city_schema = StructType([
    StructField('EmployeeKey', IntegerType(), True),
    StructField('EmployeeID', IntegerType(), True), 
    StructField('EmployeeName', StringType(), True),
    StructField('PreferredName', StringType(), True),
    StructField('IsSalesPerson', StringType(), True),
    StructField('ValidFrom', StringType(), True),
    StructField('ValidTo', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 11, Finished, Available)

## Dimension-Paymentmethod

In [10]:
table_name = 'dimension_paymentmethod'
dimension_city_schema = StructType([
    StructField('PaymentMethodKey', IntegerType(), True),
    StructField('PaymentMethodID', IntegerType(), True), 
    StructField('PaymentMethod', StringType(), True),
    StructField('ValidFrom', StringType(), True),
    StructField('ValidTo', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 12, Finished, Available)

## Dimension-Supplier

In [11]:
table_name = 'dimension_supplier'
dimension_city_schema = StructType([
    StructField('SupplierKey', IntegerType(), True),
    StructField('SupplierID', IntegerType(), True), 
    StructField('Supplier', StringType(), True),
    StructField('Category', StringType(), True),
    StructField('PrimaryContact', StringType(), True),
    StructField('SupplierReference', StringType(), True),
    StructField('PaymentDays', IntegerType(), True),
    StructField('PostalCode', IntegerType(), True),
    StructField('ValidFrom', StringType(), True),
    StructField('ValidTO', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 13, Finished, Available)

## Dimension-Product

In [12]:
table_name = 'dimension_product'
dimension_product_schema = StructType([
    StructField('Products_ID', IntegerType(), True), 
    StructField('ProductID', StringType(), True), 
    StructField('Name', StringType(), True),
    StructField('Department', StringType(), True),
    StructField('Category', StringType(), True),
    StructField('SubCampaigns', StringType(), True),
    StructField('TargetGender', StringType(), True),
    StructField('TargetClassification', StringType(), True),
    StructField('TargetGeneration', StringType(), True),
    ] 
)
df = spark.read.format("csv").schema(dimension_product_schema).option("header","true").load(Path_Dim+table_name)

df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 14, Finished, Available)

## Dimension-Date

In [13]:
table_name = 'dim_date'
dimension_city_schema = StructType([
    StructField('Date', StringType(), True),
    StructField('DayNumber', IntegerType(), True), 
    StructField('Day', IntegerType(), True),
    StructField('MonthName', StringType(), True),
    StructField('ShortMonthName', StringType(), True),
    StructField('CYMonthNumber', IntegerType(), True),
    StructField('CYMonthLabel', StringType(), True),
    StructField('CYYear', IntegerType(), True),
    StructField('CYYearLabel', StringType(), True),
    StructField('FYMonthNumber', IntegerType(), True),
    StructField('FYMonthLabel', StringType(), True),
    StructField('FYYear', IntegerType(), True),
    StructField('FYYearLabel', StringType(), True),
    StructField('WeekNumber', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 15, Finished, Available)

## Dimension-Stockitem

In [14]:
table_name = 'dimension_stockitem'
dimension_city_schema = StructType([
    StructField('StockItemKey', IntegerType(), True),
    StructField('StockItemID', IntegerType(), True), 
    StructField('StockItem', StringType(), True),
    StructField('Color', StringType(), True),
    StructField('SellingPackage', StringType(), True),
    StructField('BuyingPackage', StringType(), True),
    StructField('Brand', StringType(), True),
    StructField('Size', StringType(), True),
    StructField('LeadTimeDay', IntegerType(), True),
    StructField('QuantityPerOuter', IntegerType(), True),
    StructField('IsChillerStock', StringType(), True),
    StructField('Barcode', StringType(), True),
    StructField('TaxRate', IntegerType(), True),
    StructField('UnitPrice', IntegerType(), True),
    StructField('RecommendedRetailPrice', DecimalType(), True),
    StructField('WeightPerUnit', DecimalType(), True),
    StructField('ValidFrom', StringType(), True),
    StructField('ValidTO', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 16, Finished, Available)

## Dimension-TransactionType

In [15]:
table_name = 'dimension_transactiontype'
dimension_city_schema = StructType([
    StructField('TransactionTypeKey', IntegerType(), True),
    StructField('TransactionTypeID', IntegerType(), True), 
    StructField('TransactionType', StringType(), True),
    StructField('ValidFrom', StringType(), True),
    StructField('ValidTO', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Dim+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 17, Finished, Available)

## Fact-Movement

In [16]:
table_name = 'fact_movement'
dimension_city_schema = StructType([
    StructField('MovementKey', IntegerType(), True),
    StructField('DateKey', StringType(), True), 
    StructField('StockItemKey', IntegerType(), True),
    StructField('CustomerKey', IntegerType(), True),
    StructField('SupplierKey', IntegerType(), True),
    StructField('TransactionTypeKey', IntegerType(), True),
    StructField('StockItemTransactionID', IntegerType(), True),
    StructField('InvoiceID', IntegerType(), True),
    StructField('PurchaseOrderID', IntegerType(), True),
    StructField('Quantity', IntegerType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Fact+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 18, Finished, Available)

## Fact-Order

In [17]:
table_name = 'fact_order'
dimension_city_schema = StructType([
    StructField('OrderKey', IntegerType(), True),
    StructField('CityKey', IntegerType(), True), 
    StructField('CustomerKey', IntegerType(), True),
    StructField('StockItemKey', IntegerType(), True),
    StructField('OrderDateKey', StringType(), True),
    StructField('PickedDatekey', StringType(), True),
    StructField('SalesPersonKey', IntegerType(), True),
    StructField('PickerKey', IntegerType(), True),
    StructField('OrderID', IntegerType(), True),
    StructField('BackOrderID', IntegerType(), True),
    StructField('Description', StringType(), True),
    StructField('Package', StringType(), True),
    StructField('Quantity', IntegerType(), True),
    StructField('UnitPrice', IntegerType(), True),
    StructField('TaxRate', IntegerType(), True),
    StructField('TotalExcludingTax', IntegerType(), True),
    StructField('TaxAmount', DecimalType(), True),
    StructField('TotalIncludingTax', DecimalType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Fact+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 19, Finished, Available)

## Fact-Purchase

In [18]:
table_name = 'fact_purchase'
dimension_city_schema = StructType([
    StructField('PurchaseKey', IntegerType(), True), 
    StructField('DateKey', StringType(), True),
    StructField('SupplierKey', IntegerType(), True),
    StructField('StockItemKey', IntegerType(), True),
    StructField('PurchaseOrderID', IntegerType(), True),
    StructField('OrderedOuters', IntegerType(), True),
    StructField('OrderedQuantity', IntegerType(), True),
    StructField('ReceivedOuters', IntegerType(), True),
  StructField('Package', StringType(), True),
    StructField('IsOrderFinalized', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Fact+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 20, Finished, Available)

## Fact-Transaction

In [19]:
table_name = 'fact_transaction'
dimension_city_schema = StructType([
    StructField('TransactionKey', IntegerType(), True),
    StructField('DateKey', StringType(), True), 
    StructField('CustomerKey', IntegerType(), True),
    StructField('BillToCustomerKey', IntegerType(), True),
    StructField('SupplierKey', IntegerType(), True),
    StructField('TransactionTypekey', IntegerType(), True),
    StructField('PaymentMethodKey', IntegerType(), True),
    StructField('CustomerTransactionID', IntegerType(), True),
    StructField('SupplierTransactionID', IntegerType(), True),
    StructField('InvoiceID', IntegerType(), True),
    StructField('PurchaseOrderID', IntegerType(), True),
    StructField('SupplierInvoiceNumber', IntegerType(), True),
    StructField('TotalExcludingTax', IntegerType(), True),
    StructField('TaxAmount', IntegerType(), True),
    StructField('TotalIncludingTax', DecimalType(), True),
    StructField('OutstandingBalance', IntegerType(), True),
    StructField('IsFinalized', StringType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Fact+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 21, Finished, Available)

## Fact-Stockholding

In [20]:
table_name = 'fact_stockholding'
dimension_city_schema = StructType([
    StructField('StockHoldingKey', IntegerType(), True), 
    StructField('StockItemKey', IntegerType(), True),
    StructField('QuantityOnHand', IntegerType(), True),
    StructField('BinLocation', StringType(), True),
    StructField('LastStocktakeQuantity', IntegerType(), True),
    StructField('LastCostPrice', IntegerType(), True),
    StructField('ReorderLevel', IntegerType(), True),
    StructField('TargetStockLevel', IntegerType(), True),
    StructField('LineageKey', IntegerType(), True)])

df = spark.read.format("csv").schema(dimension_city_schema).option("header","true").load(Path_Fact+table_name)
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 22, Finished, Available)

## Fact-Campaigndata

In [24]:
table_name = 'fact_campaigndata'

df = spark.read.format("csv").option("header","true").load(Path_Fact+table_name)
df = df.select(col("Region"),col("Country"),col("ProductCategory"),col("Campaign_ID"),col("Campaign_Name"),
col("Qualification"),col("Qualification_Number"),col("Response_Status"),col("Responses"),col("Cost"),col("Revenue"),col("ROI"),col("Lead_Generation"),col("Revenue_Target"),col("Customer_Segment"),
col("Profit"),col("Marketing_Cost"),col("CampaignID"))

df = df.withColumn("ProductCategory",col("ProductCategory").cast("string"))
df = df.withColumn("CampaignID",col("CampaignID").cast("integer")) 
df = df.withColumn("Campaign_ID",col("Campaign_ID").cast("integer"))
df = df.withColumn("ROI",col("ROI").cast("integer")) 
df = df.withColumn("Revenue_Target",col("Revenue_Target").cast("Double"))
df = df.withColumn("Cost",col("Cost").cast("Double"))
df = df.withColumn("Responses",col("Responses").cast("Double"))
df = df.withColumn("Revenue",col("Revenue").cast("Double"))
df = df.withColumn("Profit",col("Profit").cast("Double"))
df = df.withColumn("Marketing_Cost",col("Marketing_Cost").cast("Double"))

df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#."+table_name)

StatementMeta(, , , Waiting, )

## Fact-StoreSalesData

In [None]:

from pyspark.sql.functions import col

table_name = 'fact_store_sales_data'
Path_Fact='abfss://#SALES_WORKSPACE_NAME#@onelake.dfs.fabric.microsoft.com/#LAKEHOUSE_BRONZE#.Lakehouse/Files/FactData/'

# Read the CSV file into a Spark DataFrame
df = spark.read.format("csv").option("header", "true").load(Path_Fact + table_name)

# Cast the columns to the appropriate data types
df = df.withColumn("store", col("store").cast("integer"))
df = df.withColumn("item", col("item").cast("integer"))
df = df.withColumn("sales", col("sales").cast("integer"))

# Save the DataFrame as a Delta table
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

## Fact-Sales

In [None]:
from pyspark.sql.functions import col, to_date, unix_timestamp

table_name = 'fact_sales'
Path_LitwareData='abfss://#SALES_WORKSPACE_NAME#@onelake.dfs.fabric.microsoft.com/#LAKEHOUSE_BRONZE#.Lakehouse/Files/sales-transaction-litware'

df = spark.read.format("csv").option("header", "true").load(Path_LitwareData + '/*/*/*.txt')

# Select relevant columns
df = df.select(col("CustomerId"), col("ProductId"), col("Quantity"), col("Price"), col("TotalAmount"), col("TransactionDate"), col("ProfitAmount"), col("StoreId"))

# Cast columns to appropriate data types
df = df.withColumn("ProductId", col("ProductId").cast("integer"))
df = df.withColumn("CustomerId", col("CustomerId").cast("integer"))
df = df.withColumn("Quantity", col("Quantity").cast("integer"))
df = df.withColumn("Price", col("Price").cast("integer"))
df = df.withColumn("TotalAmount", col("TotalAmount").cast("double"))
df = df.withColumn("ProfitAmount", col("ProfitAmount").cast("double"))
df = df.withColumn("StoreId", col("StoreId").cast("integer"))
df = df.withColumn("TransactionDate", to_date(unix_timestamp(col('TransactionDate'), 'MM-dd-yyyy').cast("timestamp")))

# Save DataFrame as a table in Delta format
df.write.mode("overwrite").format("delta").saveAsTable("#LAKEHOUSE_SILVER#." + table_name)

StatementMeta(, 4259ac00-d937-4ed0-b39b-3a46ab7c1f3b, 37, Finished, Available)