In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .remote("sc://spark-connect-server:15002")
        .appName("streaming-example")
        .config("hive.metastore.uris", "thrift://hive-cluster-metastore:9083")
        .enableHiveSupport()
        .getOrCreate()
)

spark.addArtifacts("/stackable/spark/connect/spark-connect-4.0.1.jar")

See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. SQLSTATE: 46110


Delete checkpoint:
`kubectl -n bigdata exec -c namenode -it hdfs-cluster-namenode-default-0 -- /bin/bash -c "./bin/hdfs dfs -rm -r /tmp/wordcount-checkpoint"`

In [8]:
import requests
import json
from pyspark.sql import functions as F
try:
    from pyspark.sql.avro.functions import from_avro
except ImportError:
    pass

# Stop any existing streaming query
try:
    if 'query' in globals():
        query.stop()
        print("Stopped previous query")
except NameError:
    pass
except Exception as e:
    print(f"Note: {e}")

# Configuration
SCHEMA_REGISTRY_URL = "http://schema-registry.bigdata.svc.cluster.local:8081"
KAFKA_BOOTSTRAP = "kafka-broker.bigdata.svc.cluster.local:9092"

def get_latest_schema(subject):
    """Fetch latest schema string from Schema Registry"""
    try:
        url = f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions/latest"
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.json()["schema"]
    except Exception as e:
        print(f"Error fetching schema for {subject}: {e}")
        raise

# 1. Fetch Schemas
try:
    bike_schema_json = get_latest_schema("bike-data-value")
    weather_schema_json = get_latest_schema("weather-data-value")
except Exception as e:
    print("Failed to fetch schemas.")
    raise e

# 2. Read Bike Data
bikes = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
    .option("subscribe", "bike-data") 
    .option("startingOffsets", "latest")
    .load()
    .select(
        from_avro(
            F.expr("substring(value, 6, length(value)-5)"), 
            bike_schema_json
        ).alias("bike")
    )
    # Explicitly select and cast to ensure timestamp type (avoiding ambiguous column replacement)
    .select(
        F.col("bike.tripduration"),
        F.col("bike.start_station_id"),
        F.col("bike.end_station_id"),
        F.to_timestamp(F.col("bike.starttime")).alias("starttime")
    )
    .withColumn("join_key", F.lit(1))
    .alias("bikes")
    .withWatermark("starttime", "10 minutes")
)

# 3. Read Weather Data
weather = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
    .option("subscribe", "weather-data")
    .option("startingOffsets", "latest")
    .load()
    .select(
        from_avro(
            F.expr("substring(value, 6, length(value)-5)"), 
            weather_schema_json
        ).alias("weather")
    )
    # Explicitly select and cast
    .select(
        F.col("weather.dry_bulb_temperature_celsius"),
        F.to_timestamp(F.col("weather.observation_date")).alias("observation_date")
    )
    .withColumn("join_key", F.lit(1))
    .alias("weather")
    .withWatermark("observation_date", "10 minutes")
)

# 4. Interval Join & Nearest Neighbor Logic
joined = bikes.join(
    weather,
    (F.col("bikes.join_key") == F.col("weather.join_key")) &
    (F.col("weather.observation_date") >= F.col("bikes.starttime") - F.expr("interval 1 hour")) &
    (F.col("weather.observation_date") <= F.col("bikes.starttime") + F.expr("interval 1 hour")),
    "inner"
).withColumn(
    "time_diff", 
    F.abs(F.col("bikes.starttime").cast("long") - F.col("weather.observation_date").cast("long"))
)

# Aggregate
result = joined.groupBy(
    "bikes.tripduration", "bikes.starttime", "bikes.start_station_id", "bikes.end_station_id"
).agg(
    F.min_by(
        F.struct("weather.observation_date", "weather.dry_bulb_temperature_celsius"), 
        F.col("time_diff")
    ).alias("nearest_weather")
).select(
    F.col("starttime"),
    F.col("start_station_id"),
    F.col("end_station_id"),
    F.col("nearest_weather.dry_bulb_temperature_celsius").alias("temp_c"),
    F.col("nearest_weather.observation_date").alias("weather_ts"),
    F.col("tripduration")
)

# 5. Write Output
payload = result.select(
    F.col("start_station_id").alias("key"),
    F.to_json(F.struct("*")).alias("value")
)

query = (
    payload.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
    .option("topic", "nearest-weather-output")
    .option("checkpointLocation", "/tmp/nearest-weather-checkpoint")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

print("Nearest weather per ride streaming query started (Type safe version)")

Stopped previous query
Nearest weather per ride streaming query started (Type safe version)


In [None]:
# Commands to view word count results:
#
# 1. Create the output topic (if it doesn't exist):
#    kubectl exec -n bigdata -it deployment/broker -- /opt/kafka/bin/kafka-topics.sh --create --topic wordcount-output --bootstrap-server broker:29092 --partitions 1 --replication-factor 1
#
# 2. View word counts from the output topic (from beginning):
#    kubectl exec -n bigdata -it deployment/broker -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:29092 --topic wordcount-output --from-beginning
#
# 3. View only new messages (real-time):
#    kubectl exec -n bigdata -it deployment/broker -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:29092 --topic wordcount-output
#
# 4. List all topics:
#    kubectl exec -n bigdata -it deployment/broker -- /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server broker:29092


In [None]:
# Keep the query running
# Press Ctrl+C or interrupt the kernel to stop
query.awaitTermination()


In [None]:
!pip install kafka-python

In [None]:
# Helper: Send test messages with text for word counting
import kafka
import json
import time

# Create producer
producer = kafka.KafkaProducer(
    bootstrap_servers=['kafka-broker.bigdata.svc.cluster.local:9092'],
    value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else v
)

# Sample sentences for word count testing
test_messages = [
    "The quick brown fox jumps over the lazy dog",
    "Spark streaming is awesome for real-time processing",
    "Kafka and Spark work great together",
    "Word count is a classic example of stream processing",
    "The fox jumps and the dog runs",
    "Real-time analytics with Spark and Kafka",
    "Streaming data processing made easy",
    "Count words in real-time with Spark Streaming"
]

print("Sending test messages for word counting...")
for i, message in enumerate(test_messages):
    producer.send('sparktest', value=message)
    print(f"Sent: {message}")
    time.sleep(0.5)  # Wait 0.5 seconds between messages (faster for demo)

producer.flush()
print("\nAll test messages sent! Check the streaming output above for word counts.")
