In [1]:
import sys
import os
import yaml
from dotenv import load_dotenv, find_dotenv

# Add the src directory to the sys.path
sys.path.append(os.path.abspath(os.path.join('..', 'src')))

Consideraciones:
* generar schema de validacion de streams
* introducir crdenciales a traves del entorno de docker - ENV variables en .ipynb no funcionan
* optimizar filtrado de id incluir particion?
* structura de micro servicios: inventory consumer - producer
* diseño de sistemas: filtrar ordenes efectuadas para los siguientes envios
* confirmar Best pracitces para el manejo de streams: memorytable vs readStream
* confirmar best practice para pertion key del stream
* Arquitectura Lambda
* builling de S3 por fichero registrado: definir criterios de particion
* emplear memory format para lectura de streams
* best practices para gestionar checkpoints
* firehose implementarlo

# 1. Environment Configuration

## 1.1 Import dependencies

In [2]:
import boto3
import json
import os
from uuid import uuid4
from datetime import datetime
from datetime import timedelta
import time
import random
import uuid
import logging
import numpy as np

import pyspark.sql.types as t
import pyspark.sql.functions as f

In [3]:
from spark_session import create_spark_session
from schemas import *
from functions import *

## 1.2 Extract AWS credentials

In [4]:
# Load the .env file from the parent directory
env_path = find_dotenv(filename=".env", raise_error_if_not_found=True)
load_dotenv(dotenv_path=env_path)

# Load AWS credentials
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")

## 1.3 Constants variables

In [5]:
# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

In [6]:
# Load the YAML configuration file
with open('../config/config.yml', 'r') as file:
    config = yaml.safe_load(file)

In [7]:
BUCKET_NAME = config["paths"]["BUCKET_NAME"]
RAW = config["paths"]["RAW"]
ORDERS = config["paths"]["ORDERS"]

BRONZE = config["paths"]["BRONZE"]
SILVER = config["paths"]["SILVER"]
GOLD = config["paths"]["GOLD"]

ADDRESS_DATA = config["raw_data"]["ADDRESS_DATA"]
CLIENTS_DATA = config["raw_data"]["CLIENTS_DATA"]
PRODUCTS_DATA = config["raw_data"]["PRODUCTS_DATA"]

ADDRESS_TABLE = config["table_names"]["ADDRESS_TABLE"]
CLIENTS_TABLE = config["table_names"]["CLIENTS_TABLE"]
CLIENTS_ADDRESS_TABLE = config["table_names"]["CLIENTS_ADDRESS_TABLE"]
PRODUCTS_TABLE = config["table_names"]["PRODUCTS_TABLE"]
PACKAGE_TABLE = config["table_names"]["PACKAGE_TABLE"]

RAW_ADDRESS_PATH = os.path.join(BUCKET_NAME, RAW, ADDRESS_DATA)
RAW_CIENTS_PATH = os.path.join(BUCKET_NAME, RAW, CLIENTS_DATA)
RAW_PRODUCTS_PATH = os.path.join(BUCKET_NAME, RAW, PRODUCTS_DATA)

BRONZE_ADDRESS_PATH = os.path.join(BUCKET_NAME, ORDERS, BRONZE, ADDRESS_TABLE)
BRONZE_CLIENTS_PATH = os.path.join(BUCKET_NAME, ORDERS, BRONZE, CLIENTS_TABLE)
BRONZE_PRODUCTS_PATH = os.path.join(BUCKET_NAME, ORDERS, BRONZE, PRODUCTS_TABLE)


SILVER_ADDRESS_PATH = os.path.join(BUCKET_NAME, ORDERS, SILVER, ADDRESS_TABLE)
SILVER_CLIENTS_PATH = os.path.join(BUCKET_NAME, ORDERS, SILVER, CLIENTS_TABLE)
SILVER_PRODUCTS_PATH = os.path.join(BUCKET_NAME, ORDERS, SILVER, PRODUCTS_TABLE)

GOLD_CLIENTS_ADDRESS_PATH = os.path.join(BUCKET_NAME, ORDERS, GOLD, CLIENTS_ADDRESS_TABLE)
GOLD_PRODUCTS_PATH = os.path.join(BUCKET_NAME, ORDERS, GOLD, PRODUCTS_TABLE)
GOLD_PACKAGE_PATH = os.path.join(BUCKET_NAME, ORDERS, GOLD, PACKAGE_TABLE)

