In [0]:
%run "/Workspace/Users/samuel.barroscatarino@educ.sasserno.fr/musicstreamapp/databricks/01_Initialize_Setting"

In [0]:
from pyspark.sql.functions import col, from_json, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType, BooleanType

In [0]:
# Kafka connection parameters
kafka_bootstrap_servers = "172.212.37.12:9092"

# Unity Catalog paths
catalog_name = "music_streaming"
schema_name = "raw"

In [0]:
# Define schemas for each event type
base_schema = StructType([
    StructField("ts", LongType(), True),
    StructField("sessionId", StringType(), True),
    StructField("userId", StringType(), True),
    StructField("auth", StringType(), True),
    StructField("level", StringType(), True),
    StructField("itemInSession", LongType(), True),
    StructField("city", StringType(), True),
    StructField("zip", StringType(), True),
    StructField("state", StringType(), True),
    StructField("userAgent", StringType(), True),
    StructField("lon", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("firstName", StringType(), True),
    StructField("lastName", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("registration", LongType(), True)
])

page_view_schema = StructType(base_schema.fields + [
    StructField("page", StringType(), True),
    StructField("method", StringType(), True),
    StructField("status", LongType(), True)
])

listen_schema = StructType(base_schema.fields + [
    StructField("artist", StringType(), True),
    StructField("song", StringType(), True),
    StructField("duration", LongType(), True)
])

auth_schema = StructType(base_schema.fields + [
    StructField("success", BooleanType(), True),
    StructField("method", StringType(), True),
    StructField("status", LongType(), True)
])

status_change_schema = StructType(base_schema.fields + [
    StructField("prevLevel", StringType(), True),
    StructField("method", StringType(), True),
    StructField("status", LongType(), True),
    StructField("statusChangeType", StringType(), True)
])

In [0]:
# Map topics to their schemas
schemas = {
    "page_view_events": page_view_schema,
    "listen_events": listen_schema,
    "auth_events": auth_schema,
    "status_change_events": status_change_schema
}

In [0]:
def create_kafka_stream(topic):
    """Create a Kafka stream for a given topic with settings optimized for batch jobs."""
    # Check if checkpoint exists
    try:
        dbutils.fs.ls(f"{checkpoint_path}/raw/{topic}/")
        checkpoint_exists = True
    except:
        checkpoint_exists = False
    
    # Set starting offset based on checkpoint existence
    # For first run: use "earliest" to get all historical data
    # For subsequent runs: rely on checkpoint (no need to specify startingOffsets)
    kafka_options = {
        "kafka.bootstrap.servers": kafka_bootstrap_servers,
        "subscribe": topic,
        "failOnDataLoss": "false",
        "kafka.security.protocol": "PLAINTEXT"
    }

    # Only add startingOffsets for first run
    if not checkpoint_exists:
        kafka_options["startingOffsets"] = "earliest"
    
    # Create the stream
    return (spark.readStream
            .format("kafka")
            .options(**kafka_options)
            .load()
    )

In [0]:
def process_kafka_stream(topic, schema):
    """Process a Kafka stream and write to Unity Catalog table."""
    # Create Kafka stream
    kafka_stream_df = create_kafka_stream(topic)

    # Parse the value column from Kafka which contains the JSON data
    parsed_df = kafka_stream_df.select(
        col("timestamp").alias("kafka_timestamp"),
        col("topic").alias("kafka_topic"),
        col("partition").alias("kafka_partition"),
        col("offset").alias("kafka_offset"),
        from_json(col("value").cast("string"), schema).alias("data")
    ).select(
        "kafka_timestamp", 
        "kafka_topic", 
        "kafka_partition", 
        "kafka_offset",
        "data.*"
    )

    # Add ingestion timestamp
    parsed_df = parsed_df.withColumn("ingestion_timestamp", current_timestamp())

    # Write to Unity Catalog table
    return(parsed_df.writeStream
             .format("delta")
             .outputMode("append")
             .queryName(f"Streaming_Kafka_to_Raw_{topic}")
             .trigger(availableNow=True)
             .option("checkpointLocation", f"{checkpoint_path}/raw/{topic}/")
             .option("mergeSchema", "true")
             .toTable(f"{catalog_name}.{schema_name}.{topic}")
    )

In [0]:
# Start processing all streams
queries = []

for topic in list_tables:
    print(f"Starting stream processing for {topic}...")
    schema = schemas[topic]
    query = process_kafka_stream(topic, schema)
    queries.append(query)
    print(f"Stream processing started for {topic}")

In [0]:
# Monitor each query
for i, (topic) in enumerate(list_tables):
    print(f"\nStatus for {topic}:")
    print(queries[i].status)

In [0]:
# Example queries for each table
for topic in list_tables:
    print(f"\nSample data from {topic}:")
    display(spark.sql(f"SELECT * FROM {catalog_name}.{schema_name}.{topic} LIMIT 5"))

In [0]:
# Example queries for each table
for topic in list_tables:
    print(f"\nSample data from {topic}:")
    display(spark.sql(f"SELECT COUNT(1) FROM {catalog_name}.{schema_name}.{topic}"))

In [0]:
# Stop all streams
for i, (topic) in enumerate(list_tables):
    print(f"Stopping stream for {topic}...")
    queries[i].stop()
    print(f"Stream stopped for {topic}")