# Sales order data exploration
Use this notebook to explore sales order data

In [3]:
df = spark.read.format("csv").option("header","false").load("Files/orders/2019.csv")

display(df)

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d7f3a745-3574-4add-aa13-60896f737e88)

In [4]:
from pyspark.sql.types import *

orderSchema = StructType([
     StructField("SalesOrderNumber", StringType()),
     StructField("SalesOrderLineNumber", IntegerType()),
     StructField("OrderDate", DateType()),
     StructField("CustomerName", StringType()),
     StructField("Email", StringType()),
     StructField("Item", StringType()),
     StructField("Quantity", IntegerType()),
     StructField("UnitPrice", FloatType()),
     StructField("Tax", FloatType())
])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/2019.csv")

display(df)

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d576cf46-c0fb-49d8-9664-68ec5e2ad6b6)

In [5]:
from pyspark.sql.types import *

orderSchema = StructType([
     StructField("SalesOrderNumber", StringType()),
     StructField("SalesOrderLineNumber", IntegerType()),
     StructField("OrderDate", DateType()),
     StructField("CustomerName", StringType()),
     StructField("Email", StringType()),
     StructField("Item", StringType()),
     StructField("Quantity", IntegerType()),
     StructField("UnitPrice", FloatType()),
     StructField("Tax", FloatType())
])

df = spark.read.format("csv").schema(orderSchema).load("Files/orders/*.csv")

display(df)

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 329eda59-9f82-47a5-93da-df7a7f1b9210)

In [7]:
customers = df.select("CustomerName", "Email").where(df['Item']=='Road-250 Red, 52')

print(customers.count())
print(customers.distinct().count())

display(customers.distinct())

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 9, Finished, Available, Finished)

133
133


SynapseWidget(Synapse.DataFrame, f3d763e8-968e-4194-8262-01fe6c075a28)

In [9]:
productSales = df.select("Item", "Quantity").groupBy("Item").sum()

display(productSales)

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2cc9195c-072a-49c0-a5bb-f15fa47e0339)

In [10]:
from pyspark.sql.functions import col

# Group by "Item" and aggregate Quantity as a sum
productSales = df.select("Item", "Quantity").groupBy("Item").sum()

# Rename the aggregated column to "TotalQuantity" and sort by this column in descending order
productSales = productSales.withColumnRenamed("sum(Quantity)", "TotalQuantity").orderBy(col("TotalQuantity").desc())

# Display the result
display(productSales)


StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 38657802-7418-4f89-afdf-47851ae184c4)

In [11]:
from pyspark.sql.functions import *

yearlySales = df.select(year(col("OrderDate")).alias("Year")).groupBy("Year").count().orderBy("Year")

display(yearlySales)

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3ab2e757-af17-4132-b9aa-d473275aa682)

In [13]:
from pyspark.sql.functions import *

# Create Year and Month columns
transformed_df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Create the new FirstName and LastName fields
transformed_df = transformed_df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
transformed_df = transformed_df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "Email", "Item", "Quantity", "UnitPrice", "Tax"]

# Display the first five orders
display(transformed_df.limit(5))

# Save the transformed DataFrame in Parquet format
transformed_df.write.mode("overwrite").parquet('Files/transformed_data/orders')

print ("Transformed data saved!")

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cf1dc786-82e1-4f71-9a4a-169807284870)

Transformed data saved!


### Save data in partitioned files
When dealing with large volumes of data, partitioning can significantly improve performance and make it easier to filter data.

In [15]:
orders_df = spark.read.format("parquet").load("Files/transformed_data/orders")

orders_df.write.partitionBy("Year","Month").mode("overwrite").parquet("Files/partitioned_data")

print ("Transformed data saved!")

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 17, Finished, Available, Finished)

Transformed data saved!


In [16]:
orders_2021_df = spark.read.format("parquet").load("Files/partitioned_data/Year=2021/Month=*")

display(orders_2021_df)

StatementMeta(, a0a9d306-f608-449f-a4f9-8a79734018d9, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 66e2da75-e082-478e-9ca5-b105d5fcb9db)