### Resetting Delta Tables

I use this block of code to reset my scenario when I want to start the test again from scratch. This is necessary only when I need to clear out existing data and begin with a fresh state for all my Delta tables.

The code loops through a list of Delta table paths and removes each one recursively, effectively dropping all data stored in these tables. By doing this, I ensure that any previously persisted data is wiped out, and I can proceed with a clean setup for testing.


In [0]:
from pyspark.sql import SparkSession

# Initialize SparkSession if not already initialized
spark = SparkSession.builder.getOrCreate()

# List of delta table paths
delta_table_paths = [
    "/FileStore/tables/delta/orders_delta",
    "/FileStore/tables/delta/order_lines_delta",
    "/FileStore/tables/delta/charges_delta",
    "/FileStore/tables/delta/inventory_delta",
    "/FileStore/tables/delta/processed_orders_delta"
]

# Loop through each delta table path and delete the directories
for table_path in delta_table_paths:
    dbutils.fs.rm(table_path, True)  # True to remove recursively

print("All Delta tables have been dropped.")


INFO:py4j.clientserver:Received command c on object id p0


All Delta tables have been dropped.


### Resetting Inventory for Testing

I use this code block when I want to reset the inventory amounts and start the test again from a clean state. This is only necessary when I need to ensure that the inventory values are restored to their original starting point before running new tests.

The process involves reloading the initial inventory data from a JSON file and overwriting the current Delta table to reset all inventory amounts. By doing this, I make sure that my tests operate with the correct baseline inventory values, avoiding any impact from previous test runs.


In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# Step 1: Load the initial inventory from JSON file
inventory_json_path = "/FileStore/tables/initial_inventory.json"

# Load the JSON file
initial_inventory_df = spark.read.option("multiline", "true").json(inventory_json_path)

# Step 2: Define the inventory Delta table path
inventory_delta_path = "/FileStore/tables/delta/inventory_delta"

# Step 3: Reset and overwrite the inventory table (start from zero)
initial_inventory_df.select("productName", "sku", "initialInventory").write \
    .format("delta").mode("overwrite").save(inventory_delta_path)

print("Initial inventory has been successfully written to the Delta table.")


INFO:py4j.clientserver:Received command c on object id p0


Initial inventory has been successfully written to the Delta table.


### Importing Orders and Updating Inventory

I use this code to import orders into Delta tables and adjust the inventory based on the quantity sold in each order. This process ensures that all order data is up-to-date and that the inventory reflects the correct amounts after each transaction.

The key steps include:
1. Loading the raw order data from JSON files.
2. Processing the order details, order lines, and associated charges.
3. Updating the inventory by decreasing the stock levels based on the quantities sold.

The process involves:
- **Upserting** (merge operation) the order data into the Delta tables for orders, order lines, and charges.
- **Updating the inventory**: After processing the order lines, I aggregate the quantities sold for each product and decrease the corresponding amounts in the inventory Delta table.

This workflow ensures that my system remains synchronized, with accurate information on both orders and available inventory.


In [0]:
import logging
import shutil  # Import for file movement
import os
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from pyspark.sql.utils import AnalysisException

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Function to perform Delta upsert (merge) with error handling
def upsert_to_delta(source_df, delta_table_path, merge_condition):
    """
    Upserts (merges) data from source DataFrame into a Delta table.
    
    Args:
        source_df (DataFrame): DataFrame with source data.
        delta_table_path (str): Path to the Delta table.
        merge_condition (str): Condition for merging data.
    """
    try:
        if DeltaTable.isDeltaTable(spark, delta_table_path):
            delta_table = DeltaTable.forPath(spark, delta_table_path)
            # Perform the merge operation
            delta_table.alias("target").merge(
                source_df.alias("source"),
                merge_condition
            ).whenMatchedUpdateAll() \
             .whenNotMatchedInsertAll() \
             .execute()
            logger.info(f"Upsert successful for {delta_table_path}")
        else:
            # First time save (initial load)
            source_df.write.format("delta").mode("overwrite").save(delta_table_path)
            logger.info(f"Table {delta_table_path} created and data saved")
    except AnalysisException as e:
        logger.error(f"AnalysisException encountered: {e}")
    except Exception as e:
        logger.error(f"Failed to upsert data to {delta_table_path}: {e}")
        raise
    finally:
        logger.info(f"Upsert process completed for {delta_table_path}")

