In [81]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, explode, lag
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, BooleanType, LongType, TimestampType, IntegerType
from pyspark.sql.window import Window

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'


In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/homebrew/Caskroom/miniforge/base/envs/de/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/julnow/.ivy2/cache
The jars for the packages stored in: /Users/julnow/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b9dbafaf-efbf-49c3-b744-245299e1bd7d;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-l

In [72]:
schema = StructType([
    StructField("data", StructType([
        StructField("stations", ArrayType(StructType([
            StructField("is_installed", BooleanType(), nullable=True),
            StructField("is_renting", BooleanType(), nullable=True),
            StructField("is_returning", BooleanType(), nullable=True),
            StructField("last_reported", LongType(), nullable=True),
            StructField("num_bikes_available", LongType(), nullable=True),
            StructField("num_docks_available", LongType(), nullable=True),
            StructField("station_id", StringType(), nullable=True)
        ])))
    ])),
    StructField("last_updated", TimestampType(), nullable=True),
    StructField("ttl", LongType(), nullable=True),
    StructField("version", StringType(), nullable=True)
])

In [73]:
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "station_status"

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load()

In [74]:
# Parse value from binay to string
json_df = df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
from pyspark.sql.functions import from_json

json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], schema)).select("value.*") 

In [75]:
from pyspark.sql.functions import explode, col

exploded_df = json_expanded_df \
    .select("last_updated", "data") \
    .withColumn("station_data", explode("data.stations")) \
    .select("last_updated", 
            col("station_data.num_bikes_available").alias("num_bikes_available"), 
            col("station_data.station_id").alias("station_id"))


In [85]:
from pyspark.sql.functions import split, window, current_timestamp, udf

windowed_df = exploded_df \
    .withWatermark("last_updated", "1 minute") \
    .withColumn("window", window(col("last_updated"), "1 minute")) \
    .groupBy("window", "station_id") \
    .agg({"num_bikes_available": "max"}) \
    .withColumnRenamed("max(num_bikes_available)", "num_bikes_available")

# Define a window to order by station_id and last_updated
windowSpec = Window.partitionBy("station_id").orderBy("window.start")

# Define a UDF to calculate the difference between consecutive values
@udf(IntegerType())
def calculate_difference(current_val, prev_val):
    if prev_val is None:
        return None
    else:
        return current_val - prev_val

# Calculate the difference between consecutive updates within each window
diff_df = windowed_df.withColumn("prev_num_bikes_available", lag("num_bikes_available").over(windowSpec))
diff_df = diff_df.withColumn("bike_difference", calculate_difference(col("num_bikes_available"), col("prev_num_bikes_available")))

In [86]:
writing_df = diff_df.writeStream \
    .format("console") \
    .option("checkpointLocation","checkpoint_dir") \
    .start()
    
    
# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
writing_df.awaitTermination()

