In [1]:
import sys
import os
import yaml
import configparser
import json

import logging

# Add the src directory to the sys.path
sys.path.append(os.path.abspath(os.path.join('..', 'src')))

# 1. Environment Configuration

## 1.1 Import Dependencies

In [2]:
import boto3
import json
import os
from uuid import uuid4
from datetime import datetime
from datetime import timedelta
import time
import random
import uuid
import logging
# import numpy as np

import pyspark.sql.types as t
import pyspark.sql.functions as f

In [3]:
# from spark_session import create_spark_session
from schemas import *
from functions import *

## 1.2 Extract AWS credentials

In [4]:
def load_aws_credentials(profile_name="default"):

    # Load credentials from the .aws/credentials file (local development)
    try:
        credentials = configparser.ConfigParser()
        credentials.read(os.path.join('..', '.aws', 'credentials'))
        
        logging.info("Successfully loaded credentials variables from .aws file.")
    except Exception as e:
        logging.error(f"Error loading .aws file: {e}")
        sys.exit(1)

    aws_access_key_id = credentials[profile_name]["aws_access_key_id"]
    aws_secret_access_key = credentials[profile_name]["aws_secret_access_key"]

    if not aws_access_key_id or not aws_secret_access_key:
        logging.error("AWS credentials not found.")
        sys.exit(1)

    return aws_access_key_id, aws_secret_access_key

aws_access_key_id, aws_secret_access_key = load_aws_credentials()

## 1.3 Extract AWS Config parameters

In [5]:
def load_aws_config():
    """
    Loads AWS configuration settings from the .aws/config file.

    :param profile_name: The profile name in the AWS config file (default: "default").
    :return: The region_name as a string.
    """
    try:
        config = configparser.ConfigParser()
        config.read(os.path.join('..', '.aws', 'config'))
        logging.info("Successfully loaded config variables from .aws file.")

        return config
    except Exception as e:
        logging.error(f"Error loading .aws file: {e}")
        sys.exit(1)

config = load_aws_config()

In [6]:
BUCKET_NAME = "vproptimiserplatform"
ORDERS = "orders"
STREAM_NAME = "orders_stream_5"
REGION = config["default"]["REGION"]

BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"
DELTA = "delta"

PROCESSING_TRIGGER = "5 seconds"

## Append Checkpoints 
EVENTS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/events"
ORDERS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/orders"
ORDERS_ITEMS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/orders_items"

## Update Checkpoints
EVENTS_UPDATE_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/events_update"
ORDERS_UPDATE_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/orders_update"
ORDERS_ITEMS_UPDATE_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/orders_items_update"
PRODUCTS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/inventory_update"

## paths
EVENTS = "events"
ORDERS = "orders"
ORDERS_ITEMS = "orders_items"
PRODUCTS = "products_table"

# TODO TBC medallion architecture??
EVENTS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{EVENTS}"
ORDERS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{ORDERS}"
ORDERS_ITEMS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{ORDERS_ITEMS}"
PRODUCTS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{GOLD}/{PRODUCTS}"

ORDERS_STREAM_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{STREAM_NAME}"
ORDERS_STREAM_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/{STREAM_NAME}"

## Tables
EVENTS_TABLE = f"{EVENTS}_table"
ORDERS_TABLE = f"{ORDERS}_table"
ORDERS_ITEMS_TABLE = f"{ORDERS_ITEMS}_table"

## 1.4 Initialise Spar Session

In [7]:
# from delta import *
# from pyspark.sql import SparkSession
# from pyspark.conf import SparkConf

# def create_spark_session(aws_access_key_id, aws_secret_access_key, cores_number="2"):
#     """
#     Create and configure a Spark session with AWS credentials.
    
#     :param aws_access_key_id: AWS access key ID.
#     :param aws_secret_access_key: AWS secret access key.
#     :param cores_number: Number of cores to use for the Spark session (default is 2).
#     :return: SparkSession
#     """
#     try:
#         # Configure the Spark session
#         # conf = (
#         #     SparkConf()
#         #     .setAppName("VPR-data_suscriber")
#         #     .set("spark.hadoop.fs.s3a.endpoint", "s3.eu-south-2.amazonaws.com")
#         #     .set("spark.hadoop.fs.s3a.access.key", aws_access_key_id)
#         #     .set("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key)
#         #     .set(
#         #         "spark.jars.packages", 
#         #         "io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.4.0,org.apache.spark:spark-sql-kinesis_2.12:3.5.1") # ,org.apache.spark:spark-streaming-kinesis-asl_2.12:3.4.2") # ,org.apache.spark:spark-sql-kinesis_2.12:3.5.0 # https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar") # /Users/miguelgranica/Documents/MBIT-DE/vpr-data_suscriber/jars/spark-streaming-kinesis-asl_2.12-3.5.3.jar") # 
#         #     # .set(
#         #     #     "spark.jars", "https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar")
#         #     #     # "/Users/miguelgranica/Documents/MBIT-DE/vpr-data_suscriber/jars/spark-streaming-kinesis-asl_2.12-3.5.3.jar,/Users/miguelgranica/Documents/MBIT-DE/vpr-data_suscriber/jars/aws-java-sdk-1.12.500.jar")
#         #     .set("spark.sql.streaming.aws.kinesis.endpointUrl", "https://kinesis.eu-south-2.amazonaws.com")
#         #     .set("spark.kinesis.connection.endpoint", "https://kinesis.eu-south-2.amazonaws.com")
#         #     .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
#         #     .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
#         #     .setMaster(f"local[{cores_number}]")
#         # )
#         conf = (
#             SparkConf()
#             .setAppName("VPR-data_suscriber")
#             .set("spark.hadoop.fs.s3a.endpoint", "s3.eu-south-2.amazonaws.com")
#             .set("spark.hadoop.fs.s3a.access.key", aws_access_key_id)
#             .set("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key)
#             .set(
#                 "spark.jars.packages", 
#                 "org.apache.spark:spark-sql-kinesis_2.12:3.5.1,io.delta:delta-core_2.12:3.2.0,org.apache.hadoop:hadoop-aws:3.3.1")
#             .set("spark.jars", "/Users/miguelgranica/Documents/MBIT-DE/vpr-data_suscriber/jars/spark-streaming-kinesis-asl_2.12-3.5.1.jar") # /path/to/spark-sql-kinesis-connector_2.12-3.4.2.jar,
#             .set("spark.sql.streaming.aws.kinesis.endpointUrl", "https://kinesis.eu-south-2.amazonaws.com")
#             .set("spark.kinesis.connection.endpoint", "https://kinesis.eu-south-2.amazonaws.com")
#             .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
#             .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
#             .set("spark.driver.memory", "4g")
#             .setMaster(f"local[{cores_number}]")
#         )
        
#         # Build the Spark session
#         builder = SparkSession.builder.config(conf=conf)
#         my_packages = ["org.apache.hadoop:hadoop-aws:3.3.1"]
#         spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

#         return spark