# Load JSON data with validation and error handling, checks if there are files to process
def load_json_data(json_directory_path):
    """
    Loads JSON data from the specified directory, returns empty DataFrame if no files.
    
    Args:
        json_directory_path (str): Path to the JSON files directory.
    
    Returns:
        DataFrame: Loaded JSON data or an empty DataFrame if no files are found.
    """
    try:
        # List files using dbutils.fs.ls
        files = dbutils.fs.ls(json_directory_path)
        if not files:
            logger.info(f"No files to process in {json_directory_path}")
            return spark.createDataFrame([], schema=None)  # Return an empty DataFrame if no files

        df = spark.read.option("multiLine", "true").json(json_directory_path)
        logger.info(f"Loaded JSON data from {json_directory_path}")
        return df
    except Exception as e:
        logger.error(f"Failed to load JSON data from {json_directory_path}: {e}")
        raise

# Function to move processed files
def move_processed_files(source_directory, target_directory):
    """
    Moves files from source directory to target directory.
    
    Args:
        source_directory (str): Source directory path.
        target_directory (str): Target directory path.
    """
    try:
        # Check if the target directory exists, if not create it
        try:
            dbutils.fs.ls(target_directory)  # Check if directory exists
        except:
            logger.info(f"Directory {target_directory} does not exist, creating it.")
            dbutils.fs.mkdirs(target_directory)
        
        # Move files from source to target
        files = dbutils.fs.ls(source_directory)
        for file_info in files:
            source_file = file_info.path
            target_file = target_directory + file_info.name
            dbutils.fs.mv(source_file, target_file)
            logger.info(f"Moved {file_info.name} to {target_directory}")
    except Exception as e:
        logger.error(f"Error moving files from {source_directory} to {target_directory}: {e}")
        raise



# Flatten and extract order-related data with schema validation
def extract_order_data(orders_df):
    """
    Extracts and flattens order-related data from the orders DataFrame.
    
    Args:
        orders_df (DataFrame): DataFrame containing raw order data.
    
    Returns:
        DataFrame: Flattened order data.
    """
    try:
        orders_df = orders_df.select(F.explode("list.elements.order").alias("order"))
        logger.info("Orders data successfully extracted and flattened")
        return orders_df
    except Exception as e:
        logger.error(f"Failed to extract order data: {e}")
        raise

# Process orders data with error handling
def process_order_data(orders_df):
    """
    Processes and upserts order data into the Delta table.
    
    Args:
        orders_df (DataFrame): DataFrame containing order data.
    """
    order_delta_path = "/FileStore/tables/delta/orders_delta"
    try:
        data = orders_df.select(
            F.col("order.purchaseOrderId").alias("purchaseOrderId"),
            F.col("order.customerOrderId").alias("customerOrderId"),
            F.col("order.customerEmailId").alias("customerEmailId"),
            F.col("order.orderType").alias("orderType"),
            F.col("order.orderDate").alias("orderDate")
        )
        logger.info("Processing orders data")
        upsert_to_delta(data, order_delta_path, "target.purchaseOrderId = source.purchaseOrderId")
    except Exception as e:
        logger.error(f"Error processing order data: {e}")
        raise

# Process order lines with error handling
def process_order_lines(orders_df):
    """
    Processes and upserts order line data into the Delta table and updates inventory.
    
    Args:
        orders_df (DataFrame): DataFrame containing order line data.
    """
    order_lines_delta_path = "/FileStore/tables/delta/order_lines_delta"
    try:
        order_lines_data = orders_df.select(
            "order.purchaseOrderId",
            F.explode("order.orderLines.orderLine").alias("orderLine")
        ).select(
            "purchaseOrderId",
            "orderLine.lineNumber",
            "orderLine.item.productName",
            "orderLine.item.sku",
            F.col("orderLine.orderLineQuantity.amount").cast("int").alias("amount"),
            "orderLine.fulfillment.shipMethod"
        )
        logger.info("Processing order lines data")
        upsert_to_delta(order_lines_data, order_lines_delta_path,
                        "target.purchaseOrderId = source.purchaseOrderId AND target.lineNumber = source.lineNumber")
        update_inventory(order_lines_data)
    except Exception as e:
        logger.error(f"Error processing order lines: {e}")
        raise