In [8]:
stream_name = config["STREAM_NAME"]

# 2. Initialize Spark Session

In [9]:
spark = create_spark_session(aws_access_key_id, aws_secret_access_key)

24/09/08 23:01:58 WARN Utils: Your hostname, Miguels-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.16 instead (on interface en0)
24/09/08 23:01:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/miguelgranica/Documents/MBIT-DE/vpr-data_publisher/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/miguelgranica/.ivy2/cache
The jars for the packages stored in: /Users/miguelgranica/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2efdf52d-28ed-479a-9f90-eb6f5df870da;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 134ms :: artifacts dl 8ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	

# 2. Data generation

In [10]:
def generate_order_payload(order_details):
    """
    Generate a payload for an order event.

    :param order_details: Dictionary containing order details.
    :return: Dictionary containing the payload for the order event.
    """
    return {
        "event_id": f"ev-{uuid.uuid4()}",
        "event_type": "ORDER_CREATED",
        "event_timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "order_id": f"ord-{uuid.uuid4()}",
        "order_details": order_details
    }

In [21]:
def generate_order_details(df_clients, df_products, df_packages):
    """
    Generate order details based on client information and item list.

    :param df_clients: DataFrame containing client information.
    :param df_products: DataFrame containing product information.
    :param df_packages: DataFrame containing package information.
    :return: Dictionary containing order details.
    """
    current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    item_list = generate_item_list(df_products, df_packages)
    client_details = select_client_order_details(df_clients)

    return {
        "customer_id": client_details["client_id"],
        "order_timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "order_date": datetime.now().strftime('%Y-%m-%d'),
        "items": item_list,
        "total_amount": generate_item_agg(item_list, "price"),
        "total_volume": generate_item_measure_agg(item_list, "volume"),
        "status": "RECEIVED",
        "destination_address": generate_destination_address_dict(client_details),
        "payment_details": {
            "payment_method": "",
            "payment_status": "",
            "transaction_id": ""
        }
    }

def select_client_order_details(df, primary_key_col="address_id"):
    """
    Select a random row from a DataFrame based on a unique primary key column.
    
    :param df: DataFrame to select from.
    :param primary_key_col: Name of the primary key column. default value: address_id
    :return: DataFrame containing a single randomly selected row.
    """
    # Get a list of all primary key values
    primary_keys = df.select(primary_key_col).rdd.flatMap(lambda x: x).collect()

    # Randomly select one primary key value
    random_primary_key = random.choice(primary_keys)

    # Filter the DataFrame to get the row with the random primary key
    random_row_df = df.filter(f.col(primary_key_col) == random_primary_key)

    # Convert the DataFrame row to dictionary and return
    return random_row_df.first().asDict() if random_row_df else None

def select_product_order_details(df_product, df_package,  quantity=3, primary_key_col="product_id"):
    """
    Select a random row from a DataFrame based on a unique primary key column.
    
    :param df: DataFrame to select from.
    :param primary_key_col: Name of the primary key column. default value: product_id
    :param quantity: Maximum quantity of each product (default: 3).
    :return: DataFrame containing a single randomly selected row.
    """
    # Get a list of all primary key values
    primary_keys = df_product.select(primary_key_col).rdd.flatMap(lambda x: x).collect()

    # Randomly select one primary key value
    random_primary_key = random.choice(primary_keys)

    # Register random choice function as a UDF
    weighted_random_choice_udf = f.udf(weighted_random_choice, t.IntegerType())

    # Filter the DataFrame to get the row with the random primary key
    random_product_df = (
        df_product
        .filter(f.col(primary_key_col) == random_primary_key)
        # Set random product quantity
        .withColumn("product_quantity", weighted_random_choice_udf(f.lit(quantity)))
        # Explode package annidations
        .withColumn("product_components_explode", f.explode(f.col("product_components")))
        # Extract package information
        .withColumn("package_id", f.explode(f.col("product_components_explode.package_id")))
        .withColumn("package_quantity", f.explode(f.col("product_components_explode.package_quantity")))
        # Rename column
        .withColumnRenamed("name", "product_name")
        # Join package measures
        .join(
            df_package, on="package_id", how="left"
        )
        # Select Columns
        .select(
            f.col("product_id"),
            f.col("product_quantity"),
            f.col("package_id"),
            f.col("product_name"),
            f.col("price"),
            f.col("package_quantity"),
            f.col("width"),
            f.col("height"),
            f.col("length"),
            f.col("volume")
        )
    )

    # Convert each DataFrame row to list of dictionary and return
    return [row.asDict() for row in random_product_df.collect()]

