In [7]:
%streaming
%iam_role arn:aws:iam::484183516222:role/LabRole
%region us-east-1
%number_of_workers 2
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog",
  "--datalake-formats": "delta"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 1.0.6 and you have 1.0.4 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Previous session type: etl
Setting new session type to Streaming
Current iam_role is None
iam_role has been set to arn:aws:iam::484183516222:role/LabRole.
Previous region: None
Setting new region to: us-east-1
Region is set to: us-east-1
Previous number of workers: None
Setting new number of workers to: 2
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--conf': 'spark.sql.extensions=io.delta.sql.Delt

In [1]:
spark

Trying to create a Glue session for the kernel.
Session Type: streaming
Worker Type: G.1X
Number of Workers: 2
Session ID: f6bef49e-c8c2-47a3-beae-95d0eabd1c90
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
--datalake-formats delta
Waiting for session f6bef49e-c8c2-47a3-beae-95d0eabd1c90 to get into ready status...
Session f6bef49e-c8c2-47a3-beae-95d0eabd1c90 has been created.
<pyspark.sql.session.SparkSession object at 0x7fc39490fb80>


# 1. Environment Configuration

## 1.1 Import dependencies

In [2]:
import boto3
import json
import os
from uuid import uuid4
from datetime import datetime
from datetime import timedelta
import time

import pyspark.sql.types as t
import pyspark.sql.functions as f




## 1.2 Constants Variables

In [3]:
BUCKET_NAME = "vrpoptimiserplatform"
ORDERS = "orders"
STREAM_NAME = "orders_stream"

BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"
DELTA = "delta"

PROCESSING_TRIGGER = "45 seconds"

## Checkpoints
EVENTS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/events"
ORDERS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/orders"
ORDERS_ITEMS_CHECKPOINT_LOCATION = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/checkpoints/orders_items"

## paths
EVENTS = "events"
ORDERS = "orders"
ORDERS_ITEMS = "orders_items"
# TODO TBC medallion architecture??
EVENTS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{EVENTS}"
ORDERS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{ORDERS}"
ORDERS_ITEMS_PATH = f"s3a://{BUCKET_NAME}/{ORDERS}/{DELTA}/{BRONZE}/{ORDERS_ITEMS}"

## Tables
EVENTS_TABLE = f"{EVENTS}_table"
ORDERS_TABLE = f"{ORDERS}_table"
ORDERS_ITEMS_TABLE = f"{ORDERS_ITEMS}_table"





# 2. Stream Interaction with Kinesis

## 2.1 Read Stream

In [4]:
schema = t.StructType([
    t.StructField("event_id", t.StringType(), True),
    t.StructField("event_type", t.StringType(), True),
    t.StructField("event_timestamp", t.StringType(), True),
    t.StructField("order_id", t.StringType(), True),
    t.StructField("order_details", t.StructType([
        t.StructField("customer_id", t.StringType(), True),
        t.StructField("order_date", t.StringType(), True),
        t.StructField("items", t.ArrayType(t.StructType([
            t.StructField("product_id", t.StringType(), True),
            t.StructField("product_name", t.StringType(), True),
            t.StructField("price", t.DoubleType(), True),
            t.StructField("weight", t.DoubleType(), True),
            t.StructField("quantity", t.IntegerType(), True),
        ])), True),
        t.StructField("total_amount", t.DoubleType(), True),
        t.StructField("total_weight", t.DoubleType(), True),
        t.StructField("status", t.StringType(), True),
        t.StructField("destination_address", t.StructType([
            t.StructField("address_id", t.StringType(), True),
            t.StructField("neighborhood", t.StringType(), True),
            t.StructField("coordinates", t.ArrayType(t.DoubleType()), True),
            t.StructField("road", t.StringType(), True),
            t.StructField("house_number", t.StringType(), True),
            t.StructField("suburb", t.StringType(), True),
            t.StructField("city_district", t.StringType(), True),
            t.StructField("state", t.StringType(), True),
            t.StructField("postcode", t.StringType(), True),
            t.StructField("country", t.StringType(), True),
            t.StructField("lat", t.DoubleType(), True),
            t.StructField("lon", t.DoubleType(), True)
        ]), True),
        t.StructField("payment_details", t.StructType([
            t.StructField("payment_method", t.StringType(), True),
            t.StructField("payment_status", t.StringType(), True),
            t.StructField("transaction_id", t.StringType(), True)
        ]), True)
    ]), True)
])