#     except Exception as e:
#         print(f"An error occurred while creating the Spark session: {str(e)}")
#         raise  # Re-raise the exception after logging or handling

# spark = create_spark_session(aws_access_key_id, aws_secret_access_key)

In [8]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

def create_spark_session(aws_access_key_id, aws_secret_access_key, cores_number="2"):
    """
    Create and configure a Spark session with AWS credentials.
    
    :param aws_access_key_id: AWS access key ID.
    :param aws_secret_access_key: AWS secret access key.
    :param cores_number: Number of cores to use for the Spark session (default is 2).
    :return: SparkSession
    """
    try:
        # Configure the Spark session
        conf = (
            SparkConf()
            .setAppName("VPR-data_landing")
            .set("spark.hadoop.fs.s3a.endpoint", "s3.eu-south-2.amazonaws.com")
            .set("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0,org.apache.hadoop:hadoop-aws:3.3.2")
            .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .set("spark.hadoop.fs.s3a.access.key", aws_access_key_id)
            .set("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key)
            .setMaster(f"local[{cores_number}]")  # replace the * with your desired number of cores. * for use all.
        )
        
        # Build the Spark session
        builder = SparkSession.builder.config(conf=conf)
        my_packages = ["org.apache.hadoop:hadoop-aws:3.3.1"]
        spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

        return spark

    except Exception as e:
        print(f"An error occurred while creating the Spark session: {str(e)}")
        raise  # Re-raise the exception after logging or handling
spark = create_spark_session(aws_access_key_id, aws_secret_access_key)

24/10/02 20:39:10 WARN Utils: Your hostname, Miguels-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.16 instead (on interface en0)
24/10/02 20:39:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/miguelgranica/Documents/MBIT-DE/vpr-data_suscriber/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/miguelgranica/.ivy2/cache
The jars for the packages stored in: /Users/miguelgranica/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7cb2aa49-4c25-48e2-9e19-530705e05b75;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.1 in central
	found io.delta#delta-storage;3.2.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 131ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	io.delta#delta-spark_2.12;3.2.1 from central in [default]
	io.delta#delta-storage;3.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	

In [9]:
import boto3

kinesis_client = boto3.client('kinesis', region_name='eu-south-2', 
                              aws_access_key_id=aws_access_key_id, 
                              aws_secret_access_key=aws_secret_access_key)

response = kinesis_client.describe_stream(StreamName=STREAM_NAME)
print(response)

{'StreamDescription': {'StreamName': 'orders_stream_5', 'StreamARN': 'arn:aws:kinesis:eu-south-2:034780493640:stream/orders_stream_5', 'StreamStatus': 'ACTIVE', 'StreamModeDetails': {'StreamMode': 'PROVISIONED'}, 'Shards': [{'ShardId': 'shardId-000000000000', 'HashKeyRange': {'StartingHashKey': '0', 'EndingHashKey': '340282366920938463463374607431768211455'}, 'SequenceNumberRange': {'StartingSequenceNumber': '49656388594579061284162184228423832529993071248229269506'}}], 'HasMoreShards': False, 'RetentionPeriodHours': 24, 'StreamCreationTimestamp': datetime.datetime(2024, 10, 2, 19, 4, 54, tzinfo=tzlocal()), 'EnhancedMonitoring': [{'ShardLevelMetrics': []}], 'EncryptionType': 'NONE'}, 'ResponseMetadata': {'RequestId': 'c9252949-86c4-ef23-9652-bc8561cea89c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c9252949-86c4-ef23-9652-bc8561cea89c', 'x-amz-id-2': 'QTHR8l36qkYLLb8MtyT1e11yb6Bnm58VLXxbn+ftHPYgjTEyajOC4ykcvnIWwpeIY66Lu8bsRMAEuXcnYIzo9iXvpxKTQFwk', 'date': 'Wed, 02 Oct

In [10]:
# kinesis_order_stream = (
#     spark
#     .readStream
#     .format("aws-kinesis")
#     .option("streamName", STREAM_NAME)
#     # .option("kinesis.endpointUrl", "https://kinesis.eu-south-2.amazonaws.com")
#     # .option("streamName", STREAM_NAME)
#     .option("awsAccessKeyId", aws_access_key_id)
#     .option("awsSecretKey", aws_secret_access_key)
#     # .option("format", "json")
#     .option("region", REGION)
#     .option("startingPosition", "trim_horizon")
#     # .option("startingPosition", "latest")
#     .load()
# )

# kinesis_order_stream = (
#     spark
#     .readStream
#     .format("aws-kinesis")
#     .option("kinesis.streamName", STREAM_NAME)
#     .option("kinesis.endpointUrl", "https://kinesis.eu-south-2.amazonaws.com")
#     .option("streamName", STREAM_NAME)
#     .option("awsAccessKeyId", aws_access_key_id)
#     .option("awsSecretKey", aws_secret_access_key)
#     # .option("format", "json")
#     .option("region", REGION)
#     .option("startingPosition", "trim_horizon")
#     # .option("startingPosition", "latest")
#     .load()
# )

In [11]:
# kinesis_order_stream.printSchema()

In [12]:
# (
#     kinesis_order_stream
#     .withColumn("json_data", f.expr("CAST(unbase64(data) AS STRING)"))
#     .select("json_data")
#     .writeStream
#     .format("console")
#     .outputMode("append")
#     .start()
# )

In [13]:
# df_stream_test = ( 
#     kinesis_order_stream.writeStream
#     .outputMode("append")
#     .queryName("kinesis_orders")
#     .option("checkpointLocation", "checkpoints/kineisis_orders_tests_13")
#     .option("kinesis.endpointUrl", "https://kinesis.eu-south-2.amazonaws.com")
#     .format("memory")
#     .start()
# )

# spark.table("kinesis_orders").show()

In [14]:
# df_orders_stream = (
#     kinesis_order_stream
#     .withColumn("json_data", f.expr("CAST(unbase64(data) AS STRING)"))
#     .withColumn("orders", f.from_json("json_data", orders_schema))
#     .select("orders.*")
# )

In [15]:
# (
#     df_orders_stream
#     .writeStream
#     .format("console")
#     .outputMode("append")
#     .start()
# )

In [16]:
# df_orders_stream.printSchema()

In [17]:
# client = boto3.client(
#     'kinesis',
#     aws_access_key_id=aws_access_key_id,
#     aws_secret_access_key=aws_secret_access_key,
#     region_name=REGION
# )

# response = client.describe_stream(StreamName=STREAM_NAME)
# print(response)

In [18]:
# client = boto3.client(
#     'kinesis',
#     aws_access_key_id=aws_access_key_id,
#     aws_secret_access_key=aws_secret_access_key,
#     region_name=REGION
# )

# # Get the shard iterator for the stream
# response = client.describe_stream(StreamName=STREAM_NAME)
# shard_id = response['StreamDescription']['Shards'][0]['ShardId']

# shard_iterator_response = client.get_shard_iterator(
#     StreamName=STREAM_NAME,
#     ShardId=shard_id,
#     ShardIteratorType='TRIM_HORIZON'  # Use 'LATEST' for most recent records
# )
# shard_iterator = shard_iterator_response['ShardIterator']

# # Fetch records
# record_response = client.get_records(ShardIterator=shard_iterator, Limit=100)
# record_response

In [19]:
# response['ResponseMetadata']

# 2. Orders Stream Suscriber workarround / Connection to kinesis failed

In [20]:
STREAM_NAME

'orders_stream_5'

In [21]:
kinesis_client = boto3.client('kinesis', region_name='eu-south-2', 
                              aws_access_key_id=aws_access_key_id, 
                              aws_secret_access_key=aws_secret_access_key)
response = kinesis_client.describe_stream(StreamName=STREAM_NAME)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']

shard_iterator = kinesis_client.get_shard_iterator(
    StreamName=STREAM_NAME,
    ShardId="shardId-000000000000",
    ShardIteratorType="TRIM_HORIZON"
)['ShardIterator']

response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=100)
shard_iterator = response["NextShardIterator"]
response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=100)
records = response['Records']

# Convert the Kinesis records into a list of JSON strings
records

[{'SequenceNumber': '49656388594579061284162187226314453232891665998355103746',
  'ApproximateArrivalTimestamp': datetime.datetime(2024, 10, 2, 19, 7, 34, 746000, tzinfo=tzlocal()),
  'Data': b'{"event_id": "ev-129e4bee-4ee7-4e5b-bd65-cbbfc68c6b6a", "event_type": "ORDER_CREATED", "event_timestamp": "2024-10-02 19:07:34", "order_id": "ord-c66b4563-157f-4181-b7b7-1394b1daa6d9", "order_details": {"customer_id": "cus-3323e045-f7a9-4bed-811a-35e4182c97d1", "order_timestamp": "2024-10-02 19:07:34", "order_date": "2024-10-02", "items": [{"product_id": "prod-6d585742-4e78-41af-b042-42ff1b035ecd", "product_name": "H\\u00d6GADAL", "price": 60.0, "quantity": 1, "packages": [{"package_id": "80533283", "subpackage_id": 1, "quantity": 1, "weight": 4.619999885559082, "volume": 25707.0}]}, {"product_id": "prod-d5ce1c62-4057-46fc-a4ac-eb5bb8458fa1", "product_name": "BILLY", "price": 44.9900016784668, "quantity": 3, "packages": [{"package_id": "30263844", "subpackage_id": 1, "quantity": 1, "weight": 19.

In [22]:
def get_kinesis_client(aws_access_key_id, aws_secret_access_key, region_name):
    """Initialize the Kinesis Boto3 client with error handling."""
    try:
        kinesis_client = boto3.client(
            'kinesis',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=region_name
        )
        print("Kinesis client initialized successfully.")
        return kinesis_client
    except Exception as e:
        print(f"Failed to initialize Kinesis client: {e}")
        return None

def fetch_kinesis_data(kinesis_client, stream_name, orders_schema, shard_iterator_type = "TRIM_HORIZON"):
    """
    Fetch records from a Kinesis stream and convert them into a Spark DataFrame.

    Parameters:
    - kinesis_client: Boto3 Kinesis client instance.
    - stream_name: Name of the Kinesis stream to read from.
    - orders_schema: Spark DataFrame schema to apply to the incoming records.
    - shard_iterator_type: Use 'LATEST' for most recent records

    Returns:
    - Spark DataFrame containing the fetched records.
    """
    try:
        # Get the shard iterator for the stream
        response = kinesis_client.describe_stream(StreamName=stream_name)
        shard_id = response['StreamDescription']['Shards'][0]['ShardId']

        shard_iterator = kinesis_client.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType=shard_iterator_type
        )['ShardIterator']

        response = kinesis_client.get_records(ShardIterator=shard_iterator)
        shard_iterator = response["NextShardIterator"]
        response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=100)
        records = response['Records']

        # Convert the Kinesis records into a list of JSON strings
        records_data = [json.loads(record['Data']) for record in records]

        # Create a Spark DataFrame from the fetched records
        df = spark.createDataFrame(records_data, schema=orders_schema)

        return df

    except kinesis_client.exceptions.ResourceNotFoundException:
        print(f"Stream {stream_name} not found.")
    except kinesis_client.exceptions.ProvisionedThroughputExceededException:
        print("Throughput limit exceeded, please try again later.")
    except kinesis_client.exceptions.InvalidArgumentException as e:
        print(f"Invalid argument: {e}")
    except Exception as e:
        print(f"An error occurred while fetching data from Kinesis: {e}")
        raise

def save_df_as_delta(df, table_path):
    """
    Saves a PySpark DataFrame as a Delta table.

    Parameters:
    - df: The DataFrame to be saved.
    - table_path: The path for the Delta table.

    Raises:
    - Exception: If the saving process fails.
    """
    try:
        # Save the DataFrame to a Delta table at the specified path
        (
            df
            .write
            .format("delta")
            .mode("append")
            .save(table_path)
        )
        print(f"DataFrame successfully saved as Delta table at: {table_path}")
    except Exception as e:
        raise Exception(f"Failed to save DataFrame as Delta table: {str(e)}")

def read_delta_table_as_stream(delta_table_path):
    """
    Reads a Delta table as a streaming DataFrame.

    Parameters:
    - delta_table_path: The path to the Delta table.

    Returns:
    - A streaming DataFrame representing the Delta table.

    Raises:
    - Exception: If the streaming read process fails.
    """
    try:
        # Read the Delta table as a streaming DataFrame
        streaming_df = (
            spark.readStream
            .format("delta")
            .load(delta_table_path)
        )
        
        return streaming_df  # Returning the streaming DataFrame
    except Exception as e:
        raise Exception(f"Failed to read Delta table as stream: {str(e)}")

# def get_max_timestamp(delta_table_path):
#     """
#     Extracts the maximum timestamp from the Delta table.

#     Parameters:
#     - delta_table_path: The path to the Delta table.

#     Returns:
#     - The maximum timestamp found in the table or None if no records are present.

#     Raises:
#     - Exception: If the maximum timestamp extraction fails.
#     """
#     try:
#         # Read the Delta table to find the maximum timestamp
#         max_timestamp_df = (
#             spark.read
#             .format("delta")
#             .load(delta_table_path)
#             .select(f.max("timestamp").alias("max_timestamp"))
#         )
        
#         # Collect the maximum timestamp value
#         max_timestamp_row = max_timestamp_df.collect()
#         max_timestamp = max_timestamp_row[0]['max_timestamp'] if max_timestamp_row else None

#         return max_timestamp
    
#     except Exception as e:
#         raise Exception(f"Failed to get max timestamp: {str(e)}")

# def read_delta_table_as_stream(delta_table_path, max_date):
#     """
#     Reads a Delta table as a streaming DataFrame and filters by the maximum date.

#     Parameters:
#     - delta_table_path: The path to the Delta table.
#     - max_date: The maximum date to filter the timestamp column.

#     Returns:
#     - A streaming DataFrame representing the Delta table, filtered by the max date.

#     Raises:
#     - Exception: If the streaming read process fails.
#     """
#     try:
#         # Read the Delta table as a streaming DataFrame
#         streaming_df = (
#             spark.readStream
#             .format("delta")
#             .load(delta_table_path)
#         )
        
#         # Filter by the maximum date for the timestamp column
#         filtered_streaming_df = streaming_df.filter(f.col("timestamp") <= f.lit(max_date))
        
#         return filtered_streaming_df  # Returning the filtered streaming DataFrame
#     except Exception as e:
#         raise Exception(f"Failed to read Delta table as stream: {str(e)}")

## 2.1 Kinesis Client

In [23]:
kinesis_client = get_kinesis_client(aws_access_key_id, aws_secret_access_key, REGION)

Kinesis client initialized successfully.


In [24]:
kinesis_client.list_streams()

{'StreamNames': ['orders_stream_5'],
 'HasMoreStreams': False,
 'StreamSummaries': [{'StreamName': 'orders_stream_5',
   'StreamARN': 'arn:aws:kinesis:eu-south-2:034780493640:stream/orders_stream_5',
   'StreamStatus': 'ACTIVE',
   'StreamModeDetails': {'StreamMode': 'PROVISIONED'},
   'StreamCreationTimestamp': datetime.datetime(2024, 10, 2, 19, 4, 54, tzinfo=tzlocal())}],
 'ResponseMetadata': {'RequestId': 'cc708780-8c14-35c9-9307-1255cb134351',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cc708780-8c14-35c9-9307-1255cb134351',
   'x-amz-id-2': 'K1bIJHZrXmCD4VHVHWBOlKShpMaW5stXzMCo5oDDDvxLdUmwtKAX5jaq8n+2BqfNPom7/RS1DxvpwQpG1MyHnnpKgduZTxpY',
   'date': 'Wed, 02 Oct 2024 18:40:21 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '301',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

## 2.2 Fetch Kinesis Data

In [73]:
orders_schema

StructType([StructField('event_id', StringType(), True), StructField('event_type', StringType(), True), StructField('event_timestamp', StringType(), True), StructField('order_id', StringType(), True), StructField('order_details', StructType([StructField('customer_id', StringType(), True), StructField('order_timestamp', StringType(), True), StructField('order_date', StringType(), True), StructField('items', ArrayType(StructType([StructField('product_id', StringType(), True), StructField('product_name', StringType(), True), StructField('price', DoubleType(), True), StructField('quantity', IntegerType(), True), StructField('packages', ArrayType(StructType([StructField('package_id', StringType(), True), StructField('subpackage_id', IntegerType(), True), StructField('quantity', IntegerType(), True), StructField('weight', DoubleType(), True), StructField('volume', DoubleType(), True)]), True), True)]), True), True), StructField('total_amount', DoubleType(), True), StructField('total_volume',

In [25]:
df_orders = fetch_kinesis_data(kinesis_client, STREAM_NAME, orders_schema)

In [26]:
(
    df_orders
    .orderBy("event_timestamp", ascending=False)
).show(15)

                                                                                

+--------------------+-------------+-------------------+--------------------+--------------------+
|            event_id|   event_type|    event_timestamp|            order_id|       order_details|
+--------------------+-------------+-------------------+--------------------+--------------------+
|ev-9578099b-581b-...|ORDER_CREATED|2024-10-02 19:09:43|ord-eaf26ecd-135c...|{cus-72ad7c45-602...|
|ev-38404fe2-556c-...|ORDER_CREATED|2024-10-02 19:09:27|ord-72065503-f3e8...|{cus-863f389f-9ac...|
|ev-76a38d9d-07da-...|ORDER_CREATED|2024-10-02 19:09:10|ord-bc01e71b-539d...|{cus-79bec36a-216...|
|ev-4b0a4657-1f61-...|ORDER_CREATED|2024-10-02 19:08:54|ord-16fe4cf5-6238...|{cus-2f6ff38a-54c...|
|ev-8aba5d1a-e27a-...|ORDER_CREATED|2024-10-02 19:08:42|ord-bc303b78-fa7a...|{cus-6e9a20d7-241...|
|ev-df2651dc-7e9f-...|ORDER_CREATED|2024-10-02 19:08:29|ord-fa9fd3ea-b4e4...|{cus-19a9a914-7f3...|
|ev-b4675ca8-76f3-...|ORDER_CREATED|2024-10-02 19:08:12|ord-b8c7c434-16d9...|{cus-3d4bb4a4-414...|
|ev-6a2793

In [27]:
# df_orders.printSchema()

In [28]:
# df_orders = (
#     spark.read.format("delta").load(ORDERS_STREAM_PATH)
# )
# df_orders.show()

In [29]:
## Orders

# (
#     df_orders
#     .select(
#         f.col("order_id"),
#         f.col("order_details.customer_id").alias("customer_id"),
#         f.col("order_details.total_weight").alias("total_weight"),
#         f.col("order_details.total_volume").alias("total_volume"),
#         f.col("order_details.total_amount").alias("total_price"),
#         f.col("order_details.order_timestamp").alias("order_timestamp"),
#         f.col("order_details.status").alias("status"),
#         f.col("order_details.destination_address.lat").alias("lat"),
#         f.col("order_details.destination_address.lon").alias("lon"),
#     )
# ).show()

In [30]:
## Orders items

# (
#     df_orders
#     .withColumn("order_exploded", f.explode(f.col("order_details.items")))
#     .withColumn("package_exploded", f.explode(f.col("order_exploded.packages")))
#     .withColumn("inventory_id", f.concat(f.lit("inv-"), f.expr("uuid()")))
#     .withColumn("items_quantity", f.col("order_exploded.quantity") * f.col("package_exploded.quantity"))
#     .withColumn("items_weight", f.col("items_quantity") * f.col("package_exploded.weight"))
#     .withColumn("items_volume", f.col("items_quantity") * f.col("package_exploded.volume"))
#     .withColumn("status", f.lit("PENDING"))
#     .select(
#         f.col("inventory_id"),
#         f.col("order_id"),
#         f.col("order_details.total_weight").alias("order_total_weight"),
#         f.col("order_exploded.product_id").alias("product_id"),
#         f.col("order_exploded.product_name").alias("product_name"),
#         f.col("order_exploded.price").alias("order_price"),
#         f.col("package_exploded.package_id").alias("package_id"),
#         f.col("package_exploded.subpackage_id").alias("subpackage_id"),
#         # f.col("package_exploded.weight").alias("package_weight"),
#         # f.col("package_exploded.volume").alias("package_volume"),
#         # f.col("order_exploded.quantity").alias("order_quantity"),
#         # f.col("package_exploded.quantity").alias("package_quantity"),
#         f.col("items_quantity"),
#         f.col("items_weight"),
#         f.col("items_volume"),
#         # f.col("order_details.order_timestamp").alias("order_timestamp"),
#         # f.col("status")
        
#     )
# ).show()

In [31]:
## Query review
# (
#     df_orders
#     .withColumn("order_exploded", f.explode(f.col("order_details.items")))
#     .withColumn("package_exploded", f.explode(f.col("order_exploded.packages")))
#     .where(f.col("package_exploded.package_id") == f.lit("90401932"))
#     .where(f.col("order_id")==f.lit("ord-63697999-e28f-42bc-966d-ea008fdc5ffa"))
#     .select(
#         # f.col("order_exploded"),
#         f.col("order_id"),
#         f.col("order_details.total_weight").alias("order_total_weight"),
#         f.col("order_exploded.product_id").alias("product_id"),
#         f.col("order_exploded.product_name").alias("product_name"),
#         f.col("order_exploded.price").alias("order_price"),
#         f.col("package_exploded.package_id").alias("package_id"),
#         f.col("package_exploded.subpackage_id").alias("subpackage_id"),
#         f.col("package_exploded.weight").alias("package_weight"),
#         f.col("package_exploded.volume").alias("package_volume"),
#         f.col("order_exploded.quantity").alias("order_quantity"),
#         f.col("package_exploded.quantity").alias("package_quantity"),
#         f.col("order_details.order_timestamp").alias("order_timestamp"),
#         f.col("order_details.status").alias("status"),
        
#     )
# ).show(truncate=False)

In [32]:
# (
#     df_orders
#     .withColumn("order_details_explode", f.explode(f.col("order_details.items")))
#     .select(
#         f.col("order_id"),
#     )
# ).show()

## 2.3 Save & Read Orders Structured Stream

In [33]:
save_df_as_delta(df_orders, ORDERS_STREAM_PATH)

24/10/02 20:41:23 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

DataFrame successfully saved as Delta table at: s3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5


In [34]:
df_orders_stream = read_delta_table_as_stream(ORDERS_STREAM_PATH)

In [74]:
# df_orders_stream.printSchema()


's3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5'

# 3. Write Orders Stream

In [36]:
def process_events_stream(df_order_stream, events_path, checkpoint_location):
    """
    Processes the order stream by selecting event-specific columns and writing them 
    as a stream into a Delta table.

    Parameters:
    - df_order_stream: The input PySpark DataFrame containing order stream data.
    - events_path: The path where the Delta table for events should be written.
    - checkpoint_location: The location where checkpoint data will be stored for fault-tolerance.

    Raises:
    - Exception: If the streaming process fails or encounters an error.
    """
    try:
        # Define the stream transformation and writing process
        events_stream = (
            df_order_stream
            .select(
                f.col("event_id"),
                f.col("event_type"),
                f.col("event_timestamp"),
                f.col("order_id")
            )
            .writeStream
            .format("delta")
            .outputMode("append")
            .trigger(once=True)  # Using trigger(once=True) as per requirement
            .option("path", events_path)
            .option("checkpointLocation", checkpoint_location)
            .start()
        )
        
        # Log the status of the streaming process
        logging.info(f"Streaming process status: {events_stream.status}")
        
        print(f"Events stream successfully written to {events_path}")

        # return stream
        return events_stream
        
    except Exception as e:
        # Log the error and raise an exception
        logging.error(f"Failed to process events stream: {str(e)}")
        raise Exception(f"Failed to process events stream: {str(e)}")

def process_orders_stream(df_order_stream, orders_path, checkpoint_location):
    """
    Processes the order stream by selecting specific columns related to orders and writing them 
    as a stream into a Delta table.

    Parameters:
    - df_order_stream: The input PySpark DataFrame containing order stream data.
    - orders_path: The path where the Delta table for orders should be written.
    - checkpoint_location: The location where checkpoint data will be stored for fault-tolerance.

    Raises:
    - Exception: If the streaming process fails or encounters an error.
    """    
    try:
        # Define the stream transformation and writing process
        orders_stream = (
            df_order_stream
            .select(
                f.col("order_id"),
                f.col("order_details.customer_id").alias("customer_id"),
                f.col("order_details.total_weight").alias("total_weight"),
                f.col("order_details.total_volume").alias("total_volume"),
                f.col("order_details.total_amount").alias("total_price"),
                f.col("order_details.order_timestamp").alias("order_timestamp"),
                f.col("order_details.status").alias("status"),
                f.col("order_details.destination_address.lat").alias("lat"),
                f.col("order_details.destination_address.lon").alias("lon")
            )
            .writeStream
            .format("delta")
            .outputMode("append")
            .trigger(once=True)  # Using trigger(once=True) as per requirement
            .option("path", orders_path)
            .option("checkpointLocation", checkpoint_location)
            .start()
        )
        
        # Log the status of the streaming process
        logging.info(f"Streaming process status: {orders_stream.status}")
        
        print(f"Orders stream successfully written to {orders_path}")

        # Return Stream
        return orders_stream
        
    except Exception as e:
        # Log the error and raise an exception
        logging.error(f"Failed to process orders stream: {str(e)}")
        raise Exception(f"Failed to process orders stream: {str(e)}")

def process_orders_items_stream(df_order_stream: DataFrame) -> DataFrame:
    """
    Transforms the incoming order stream DataFrame by exploding order items,
    generating inventory IDs, and adding a status column.

    Parameters:
    - df_order_stream (DataFrame): Input DataFrame representing the order stream.

    Returns:
    - DataFrame: Transformed streaming DataFrame with the required columns.

    Raises:
    - ValueError: If the input DataFrame is empty or has unexpected schema.
    """    
    try:
        # Validate input streaming DataFrame
        # if df_order_stream is None or len(df_order_stream.columns) == 0:
        #     raise ValueError("Input DataFrame is empty or not provided")

        # Apply the transformation logic for the streaming DataFrame
        df_orders_items_stream = (
            df_order_stream            
            .withColumn("order_exploded", f.explode(f.col("order_details.items")))
            .withColumn("package_exploded", f.explode(f.col("order_exploded.packages")))
            .withColumn("inventory_id", f.concat(f.lit("inv-"), f.expr("uuid()")))
            .withColumn("items_quantity", f.col("order_exploded.quantity") * f.col("package_exploded.quantity"))
            .withColumn("items_weight", f.col("items_quantity") * f.col("package_exploded.weight"))
            .withColumn("items_volume", f.col("items_quantity") * f.col("package_exploded.volume"))
            .withColumn("status", f.lit("PENDING"))
            .select(
                f.col("inventory_id"),
                f.col("order_id"),
                f.col("order_exploded.product_id").alias("product_id"),
                f.col("order_exploded.product_name").alias("product_name"),
                f.col("order_exploded.price").alias("order_price"),
                f.col("package_exploded.package_id").alias("package_id"),
                f.col("package_exploded.subpackage_id").alias("subpackage_id"),
                f.col("items_quantity"),
                f.col("items_weight"),
                f.col("items_volume"),
                f.col("order_details.order_timestamp").alias("order_timestamp"),
                f.col("status")
            )
        )
        # Log success message
        logging.info("Transformation applied successfully to the streaming DataFrame.")
        return df_orders_items_stream

    except Exception as e:
        logging.error(f"Error occurred during transformation: {str(e)}")
        raise  # Re-raise exception for further handling


def write_orders_items_stream(df_orders_items_stream: DataFrame, path: str, checkpoint_location: str):
    """
    Writes the transformed order items stream DataFrame to Delta format.

    Parameters:
    - df_orders_items_stream (DataFrame): Input streaming DataFrame representing the transformed order items stream.
    - path (str): The destination path for the Delta table.
    - checkpoint_location (str): The location for checkpointing the stream.

    Returns:
    - StreamingQuery: The StreamingQuery object representing the started stream.

    Raises:
    - ValueError: If the input DataFrame is empty or has unexpected schema.
    - Exception: For any other errors during stream initialization.
    """
    try:
        # Validate input streaming DataFrame
        # if df_orders_items_stream is None or len(df_orders_items_stream.columns) == 0:
        #     raise ValueError("Input DataFrame is empty or not provided")

        # Configure and start the streaming write
        orders_items_stream = (
            df_orders_items_stream
            .writeStream
            .format("delta")
            .outputMode("append")
            .trigger(once=True)  # Use `once=True` for single run
            .option("path", path)
            .option("checkpointLocation", checkpoint_location)
            .start()
        )

        # Log success message
        logging.info("Stream started successfully and data is being written to the Delta table.")
        return orders_items_stream

    except Exception as e:
        logging.error(f"Error occurred during stream write operation: {str(e)}")
        raise  # Re-raise exception for further handling

## 3.1 Process Events Stream

In [37]:
events_stream = process_events_stream(df_orders_stream, EVENTS_PATH, EVENTS_CHECKPOINT_LOCATION)

24/10/02 20:41:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Events stream successfully written to s3a://vproptimiserplatform/orders/delta/bronze/events


In [75]:
EVENTS_PATH, EVENTS_CHECKPOINT_LOCATION

('s3a://vproptimiserplatform/orders/delta/bronze/events',
 's3a://vproptimiserplatform/orders/delta/checkpoints/events')

In [38]:
events_stream.lastProgress

In [39]:
events_stream.status

{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [41]:
(
    spark
    .read
    .format("delta")
    .load(EVENTS_PATH)
    .orderBy("event_timestamp", ascending=False)
).show()

                                                                                

+--------------------+-------------+-------------------+--------------------+
|            event_id|   event_type|    event_timestamp|            order_id|
+--------------------+-------------+-------------------+--------------------+
|ev-9578099b-581b-...|ORDER_CREATED|2024-10-02 19:09:43|ord-eaf26ecd-135c...|
|ev-38404fe2-556c-...|ORDER_CREATED|2024-10-02 19:09:27|ord-72065503-f3e8...|
|ev-76a38d9d-07da-...|ORDER_CREATED|2024-10-02 19:09:10|ord-bc01e71b-539d...|
|ev-4b0a4657-1f61-...|ORDER_CREATED|2024-10-02 19:08:54|ord-16fe4cf5-6238...|
|ev-8aba5d1a-e27a-...|ORDER_CREATED|2024-10-02 19:08:42|ord-bc303b78-fa7a...|
|ev-df2651dc-7e9f-...|ORDER_CREATED|2024-10-02 19:08:29|ord-fa9fd3ea-b4e4...|
|ev-b4675ca8-76f3-...|ORDER_CREATED|2024-10-02 19:08:12|ord-b8c7c434-16d9...|
|ev-6a279317-7e6c-...|ORDER_CREATED|2024-10-02 19:07:59|ord-b6e3cca0-5f42...|
|ev-5b601392-0022-...|ORDER_CREATED|2024-10-02 19:07:46|ord-d2de3e01-b5f2...|
|ev-129e4bee-4ee7-...|ORDER_CREATED|2024-10-02 19:07:34|ord-c66b

## 3.2 Process Orders Stream

In [42]:
orders_stream = process_orders_stream(df_orders_stream, ORDERS_PATH, ORDERS_CHECKPOINT_LOCATION)

24/10/02 20:42:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Orders stream successfully written to s3a://vproptimiserplatform/orders/delta/bronze/orders


24/10/02 20:42:12 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5] is ignored when Trigger.Once is used.
                                                                                

In [43]:
orders_stream.lastProgress

In [77]:
orders_stream.status

's3a://vproptimiserplatform/orders/delta/checkpoints/orders'

In [72]:
(
    spark
    .read
    .format("delta")
    .load(ORDERS_PATH)
).show(30,truncate=False)

+----------------------------------------+----------------------------------------+------------------+------------+------------------+-------------------+--------+------------------+-------------------+
|order_id                                |customer_id                             |total_weight      |total_volume|total_price       |order_timestamp    |status  |lat               |lon                |
+----------------------------------------+----------------------------------------+------------------+------------+------------------+-------------------+--------+------------------+-------------------+
|ord-c66b4563-157f-4181-b7b7-1394b1daa6d9|cus-3323e045-f7a9-4bed-811a-35e4182c97d1|108.41999816894531|381936.0    |359.95000076293945|2024-10-02 19:07:34|RECEIVED|40.39339065551758 |-3.7561213970184326|
|ord-d2de3e01-b5f2-4166-b0b5-78e1bb7f2952|cus-bb6b538c-0b66-49aa-ac02-550ae393a760|20.419999659061432|57064.0     |144.0             |2024-10-02 19:07:46|RECEIVED|40.519107818603516|-3.788

24/10/02 22:38:31 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 948604 ms exceeds timeout 120000 ms
24/10/02 22:38:31 WARN SparkContext: Killing executors is not supported by current scheduler.
24/10/02 22:38:35 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

## 3.3 Process Orders Iventory Stream

In [46]:
df_orders_items_stream = process_orders_items_stream(df_orders_stream)
write_orders_items_stream(df_orders_items_stream, ORDERS_ITEMS_PATH, ORDERS_ITEMS_CHECKPOINT_LOCATION)

24/10/02 20:42:37 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x108cbbbe0>

24/10/02 20:42:38 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5] is ignored when Trigger.Once is used.


In [51]:
(
    spark
    .read
    .format("delta")
    .load(ORDERS_ITEMS_PATH)
).show(40)

+--------------------+--------------------+--------------------+------------+------------------+----------+-------------+--------------+-------------------+------------+-------------------+-------+
|        inventory_id|            order_id|          product_id|product_name|       order_price|package_id|subpackage_id|items_quantity|       items_weight|items_volume|    order_timestamp| status|
+--------------------+--------------------+--------------------+------------+------------------+----------+-------------+--------------+-------------------+------------+-------------------+-------+
|inv-f8baec1d-8769...|ord-bc303b78-fa7a...|prod-924f611b-c45...|     SKRUVBY|248.99000549316406|  20503547|            1|             3|  91.64999771118164|    174096.0|2024-10-02 19:08:42|PENDING|
|inv-94d66dd2-25cb...|ord-bc303b78-fa7a...|prod-924f611b-c45...|     SKRUVBY|248.99000549316406|  60568725|            1|             3|  74.28000068664551|    124488.0|2024-10-02 19:08:42|PENDING|
|inv-2b2fa

In [78]:
ORDERS_ITEMS_PATH, ORDERS_ITEMS_CHECKPOINT_LOCATION

('s3a://vproptimiserplatform/orders/delta/bronze/orders_items',
 's3a://vproptimiserplatform/orders/delta/checkpoints/orders_items')

# 4. Write Inventory Streams

## 4.1 Upsert Packages Stream

In [52]:
df_packages = (
    spark
    .read
    .format("delta")
    .load(f"s3a://{BUCKET_NAME}/{ORDERS}/{GOLD}/package_table")
)
df_packages.show()

+----------+-------------+----------+-----+------+------+------+-------+--------------+
|package_id|subpackage_id|      name|width|height|length|weight| volume|stock_quantity|
+----------+-------------+----------+-----+------+------+------+-------+--------------+
|  00245842|            1|     BESTÅ| 41.0|   8.0| 196.0| 18.01|64288.0|          1000|
|  00263850|            1|     BILLY| 29.0|  13.0| 206.0|  37.9|77662.0|          1000|
|  00275848|            1|    KALLAX| 41.0|  12.0| 155.0|  15.2|76260.0|          1000|
|  00278578|            1|    HYLLIS| 29.0|   4.0| 140.0|  5.76|16240.0|          1000|
|  00286677|            1|     HEJNE|  7.0|   3.0| 171.0|  1.77| 3591.0|          1000|
|  00295554|            1|     BESTÅ| 36.0|   2.0|  59.0|  2.47| 4248.0|          1000|
|  00324518|            1|    KALLAX| 41.0|  16.0| 150.0| 21.14|98400.0|          1000|
|  00340047|            1|      EKET|  9.0|   3.0|  33.0|  0.32|  891.0|          1000|
|  00415603|            1|     B

In [53]:
def upsert_to_package(microBatchDF, batchId):
    """
    Upserts the incoming micro-batch DataFrame into the Delta table for products.

    Parameters:
    - microBatchDF: The micro-batch DataFrame from the streaming source.
    - batchId: The unique identifier for the micro-batch.
    """
    deltaTableProducts = DeltaTable.forPath(spark, f"s3a://{BUCKET_NAME}/{ORDERS}/{GOLD}/package_table") #PRODUCTS_PATH)

    # WorkArround Preprocess the micro-batch DataFrame to removemicrobath duplicates integrity lo
    deduplicatedBatchDF = (
        microBatchDF
        .filter(f.col("status") == f.lit("PENDING"))
        .groupBy("package_id", "subpackage_id")
        .agg(
            f.sum("items_quantity").alias("items_quantity"),
        )
    )
    (
        deltaTableProducts.alias("t")
        .merge(
            deduplicatedBatchDF.alias("s"),
            "s.package_id = t.package_id AND s.subpackage_id = t.subpackage_id"  
        )
        .whenMatchedUpdate(
            # Quantity availability to fulfill order
            condition=f.col("t.stock_quantity") >= f.col("s.items_quantity"),
            set={
                "stock_quantity": f.col("t.stock_quantity") - f.col("s.items_quantity"),
                # "updated_at": f.current_timestamp()
            },
        )
        .execute()
    )

def update_packages_stream(
    df_orders_items_stream: DataFrame, 
    packages_path: str, 
    checkpoint_location: str, 
    upsert_function
):
    """
    Initializes and starts a streaming query to update the products table using the upsert function.

    Parameters:
    - df_orders_items_stream (DataFrame): The input streaming DataFrame containing orders items data.
    - packages_path (str): The path to the Delta table where the packages data will be stored.
    - checkpoint_location (str): The checkpoint location for the stream query.
    - upsert_function (function): The function to perform upsert operations on the Delta table.

    Returns:
    - StreamingQuery: The StreamingQuery object representing the started stream.
    """
    try:
        # Start the streaming write process with upsert logic using foreachBatch
        update_products_stream = (
            df_orders_items_stream
            # .filter(f.col("status") == f.lit("PENDING"))
            .select(
                f.col("package_id"),
                f.col("subpackage_id"),
                f.col("items_quantity"),
                f.col("status")
            )
            .writeStream
            .format("delta")
            .outputMode("update")
            .foreachBatch(upsert_to_package)  # Use the provided upsert function
            .option("path", packages_path)
            .option("checkpointLocation", checkpoint_location)
            .trigger(once=True)  # Single-trigger mode for processing the micro-batch once
            .start()
        )

        # Log successful stream start
        logging.info("Streaming query for packages update started successfully.")
        return update_products_stream

    except Exception as e:
        logging.error(f"Error starting the streaming query for packagess update: {str(e)}")
        raise  # Re-raise exception for further handling


In [62]:
df_update_packages_stream = update_packages_stream(
    df_orders_items_stream, 
    f"s3a://{BUCKET_NAME}/{ORDERS}/{GOLD}/package_table", 
    ORDERS_ITEMS_UPDATE_CHECKPOINT_LOCATION, 
    upsert_to_package
)

24/10/02 21:00:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/02 21:00:01 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5] is ignored when Trigger.Once is used.


In [80]:
(
    spark
    .read
    .format("delta")
    .load(f"s3a://{BUCKET_NAME}/{ORDERS}/{GOLD}/package_table")
    .orderBy(["stock_quantity"])
).show()

ConnectionRefusedError: [Errno 61] Connection refused

## 4.2 Upsert Orders Items Stream

In [79]:
ORDERS_ITEMS_UPDATE_CHECKPOINT_LOCATION

's3a://vproptimiserplatform/orders/delta/checkpoints/orders_items_update'

In [60]:
def upsert_to_orders_items(microBatchDF, batchId):
    deltaTableOrdersItems = DeltaTable.forPath(spark, ORDERS_ITEMS_PATH)
    (
        deltaTableOrdersItems.alias("t")
        .merge(
            microBatchDF.alias("s"),
            "s.order_id = t.order_id AND s.inventory_id = t.inventory_id"
        )
        .whenMatchedUpdate(
            set={
                "status": f.lit("PROCESSING"),
                "order_timestamp": f.current_timestamp()
            },
        )
        .execute()
    )

def update_orders_items_stream(df_orders_items_stream, ORDERS_ITEMS_PATH, ORDERS_ITEMS_CHECKPOINT_LOCATION, upsert_to_orders_items):
    """
    Function to process the orders items stream, filtering by PENDING status and upserting into the Delta Lake table.
    """
    # Apply transformations and define the streaming write operation
    processing_orders_items_stream = (
        df_orders_items_stream
        .filter(f.col("status") == f.lit("PENDING"))  # Filter by status
        .select(
            f.col("inventory_id"),
            f.col("order_id")
        )
        .writeStream
        .format("delta")  # Write to Delta format
        .outputMode("update")  # Use 'update' mode for micro-batch processing
        .foreachBatch(upsert_to_orders_items)  # Use the provided upsert function
        .trigger(once=True)  # Trigger the stream to process once
        .option("path", ORDERS_ITEMS_PATH)
        .option("checkpointLocation", f"{ORDERS_ITEMS_CHECKPOINT_LOCATION}_processing")
        .start()
    )
    
    return processing_orders_items_stream

In [57]:
df_update_orders_items_stream = update_orders_items_stream(
    df_orders_items_stream, 
    ORDERS_ITEMS_PATH, 
    ORDERS_ITEMS_UPDATE_CHECKPOINT_LOCATION, 
    upsert_to_orders_items
)

24/10/02 20:46:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/02 20:46:26 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5] is ignored when Trigger.Once is used.


In [59]:
(
    spark
    .read
    .format("delta")
    .load(ORDERS_ITEMS_PATH)
).show(30)

+--------------------+--------------------+--------------------+------------+------------------+----------+-------------+--------------+-------------------+------------+-------------------+-------+
|        inventory_id|            order_id|          product_id|product_name|       order_price|package_id|subpackage_id|items_quantity|       items_weight|items_volume|    order_timestamp| status|
+--------------------+--------------------+--------------------+------------+------------------+----------+-------------+--------------+-------------------+------------+-------------------+-------+
|inv-f8baec1d-8769...|ord-bc303b78-fa7a...|prod-924f611b-c45...|     SKRUVBY|248.99000549316406|  20503547|            1|             3|  91.64999771118164|    174096.0|2024-10-02 19:08:42|PENDING|
|inv-94d66dd2-25cb...|ord-bc303b78-fa7a...|prod-924f611b-c45...|     SKRUVBY|248.99000549316406|  60568725|            1|             3|  74.28000068664551|    124488.0|2024-10-02 19:08:42|PENDING|
|inv-2b2fa

## 4.3 Upsert Orders Stream

In [69]:
def upsert_to_orders(microBatchDF, batchId):
    """
    Function to upsert records into Delta Lake table for orders.
    Removes duplicate order_id records, updates status to PROCESSING, and sets the current order timestamp for matched records.
    """
    # Load the existing Delta table
    deltaTableOrders = DeltaTable.forPath(spark, ORDERS_PATH)
    
    # Remove duplicate records based on order_id
    # microBatchDF = microBatchDF.dropDuplicates(["order_id"])
    
    # Perform the merge (upsert) operation
    (
        deltaTableOrders.alias("t")
        .merge(
            microBatchDF.alias("s"),
            "s.order_id = t.order_id"
        )
        .whenMatchedUpdate(
            set={
                "status": f.lit("PROCESSING"),
                "order_timestamp": f.current_timestamp()
            }
        )
        .execute()
    )

def update_orders_stream(df_orders_items_stream, ORDERS_PATH, ORDERS_CHECKPOINT_LOCATION, upsert_to_orders):
    """
    Function to process the orders stream, filtering by RECEIVED status and upserting into the Delta Lake table.
    """
    # Apply transformations and define the streaming write operation
    processing_orders_stream = (
        df_orders_items_stream
        .filter(f.col("status") == f.lit("RECEIVED"))  # Filter by status
        .select(
            f.col("order_id")  # Select order_id for upsert
        )
        .writeStream
        .format("delta")  # Write to Delta format
        .outputMode("update")  # Use 'update' mode for micro-batch processing
        .foreachBatch(upsert_to_orders)
        .trigger(once=True)  # Trigger the stream to process once
        .option("path", ORDERS_PATH)
        .option("checkpointLocation", f"{ORDERS_CHECKPOINT_LOCATION}_processing")
        .start()
    )
    
    return processing_orders_stream

In [70]:
df_update_orders_stream = update_orders_stream(
    df_orders_items_stream, 
    ORDERS_PATH, 
    ORDERS_UPDATE_CHECKPOINT_LOCATION, 
    upsert_to_orders
)

24/10/02 21:38:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/10/02 21:38:05 WARN MicroBatchExecution: The read limit MaxFiles: 1000 for DeltaSource[s3a://vproptimiserplatform/orders/delta/bronze/orders_stream_5] is ignored when Trigger.Once is used.


In [71]:
(
    spark
    .read
    .format("delta")
    .load(ORDERS_PATH)
).show(30,truncate=False)

+----------------------------------------+----------------------------------------+------------------+------------+------------------+-------------------+--------+------------------+-------------------+
|order_id                                |customer_id                             |total_weight      |total_volume|total_price       |order_timestamp    |status  |lat               |lon                |
+----------------------------------------+----------------------------------------+------------------+------------+------------------+-------------------+--------+------------------+-------------------+
|ord-c66b4563-157f-4181-b7b7-1394b1daa6d9|cus-3323e045-f7a9-4bed-811a-35e4182c97d1|108.41999816894531|381936.0    |359.95000076293945|2024-10-02 19:07:34|RECEIVED|40.39339065551758 |-3.7561213970184326|
|ord-d2de3e01-b5f2-4166-b0b5-78e1bb7f2952|cus-bb6b538c-0b66-49aa-ac02-550ae393a760|20.419999659061432|57064.0     |144.0             |2024-10-02 19:07:46|RECEIVED|40.519107818603516|-3.788

## 4.4 Append Events

In [None]:
def process_events_stream(spark, ORDERS_PATH, EVENTS_PATH, EVENTS_CHECKPOINT_LOCATION):
    """
    Function to process the events stream by reading from the Delta table,
    filtering for records with status 'PROCESSING', generating event metadata, 
    and writing the stream to another Delta table.
    
    Parameters:
    - spark: SparkSession object
    - ORDERS_PATH: Path to the Delta table storing order data
    - EVENTS_PATH: Path to save the processed events stream
    - EVENTS_CHECKPOINT_LOCATION: Path for checkpointing the streaming query
    """
    # Define the streaming read and transformations
    processing_events_stream = (
        spark
        .readStream
        .format("delta")  # Reading from Delta format
        .load(ORDERS_PATH)  # Load from the specified Delta table path
        .filter(f.col("status") == f.lit("PROCESSING"))  # Filter for processing orders
        .withColumn(
            "event_id", f.concat(f.lit("ev-"), f.expr("uuid()"))  # Generate unique event ID
        )
        .withColumn(
            "event_type", f.lit("INVENTORY_UPDATED")  # Set the event type
        )
        .withColumn(
            "event_timestamp", f.current_timestamp()  # Add the current timestamp
        )
        .select(
            f.col("event_id"),
            f.col("event_type"),
            f.col("event_timestamp"),
            f.col("order_id")  # Select relevant columns
        )
        .writeStream
        .format("delta")  # Write the output in Delta format
        .outputMode("append")  # Append new data as it arrives
        .trigger(once=True)  # Trigger the stream to process once
        .option("path", EVENTS_PATH)  # Specify the path to write the output
        .option("checkpointLocation", f"{EVENTS_CHECKPOINT_LOCATION}_processing")  # Set checkpoint location
        .start()  # Start the streaming query
    )
    
    return processing_events_stream

In [76]:
ORDERS_PATH

's3a://vproptimiserplatform/orders/delta/bronze/orders'