# Update inventory with error handling
def update_inventory(order_lines_data):
    """
    Updates the inventory Delta table based on quantities sold.
    
    Args:
        order_lines_data (DataFrame): DataFrame containing order line quantities sold.
    """
    inventory_delta_path = "/FileStore/tables/delta/inventory_delta"
    try:
        if DeltaTable.isDeltaTable(spark, inventory_delta_path):
            inventory_delta = DeltaTable.forPath(spark, inventory_delta_path)

            sold_products_df = order_lines_data.groupBy("productName", "sku").agg(F.sum("amount").alias("quantitySold"))
            sold_products_df.show(truncate=False)

            inventory_delta.alias("inventory").merge(
                sold_products_df.alias("sales"),
                "inventory.productName = sales.productName AND inventory.sku = sales.sku"
            ).whenMatchedUpdate(
                set={
                    "inventory.initialInventory": F.col("inventory.initialInventory") - F.col("sales.quantitySold")
                }
            ).execute()
            logger.info("Inventory updated successfully")
        else:
            logger.warning(f"Inventory Delta Table does not exist at {inventory_delta_path}")
    except Exception as e:
        logger.error(f"Error updating inventory: {e}")
        raise

# Process charges data with error handling
def process_charges_data(orders_df):
    """
    Processes and upserts charges data into the Delta table.
    
    Args:
        orders_df (DataFrame): DataFrame containing order charge data.
    """
    charges_delta_path = "/FileStore/tables/delta/charges_delta"
    try:
        charges_df = orders_df.select(
            "order.purchaseOrderId",
            F.explode("order.orderLines.orderLine").alias("orderLine")
        ).select(
            "purchaseOrderId",
            "orderLine.lineNumber",
            F.explode("orderLine.charges.charge").alias("charge")
        ).select(
            "purchaseOrderId",
            "lineNumber",
            "charge.chargeType",
            "charge.chargeName",
            F.when(F.col("charge.chargeAmount.amount").isNotNull(), F.col("charge.chargeAmount.amount")).alias("amount"),
            F.when(F.col("charge.chargeAmount.currency").isNotNull(), F.col("charge.chargeAmount.currency")).alias("currency"),
            F.when(F.col("charge.tax").isNotNull(), F.col("charge.tax.taxAmount.amount")).alias("taxAmount"),
            F.when(F.col("charge.tax").isNotNull(), F.col("charge.tax.taxAmount.currency")).alias("taxCurrency"),
            F.when(F.col("charge.tax").isNotNull(), F.col("charge.tax.taxName")).alias("taxName")
        )
        logger.info("Processing charges data")
        upsert_to_delta(charges_df, charges_delta_path,
                        "target.purchaseOrderId = source.purchaseOrderId AND target.lineNumber = source.lineNumber AND target.chargeType = source.chargeType")
    except Exception as e:
        logger.error(f"Error processing charges data: {e}")
        raise

# Main process that combines all steps with error handling
def main():
    """
    Main process to load, process, and upsert orders, order lines, and charges.
    """
    try:
        json_directory_path = "/FileStore/tables/orders/"
        processed_directory_path = "/FileStore/tables/processed/"

        files = dbutils.fs.ls(json_directory_path)
        if not files:
            logger.warning(f"No files to process in {json_directory_path}")
        else:
            orders_raw_df = load_json_data(json_directory_path)
            if orders_raw_df.rdd.isEmpty():
                return  # Exit early if no data to process
            orders_df = extract_order_data(orders_raw_df)
            
            process_order_data(orders_df)
            process_order_lines(orders_df)
            process_charges_data(orders_df)

            move_processed_files(json_directory_path, processed_directory_path)    
    except Exception as e:
        logger.error(f"Error in main process: {e}")
        raise

