# Single table column transformation


In [0]:
dbutils.fs.ls('mnt/bronze/SalesLT/')

[FileInfo(path='dbfs:/mnt/bronze/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1746543591000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/Product/', name='Product/', size=0, modificationTime=1746543591000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/SalesOrderDetail/', name='SalesOrderDetail/', size=0, modificationTime=1746543622000),
 FileInfo(path='dbfs:/mnt/bronze/SalesLT/SalesOrderHeader/', name='SalesOrderHeader/', size=0, modificationTime=1746722825000)]

In [0]:
dbutils.fs.ls('mnt/silver/')

[FileInfo(path='dbfs:/mnt/silver/SalesLT/', name='SalesLT/', size=0, modificationTime=1746612695000)]

In [0]:
df = spark.read.parquet("/mnt/bronze/SalesLT/Customer/")


In [0]:
df = display(df)

CustomerID,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,CompanyName,SalesPerson,EmailAddress,Phone,PasswordHash,PasswordSalt,rowguid,ModifiedDate
1,False,Mr.,Orlando,N.,Gee,,A Bike Store,adventure-works\pamela0,orlando0@adventure-works.com,245-555-0173,L/Rlwxzp4w7RWmEgXX+/A7cXaePEPcp+KwQhl2fJL7w=,1KjXYs4=,3f5ae95e-b87d-4aed-95b4-c3797afcb74f,2005-08-01T00:00:00Z
2,False,Mr.,Keith,,Harris,,Progressive Sports,adventure-works\david8,keith0@adventure-works.com,170-555-0127,YPdtRdvqeAhj6wyxEsFdshBDNXxkCXn+CRgbvJItknw=,fs1ZGhY=,e552f657-a9af-4a7d-a645-c429d6e02491,2006-08-01T00:00:00Z
3,False,Ms.,Donna,F.,Carreras,,Advanced Bike Components,adventure-works\jillian0,donna0@adventure-works.com,279-555-0130,LNoK27abGQo48gGue3EBV/UrlYSToV0/s87dCRV7uJk=,YTNH5Rw=,130774b1-db21-4ef3-98c8-c104bcd6ed6d,2005-09-01T00:00:00Z
4,False,Ms.,Janet,M.,Gates,,Modular Cycle Systems,adventure-works\jillian0,janet1@adventure-works.com,710-555-0173,ElzTpSNbUW1Ut+L5cWlfR7MF6nBZia8WpmGaQPjLOJA=,nm7D5e4=,ff862851-1daa-4044-be7c-3e85583c054d,2006-07-01T00:00:00Z
5,False,Mr.,Lucy,,Harrington,,Metropolitan Sports Supply,adventure-works\shu0,lucy0@adventure-works.com,828-555-0186,KJqV15wsX3PG8TS5GSddp6LFFVdd3CoRftZM/tP0+R4=,cNFKU4w=,83905bdc-6f5e-4f71-b162-c98da069f38a,2006-09-01T00:00:00Z
6,False,Ms.,Rosmarie,J.,Carroll,,Aerobic Exercise Company,adventure-works\linda3,rosmarie0@adventure-works.com,244-555-0112,OKT0scizCdIzymHHOtyJKQiC/fCILSooSZ8dQ2Y34VM=,ihWf50M=,1a92df88-bfa2-467d-bd54-fcb9e647fdd7,2007-09-01T00:00:00Z
7,False,Mr.,Dominic,P.,Gash,,Associated Bikes,adventure-works\shu0,dominic0@adventure-works.com,192-555-0173,ZccoP/jZGQm+Xpzc7RKwDhS11YFNybwcPVRYTSNcnSg=,sPoUBSQ=,03e9273e-b193-448e-9823-fe0c44aeed78,2006-07-01T00:00:00Z
10,False,Ms.,Kathleen,M.,Garza,,Rural Cycle Emporium,adventure-works\josé1,kathleen0@adventure-works.com,150-555-0127,Qa3aMCxNbVLGrc0b99KsbQqiVgwYDfHcsK9GZSUxcTM=,Ls05W3g=,cdb6698d-2ff1-4fba-8f22-60ad1d11dabd,2006-09-01T00:00:00Z
11,False,Ms.,Katherine,,Harding,,Sharp Bikes,adventure-works\josé1,katherine0@adventure-works.com,926-555-0159,uRlorVzDGNJIX9I+ehTlRK+liT4UKRgWhApJgUMC2d4=,jpHKbqE=,750f3495-59c4-48a0-80e1-e37ec60e77d9,2005-08-01T00:00:00Z
12,False,Mr.,Johnny,A.,Caprio,Jr.,Bikes and Motorbikes,adventure-works\garrett1,johnny0@adventure-works.com,112-555-0191,jtF9jBoFYeJTaET7x+eJDkd7BzMz15Wo9odbGPBaIak=,wVLnvHo=,947bcaf1-1f32-44f3-b9c3-0011f95fbe54,2006-08-01T00:00:00Z


In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format, concat_ws, regexp_replace, col
from pyspark.sql.types import TimestampType
df = spark.read.parquet("/mnt/bronze/SalesLT/Customer/")
# Columns to be dropped
columns_to_drop = [
    "Suffix", 
    "PasswordSalt", 
    "RowGuid", 
    "PasswordHash", 
    "NameStyle"
]

# Convert Modified column to 'yyyy-MM-dd' format
df = df.withColumn("ModifiedDate", 
                   date_format(from_utc_timestamp(col('ModifiedDate').cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

# Combining FirstName, MiddleName, and LastName to a single column
df = df.withColumn("FullName", concat_ws(" ", col("FirstName"), col("MiddleName"), col("LastName")))

# dropping unnecessary columns
df = df.drop(*columns_to_drop, "FirstName", "MiddleName", "LastName")

# Keeping sales person name only
df = df.withColumn("SalesPerson", regexp_replace(col("SalesPerson"), r"(?i).*[\\-]([a-zA-Z]+)[0-9]*", r"$1"))

# Remove duplicate rows based on 'CustomerID'
df = df.dropDuplicates(["CustomerID"])

# Reorder columns: Title, FullName, CustomerID, and then the rest
desired_order = ["Title", "FullName", "CustomerID"]
remaining_cols = [col_name for col_name in df.columns if col_name not in desired_order]
df = df.select(*desired_order, *remaining_cols)

# Order the DataFrame by 'FullName' in ascending order
df = df.orderBy("FullName")

# Save the cleaned DataFrame in parquet format
df.write.format('delta').mode('overwrite').option("mergeSchema", "true").save('/mnt/silver/SalesLT/Customer/')



In [0]:
df = display(df)

Title,FullName,CustomerID,CompanyName,SalesPerson,EmailAddress,Phone,ModifiedDate
,A. Francesca Leonetti,202,Two-Seater Bikes,jillian,a0@adventure-works.com,645-555-0193,2007-09-01
,A. Francesca Leonetti,29943,Two-Seater Bikes,jillian,a0@adventure-works.com,645-555-0193,2007-09-01
Ms.,Abigail J. Gonzalez,345,Genial Bike Associates,jillian,abigail0@adventure-works.com,121-555-0139,2006-08-01
Ms.,Abigail J. Gonzalez,29792,Genial Bike Associates,jillian,abigail0@adventure-works.com,121-555-0139,2006-08-01
Mr.,Abraham L. Swearengin,511,Wheel Gallery,shu,abraham0@adventure-works.com,926-555-0136,2005-07-01
Mr.,Abraham L. Swearengin,30052,Wheel Gallery,shu,abraham0@adventure-works.com,926-555-0136,2005-07-01
Ms.,Aidan Delaney,75,Paint Supply,jillian,aidan0@adventure-works.com,358-555-0188,2005-09-01
Ms.,Aidan Delaney,29702,Paint Supply,jillian,aidan0@adventure-works.com,358-555-0188,2005-09-01
Mr.,Ajay Manchepalli,29978,Shipping Specialists,jae,ajay0@adventure-works.com,1 (11) 500 555-0174,2008-02-01
Mr.,Ajay Manchepalli,659,Shipping Specialists,jae,ajay0@adventure-works.com,1 (11) 500 555-0174,2008-02-01


In [0]:
df = spark.read.parquet("/mnt/bronze/SalesLT/Product/")

In [0]:
df= display(df)

ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate
680,"HL Road Frame - Black, 58",FR-R92B-58,Black,1059.31,1431.5,58,1016.04,18,6,2002-06-01T00:00:00Z,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,43dd68d6-14a4-461f-9069-55309d90ea7e,2008-03-11T10:01:36.827Z
706,"HL Road Frame - Red, 58",FR-R92R-58,Red,1059.31,1431.5,58,1016.04,18,6,2002-06-01T00:00:00Z,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,9540ff17-2712-4c90-a3d1-8ce5568b2462,2008-03-11T10:01:36.827Z
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,,,35,33,2005-07-01T00:00:00Z,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,2e1ef41a-c08a-4ff6-8ada-bde58b64a712,2008-03-11T10:01:36.827Z
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,,,35,33,2005-07-01T00:00:00Z,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,a25a44fb-c2de-4268-958f-110b8d7621e2,2008-03-11T10:01:36.827Z
709,"Mountain Bike Socks, M",SO-B909-M,White,3.3963,9.5,M,,27,18,2005-07-01T00:00:00Z,2006-06-30T00:00:00Z,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,18f95f47-1540-4e02-8f1f-cc1bcb6828d0,2008-03-11T10:01:36.827Z
710,"Mountain Bike Socks, L",SO-B909-L,White,3.3963,9.5,L,,27,18,2005-07-01T00:00:00Z,2006-06-30T00:00:00Z,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,161c035e-21b3-4e14-8e44-af508f35d80a,2008-03-11T10:01:36.827Z
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,,,35,33,2005-07-01T00:00:00Z,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,fd7c0858-4179-48c2-865b-abd5dfc7bc1d,2008-03-11T10:01:36.827Z
712,AWC Logo Cap,CA-1098,Multi,6.9223,8.99,,,23,2,2005-07-01T00:00:00Z,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,b9ede243-a6f4-4629-b1d4-ffe1aedc6de7,2008-03-11T10:01:36.827Z
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,Multi,38.4923,49.99,S,,25,11,2005-07-01T00:00:00Z,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,fd449c82-a259-4fae-8584-6ca0255faf68,2008-03-11T10:01:36.827Z
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,Multi,38.4923,49.99,M,,25,11,2005-07-01T00:00:00Z,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,6a290063-a0cf-432a-8110-2ea0fda14308,2008-03-11T10:01:36.827Z


In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format, col
from pyspark.sql.types import DecimalType, TimestampType

# Load data from the Bronze layer
df = spark.read.parquet("/mnt/bronze/SalesLT/Product/")

# Format 'ModifiedDate' as 'yyyy-MM-dd'
df = df.withColumn(
    "ModifiedDate",
    date_format(from_utc_timestamp(col("ModifiedDate").cast(TimestampType()), "UTC"), "yyyy-MM-dd")
)

# Identify and format all timestamp columns
timestamp_cols = [c for c, t in df.dtypes if t == "timestamp"]
for c in timestamp_cols:
    df = df.withColumn(
        c,
        date_format(from_utc_timestamp(col(c), "UTC"), "yyyy-MM-dd")
    )

# Drop unnecessary columns if they exist
columns_to_drop = ["DiscontinuedDate", "ThumbNailPhoto", "ThumbnailPhotoFileName", "rowguid"]
df = df.drop(*[c for c in columns_to_drop if c in df.columns])

# Rename 'Name' to 'ProductName'
df = df.withColumnRenamed("Name", "ProductName")

# Reorder columns: ProductName, ProductID, ProductCategoryID, ProductModelID, then the rest
reorder = ["ProductName", "ProductID", "ProductCategoryID", "ProductModelID"]
remaining_cols = [c for c in df.columns if c not in reorder]
df = df.select(*reorder, *remaining_cols)

# Cast 'StandardCost' and 'ListPrice' to DecimalType(19, 2)
df = df.withColumn("StandardCost", col("StandardCost").cast(DecimalType(19, 2))) \
       .withColumn("ListPrice", col("ListPrice").cast(DecimalType(19, 2)))

df.write.format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true")\
  .save("/mnt/silver/SalesLT/Product/")



In [0]:
df =display(df)

ProductName,ProductID,ProductCategoryID,ProductModelID,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,SellStartDate,SellEndDate,ModifiedDate
"HL Road Frame - Black, 58",680,18,6,FR-R92B-58,Black,1059.31,1431.5,58,1016.04,2002-06-01,,2008-03-11
"HL Road Frame - Red, 58",706,18,6,FR-R92R-58,Red,1059.31,1431.5,58,1016.04,2002-06-01,,2008-03-11
"Sport-100 Helmet, Red",707,35,33,HL-U509-R,Red,13.09,34.99,,,2005-07-01,,2008-03-11
"Sport-100 Helmet, Black",708,35,33,HL-U509,Black,13.09,34.99,,,2005-07-01,,2008-03-11
"Mountain Bike Socks, M",709,27,18,SO-B909-M,White,3.4,9.5,M,,2005-07-01,2006-06-30,2008-03-11
"Mountain Bike Socks, L",710,27,18,SO-B909-L,White,3.4,9.5,L,,2005-07-01,2006-06-30,2008-03-11
"Sport-100 Helmet, Blue",711,35,33,HL-U509-B,Blue,13.09,34.99,,,2005-07-01,,2008-03-11
AWC Logo Cap,712,23,2,CA-1098,Multi,6.92,8.99,,,2005-07-01,,2008-03-11
"Long-Sleeve Logo Jersey, S",713,25,11,LJ-0192-S,Multi,38.49,49.99,S,,2005-07-01,,2008-03-11
"Long-Sleeve Logo Jersey, M",714,25,11,LJ-0192-M,Multi,38.49,49.99,M,,2005-07-01,,2008-03-11


In [0]:
df = spark.read.parquet("/mnt/bronze/SalesLT/SalesOrderDetail/")

In [0]:
df = display(df)

SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
71774,110562,1,836,356.898,0.0,356.898,e3a1994c-7a68-4ce8-96a3-77fdd3bbd730,2008-06-01T00:00:00Z
71774,110563,1,822,356.898,0.0,356.898,5c77f557-fdb6-43ba-90b9-9a7aec55ca32,2008-06-01T00:00:00Z
71776,110567,1,907,63.9,0.0,63.9,6dbfe398-d15d-425e-aa58-88178fe360e5,2008-06-01T00:00:00Z
71780,110616,4,905,218.454,0.0,873.816,377246c9-4483-48ed-a5b9-e56f005364e0,2008-06-01T00:00:00Z
71780,110617,2,983,461.694,0.0,923.388,43a54bcd-536d-4a1b-8e69-24d083507a14,2008-06-01T00:00:00Z
71780,110618,6,988,112.998,0.4,406.7928,12706fab-f3a2-48c6-b7c7-1ccde4081f18,2008-06-01T00:00:00Z
71780,110619,2,748,818.7,0.0,1637.4,b12f0d3b-5b4e-4f1f-b2f0-f7cde99dd826,2008-06-01T00:00:00Z
71780,110620,1,990,323.994,0.0,323.994,f117a449-039d-44b8-a4b2-b12001dacc01,2008-06-01T00:00:00Z
71780,110621,1,926,149.874,0.0,149.874,92e5052b-72d0-4c91-9a8c-42591803667e,2008-06-01T00:00:00Z
71780,110622,1,743,809.76,0.0,809.76,8bd33bed-c4f6-4d44-84fb-a7d04afcd794,2008-06-01T00:00:00Z


In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format, col, format_number

# Read the original DataFrame
df = spark.read.format('parquet').load('/mnt/bronze/SalesLT/SalesOrderDetail/')

# List of columns we want to modify directly by name
decimal_cols = ['UnitPrice', 'UnitPriceDiscount', 'LineTotal']
timestamp_cols = ['ModifiedDate']

# Drop unnecessary columns if they exist
columns_to_drop = [ "rowguid"]
df = df.drop(*[c for c in columns_to_drop if c in df.columns])
# Round decimal columns to 2 decimal places
for col_name in decimal_cols:
    df = df.withColumn(col_name, format_number(col(col_name), 2))

# Format 'ModifiedDate' timestamp to 'yyyy-MM-dd'
for col_name in timestamp_cols:
    df = df.withColumn(
        col_name,
        date_format(from_utc_timestamp(col(col_name), "UTC"), "yyyy-MM-dd")
    )

# Save the DataFrame back to Delta
df.write.format('delta').mode('overwrite').save('/mnt/silver/SalesLT/SalesOrderDetail/')


In [0]:
df= display(df)

SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate
71774,110562,1,836,356.9,0.0,356.9,2008-06-01
71774,110563,1,822,356.9,0.0,356.9,2008-06-01
71776,110567,1,907,63.9,0.0,63.9,2008-06-01
71780,110616,4,905,218.45,0.0,873.82,2008-06-01
71780,110617,2,983,461.69,0.0,923.39,2008-06-01
71780,110618,6,988,113.0,0.4,406.79,2008-06-01
71780,110619,2,748,818.7,0.0,1637.4,2008-06-01
71780,110620,1,990,323.99,0.0,323.99,2008-06-01
71780,110621,1,926,149.87,0.0,149.87,2008-06-01
71780,110622,1,743,809.76,0.0,809.76,2008-06-01


In [0]:
df = spark.read.parquet("/mnt/bronze/SalesLT/SalesOrderHeader/")

In [0]:
df =display(df)


SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ShipToAddressID,BillToAddressID,ShipMethod,CreditCardApprovalCode,SubTotal,TaxAmt,Freight,TotalDue,Comment,rowguid,ModifiedDate
71774,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71774,PO348186287,10-4020-000609,29847,1092,1092,CARGO TRANSPORT 5,,880.3484,70.4279,22.0087,972.785,,89e42cdc-8506-48a2-b89b-eb3e64e3554e,2008-06-08T00:00:00Z
71776,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71776,PO19952192051,10-4020-000106,30072,640,640,CARGO TRANSPORT 5,,78.81,6.3048,1.9703,87.0851,,8a3448c5-e677-4158-a29b-dd33069be0b0,2008-06-08T00:00:00Z
71780,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71780,PO19604173239,10-4020-000340,30113,653,653,CARGO TRANSPORT 5,,38418.6895,3073.4952,960.4672,42452.6519,,a47665d2-7ac9-4cf3-8a8b-2a3883554284,2008-06-08T00:00:00Z
71782,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71782,PO19372114749,10-4020-000582,29485,1086,1086,CARGO TRANSPORT 5,,39785.3304,3182.8264,994.6333,43962.7901,,f1be45a5-5c57-4a50-93c6-5f8be44cb7cb,2008-06-08T00:00:00Z
71783,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71783,PO19343113609,10-4020-000024,29957,992,992,CARGO TRANSPORT 5,,83858.4261,6708.6741,2096.4607,92663.5609,,7db2329e-6446-42a8-8915-9c8370b68ed8,2008-06-08T00:00:00Z
71784,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71784,PO19285135919,10-4020-000448,29736,659,659,CARGO TRANSPORT 5,,108561.8317,8684.9465,2714.0458,119960.824,,ca31f324-2c32-4f8d-95eb-596e7f343027,2008-06-08T00:00:00Z
71796,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71796,PO17052159664,10-4020-000420,29660,1058,1058,CARGO TRANSPORT 5,,57634.6342,4610.7707,1440.8659,63686.2708,,917ef5ba-f32d-4563-8588-66db0bcdc846,2008-06-08T00:00:00Z
71797,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71797,PO16501134889,10-4020-000142,29796,642,642,CARGO TRANSPORT 5,,78029.6898,6242.3752,1950.7422,86222.8072,,bb3fee84-c8bf-4dd2-bcca-675ab6a11c38,2008-06-08T00:00:00Z
71815,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71815,PO13021155785,10-4020-000276,30089,1034,1034,CARGO TRANSPORT 5,,1141.5782,91.3263,28.5395,1261.444,,2aa5f39b-1096-4a4b-b17b-f10504a397ce,2008-06-08T00:00:00Z
71816,2,2008-06-01T00:00:00Z,2008-06-13T00:00:00Z,2008-06-08T00:00:00Z,5,False,SO71816,PO12992180445,10-4020-000295,30027,1038,1038,CARGO TRANSPORT 5,,3398.1659,271.8533,84.9541,3754.9733,,e3c189e7-98de-4c40-b6c2-0d1d13f9bb33,2008-06-08T00:00:00Z


In [0]:
from pyspark.sql.functions import from_utc_timestamp, date_format, col
from pyspark.sql.types import DecimalType, TimestampType

# Load data from the Bronze layer
df = spark.read.parquet("/mnt/bronze/SalesLT/SalesOrderHeader/")

# Format all timestamp columns as 'yyyy-MM-dd'
timestamp_cols = [c for c, t in df.dtypes if t == "timestamp"]
for c in timestamp_cols:
    df = df.withColumn(
        c,
        date_format(from_utc_timestamp(col(c), "UTC"), "yyyy-MM-dd")
    )

# Drop unnecessary columns (case-sensitive check)
columns_to_drop = ["Comment", "CreditCardApprovalCode", "rowguid"]
df = df.drop(*[c for c in columns_to_drop if c in df.columns])

# Cast all decimal (float/double) columns to Decimal(19, 2)
decimal_cols = [c for c, t in df.dtypes if t in ("double", "float")]
for c in decimal_cols:
    df = df.withColumn(c, col(c).cast(DecimalType(19, 2)))

# Define desired column order
priority_cols = [
    "CustomerID",            # integer
    "AccountNumber",         # string
    "SalesOrderID",          # integer
    "SalesOrderNumber",      # string
    "PurchaseOrderNumber",   # string
    "OrderDate",             # timestamp
    "ShipDate",              # timestamp
    "Status"                 # integer
]

# Append remaining columns that aren't already reordered
remaining_cols = [c for c in df.columns if c not in priority_cols]
df = df.select(*priority_cols, *remaining_cols)

# Save to the Silver layer in Delta format with schema overwrite
df.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/silver/SalesLT/SalesOrderHeader/")


In [0]:
df =display(df)

CustomerID,AccountNumber,SalesOrderID,SalesOrderNumber,PurchaseOrderNumber,OrderDate,ShipDate,Status,RevisionNumber,DueDate,OnlineOrderFlag,ShipToAddressID,BillToAddressID,ShipMethod,SubTotal,TaxAmt,Freight,TotalDue,ModifiedDate
29847,10-4020-000609,71774,SO71774,PO348186287,2008-06-01,2008-06-08,5,2,2008-06-13,False,1092,1092,CARGO TRANSPORT 5,880.3484,70.4279,22.0087,972.785,2008-06-08
30072,10-4020-000106,71776,SO71776,PO19952192051,2008-06-01,2008-06-08,5,2,2008-06-13,False,640,640,CARGO TRANSPORT 5,78.81,6.3048,1.9703,87.0851,2008-06-08
30113,10-4020-000340,71780,SO71780,PO19604173239,2008-06-01,2008-06-08,5,2,2008-06-13,False,653,653,CARGO TRANSPORT 5,38418.6895,3073.4952,960.4672,42452.6519,2008-06-08
29485,10-4020-000582,71782,SO71782,PO19372114749,2008-06-01,2008-06-08,5,2,2008-06-13,False,1086,1086,CARGO TRANSPORT 5,39785.3304,3182.8264,994.6333,43962.7901,2008-06-08
29957,10-4020-000024,71783,SO71783,PO19343113609,2008-06-01,2008-06-08,5,2,2008-06-13,False,992,992,CARGO TRANSPORT 5,83858.4261,6708.6741,2096.4607,92663.5609,2008-06-08
29736,10-4020-000448,71784,SO71784,PO19285135919,2008-06-01,2008-06-08,5,2,2008-06-13,False,659,659,CARGO TRANSPORT 5,108561.8317,8684.9465,2714.0458,119960.824,2008-06-08
29660,10-4020-000420,71796,SO71796,PO17052159664,2008-06-01,2008-06-08,5,2,2008-06-13,False,1058,1058,CARGO TRANSPORT 5,57634.6342,4610.7707,1440.8659,63686.2708,2008-06-08
29796,10-4020-000142,71797,SO71797,PO16501134889,2008-06-01,2008-06-08,5,2,2008-06-13,False,642,642,CARGO TRANSPORT 5,78029.6898,6242.3752,1950.7422,86222.8072,2008-06-08
30089,10-4020-000276,71815,SO71815,PO13021155785,2008-06-01,2008-06-08,5,2,2008-06-13,False,1034,1034,CARGO TRANSPORT 5,1141.5782,91.3263,28.5395,1261.444,2008-06-08
30027,10-4020-000295,71816,SO71816,PO12992180445,2008-06-01,2008-06-08,5,2,2008-06-13,False,1038,1038,CARGO TRANSPORT 5,3398.1659,271.8533,84.9541,3754.9733,2008-06-08
