In [16]:
# -------------------------
# 1. Install Java and Spark + findspark
# -------------------------
!apt-get install openjdk-8-jdk -y
!pip install -q findspark azure-eventhub

import os
import subprocess
import shutil
import json
import time
import pandas as pd
from azure.eventhub import EventHubProducerClient, EventData
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import from_json, expr
from pyspark.sql import SparkSession

# Descargar Spark
spark_version = subprocess.run(
    "curl -s https://downloads.apache.org/spark/ | grep -o 'spark-3\\.[0-9]\\+\\.[0-9]\\+' | sort -V | tail -1",
    shell=True, capture_output=True, text=True
).stdout.strip()

spark_release = spark_version
hadoop_version = "hadoop3"
tgz_file = f"{spark_release}-bin-{hadoop_version}.tgz"
spark_dir = f"{spark_release}-bin-{hadoop_version}"
spark_url = f"https://downloads.apache.org/spark/{spark_release}/{tgz_file}"

if not os.path.exists(tgz_file):
    !wget -q {spark_url}
if os.path.exists(spark_dir):
    shutil.rmtree(spark_dir)
!tar xf {tgz_file}

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_dir}"

import findspark
findspark.init()

# -------------------------
# 2. Create SparkSession
# -------------------------
spark = SparkSession.builder \
    .appName("UberStreamingPipeline") \
    .config("spark.jars.packages", ",".join([
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
        "org.apache.spark:spark-avro_2.12:3.4.1",
        "org.apache.hadoop:hadoop-azure:3.3.1",
        "com.microsoft.azure:azure-storage:8.6.6"
    ])) \
    .config("spark.sql.extensions", "org.apache.spark.sql.avro.AvroSqlExtensions") \
    .getOrCreate()

# -------------------------
# 3. Configure Azure Event Hub
# -------------------------
namespace = "iesstsabbadbab-grp-01-05"
conn_str = "Endpoint=sb://iesstsabbadbab-grp-01-05.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=KFsFTha8Ojs7lrblvgJoOEkE/wL16nf0/+AEhHOIKkI="
ride_topic = "ride-events-group4"
traffic_topic = "traffic-alerts-group4"

common_kafka_options = {
    "kafka.bootstrap.servers": f"{namespace}.servicebus.windows.net:9093",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{conn_str}";',
    "startingOffsets": "earliest"
}

# -------------------------
# 4. Load events from JSON files
# -------------------------
ride_path = "/content/ride_events.json"
traffic_path = "/content/traffic_surge_alerts.json"

with open(ride_path, "r") as f:
    ride_events = json.load(f)
with open(traffic_path, "r") as f:
    traffic_alerts_raw = json.load(f)

traffic_alerts = []
for zone, alerts in traffic_alerts_raw.items():
    for alert in alerts:
        traffic_alerts.append(alert)

# -------------------------
# 5. Define schemas and read from Kafka
# -------------------------
ride_schema = StructType([
    StructField("event_id", StringType()),
    StructField("ride_id", StringType()),
    StructField("event_type", StringType()),
    StructField("uber_type", StringType()),
    StructField("start_location", StringType()),
    StructField("end_location", StringType()),
    StructField("timestamp_event", StringType()),
    StructField("price", DoubleType())
])

traffic_schema = StructType([
    StructField("alert_id", StringType()),
    StructField("timestamp", StringType()),
    StructField("zone_id", StringType()),
    StructField("traffic_level", StringType()),
    StructField("event_type", StringType()),
    StructField("surge_multiplier", DoubleType())
])