### Creating Delta Tables for Orders, Order Lines, Charges, and Inventory

I use this SQL code to create Delta tables for storing and managing orders, order lines, charges, and inventory data. These tables are essential for organizing my data in a structured and efficient way. By defining these Delta tables, I ensure that all the necessary data is persisted and easily accessible for queries and further processing.

#### Why?
Creating these Delta tables allows me to:
- **Persist data** in a reliable format (Delta Lake), which supports ACID transactions.
- **Access and query** data efficiently using SQL or PySpark.
- **Enable updates** and merges (upserts) to keep the data up-to-date as new transactions occur (e.g., new orders, inventory updates).
- **Maintain consistency** of the data, especially when working with large datasets across multiple stages like orders, order lines, and inventory management.

Each table corresponds to a specific aspect of the order management workflow:
- **orders_delta**: Stores order details.
- **order_lines_delta**: Contains details about the items in each order.
- **charges_delta**: Manages the charges and fees associated with each order.
- **inventory_delta**: Tracks inventory levels and updates when sales are processed.


In [0]:
%sql
-- Create SQL table for orders data
CREATE TABLE IF NOT EXISTS orders_delta
USING delta
LOCATION '/FileStore/tables/delta/orders_delta';

-- Create SQL table for order lines data
CREATE TABLE IF NOT EXISTS order_lines_delta
USING delta
LOCATION '/FileStore/tables/delta/order_lines_delta';

-- Create SQL table for charges data
CREATE TABLE IF NOT EXISTS charges_delta
USING delta
LOCATION '/FileStore/tables/delta/charges_delta';

-- Create SQL table for inventory data
CREATE TABLE IF NOT EXISTS inventory_delta
USING delta
LOCATION '/FileStore/tables/delta/inventory_delta';

Let's check the invetory before running my main code.

In [0]:
%sql

-- Query to see the inventory data
SELECT * FROM inventory_delta order by 1;

productName,sku,initialInventory
Apple AirPods Pro (2nd Gen),AirPodsPro_2ndGen,180
Apple iPad Pro 12.9-inch,iPadPro_12_9_128GB,75
Bose QuietComfort 45 Headphones,BoseQC45,120
Dyson V15 Detect Cordless Vacuum,DysonV15_Cordless,90
Google Pixel 6 Pro,Pixel6Pro_128GB,60
Nintendo Switch OLED,NintendoSwitch_OLED,80
Open Box Apple Watch Series 8 GPS 41mm Silver Aluminum Case with White Sport Band - S/M,skutotestsparkjobsmartwatch4,50
Samsung Galaxy S21 Ultra,SGS21U_128GB,100
Sony PlayStation 5 Console,PS5_CONSOLE,200
Sony WH-1000XM5 Wireless Headphones,SONY_WHR_1000XM5,150


### First Execution

In [0]:
main()

INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:Loaded JSON data from /FileStore/tables/orders/
INFO:__main__:Orders data successfully extracted and flattened
INFO:__main__:Processing orders data
INFO:__main__:Table /FileStore/tables/delta/orders_delta created and data saved
INFO:__main__:Upsert process completed for /FileStore/tables/delta/orders_delta
INFO:__main__:Processing order lines data
INFO:__main__:Table /FileStore/tables/delta/order_lines_delta created and data saved
INFO:__main__:Upsert process completed for /FileStore/tables/delta/order_lines_delta


+---------------------------------------------------------------------------------------+----------------------------+------------+
|productName                                                                            |sku                         |quantitySold|
+---------------------------------------------------------------------------------------+----------------------------+------------+
|Google Pixel 6 Pro                                                                     |Pixel6Pro_128GB             |1           |
|Apple AirPods Pro (2nd Gen)                                                            |AirPodsPro_2ndGen           |1           |
|Nintendo Switch OLED                                                                   |NintendoSwitch_OLED         |1           |
|Apple iPad Pro 12.9-inch                                                               |iPadPro_12_9_128GB          |1           |
|Sony PlayStation 5 Console                                                 

