## Data Ingestion Sales  Data Set
We are going to load the Sales Data Set from a private Azure Storage Account.

<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>No changes are required to this cell, This cell have all the necessary credentials to Ingest data from storage account
</font>

In [5]:
# Providing the details for the Azure Storage account
# Mention about the SAS key
storage_account = "fabricdatafactorylab"
container = "labdata"

# Since the container is set to private access, we need the SAS Key
# Ask the instructor for the SAS Key. This is an example of how the SAS key should look like: 
# ?sv=2021-10-04&ss=btqf&srt=sco&spr=https%2Chttp&st=2023-10-16T10%3A23%3A26Z&se=2024-10-17T10%3A23%3A00Z&sp=rl&sig=9PVMpC31bgaIbN2rsdYd0huYi2RutRa0c7czd13KEDU%3D
sas_token = "sas key from instructor" 

# Set Spark config to access  blob storage
spark.conf.set("fs.azure.sas.%s.%s.blob.core.windows.net" % (container, storage_account),sas_token)

# We specify the path for the csv files we need to read. By reading the *.csv, we are reading all the files from the folder Facts
wasbs_path1 = f"wasbs://{container}@{storage_account}.blob.core.windows.net/csv/Facts/Sales_File1.csv"
wasbs_path2 = f"wasbs://{container}@{storage_account}.blob.core.windows.net/csv/Facts/Sales_File2.csv"

# We are specifying the file type, csv and displaying the headers of the sales files
sales_df1 = spark.read.format("csv").option("delimiter", ";").option("header","true").option("inferSchema", "true").load(wasbs_path1)
sales_df2 = spark.read.format("csv").option("delimiter", ";").option("header","true").option("inferSchema", "true").load(wasbs_path2)

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 7, Finished, Available)

In [6]:
# We are currently showcasing both data frames
sales_df2.show()
sales_df1.show()

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 8, Finished, Available)

+----------------+--------------------+-------------+-----------+--------------+------------+----------------+--------------+---------+--------+---------+
|SalesOrderNumber|SalesOrderLineNumber|SalesQuantity|SalesAmount|ReturnQuantity|ReturnAmount|DiscountQuantity|DiscountAmount|TotalCost|UnitCost|UnitPrice|
+----------------+--------------------+-------------+-----------+--------------+------------+----------------+--------------+---------+--------+---------+
|  200808248CS632|                  11|            1|    3199,99|             0|        0,00|               0|          0,00|  1060,22| 1060,22|  3199,99|
|  200809128CS634|                 445|            1|    3199,99|             0|        0,00|               0|          0,00|  1060,22| 1060,22|  3199,99|
|  200808248CS632|                  12|            1|    3199,99|             0|        0,00|               0|          0,00|  1060,22| 1060,22|  3199,99|
|  200808058CS628|                 181|            1|    3199,99|     

In [10]:
# We print only the schema to better understand the dataframes data types structure

sales_df1.printSchema()
sales_df1.count()


StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 12, Finished, Available)

root
 |-- OnlineSalesKey: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- DeliveryDate: timestamp (nullable = true)
 |-- StoreKey: integer (nullable = true)
 |-- ProductKey: integer (nullable = true)
 |-- PromotionKey: integer (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: integer (nullable = true)



1000000

In [11]:
# We print only the schema to better understand the dataframes data types structure. 
# As you can see these two dataframes have different columns. Hence a merge join is required.

sales_df2.printSchema()
sales_df2.count()

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 13, Finished, Available)

root
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: integer (nullable = true)
 |-- SalesQuantity: integer (nullable = true)
 |-- SalesAmount: string (nullable = true)
 |-- ReturnQuantity: integer (nullable = true)
 |-- ReturnAmount: string (nullable = true)
 |-- DiscountQuantity: integer (nullable = true)
 |-- DiscountAmount: string (nullable = true)
 |-- TotalCost: string (nullable = true)
 |-- UnitCost: string (nullable = true)
 |-- UnitPrice: string (nullable = true)



1000000

In [7]:
# By ordering the data frames based on the columns that will be used for the merge join, we can significantly improve the efficiency of the join operation.
# In this case the column SalesOrderNumber
from pyspark.sql.functions import col

sorted_sales_df1 = sales_df1.orderBy(col("SalesOrderNumber"),col("SalesOrderLineNumber"))

sorted_sales_df1.show()

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 9, Finished, Available)