ride_df = spark.readStream \
    .format("kafka") \
    .option("subscribe", ride_topic) \
    .options(**common_kafka_options) \
    .load() \
    .selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json("json_str", ride_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp_event", expr("to_timestamp(timestamp_event)"))

traffic_df = spark.readStream \
    .format("kafka") \
    .option("subscribe", traffic_topic) \
    .options(**common_kafka_options) \
    .load() \
    .selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json("json_str", traffic_schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", expr("to_timestamp(timestamp)"))

# -------------------------
# 6. Iniciate streams in memory
# -------------------------
for s in spark.streams.active:
    s.stop()

ride_preview_query = ride_df.writeStream \
    .format("memory") \
    .queryName("ride_events_table") \
    .outputMode("append") \
    .start()

traffic_preview_query = traffic_df.writeStream \
    .format("memory") \
    .queryName("traffic_alerts_table") \
    .outputMode("append") \
    .start()

# -------------------------
# 7. Send events to the Event Hub
# -------------------------
def send_events():
    send_events_to_eventhub(ride_events, ride_topic)
    send_events_to_eventhub(traffic_alerts, traffic_topic)

for _ in range(5):  # send 5 batches
    send_events()
    time.sleep(10)

# -------------------------
# 8. Wait and show the tables
# -------------------------
time.sleep(30)

print("\n\U0001F697 Preview: Ride Events desde AVRO deserializado")
spark.sql("SELECT * FROM ride_events_table").show(truncate=False)

print("\n\U0001F6A6 Preview: Traffic Alerts desde AVRO deserializado")
spark.sql("SELECT * FROM traffic_alerts_table").show(truncate=False)

# -------------------------
# 9. Store in Azure Blob Storage
# -------------------------
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
wasbs_base_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"

ride_query = ride_df.writeStream \
    .format("parquet") \
    .option("path", wasbs_base_path + "ride_stream") \
    .option("checkpointLocation", "/content/checkpoints/ride_stream") \
    .outputMode("append") \
    .start()

traffic_query = traffic_df.writeStream \
    .format("parquet") \
    .option("path", wasbs_base_path + "traffic_stream") \
    .option("checkpointLocation", "/content/checkpoints/traffic_stream") \
    .outputMode("append") \
    .start()

# -------------------------
# 10. Monitor Streams - Debugging Output
# -------------------------
import time

try:
    print("\nMonitoring stream progress for 1 minute...\n")
    for _ in range(6):  # every 10 seconds, total 1 minute
        print(">>>RIDE STREAM STATUS:")
        print(ride_query.status)
        print("Progress:", ride_query.recentProgress)

        print("\n>>>TRAFFIC STREAM STATUS:")
        print(traffic_query.status)
        print("Progress:", traffic_query.recentProgress)

        time.sleep(10)

    print("\nMonitor finished: End of debug cycle.")

except Exception as e:
    print(f"\nError in monitoring: {e}")
finally:
    ride_query.stop()
    traffic_query.stop()
    print("\nStreams interrupted correctly.")


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-8-jdk is already the newest version (8u442-b06~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.
400 events sent to 'ride-events-group4'
100 events sent to 'traffic-alerts-group4'
400 events sent to 'ride-events-group4'
100 events sent to 'traffic-alerts-group4'
400 events sent to 'ride-events-group4'
100 events sent to 'traffic-alerts-group4'
400 events sent to 'ride-events-group4'
100 events sent to 'traffic-alerts-group4'
400 events sent to 'ride-events-group4'
100 events sent to 'traffic-alerts-group4'

Preview: Ride Events from AVRO deserialized
+------------------------------------+------------------------------------+----------------+------------+---------------------------+-------------------+--------------------------+-----+
|event_id                            |ride_id                             |event_type      |uber_type   |start_locat

In [17]:
df_from_blob = spark.read.parquet(wasbs_base_path + "ride_stream")
df_from_blob.show(5)

+--------------------+--------------------+----------------+------------+--------------------+-------------------+--------------------+-----+
|            event_id|             ride_id|      event_type|   uber_type|      start_location|       end_location|     timestamp_event|price|
+--------------------+--------------------+----------------+------------+--------------------+-------------------+--------------------+-----+
|6f1a1234-6785-450...|f08afdcf-ed3c-45b...|         Request|regular_uber| Plaza Mayor, Madrid|   Gran Via, Madrid|2025-03-14 16:20:...| NULL|
|4b2d5dc8-3d98-441...|f08afdcf-ed3c-45b...|Driver available|regular_uber| Plaza Mayor, Madrid|   Gran Via, Madrid|2025-03-14 16:20:...| NULL|
|8b779c1c-167f-4d6...|f08afdcf-ed3c-45b...|  Start car ride|regular_uber| Plaza Mayor, Madrid|   Gran Via, Madrid|2025-03-14 16:20:...| 3.94|
|058fd86a-16e9-48d...|f08afdcf-ed3c-45b...|   Ride finished|regular_uber| Plaza Mayor, Madrid|   Gran Via, Madrid|2025-03-14 16:20:...| 3.94|
|a9625

In [18]:
df_from_blob = spark.read.parquet(wasbs_base_path + "traffic_stream")
df_from_blob.show(5)

+--------------------+--------------------+----------------+-------------+---------------+----------------+
|            alert_id|           timestamp|         zone_id|traffic_level|     event_type|surge_multiplier|
+--------------------+--------------------+----------------+-------------+---------------+----------------+
|fb64b065-f0a7-4e8...|2025-02-20 00:39:...|Malasaña, Madrid|     moderate|       roadwork|             1.2|
|56ec0c76-fe23-404...|2025-02-03 11:02:...|Malasaña, Madrid|     moderate|road congestion|             1.2|
|86c9b201-8745-43e...|2025-01-21 06:20:...|Malasaña, Madrid|       severe|road congestion|             2.0|
|c851ca6c-8779-4d0...|2025-03-05 01:02:...|Malasaña, Madrid|       severe|road congestion|             2.0|
|da7e278f-1470-407...|2025-01-04 06:43:...|Malasaña, Madrid|     moderate|road congestion|             1.2|
+--------------------+--------------------+----------------+-------------+---------------+----------------+
only showing top 5 rows