INFO:__main__:Inventory updated successfully
INFO:__main__:Processing charges data
INFO:__main__:Table /FileStore/tables/delta/charges_delta created and data saved
INFO:__main__:Upsert process completed for /FileStore/tables/delta/charges_delta
INFO:__main__:Moved orders.json to /FileStore/tables/processed/


Now let's check the remaining invetory.

In [0]:
%sql

-- Query to see the inventory data
SELECT * FROM inventory_delta order by 1;

productName,sku,initialInventory
Apple AirPods Pro (2nd Gen),AirPodsPro_2ndGen,179
Apple iPad Pro 12.9-inch,iPadPro_12_9_128GB,74
Bose QuietComfort 45 Headphones,BoseQC45,119
Dyson V15 Detect Cordless Vacuum,DysonV15_Cordless,89
Google Pixel 6 Pro,Pixel6Pro_128GB,59
Nintendo Switch OLED,NintendoSwitch_OLED,79
Open Box Apple Watch Series 8 GPS 41mm Silver Aluminum Case with White Sport Band - S/M,skutotestsparkjobsmartwatch4,49
Samsung Galaxy S21 Ultra,SGS21U_128GB,99
Sony PlayStation 5 Console,PS5_CONSOLE,199
Sony WH-1000XM5 Wireless Headphones,SONY_WHR_1000XM5,148


### Second Execution
It is expected to print message "No files to process"

In [0]:
main()



### Third Execution
Let's add a new order to be processed

In [0]:
dbutils.fs.cp("/FileStore/tables/bkp/order1.json", "/FileStore/tables/orders/order1.json")
main()

INFO:__main__:Loaded JSON data from /FileStore/tables/orders/
INFO:__main__:Orders data successfully extracted and flattened
INFO:__main__:Processing orders data
INFO:__main__:Upsert successful for /FileStore/tables/delta/orders_delta
INFO:__main__:Upsert process completed for /FileStore/tables/delta/orders_delta
INFO:__main__:Processing order lines data
INFO:__main__:Upsert successful for /FileStore/tables/delta/order_lines_delta
INFO:__main__:Upsert process completed for /FileStore/tables/delta/order_lines_delta


+---------------------------+-----------------+------------+
|productName                |sku              |quantitySold|
+---------------------------+-----------------+------------+
|Apple AirPods Pro (2nd Gen)|AirPodsPro_2ndGen|1           |
|Samsung Galaxy S21 Ultra   |SGS21U_128GB     |2           |
+---------------------------+-----------------+------------+



INFO:__main__:Inventory updated successfully
INFO:__main__:Processing charges data
INFO:__main__:Upsert successful for /FileStore/tables/delta/charges_delta
INFO:__main__:Upsert process completed for /FileStore/tables/delta/charges_delta
INFO:__main__:Moved order1.json to /FileStore/tables/processed/


### Results

In [0]:
%sql

-- Query to see the inventory data
SELECT * FROM inventory_delta order by 1;

INFO:py4j.clientserver:Received command c on object id p0


productName,sku,initialInventory
Apple AirPods Pro (2nd Gen),AirPodsPro_2ndGen,178
Apple iPad Pro 12.9-inch,iPadPro_12_9_128GB,74
Bose QuietComfort 45 Headphones,BoseQC45,119
Dyson V15 Detect Cordless Vacuum,DysonV15_Cordless,89
Google Pixel 6 Pro,Pixel6Pro_128GB,59
Nintendo Switch OLED,NintendoSwitch_OLED,79
Open Box Apple Watch Series 8 GPS 41mm Silver Aluminum Case with White Sport Band - S/M,skutotestsparkjobsmartwatch4,49
Samsung Galaxy S21 Ultra,SGS21U_128GB,97
Sony PlayStation 5 Console,PS5_CONSOLE,199
Sony WH-1000XM5 Wireless Headphones,SONY_WHR_1000XM5,148


