In [None]:
#simulating traffic to Kafka
from datetime import datetime
import json
import random
import time
import logging

from kafka import KafkaProducer
from kafka.errors import KafkaError
import pandas as pd

producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

print("sorting data...")
pings = pd.read_csv("Pet Cats United States.csv")
pings = pings.sort_values(by="timestamp")
pings = pings.head(2000)     # Remove this to test the full dataset

print("Simulating traffic...")
for index, row in pings.iterrows():
    dt = row
    future = producer.send('ingest', dt.to_dict())
    try:
        record_metadata = future.get(timeout=10)
    except KafkaError:
        logging.exception("Failed to send message")
        pass


sorting data...


  pings = pd.read_csv("Pet Cats United States.csv")


Simulating traffic...


In [None]:
# init spark
from IPython.display import display, clear_output
from time import sleep

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a local Spark cluster with two executors (if it doesn't already exist)
spark_session = SparkSession.builder.master('local[2]').getOrCreate()
sc = spark_session.sparkContext


In [None]:
# Create a streaming DataFrame that reads from the Kafka topic
input = (
    spark_session.readStream.format("kafka")
    .option("kafka.bootstrap.servers","kafka:9092")
    .option("subscribe", "ingest")
    .option("startingOffsets", "earliest")
    .load()
)

display(input.isStreaming)


True

In [None]:
# Decode the Kafka message and convert it to a DataFrame
stream_decoded = (
    input.withColumn("value", input["value"].cast("string"))
    .select("value","timestamp")
)

In [5]:

try:
    # In case the previous query wasn't stopped
    tq.stop() # type: ignore
except:
    pass

tq = (
    # Create an output stream
    stream_decoded.writeStream               
    # Only write new rows to the output
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(5)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)

memory_sink = spark_session.table("test_query")

# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas())

# Stop the query
tq.stop()

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': True}

Unnamed: 0,value,timestamp
0,"{""event-id"": 238900963, ""visible"": true, ""time...",2025-03-31 12:04:40.271
1,"{""event-id"": 238898622, ""visible"": true, ""time...",2025-03-31 12:04:40.291
2,"{""event-id"": 238898623, ""visible"": true, ""time...",2025-03-31 12:04:40.295
3,"{""event-id"": 238900964, ""visible"": true, ""time...",2025-03-31 12:04:40.297
4,"{""event-id"": 238900965, ""visible"": true, ""time...",2025-03-31 12:04:40.300
...,...,...
67180,"{""event-id"": 238901638, ""visible"": true, ""time...",2025-04-06 20:29:14.854
67181,"{""event-id"": 238899642, ""visible"": true, ""time...",2025-04-06 20:29:14.855
67182,"{""event-id"": 238901639, ""visible"": true, ""time...",2025-04-06 20:29:14.855
67183,"{""event-id"": 238899643, ""visible"": true, ""time...",2025-04-06 20:29:14.856


In [6]:
#The csv file has it's own timestamp
input_without_timestamp = (input.drop("timestamp"))  # Drop the default Kafka 'timestamp' column

In [25]:
#cast json to columns
# Define the schema
schema = StructType([
    StructField("event-id", StringType(), True),
    StructField("visible", BooleanType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("location-long", DoubleType(), True),
    StructField("location-lat", DoubleType(), True),
    StructField("algorithm-marked-outlier", BooleanType(), True),  # Assuming the field can be null or empty
    StructField("ground-speed", DoubleType(), True),
    StructField("heading", DoubleType(), True),
    StructField("height-above-ellipsoid", DoubleType(), True),
    StructField("manually-marked-outlier", BooleanType(), True),  # Assuming the field can be null or empty
    StructField("sensor-type", StringType(), True),
    StructField("individual-taxon-canonical-name", StringType(), True),
    StructField("tag-local-identifier", StringType(), True),
    StructField("individual-local-identifier", StringType(), True),
    StructField("study-name", StringType(), True)
])

decoded_json_stream = (
    input_without_timestamp
    .withColumn("parsed_value", from_json(col("value").cast("string"), schema))
)


In [None]:
# Flatten the DataFrame by selecting all fields from the parsed JSON
flattened_stream = (
    decoded_json_stream.select("parsed_value.*")
)

In [27]:

try:
    # In case the previous query wasn't stopped
    tq.stop() # type: ignore
except:
    pass

tq = (
    # Create an output stream
    flattened_stream.writeStream               
    # Only write new rows to the output
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(5)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)

memory_sink = spark_session.table("test_query")

# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas())
display(memory_sink.dtypes)
# Stop the query
tq.stop()

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

Unnamed: 0,event-id,visible,timestamp,location-long,location-lat,algorithm-marked-outlier,ground-speed,heading,height-above-ellipsoid,manually-marked-outlier,sensor-type,individual-taxon-canonical-name,tag-local-identifier,individual-local-identifier,study-name
0,238900963,True,2013-08-14 15:52:57,-78.665771,35.792908,,252.0,0.0,26.825448,,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
1,238898622,True,2013-08-14 16:05:13,-78.647598,35.790443,,0.0,164.0,6.089904,,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States
2,238898623,True,2013-08-14 16:08:31,-78.647354,35.790867,,0.0,266.0,16.858488,,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States
3,238900964,True,2013-08-14 16:08:53,-78.665771,35.792908,,252.0,0.0,26.825448,,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
4,238900965,True,2013-08-14 16:12:37,-78.666000,35.792862,,756.0,355.0,26.727912,,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
67180,238901638,True,2013-08-18 19:54:09,-78.665733,35.793030,,2916.0,323.0,37.109400,,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
67181,238899642,True,2013-08-18 19:55:38,-78.645630,35.790726,,360.0,327.0,52.739544,,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States
67182,238901639,True,2013-08-18 19:57:18,-78.665848,35.792927,,6660.0,124.0,36.889944,,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
67183,238899643,True,2013-08-18 19:58:42,-78.645645,35.790592,,432.0,127.0,45.137832,,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States


[('event-id', 'string'),
 ('visible', 'boolean'),
 ('timestamp', 'timestamp'),
 ('location-long', 'double'),
 ('location-lat', 'double'),
 ('algorithm-marked-outlier', 'boolean'),
 ('ground-speed', 'double'),
 ('heading', 'double'),
 ('height-above-ellipsoid', 'double'),
 ('manually-marked-outlier', 'boolean'),
 ('sensor-type', 'string'),
 ('individual-taxon-canonical-name', 'string'),
 ('tag-local-identifier', 'string'),
 ('individual-local-identifier', 'string'),
 ('study-name', 'string')]

In [None]:
cleaned_stream = (
    flattened_stream
    .filter((col("algorithm-marked-outlier").isNull()) &(col("manually-marked-outlier").isNull()))  # verwijder outliers
    .drop("algorithm-marked-outlier", "manually-marked-outlier")  # deze kolommen verwijderen
    .dropna( # remove rows with null values
        subset=["event-id","visible", "location-long", "location-lat","ground-speed","heading","height-above-ellipsoid", "sensor-type", "individual-taxon-canonical-name",
                "tag-local-identifier", "individual-local-identifier", "study-name"]
    ))

In [38]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
except:
    pass

tq = (
    # Create an output stream
    cleaned_stream.writeStream               
    # Only write new rows to the output
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(10)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)
memory_sink = spark_session.table("test_query")
# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas())
display(memory_sink.dtypes)

# Stop the query
tq.stop()

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

Unnamed: 0,event-id,visible,timestamp,location-long,location-lat,ground-speed,heading,height-above-ellipsoid,sensor-type,individual-taxon-canonical-name,tag-local-identifier,individual-local-identifier,study-name
0,238900963,True,2013-08-14 15:52:57,-78.665771,35.792908,252.0,0.0,26.825448,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
1,238898622,True,2013-08-14 16:05:13,-78.647598,35.790443,0.0,164.0,6.089904,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States
2,238898623,True,2013-08-14 16:08:31,-78.647354,35.790867,0.0,266.0,16.858488,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States
3,238900964,True,2013-08-14 16:08:53,-78.665771,35.792908,252.0,0.0,26.825448,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
4,238900965,True,2013-08-14 16:12:37,-78.666000,35.792862,756.0,355.0,26.727912,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
...,...,...,...,...,...,...,...,...,...,...,...,...,...
38645,238901638,True,2013-08-18 19:54:09,-78.665733,35.793030,2916.0,323.0,37.109400,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
38646,238899642,True,2013-08-18 19:55:38,-78.645630,35.790726,360.0,327.0,52.739544,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States
38647,238901639,True,2013-08-18 19:57:18,-78.665848,35.792927,6660.0,124.0,36.889944,gps,Felis catus,sweetpeatag,Sweetpea,Pet Cats United States
38648,238899643,True,2013-08-18 19:58:42,-78.645645,35.790592,432.0,127.0,45.137832,gps,Felis catus,motorcattag,Motorcat,Pet Cats United States


[('event-id', 'string'),
 ('visible', 'boolean'),
 ('timestamp', 'timestamp'),
 ('location-long', 'double'),
 ('location-lat', 'double'),
 ('ground-speed', 'double'),
 ('heading', 'double'),
 ('height-above-ellipsoid', 'double'),
 ('sensor-type', 'string'),
 ('individual-taxon-canonical-name', 'string'),
 ('tag-local-identifier', 'string'),
 ('individual-local-identifier', 'string'),
 ('study-name', 'string')]

In [39]:
output_stream = (
    cleaned_stream
    .select(to_json(struct("*")).alias("value"))  # Convert all columns into a JSON-encoded "value"
)

In [40]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
except:
    pass

tq = (
    # Create an output stream
    output_stream.writeStream               
    # Only write new rows to the output
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(10)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)

memory_sink = spark_session.table("test_query")

# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas())
display(memory_sink.dtypes)

# Stop the query
tq.stop()

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

Unnamed: 0,value
0,"{""event-id"":""238900963"",""visible"":true,""timest..."
1,"{""event-id"":""238898622"",""visible"":true,""timest..."
2,"{""event-id"":""238898623"",""visible"":true,""timest..."
3,"{""event-id"":""238900964"",""visible"":true,""timest..."
4,"{""event-id"":""238900965"",""visible"":true,""timest..."
...,...
38645,"{""event-id"":""238901638"",""visible"":true,""timest..."
38646,"{""event-id"":""238899642"",""visible"":true,""timest..."
38647,"{""event-id"":""238901639"",""visible"":true,""timest..."
38648,"{""event-id"":""238899643"",""visible"":true,""timest..."


[('value', 'string')]

In [43]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
    # Remove old checkpoint dir
    os.rmdir("checkpoints-cleanup")
except:
    pass

# Prepare df for Kafka and write to kafka
tq = (
    output_stream
    .writeStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "ingest-cleaned")
    .option("checkpointLocation", "checkpoints-cleanup")
    .start()
)

sleep(10)
display(tq.status)


{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [42]:
memory_sink = spark_session.table("test_query")
display(memory_sink.toPandas())

Unnamed: 0,value
0,"{""event-id"":""238900963"",""visible"":true,""timest..."
1,"{""event-id"":""238898622"",""visible"":true,""timest..."
2,"{""event-id"":""238898623"",""visible"":true,""timest..."
3,"{""event-id"":""238900964"",""visible"":true,""timest..."
4,"{""event-id"":""238900965"",""visible"":true,""timest..."
...,...
38645,"{""event-id"":""238901638"",""visible"":true,""timest..."
38646,"{""event-id"":""238899642"",""visible"":true,""timest..."
38647,"{""event-id"":""238901639"",""visible"":true,""timest..."
38648,"{""event-id"":""238899643"",""visible"":true,""timest..."
