# Transform data to silver layer 

# creating schema and loading the tables from bronze folder

In [15]:
from pyspark.sql.types import * 

orderschema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", IntegerType()),
    StructField("OrderDate", DateType()),
    StructField("CustomerName", StringType()),
    StructField("Email", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", FloatType()),
    StructField("Tax", FloatType()),
    StructField("IsFlagged", BooleanType()),
    StructField("CreatedTS", DateType()), 
    StructField("ModifiedTS", DateType())
    
])

df = spark.read.format("csv").option("header", "false").schema(orderschema).load("Files/broze/*.csv")

StatementMeta(, 363b84ef-aa38-4f16-9586-55e1667c74b1, 17, Finished, Available, Finished)

In [16]:
display(df.limit(20))

StatementMeta(, 363b84ef-aa38-4f16-9586-55e1667c74b1, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4a6d6c9a-68dd-4812-bc76-d2c2b03deb75)

# Adding additional column using withcolumn function for data validation and function

In [17]:
from pyspark.sql.functions import *
# Adding column is flagged , created timestamp and modified time stamp

df = df.withColumn("FileName", input_file_name())\
    .withColumn("IsFlagged", when(col("OrderDate")< '2019-08-01', True).otherwise(False))\
    .withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp())
# Updating custommer name from null to Unknows 
df = df.withColumn("CustomerName", when((col("CustomerName").isNull() | (col("CustomerName")=="")), lit("Unknown")).otherwise(col("CustomerName")))

StatementMeta(, 363b84ef-aa38-4f16-9586-55e1667c74b1, 19, Finished, Available, Finished)

# Defining sales silver table schema to create sales table 

In [18]:
from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
.tableName("dbo.Sales_Silver") \
.addColumn("SalesOrderNumber", StringType()) \
.addColumn("SalesOrderLineNumber", IntegerType())\
.addColumn("OrderDate", DateType()) \
.addColumn("CustomerName", StringType()) \
.addColumn("Email", StringType()) \
.addColumn("Item", StringType())\
.addColumn("Quantity", IntegerType()) \
.addColumn("UnitPrice", FloatType()) \
.addColumn("Tax", FloatType()) \
.addColumn("FileName", StringType()) \
.addColumn("IsFlagged", BooleanType()) \
.addColumn("CreatedTS", DateType()) \
.addColumn("ModifiedTS", DateType()) \
.execute()

StatementMeta(, 363b84ef-aa38-4f16-9586-55e1667c74b1, 20, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x771393694190>

# Upsert operation based on condition


In [19]:
# Update existing records and insert new ones based on a condition defined by the columns SalesOrderNumber, OrderDate, CustomerName, and Item.
# delta table is the empty table we created and update is the tables from the dataframe loaded and edited from bronze layer (the values)
from delta.tables import *
    
deltaTable = DeltaTable.forPath(spark, 'Tables/dbo/sales_silver')
    
dfUpdates = df
    
deltaTable.alias('silver')\
       .merge(
    dfUpdates.alias('updates'),
    'silver.SalesOrderNumber = updates.SalesOrderNumber and silver.OrderDate = updates.OrderDate and silver.CustomerName = updates.CustomerName and silver.Item = updates.Item'
  ) \
   .whenMatchedUpdate(set =
    {
          
    }
  ) \
 .whenNotMatchedInsert(values =
    {
      "SalesOrderNumber": "updates.SalesOrderNumber",
      "SalesOrderLineNumber": "updates.SalesOrderLineNumber",
      "OrderDate": "updates.OrderDate",
      "CustomerName": "updates.CustomerName",
      "Email": "updates.Email",
      "Item": "updates.Item",
      "Quantity": "updates.Quantity",
      "UnitPrice": "updates.UnitPrice",
      "Tax": "updates.Tax",
      "FileName": "updates.FileName",
      "IsFlagged": "updates.IsFlagged",
      "CreatedTS": "updates.CreatedTS",
      "ModifiedTS": "updates.ModifiedTS"
    }
  ) \
  .execute()

StatementMeta(, 363b84ef-aa38-4f16-9586-55e1667c74b1, 21, Finished, Available, Finished)

In [20]:
display(dfUpdates)

StatementMeta(, 363b84ef-aa38-4f16-9586-55e1667c74b1, 22, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fa1e954c-ce4d-4927-87ea-9f25964c04ba)