def generate_destination_address_dict(clients_dict):
    """
    Filter unnecessary keys from the client's address dictionary.

    :param clients_dict: Dictionary containing client information.
    :return: Dictionary containing filtered address information.
    """
    address_keys = [
        'address_id', 'neighborhood', 'coordinates', 'road', 'house_number',
        'suburb', 'city_district', 'state', 'postcode', 'country', 'lat', 'lon'
    ]
    return {k: v for k, v in clients_dict.items() if k in address_keys}

def generate_item_list(df_products, df_packages, items=5, quantity=3):
    """
    Generate a list of items with details.
    
    Parameters:
    - items: Number of items to generate details for (default: 5).
    - quantity: Maximum quantity of each item (default: 3).

    Returns:
    - List of dictionaries containing item details.
    """
    return [        
        {
            "product_id": packages[0]["product_id"], 
            "product_name": packages[0]["product_name"], 
            "price": packages[0]["price"],
            "quantity": packages[0]["product_quantity"],
            "packages": [
                {
                    "package_id": package["package_id"],
                    "quantity": package["package_quantity"],
                    "volume": package["volume"]
                } for package in packages 
            ],
        }
        for packages in [select_product_order_details(df_products, df_packages) for num in range(weighted_random_choice(5))]
    ]
    
    # return [ 
    #     package 
    #     for num in range(weighted_random_choice(5)) 
    #     for package in select_product_order_details(df_products, df_packages, quantity)
    # ]

    # return [
    #     {
    #         "product_id": item["product_id"], 
    #         "product_name": item["product_name"], 
    #         "price": item["price"], 
    #         "weight": item["weight"],
    #         "quantity": weighted_random_choice(quantity)
    #     }
    #     for item in [ select_client_order_details(df_products, primary_key_col="product_id") for num in range(weighted_random_choice(items))]
    # ]

def weighted_random_choice(numbers_len):
    """
    Select a random number from a range starting from 1 with weights based on reciprocal values.

    Parameters:
    - numbers_len: Length of the range of numbers starting from 1.

    Returns:
    - A randomly selected number based on the reciprocal weights.
    """
    # Define numbers range starting from 1 to numbers_len
    numbers = np.arange(1, numbers_len + 1)
    
    # Calculate weights based on reciprocal values
    weights = 1 / numbers
    
    # Ensure the weights sum to 1
    normalized_weights = weights / np.sum(weights)
    
    # Select a random number with the specified weights
    random_number = int(np.random.choice(numbers, p=normalized_weights))
    
    return random_number

def generate_item_agg(items, property_name):
    """
    Generate the aggregate value of a property for a list of items.

    :param items: List of dictionaries containing item details.
    :param property_name: Name of the property to aggregate.
    :return: Aggregate value of the specified property.
    """
    return sum([(item['quantity'] * item[property_name]) for item in items])

def generate_item_measure_agg(items, property_name="volume", quantity="quantity"):
    """
    Generate the aggregate value of a specified property for a list of items.

    This function calculates the sum of the product of item quantities, package quantities, 
    and a specified property (e.g., volume) for each package within each item. 
    If the specified property is `None`, it is treated as `1` to ensure the multiplication proceeds.

    :param items: List of dictionaries, where each dictionary contains details about an item and its packages.
    :param quantity: The key name in the dictionaries for the quantity of the item and packages (default is "quantity").
    :param property_name: The key name in the dictionaries for the property to aggregate (default is "volume").
    :return: The aggregated value of the specified property.
    """
    return sum([
        item[quantity]* package[quantity] * (package[property_name] if package[property_name] is not None else 1) 
        for item in items 
        for package in item['packages']
    ])

In [12]:
df_clients_address = read_file(spark, GOLD_CLIENTS_ADDRESS_PATH, "parquet", gold_clients_address_schema)
df_products = spark.read.format("delta").load(GOLD_PRODUCTS_PATH)
df_packages = spark.read.format("delta").load(GOLD_PACKAGE_PATH)

