In [0]:
dbutils.fs.cp("file:/Workspace/Shared/sales_data.csv", "dbfs:/FileStore/streaming/input/sales_data.csv") 
dbutils.fs.cp("file:/Workspace/Shared/customer_data.json", "dbfs:/FileStore/streaming/input/customer_data.json") 

True

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
#Initialize SparkSession
spark=SparkSession.builder \
.appName("StructuredStreamingExample") \
.getOrCreate()
# Define the schema for the CSV data
sales_schema= "OrderID INT, OrderDate STRING, CustomerID STRING, Product STRING, Quantity INT, Price DOUBLE"
#Read streaming data from CSV files
df_sales_stream=spark.readStream \
.format("csv") \
.option("header", "true") \
.schema (sales_schema) \
.load("dbfs:/FileStore/streaming/input/")
df_sales_stream.printSchema()
#Define the schema for the JSON data
customer_schema= "CustomerID STRING, CustomerName STRING, Region STRING, SignupDate STRING"
df_customers_stream=spark.readStream \
.format("json") \
.schema (customer_schema) \
.load("dbfs:/FileStore/streaming/input/")
df_customers_stream.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)

root
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- SignupDate: string (nullable = true)



In [0]:
from pyspark.sql.functions import current_date, datediff, to_timestamp
#Transform the sales data: Add a new column for total amount
df_sales_transformed = df_sales_stream.select(
	col ("OrderID"),
	to_timestamp(col("OrderDate"), "yyyy-MM-dd HH:mm:ss").alias ("OrderDate"), 
	col ("Product"),
	col("Quantity"),
	col("Price"),
	(col("Quantity") * col("Price")).alias("TotalAmount")
)
print("Applied transformations on sales data...")
# Add watermark to handle late data and perform an aggregation
df_sales_aggregated=df_sales_transformed \
	.withWatermark("OrderDate", "1 day") \
	.groupBy("Product") \
	.agg({"TotalAmount": "sum"})
print("Aggregated sales data by product...")
#Transform the customer data: Add a new column for the number of years since signup
df_customers_transformed = df_customers_stream.withColumn(
	"YearsSinceSignup",
	datediff (current_date(), to_timestamp(col("SignupDate"), "yyyy-MM-dd")).cast("int") / 365
)
print("Applied transformations on customer data...")

Applied transformations on sales data...
Aggregated sales data by product...
Applied transformations on customer data...


In [0]:
#Write the aggregated sales data to a console sink for debugging
sales_query= df_sales_aggregated.writeStream \
	.outputMode("update") \
	.format("console") \
	.start()
print("Started streaming query to write aggregated sales data to console...")
# Write the transformed customer data to a console sink for debugging
customers_query= df_customers_transformed.writeStream \
	.outputMode("append") \
	.format("console") \
	.start()
print("Started streaming query to write transformed customer data to console...")

Started streaming query to write aggregated sales data to console...
Started streaming query to write transformed customer data to console...


In [0]:
import pandas as pd
#Create sample sales data
data = {
"OrderID": [1, 2, 3, 4],
"OrderDate": ["2024-01-01 10:00:00", "2024-01-02 11:00:00", "2024-01-03 12:00:00", "2024-01-04 13:00:00"],
"CustomerID": ["c001", "c002", "C003", "C004"], "Product": ["ProductA", "ProductB", "ProductC", "ProductD"], "Quantity": [10, 20, 15, 5],
"Price": [100.0, 200.0, 150.0, 50.0]
}
#Convert to DataFrame
df= pd.DataFrame(data)
#Save to CSV
csv_path = "/dbfs/FileStore/sales_data.csv"
df.to_csv(csv_path, index=False)
print(f"Sample data saved to {csv_path}")

Sample data saved to /dbfs/FileStore/sales_data.csv


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Structured StreamingExample") \
    .getOrCreate()

# Load data from CSV
df = spark.read.format("csv").option("header", "true").load("/FileStore/sales_data.csv")
print("Data Loaded Successfully")

# Transform the data: Add a new column for total amount
df_transformed = df.withColumn(
    "TotalAmount",
    col("Quantity").cast("int") * col("Price").cast("double")
)
print("Data Transformed Successfully")

# Write transformed data to a Delta table with schema evolution enabled
df_transformed.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .save("/delta/sales_data")
print("Transformed data written to Delta table successfully")

Data Loaded Successfully
Data Transformed Successfully
Transformed data written to Delta table successfully