+--------------+-------------------+-------------------+--------+----------+------------+-----------+----------------+--------------------+
|OnlineSalesKey|          OrderDate|       DeliveryDate|StoreKey|ProductKey|PromotionKey|CustomerKey|SalesOrderNumber|SalesOrderLineNumber|
+--------------+-------------------+-------------------+--------+----------+------------+-----------+----------------+--------------------+
|      31721514|2007-01-01 00:00:00|2007-01-03 00:00:00|     307|         8|           5|         29|  20070101111028|                   4|
|      32048750|2007-01-01 00:00:00|2007-01-06 00:00:00|     307|      1052|           5|         29|  20070101111028|                   6|
|      32048751|2007-01-01 00:00:00|2007-01-07 00:00:00|     307|      1293|           5|         29|  20070101111028|                   7|
|      31868524|2007-01-01 00:00:00|2007-01-09 00:00:00|     306|       904|          10|        333|  20070101111332|                   4|
|      31721520|2007

In [8]:
# By ordering the data frames based on the columns that will be used for the merge join, we can significantly improve the efficiency of the join operation.
# In this case the column SalesOrderNumber
from pyspark.sql.functions import col

sorted_sales_df2 = sales_df2.orderBy(col("SalesOrderNumber"),col("SalesOrderLineNumber"))
sorted_sales_df2.show()

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 10, Finished, Available)

+----------------+--------------------+-------------+-----------+--------------+------------+----------------+--------------+---------+--------+---------+
|SalesOrderNumber|SalesOrderLineNumber|SalesQuantity|SalesAmount|ReturnQuantity|ReturnAmount|DiscountQuantity|DiscountAmount|TotalCost|UnitCost|UnitPrice|
+----------------+--------------------+-------------+-----------+--------------+------------+----------------+--------------+---------+--------+---------+
|  20070101111028|                   4|            1|    50,9915|             0|        0,00|               1|        8,9985|    30,58|   30,58|    59,99|
|  20070101111028|                   6|            1|     532,95|             0|        0,00|               1|         94,05|   207,74|  207,74|   627,00|
|  20070101111028|                   7|            1|     505,75|             0|        0,00|               1|         89,25|   197,14|  197,14|   595,00|
|  20070101111332|                   4|            1|     60,792|     

In [9]:
# This cell contains an inner join operation on the 'SalesOrderNumber' and 'SalesOrderLineNumber' columns.

merged_df = sorted_sales_df1.join(
    sorted_sales_df2,
    on=["SalesOrderNumber", "SalesOrderLineNumber"],
    how="inner"
)

# Show the result
merged_df.printSchema()

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 11, Finished, Available)

root
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: integer (nullable = true)
 |-- OnlineSalesKey: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- DeliveryDate: timestamp (nullable = true)
 |-- StoreKey: integer (nullable = true)
 |-- ProductKey: integer (nullable = true)
 |-- PromotionKey: integer (nullable = true)
 |-- CustomerKey: integer (nullable = true)
 |-- SalesQuantity: integer (nullable = true)
 |-- SalesAmount: string (nullable = true)
 |-- ReturnQuantity: integer (nullable = true)
 |-- ReturnAmount: string (nullable = true)
 |-- DiscountQuantity: integer (nullable = true)
 |-- DiscountAmount: string (nullable = true)
 |-- TotalCost: string (nullable = true)
 |-- UnitCost: string (nullable = true)
 |-- UnitPrice: string (nullable = true)



##### Prior to providing a table name, we verify that no other table shares the same name.

In [12]:
%%sql
DROP TABLE IF EXISTS Sales;


StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 14, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

##  Sales DataSet ingestion onto a Managed Lakehouse Table


<font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>
</font>

In [13]:
# Writing the Data Frame directly into the Delta Table from Managed Zone
table_name = 'Sales'

merged_df \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .save("Tables/" + table_name)

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 15, Finished, Available)

In [15]:
# Now that the table has been created, we can utilize Pyspark SQL to generate a new data frame and load the table
table_name = "Sales"

# Read the table into a DataFrame
df = spark.read.table(table_name)

# Calculate the number of rows
row_count = df.count()

# Print or use the row count as needed
print("Number of rows:", row_count)

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 17, Finished, Available)

Number of rows: 1000000


In [16]:
# To display the content of the table as dataframe we run the display command:
display(df)

StatementMeta(, 9c7bbef1-2a4b-4518-a49a-3729ccb7d26e, 18, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2a4e3f71-ccc9-42e9-8426-f526847c4622)