24/09/08 23:02:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [14]:
df_products.show()

                                                                                

+--------------------+--------------+--------------------+--------------------+------+--------+--------------------+
|          product_id|          name|            category|                 url| price|currency|  product_components|
+--------------------+--------------+--------------------+--------------------+------+--------+--------------------+
|prod-d3fb4bc4-4ac...|BILLY / OXBERG|Librería con puer...|https://www.ikea....|139.99|       €|[[{69281776, 1}],...|
|prod-1bcfaca6-632...|         BILLY|Combinación libre...|https://www.ikea....|139.97|       €|[[{99395936, 1}],...|
|prod-dcb3fa73-a0b...|         BILLY|            Librería|https://www.ikea....| 59.99|       €|   [[{00263850, 1}]]|
+--------------------+--------------+--------------------+--------------------+------+--------+--------------------+



In [13]:
df_packages.show()

24/09/08 23:02:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----------+--------------+-----+------+------+------+--------------+
|package_id|          name|width|height|length|volume|stock_quantity|
+----------+--------------+-----+------+------+------+--------------+
|  00263850|         BILLY|   29|    13|   206| 77662|           100|
|  40104109|         BILLY|   20|     1|    22|   440|           100|
|  50263838|         BILLY|   39|     7|   207| 43470|           100|
|  50275558|        OXBERG|   40|     3|   211| 25320|           100|
|  69281776|BILLY / OXBERG| NULL|  NULL|  NULL|  NULL|           100|
|  99395936|         BILLY| NULL|  NULL|  NULL|  NULL|           100|
+----------+--------------+-----+------+------+------+--------------+



In [None]:
# # Register the function as a UDF
# weighted_random_choice_udf = f.udf(weighted_random_choice, t.IntegerType())
# (
#     df_products
#     # Set random product quantity
#     .withColumn("product_quantity", weighted_random_choice_udf(f.lit(3)))
#     # Explode package annidations
#     .withColumn("product_components_explode", f.explode(f.col("product_components")))
#     # Extract package information
#     .withColumn("package_id", f.explode(f.col("product_components_explode.package_id")))
#     .withColumn("package_quantity", f.explode(f.col("product_components_explode.package_quantity")))
#     .withColumnRenamed("name", "product_name")
#     # .join(
#     #     df_packages, on="package_id", how="left"
#     # )
#     # # Select Columns
#     #     .select(
#     #         f.col("product_id"),
#     #         f.col("product_quantity"),
#     #         f.col("package_id"),
#     #         f.col("name"),
#     #         f.col("product_name"),
#     #         f.col("price"),
#     #         f.col("package_quantity"),
#     #         f.col("width"),
#     #         f.col("height"),
#     #         f.col("length"),
#     #         f.col("volume")
#     #     )
# ).show()

In [None]:
# [        
#     {
#         "product_id": packages[0]["product_id"], 
#         "product_name": packages[0]["product_name"], 
#         "price": packages[0]["price"],
#         "quantity": packages[0]["product_quantity"]
#         "packages": [
#             {
#                 "package_id": package["package_id"],
#                 "quantity": package["package_quantity"],
#                 "volume": package["volume"]
#             } for package in packages 
#         ],
#     }
#     for packages in [select_product_order_details(df_products, df_packages) for num in range(weighted_random_choice(5))]
# ]

In [19]:
item_list = generate_item_list(df_products, df_packages)
item_list