> startingPosition indica el punto de incio de lectura del stream, sus opciones son:
>  * trim_horizon: primer dato del stream o el primer registro incluido en el checkpoint
>  * latest: ignora lo previo al ultimo registro

In [5]:
kinesis_order_stream = (
    spark
    .readStream
    .format("kinesis")
    .option("streamName", STREAM_NAME)
    .option("startingPosition", "trim_horizon")
    # .option("startingPosition", "latest")
    .load()
)




In [6]:
kinesis_order_stream.printSchema()

root
 |-- data: binary (nullable = true)
 |-- streamName: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- sequenceNumber: string (nullable = true)
 |-- approximateArrivalTimestamp: timestamp (nullable = true)


In [7]:
df_order_stream = (
    kinesis_order_stream
    .withColumn("json_data", f.col("data").cast("string"))
    .withColumn("orders", f.from_json("json_data", schema))
    .select("orders.*")
)




In [8]:
df_order_stream.printSchema()

root
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_details: struct (nullable = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- order_date: string (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- product_id: string (nullable = true)
 |    |    |    |-- product_name: string (nullable = true)
 |    |    |    |-- price: double (nullable = true)
 |    |    |    |-- weight: double (nullable = true)
 |    |    |    |-- quantity: integer (nullable = true)
 |    |-- total_amount: double (nullable = true)
 |    |-- total_weight: double (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- destination_address: struct (nullable = true)
 |    |    |-- address_id: string (nullable = true)
 |    |    |-- neighborhood: string (nullable = true)
 |    |    |

# 3 Wrtie streams

## 3.1 Events Stream

In [9]:
events_stream = (
    df_order_stream
    .select(
        f.col("event_id"),
        f.col("event_type"),
        f.col("event_timestamp"),
        f.col("order_id")
    )
    .writeStream
    .format("delta")
    .outputMode("append")
    .trigger(processingTime=PROCESSING_TRIGGER)
    # .trigger(availableNow=True)
    .option("path", EVENTS_PATH)
    .option("checkpointLocation", EVENTS_CHECKPOINT_LOCATION)
    .toTable(EVENTS_TABLE)
    )




In [10]:
events_stream.isActive

True


In [11]:
events_stream.status

{'message': 'Writing offsets to log', 'isDataAvailable': False, 'isTriggerActive': True}


In [12]:
events_stream.lastProgress




In [14]:
(
    spark.table(EVENTS_TABLE)
).show()

+--------------------+-------------+-------------------+--------------------+
|            event_id|   event_type|    event_timestamp|            order_id|
+--------------------+-------------+-------------------+--------------------+
|ev-308a173e-c3ac-...|ORDER_CREATED|2024-06-23 16:19:39|ord-b37a714b-aaa6...|
|ev-337be020-46bf-...|ORDER_CREATED|2024-06-23 16:19:46|ord-5edb868e-5481...|
|ev-57068625-1eb9-...|ORDER_CREATED|2024-06-23 16:19:52|ord-7dc2e7c5-13ad...|
|ev-86ff429a-64d3-...|ORDER_CREATED|2024-06-23 16:19:59|ord-75be955b-7659...|
|ev-a06c8ac7-165b-...|ORDER_CREATED|2024-06-23 16:20:07|ord-18b3d774-a37c...|
|ev-ae138c37-20a7-...|ORDER_CREATED|2024-06-23 16:20:13|ord-75fb0ae6-8be9...|
|ev-358a3607-db80-...|ORDER_CREATED|2024-06-23 16:20:20|ord-aa3cee40-9a3c...|
|ev-4eac65b4-fe26-...|ORDER_CREATED|2024-06-23 16:20:28|ord-75ea8db0-a2f3...|
|ev-87dd6390-10ee-...|ORDER_CREATED|2024-06-23 16:20:34|ord-eb097cd7-6d50...|
|ev-ffa9d7f9-712a-...|ORDER_CREATED|2024-06-23 16:20:41|ord-fa84

## 3.2 Orders Stream

In [17]:
orders_stream = (
    df_order_stream
    .select(
        f.col("order_id"),
        f.col("order_details.customer_id").alias("customer_id"),
        f.col("order_details.total_weight").alias("total_weight"),
        f.col("order_details.total_amount").alias("total_price"),
        f.col("order_details.order_date").alias("order_date"),
        f.col("order_details.status").alias("status"),
        f.col("order_details.destination_address.lat").alias("lat"),
        f.col("order_details.destination_address.lon").alias("lon"),
    )
    .writeStream
    .format("delta")
    .outputMode("append")
    .trigger(processingTime=PROCESSING_TRIGGER)
    # .trigger(availableNow=True)
    .option("path", ORDERS_PATH)
    .option("checkpointLocation", ORDERS_CHECKPOINT_LOCATION)
    .toTable(ORDERS_TABLE)
)




In [18]:
orders_stream.isActive

True


In [19]:
orders_stream.status

{'message': 'Waiting for next trigger', 'isDataAvailable': True, 'isTriggerActive': False}


In [20]:
orders_stream.lastProgress

{'id': '49e514e7-8524-476f-a3de-e1cca40bcb9f', 'runId': '5dde80a7-07e7-4835-8377-2926164db0d3', 'name': None, 'timestamp': '2024-06-24T11:40:06.217Z', 'batchId': 0, 'numInputRows': 1, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.15503875968992248, 'durationMs': {'addBatch': 5249, 'getBatch': 3, 'getOffset': 38, 'queryPlanning': 9, 'triggerExecution': 6450, 'walCommit': 514}, 'stateOperators': [], 'sources': [{'description': 'KinesisSource[orders_stream]', 'startOffset': None, 'endOffset': {'metadata': {'streamName': 'orders_stream', 'batchId': '0'}, 'shardId-000000000000': {'iteratorType': 'TRIM_HORIZON', 'iteratorPosition': ''}}, 'latestOffset': {'metadata': {'streamName': 'orders_stream', 'batchId': '0'}, 'shardId-000000000000': {'iteratorType': 'TRIM_HORIZON', 'iteratorPosition': ''}}, 'numInputRows': 1, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 0.15503875968992248}], 'sink': {'description': 'DeltaSink[s3a://vrpoptimiserplatform/orders/delta/bronze/orders]', 'nu

In [26]:
(
    spark.table(ORDERS_TABLE)
).show()

+--------------------+--------------------+-------------------+------------------+-------------------+-------+------------------+-------------------+
|            order_id|         customer_id|       total_weight|       total_price|         order_date| status|               lat|                lon|
+--------------------+--------------------+-------------------+------------------+-------------------+-------+------------------+-------------------+
|ord-36638d46-b028...|cus-031007c2-3d8f...| 1.6200000643730164|             87.48|2024-06-23 16:18:04|PENDING| 40.34373474121094| -3.717392921447754|
|ord-5e4c69af-c3c4...|cus-af703718-b168...|0.10999999940395355|41.269999999999996|2024-06-23 16:18:11|PENDING|  40.3503532409668|-3.7048850059509277|
|ord-6f214b51-21cd...|cus-3d4bb4a4-414d...| 1.0800000429153442|             39.98|2024-06-23 16:18:17|PENDING|40.397552490234375|-3.6726293563842773|
|ord-c1902c32-c8eb...|cus-f06cb607-e927...| 1.6000000331550837|             32.29|2024-06-23 16:18:2

## 3.3 Orders Item stream

In [21]:
orders_items_stream = (
    df_order_stream
    .withColumn("exploded_order", f.explode(f.col("order_details.items")))
    .select(
        f.col("order_id"),
        f.col("exploded_order.product_id").alias("product_id"),
        f.col("exploded_order.product_name").alias("product_name"),
        f.col("exploded_order.quantity").alias("quantity"),
        f.col("exploded_order.price").alias("price"),
        f.col("exploded_order.weight").alias("weight"),
    )
    .writeStream
    .format("delta")
    .outputMode("append")
    .trigger(processingTime=PROCESSING_TRIGGER)
    # .trigger(availableNow=True)
    .option("path", ORDERS_ITEMS_PATH)
    .option("checkpointLocation", ORDERS_ITEMS_CHECKPOINT_LOCATION)
    .toTable(ORDERS_ITEMS_TABLE)
)




In [22]:
orders_items_stream.isActive

True


In [23]:
orders_items_stream.status

{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}


In [24]:
orders_items_stream.lastProgress




In [25]:
(
    spark.table(ORDERS_ITEMS_TABLE)
).show()

+--------------------+--------------------+--------------------+--------+------+--------------------+
|            order_id|          product_id|        product_name|quantity| price|              weight|
+--------------------+--------------------+--------------------+--------+------+--------------------+
|ord-36638d46-b028...|pro-a074a114158f5...|Lulu Jr. Illustor...|       3| 29.16|  0.5400000214576721|
|ord-5e4c69af-c3c4...|pro-b3eb1e8f2f7ec...|MightySkins Skin ...|       2| 14.99|0.029999999329447746|
|ord-5e4c69af-c3c4...|pro-a92a8f8ad237d...|Armored Black Knight|       1| 11.29| 0.05000000074505806|
|ord-6f214b51-21cd...|pro-7f13d11a5f3e7...|Baby Einstein Mus...|       2| 19.99|  0.5400000214576721|
|ord-c1902c32-c8eb...|pro-2a0fcbd064f9d...|Raymond Geddes & ...|       1|  24.3|   1.590000033378601|
|ord-c1902c32-c8eb...|pro-acee8a6c2fd41...|Pudgy Pedro's Pla...|       1|  7.99|0.009999999776482582|
|ord-09986af9-0257...|pro-e6d8ff80f012c...|KidKraft Bookcase...|       1| 68.99|  

In [None]:
(
    spark.read
    .format("delta")
    .load(BRONZE_ORDERS_PATH)
    .count()
)

In [None]:
df_orders_bronze = (
    spark.read
    .format("delta")
    .load(BRONZE_ORDERS_PATH)
)

In [None]:
df_orders_bronze.show()

In [None]:
(
    df_orders_bronze
    .select(
        f.col("order_id"),
        f.col("order_details.total_weight").alias("total_weight"),
        f.col("order_details.destination_address.address_id").alias("address_id"),
        f.col("order_details.destination_address.neighborhood").alias("neighborhood"),
        f.col("order_details.destination_address.coordinates").alias("coordinates"),
        f.col("order_details.destination_address.road").alias("road"),
        f.col("order_details.destination_address.house_number").alias("house_number"),
        f.col("order_details.destination_address.suburb").alias("suburb"),
        f.col("order_details.destination_address.city_district").alias("city_district"),
        f.col("order_details.destination_address.state").alias("state"),
        f.col("order_details.destination_address.postcode").alias("postcode"),
        f.col("order_details.destination_address.country").alias("country"),
        f.col("order_details.destination_address.lat").alias("lat"),
        f.col("order_details.destination_address.lon").alias("lon"),
    )
    .show()
)

In [None]:
(
    df_orders_bronze
    .withColumn("exploded_order", f.explode(f.col("order_details.items")))
    .select(
        f.col("order_id"),
        f.col("exploded_order.product_id").alias("product_id"),
        f.col("exploded_order.product_name").alias("product_name"),
        f.col("exploded_order.quantity").alias("quantity"),
        f.col("exploded_order.price").alias("price"),
        f.col("exploded_order.weight").alias("weight"),
    )
    .show()
)

In [None]:
# weight treshold 
(
    df_orders_bronze
    .select(
        f.sum(f.col("order_details.total_weight")).alias("total_weight")
    ).collect()[0]["total_weight"]
)

In [None]:
(
    spark.table("gold_products_table")
    .show()
    
)

## OLD TESTS 

In [None]:
event_stream.stop()