In [0]:
%sql
-- Query to see all orders
SELECT * FROM orders_delta;


INFO:py4j.clientserver:Received command c on object id p0


purchaseOrderId,customerOrderId,customerEmailId,orderType,orderDate
4792982839409,5681962097195,9336DEB95193429EA032F07770C3E48A@relay.walmart.com,REPLACEMENT,1571903550000
2792982839545,5681963507621,608603150B4049338B773566484C0591@relay.walmart.com,REGULAR,1571903539000
2792982839414,5681962895313,BC058103AF6C400E99FDEC95795F16AF@relay.walmart.com,REPLACEMENT,1571903538000
4792982839305,5681962393943,B100013EF5B6415E97F13A1E2F76D33E@relay.walmart.com,REPLACEMENT,1571903537000
4792982839157,5681962094947,F26C0523EB674390ABD87D8148A9CF9A@relay.walmart.com,REPLACEMENT,1571903536000
4792982839565,5681963200599,B442C0D25766479DA1D98D0BEE18AA4E@relay.walmart.com,REPLACEMENT,1571903536000
4792982839536,5681963403079,C087D231D7F846C3BBE04B1AC869F3FB@relay.walmart.com,REPLACEMENT,1571903535000
3796673088300,5681962299170,A78E13850B234D4599FD6B42DB0EFB19@relay.walmart.com,REGULAR,1571903535000
1796673088779,5681963402652,B7F3DF65F7DB47CD9F4108B5C2BA89BF@relay.walmart.com,REGULAR,1571903535000
4792982839704,5681962096403,B100013EF5B6415E97F13A1E2F76D33E@relay.walmart.com,REGULAR,1571903535000


In [0]:
%sql

-- Query to see all order lines
SELECT * FROM order_lines_delta;


purchaseOrderId,lineNumber,productName,sku,amount,shipMethod
4792982839409,3,Samsung Galaxy S21 Ultra,SGS21U_128GB,1,EXPEDITED
2792982839545,11,Sony WH-1000XM5 Wireless Headphones,SONY_WHR_1000XM5,1,EXPEDITED
2792982839414,4,Apple iPad Pro 12.9-inch,iPadPro_12_9_128GB,1,EXPEDITED
4792982839305,4,Bose QuietComfort 45 Headphones,BoseQC45,1,EXPEDITED
4792982839157,3,Sony PlayStation 5 Console,PS5_CONSOLE,1,EXPEDITED
4792982839565,3,Nintendo Switch OLED,NintendoSwitch_OLED,1,EXPEDITED
4792982839536,4,Apple AirPods Pro (2nd Gen),AirPodsPro_2ndGen,1,EXPEDITED
3796673088300,3,Dyson V15 Detect Cordless Vacuum,DysonV15_Cordless,1,EXPEDITED
1796673088779,3,Google Pixel 6 Pro,Pixel6Pro_128GB,1,EXPEDITED
4792982839704,1,Sony WH-1000XM5 Wireless Headphones,SONY_WHR_1000XM5,1,EXPEDITED


In [0]:
%sql

-- Query to see the inventory data
SELECT * FROM charges_delta;

purchaseOrderId,lineNumber,chargeType,chargeName,amount,currency,taxAmount,taxCurrency,taxName
4792982839409,3,PRODUCT,ItemPrice,1200,USD,7.92,USD,Tax1
4792982839409,3,SHIPPING,Shipping,33,USD,,,
2792982839545,11,PRODUCT,ItemPrice,350,USD,,,
2792982839545,11,SHIPPING,Shipping,60,USD,,,
2792982839414,4,PRODUCT,ItemPrice,1000,USD,8.89,USD,Tax1
2792982839414,4,SHIPPING,Shipping,27,USD,,,
4792982839305,4,PRODUCT,ItemPrice,300,USD,8.9,USD,Tax1
4792982839305,4,SHIPPING,Shipping,19,USD,,,
4792982839157,3,PRODUCT,ItemPrice,500,USD,,,
4792982839157,3,SHIPPING,Shipping,60,USD,,,