24/04/14 18:19:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;
Window [lag(num_bikes_available#1554L, -1, null) windowspecdefinition(station_id#1304, _w0#1560 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS prev_num_bikes_available#1558L], [station_id#1304], [_w0#1560 ASC NULLS FIRST]
+- Project [window#1539-T60000ms, station_id#1304, num_bikes_available#1554L, window#1539-T60000ms.start AS _w0#1560]
   +- Project [window#1539-T60000ms, station_id#1304, max(num_bikes_available)#1549L AS num_bikes_available#1554L]
      +- Aggregate [window#1539-T60000ms, station_id#1304], [window#1539-T60000ms, station_id#1304, max(num_bikes_available#1303L) AS max(num_bikes_available)#1549L]
         +- Project [last_updated#1288-T60000ms, num_bikes_available#1303L, station_id#1304, window#1540-T60000ms AS window#1539-T60000ms]
            +- Project [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - 0) % 60000000) + 60000000) ELSE ((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - 0) % 60000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - 0) % 60000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - 0) % 60000000) + 60000000) ELSE ((precisetimestampconversion(last_updated#1288-T60000ms, TimestampType, LongType) - 0) % 60000000) END) - 0) + 60000000), LongType, TimestampType))) AS window#1540-T60000ms, last_updated#1288-T60000ms, num_bikes_available#1303L, station_id#1304]
               +- Filter isnotnull(last_updated#1288-T60000ms)
                  +- EventTimeWatermark last_updated#1288: timestamp, 1 minutes
                     +- Project [last_updated#1288, station_data#1299.num_bikes_available AS num_bikes_available#1303L, station_data#1299.station_id AS station_id#1304]
                        +- Project [last_updated#1288, data#1287, station_data#1299]
                           +- Generate explode(data#1287.stations), false, [station_data#1299]
                              +- Project [last_updated#1288, data#1287]
                                 +- Project [value#1285.data AS data#1287, value#1285.last_updated AS last_updated#1288, value#1285.ttl AS ttl#1289L, value#1285.version AS version#1290]
                                    +- Project [from_json(StructField(data,StructType(StructField(stations,ArrayType(StructType(StructField(is_installed,BooleanType,true),StructField(is_renting,BooleanType,true),StructField(is_returning,BooleanType,true),StructField(last_reported,LongType,true),StructField(num_bikes_available,LongType,true),StructField(num_docks_available,LongType,true),StructField(station_id,StringType,true)),true),true)),true), StructField(last_updated,TimestampType,true), StructField(ttl,LongType,true), StructField(version,StringType,true), value#1283, Some(Europe/Warsaw)) AS value#1285]
                                       +- Project [cast(value#1270 as string) AS value#1283]
                                          +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@4aca2f6b, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@78a95c17, [kafka.bootstrap.servers=localhost:9092, subscribe=station_status], [key#1269, value#1270, topic#1271, partition#1272, offset#1273L, timestamp#1274, timestampType#1275], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6e2c5d73,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> station_status),None), kafka, [key#1262, value#1263, topic#1264, partition#1265, offset#1266L, timestamp#1267, timestampType#1268]


                                                                                

-------------------------------------------
Batch: 45
-------------------------------------------
+------------+-------------------+----------+
|last_updated|num_bikes_available|station_id|
+------------+-------------------+----------+
|  1713111597|                  2|    448565|
|  1713111597|                  0|   2585259|
|  1713111597|                  1|   2585263|
|  1713111597|                  0|   2585265|
|  1713111597|                  0|   2585267|
|  1713111597|                  0|   2585269|
|  1713111597|                  1|   2585271|
|  1713111597|                  0|   2585273|
|  1713111597|                  0|   2585275|
|  1713111597|                  6|   2585278|
|  1713111597|                  4|   2585280|
|  1713111597|                  0|   2585281|
|  1713111597|                  1|   2585283|
|  1713111597|                  1|   2585284|
|  1713111597|                  7|   2585285|
|  1713111597|                  2|   2585286|
|  1713111597|              

In [93]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, lag, sum as spark_sum, min as spark_min, max as spark_max, when, abs
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, BooleanType, LongType, TimestampType
from datetime import datetime, timedelta

import os

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .getOrCreate()

schema = StructType([
    StructField("data", StructType([
        StructField("stations", ArrayType(StructType([
            StructField("is_installed", BooleanType(), nullable=True),
            StructField("is_renting", BooleanType(), nullable=True),
            StructField("is_returning", BooleanType(), nullable=True),
            StructField("last_reported", LongType(), nullable=True),
            StructField("num_bikes_available", LongType(), nullable=True),
            StructField("num_docks_available", LongType(), nullable=True),
            StructField("station_id", StringType(), nullable=True)
        ])))
    ])),
    StructField("last_updated", LongType(), nullable=True),
    StructField("ttl", LongType(), nullable=True),
    StructField("version", StringType(), nullable=True)
])

# Read the data in batch mode
df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "station_status") \
    .load()

# Parse value from binary to string
json_df = df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], schema)).select("value.*") 

# Explode the data
exploded_df = json_expanded_df \
    .select("last_updated", "data") \
    .withColumn("station_data", explode("data.stations")) \
    .select("last_updated", 
            col("station_data.num_bikes_available").alias("num_bikes_available"), 
            col("station_data.station_id").alias("station_id"))

# Filter data only from the last hour
one_hour_ago = int((datetime.now() - timedelta(hours=1)).timestamp())
filtered_df = exploded_df.filter(col("last_updated") >= one_hour_ago)

# Define a UDF to calculate the difference between consecutive values
@udf(IntegerType())
def calculate_difference(current_val, prev_val):
    if prev_val is None:
        return None
    else:
        return current_val - prev_val

# Calculate the bike difference
windowSpec = Window.partitionBy("station_id").orderBy("last_updated")
diff_df = filtered_df.withColumn("prev_num_bikes_available", lag("num_bikes_available").over(windowSpec))
diff_df = diff_df.withColumn("bike_difference", calculate_difference(col("num_bikes_available"), col("prev_num_bikes_available")))

# Calculate num_rentals (absolute value of negative bike differences) and num_returns
rentals_returns_df = diff_df.groupBy("station_id").agg(
    spark_min("last_updated").alias("first_timestamp"),
    spark_max("last_updated").alias("last_timestamp"),
    spark_sum(when(col("bike_difference") < 0, abs(col("bike_difference")))).alias("num_rentals"),
    spark_sum(when(col("bike_difference") > 0, col("bike_difference"))).alias("num_returns")
)

# Show the result
rentals_returns_df.show(truncate=False)

# Stop the SparkSession
spark.stop()



24/04/14 18:32:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/14 18:32:42 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/04/14 18:32:42 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/04/14 18:32:42 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/04/14 18:32:42 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/04/14 18:32:42 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
                                                                                

+----------+---------------+--------------+-----------+-----------+
|station_id|first_timestamp|last_timestamp|num_rentals|num_returns|
+----------+---------------+--------------+-----------+-----------+
|2585367   |1713108790     |1713112331    |8          |5          |
|2585774   |1713108790     |1713112331    |1          |1          |
|2585353   |1713108790     |1713112331    |10         |6          |
|2585286   |1713108790     |1713112331    |4          |3          |
|2585978   |1713108790     |1713112331    |6          |4          |
|2586024   |1713108790     |1713112331    |7          |4          |
|2585387   |1713108790     |1713112331    |8          |7          |
|2585811   |1713108790     |1713112331    |6          |5          |
|2585288   |1713108790     |1713112331    |6          |8          |
|2585335   |1713108790     |1713112331    |4          |3          |
|2585995   |1713108790     |1713112331    |2          |4          |
|276784510 |1713108790     |1713112331    |1    