[{'product_id': 'prod-1bcfaca6-6325-46f3-b8d0-ca705a5eeebe',
  'product_name': 'BILLY',
  'price': 139.97000122070312,
  'quantity': 2,
  'packages': [{'package_id': '99395936', 'quantity': 1, 'volume': None},
   {'package_id': '50263838', 'quantity': 3, 'volume': 43470},
   {'package_id': '40104109', 'quantity': 1, 'volume': 440}]},
 {'product_id': 'prod-1bcfaca6-6325-46f3-b8d0-ca705a5eeebe',
  'product_name': 'BILLY',
  'price': 139.97000122070312,
  'quantity': 1,
  'packages': [{'package_id': '99395936', 'quantity': 1, 'volume': None},
   {'package_id': '50263838', 'quantity': 3, 'volume': 43470},
   {'package_id': '40104109', 'quantity': 1, 'volume': 440}]},
 {'product_id': 'prod-dcb3fa73-a0b3-4586-87f2-f05bc9a955a9',
  'product_name': 'BILLY',
  'price': 59.9900016784668,
  'quantity': 1,
  'packages': [{'package_id': '00263850', 'quantity': 1, 'volume': 77662}]},
 {'product_id': 'prod-d3fb4bc4-4ac1-48af-8ced-e0c4aba4c932',
  'product_name': 'BILLY / OXBERG',
  'price': 139.99000

In [20]:
generate_item_measure_agg(item_list)

804483

In [22]:
order_details = generate_order_details(df_clients_address, df_products, df_packages)
print(json.dumps(order_details, indent=4))

{
    "customer_id": "cus-3cd6a9d3-d0a6-4905-877e-6eac34089035",
    "order_timestamp": "2024-09-08 23:08:36",
    "order_date": "2024-09-08",
    "items": [
        {
            "product_id": "prod-dcb3fa73-a0b3-4586-87f2-f05bc9a955a9",
            "product_name": "BILLY",
            "price": 59.9900016784668,
            "quantity": 3,
            "packages": [
                {
                    "package_id": "00263850",
                    "quantity": 1,
                    "volume": 77662
                }
            ]
        }
    ],
    "total_amount": 179.9700050354004,
    "total_volume": 232986,
    "status": "RECEIVED",
    "destination_address": {
        "address_id": "cus-ce40acf5-2f59-4b4e-b668-d196ccf913c5",
        "neighborhood": "Retiro",
        "coordinates": [
            40.40616562579491,
            -3.673306729561871
        ],
        "road": "Calle de S\u00e1nchez Barc\u00e1iztegui",
        "house_number": "37",
        "suburb": "Retiro",
        "ci

In [23]:
order_stream = generate_order_payload(order_details)
print(json.dumps(order_stream, indent=4))

{
    "event_id": "ev-b2b0b5b9-dc6e-43bf-b2a5-00864c9ffe0b",
    "event_type": "ORDER_CREATED",
    "event_timestamp": "2024-09-08 23:09:23",
    "order_id": "ord-91f684fb-03ad-4489-86af-8113f2725519",
    "order_details": {
        "customer_id": "cus-3cd6a9d3-d0a6-4905-877e-6eac34089035",
        "order_timestamp": "2024-09-08 23:08:36",
        "order_date": "2024-09-08",
        "items": [
            {
                "product_id": "prod-dcb3fa73-a0b3-4586-87f2-f05bc9a955a9",
                "product_name": "BILLY",
                "price": 59.9900016784668,
                "quantity": 3,
                "packages": [
                    {
                        "package_id": "00263850",
                        "quantity": 1,
                        "volume": 77662
                    }
                ]
            }
        ],
        "total_amount": 179.9700050354004,
        "total_volume": 232986,
        "status": "RECEIVED",
        "destination_address": {
            "ad

24/09/10 08:04:34 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 950181 ms exceeds timeout 120000 ms
24/09/10 08:04:34 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/10 08:19:58 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
order_stream['order_id'] #['orderDetails']['destinationAddress']['neighborhood']

# 3. Stream producer

In [None]:
kinesis_client = boto3.client(
    'kinesis', 
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    aws_session_token=aws_session_token,
    region_name='us-east-1'
)

In [None]:
def produce_order(payload):
    try:
        # Ensure payload is correctly formatted and partition key is a string
        if 'event_type' not in payload or not isinstance(payload['event_type'], str):
            raise ValueError("Payload must include 'event_type' as a string")
        
        data = json.dumps(payload)
        put_response = kinesis_client.put_record(
            StreamName=stream_name,
            Data=f"{data}\n",
            PartitionKey=payload['event_type']
        )
        
        # Log response details
        logger.info(f"Put record response: {put_response}")
        return put_response
    except Exception as e:
        logger.error(f"Failed to put record to stream: {e}", exc_info=True)
        return None

In [None]:
order_payload = generate_order_payload(generate_order_details(df_clients_address, df_products))
print(json.dumps(order_payload, indent=4))

In [None]:
order_payload = generate_order_payload(generate_order_details(df_clients_address, df_products))
produce_order(order_payload)

In [None]:
# Produce orders at regular intervals (for example, every second)
# while True:
#     order_payload = generate_order_payload(generate_order_details(df_clients_address, df_products))
#     produce_order(order_payload)
#     time.sleep(20)