In [0]:
pip install azure-eventhub requests

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

##Batch Analysis

###Upload data

In [0]:
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/naima.rejeb@polytechnicien.tn/states_2017_06_05_00.csv")

In [0]:
df.printSchema()
df.show()

root
 |-- time: string (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- velocity: string (nullable = true)
 |-- heading: string (nullable = true)
 |-- vertrate: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- onground: string (nullable = true)
 |-- alert: string (nullable = true)
 |-- spi: string (nullable = true)
 |-- squawk: string (nullable = true)
 |-- baroaltitude: string (nullable = true)
 |-- geoaltitude: string (nullable = true)
 |-- lastposupdate: string (nullable = true)
 |-- lastcontact: string (nullable = true)

+----------+------+-------------+--------------+-------------+-------------+---------+--------+--------+-----+-----+------+------------+-----------+-------------+------------+
|      time|icao24|          lat|           lon|     velocity|      heading| vertrate|callsign|onground|alert|  spi|squawk|baroaltitude|geoaltitude|lastposupdate| lastcontact|
+----------+--

####Create temporary view

In [0]:
df.createOrReplaceTempView("raw_data")

###Data Analysis

In [0]:
spark.sql("SELECT * FROM raw_data LIMIT 10").show()


+----------+------+-------------+--------------+-------------+-------------+--------+--------+--------+-----+-----+------+------------+-----------+-------------+------------+
|      time|icao24|          lat|           lon|     velocity|      heading|vertrate|callsign|onground|alert|  spi|squawk|baroaltitude|geoaltitude|lastposupdate| lastcontact|
+----------+------+-------------+--------------+-------------+-------------+--------+--------+--------+-----+-----+------+------------+-----------+-------------+------------+
|1496620800|4bccb9| 52.084915638| 11.4453935623|175.320556641|305.321044922|    NULL|SXS2WY  |   False|False|False|  1000|     7459.98|     7581.9| 1496620800.0|1496620800.0|
|1496620800|502cb2|49.6373748779| 2.87184062757| 232.93635757|305.199349779|-0.32512|MON55BR |   False|False|False|  2770|    10988.04|   11033.76|1496620797.81|1496620800.0|
|1496620800|4bccaf|49.8504638672| 12.5227832794|196.684570312|288.764648438|    NULL|SXS7R   |   False|False|False|  3215|   

In [0]:
#Inspecting data
spark.sql("SELECT COUNT(*) FROM raw_data").show()


+--------+
|count(1)|
+--------+
| 1526689|
+--------+



In [0]:
spark.sql("SELECT DISTINCT icao24 FROM raw_data").show()


+------+
|icao24|
+------+
|45aa84|
|c04081|
|502cb5|
|4bcde4|
|400bdb|
|400a30|
|34310d|
|89629b|
|502cb2|
|44d1cc|
|406953|
|4bccb9|
|34440c|
|406696|
|780de5|
|4bccaf|
|4bccab|
|4008e1|
|4070e3|
|471f8c|
+------+
only showing top 20 rows


In [0]:
spark.sql("""
    SELECT AVG(velocity) AS avg_velocity, AVG(lat) AS avg_lat, AVG(lon) AS avg_lon
    FROM raw_data
""").show()


+------------------+----------------+------------------+
|      avg_velocity|         avg_lat|           avg_lon|
+------------------+----------------+------------------+
|183.06975094904107|31.9657066062951|-48.93514326517676|
+------------------+----------------+------------------+



In [0]:
spark.sql("""
    SELECT * FROM raw_data
    WHERE lat BETWEEN 50 AND 55
""").show()


+----------+------+-------------+---------------+-------------+-------------+---------+--------+--------+-----+-----+------+------------+-----------+-------------+------------+
|      time|icao24|          lat|            lon|     velocity|      heading| vertrate|callsign|onground|alert|  spi|squawk|baroaltitude|geoaltitude|lastposupdate| lastcontact|
+----------+------+-------------+---------------+-------------+-------------+---------+--------+--------+-----+-----+------+------------+-----------+-------------+------------+
|1496620800|4bccb9| 52.084915638|  11.4453935623|175.320556641|305.321044922|     NULL|SXS2WY  |   False|False|False|  1000|     7459.98|     7581.9| 1496620800.0|1496620800.0|
|1496620800|4008e1|50.7808470726|    9.262611866|214.205322266|292.747192383|     NULL|TCX229  |   False|False|False|  3462|     10363.2|       NULL| 1496620800.0|1496620800.0|
|1496620800|4070e3|50.7229924606|  3.82369995117|221.822621523|297.634765076|  0.32512|EXS14D  |   False|False|Fals

In [0]:
spark.sql("""
    SELECT MAX(velocity) AS max_velocity, MIN(velocity) AS min_velocity, 
           MAX(heading) AS max_heading, MIN(heading) AS min_heading
    FROM raw_data
""").show()


+------------+------------+-------------+-----------+
|max_velocity|min_velocity|  max_heading|min_heading|
+------------+------------+-------------+-----------+
|99.994204739|         0.0|99.9993781232|       -0.0|
+------------+------------+-------------+-----------+



In [0]:
data_cleaned = df.na.drop()

In [0]:
from pyspark.sql.functions import col

# Cast columns to appropriate data types
data_cleaned = df.select(
    col("time").cast("int"),
    col("icao24").cast("string"),
    col("lat").cast("double"),
    col("lon").cast("double"),
    col("velocity").cast("double"),
    col("heading").cast("double"),
    col("vertrate").cast("double"),
    col("callsign").cast("string"),
    col("onground").cast("boolean"),
    col("alert").cast("boolean"),
    col("spi").cast("boolean"),
    col("squawk").cast("int"),
    col("baroaltitude").cast("double"),
    col("geoaltitude").cast("double"),
    col("lastposupdate").cast("double"),
    col("lastcontact").cast("double")
)

# Now, data_cleaned should have the correct data types
data_cleaned.printSchema()


root
 |-- time: integer (nullable = true)
 |-- icao24: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- velocity: double (nullable = true)
 |-- heading: double (nullable = true)
 |-- vertrate: double (nullable = true)
 |-- callsign: string (nullable = true)
 |-- onground: boolean (nullable = true)
 |-- alert: boolean (nullable = true)
 |-- spi: boolean (nullable = true)
 |-- squawk: integer (nullable = true)
 |-- baroaltitude: double (nullable = true)
 |-- geoaltitude: double (nullable = true)
 |-- lastposupdate: double (nullable = true)
 |-- lastcontact: double (nullable = true)



In [0]:
spark.conf.set("spark.cassandra.connection.host", "cassandraaccount.cassandra.cosmos.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")

data_cleaned.limit(10).write.format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .option("table", "flightdata") \
    .option("keyspace", "flightdb") \
    .option("spark.cassandra.connection.ssl.enabled", "true") \
    .option("spark.cassandra.auth.username", "cassandraaccount") \
    .option("spark.cassandra.auth.password", "##") \
    .save()

In [0]:
spark.conf.set("spark.cassandra.connection.host", "cassandraaccount.cassandra.cosmos.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")

data_cleaned.write.format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .option("table", "flightdata") \
    .option("keyspace", "flightdb") \
    .option("spark.cassandra.connection.ssl.enabled", "true") \
    .option("spark.cassandra.auth.username", "cassandraaccount") \
    .option("spark.cassandra.auth.password", "##") \
    .save()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
spark.conf.set("spark.cassandra.connection.host", "cassandraaccount.cassandra.cosmos.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")


keyspaces_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="system_schema.keyspaces", keyspace="system_schema") \
    .load()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-7981590205821744>, line 8[0m
[1;32m      1[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.cassandra.connection.host[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mcassandraaccount.cassandra.cosmos.azure.com[39m[38;5;124m"[39m)
[1;32m      2[0m spark[38;5;241m.[39mconf[38;5;241m.[39mset([38;5;124m"[39m[38;5;124mspark.cassandra.connection.port[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m10350[39m[38;5;124m"[39m)
[1;32m      5[0m keyspaces_df [38;5;241m=[39m spark[38;5;241m.[39mread \
[1;32m      6[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124morg.apache.spark.sql.cassandra[39m[38;5;124m"[39m) \
[1;32m      7[0m     [38;5;241m.[39moptions(table[38;5;241m=[39m[38;5;124m"[39m[38;5;124msystem_schema.key

In [0]:
df = spark.read.format("org.apache.spark.sql.cassandra") \
    .options(table="flight_data", keyspace="flightdb") \
    .load()


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-7981590205821745>, line 3[0m
[1;32m      1[0m df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124morg.apache.spark.sql.cassandra[39m[38;5;124m"[39m) \
[1;32m      2[0m     [38;5;241m.[39moptions(table[38;5;241m=[39m[38;5;124m"[39m[38;5;124mflight_data[39m[38;5;124m"[39m, keyspace[38;5;241m=[39m[38;5;124m"[39m[38;5;124mflightdb[39m[38;5;124m"[39m) \
[0;32m----> 3[0m     [38;5;241m.[39mload()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m

In [0]:
from graphframes import GraphFrame
from pyspark.sql.functions import col

# Define the time range for the subset (replace with actual Unix timestamp values)
start_time = 1496620800  # Example: June 5, 2017, 00:00:00 (Unix timestamp)
end_time = start_time + 3 * 3600  # 3 hours later

# Filter the data to create a subset
subset_data = data_cleaned.filter(col("time").between(start_time, end_time))

# Display the filtered subset to verify data
print("Subset Data:")
subset_data.show(10)

# Prepare Vertices and Edges from the subset
vertices = subset_data.select("icao24").distinct().withColumnRenamed("icao24", "id")

edges = subset_data.alias("df1") \
    .join(subset_data.alias("df2"), 
          (col("df1.time") - col("df2.time")).between(-300, 300) & 
          (col("df1.icao24") != col("df2.icao24")), "inner") \
    .select(
        col("df1.icao24").alias("src"),
        col("df2.icao24").alias("dst"),
        (col("df1.time") - col("df2.time")).alias("time_diff")
    )

# Create GraphFrame
graph = GraphFrame(vertices, edges)

# Display the graph structure
print("Vertices:")
graph.vertices.show()

print("Edges:")
graph.edges.show()

Subset Data:
+----------+------+-------------+--------------+-------------+-------------+--------+--------+--------+-----+-----+------+------------+-----------+---------------+-----------+
|      time|icao24|          lat|           lon|     velocity|      heading|vertrate|callsign|onground|alert|  spi|squawk|baroaltitude|geoaltitude|  lastposupdate|lastcontact|
+----------+------+-------------+--------------+-------------+-------------+--------+--------+--------+-----+-----+------+------------+-----------+---------------+-----------+
|1496620800|502cb2|49.6373748779| 2.87184062757| 232.93635757|305.199349779|-0.32512|MON55BR |   false|false|false|  2770|    10988.04|   11033.76|1.49662079781E9|1.4966208E9|
|1496620800|4070e3|50.7229924606| 3.82369995117|221.822621523|297.634765076| 0.32512|EXS14D  |   false|false|false|  7775|     10972.8|   11033.76|1.49662079784E9|1.4966208E9|
|1496620800|c04081|51.4994001389| 7.58133888245|222.683105469|310.127563477| -293.37|TFL230  |   false|fals

In [0]:
from graphframes import GraphFrame
import networkx as nx
import matplotlib.pyplot as plt

# Limit the data size to avoid memory issues
vertices_pd = graph.vertices.limit(100).toPandas()
edges_pd = graph.edges.limit(200).toPandas()

# Create NetworkX graph
G = nx.DiGraph()

# Add nodes and edges
G.add_nodes_from(vertices_pd['id'])
G.add_edges_from(zip(edges_pd['src'], edges_pd['dst']))

# Plot the graph
plt.figure(figsize=(12, 8))
nx.draw(G, with_labels=True, node_size=700, node_color="lightblue", font_size=10)
plt.title("Flight Graph Visualization")
plt.show()


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

##Stream Analysis

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf
from pyspark.sql.types import StructType, StructField, StringType, FloatType, BooleanType, LongType
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array  # Import this for vector conversion
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("FlightAnomalyDetection") \
    .getOrCreate()

# Event Hubs connection string
connection_str = "Endpoint=sb://flightproject.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=##;EntityPath=flightStream2"
encrypted_connection_str = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_str)

# Event Hubs configuration
ehConf = {
    'eventhubs.connectionString': encrypted_connection_str,
    'eventhubs.eventHubName': 'flightStream2',
}

# Define schema for incoming JSON data
schema = StructType([
    StructField("icao24", StringType(), True),
    StructField("callsign", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("time_position", LongType(), True),
    StructField("longitude", FloatType(), True),
    StructField("latitude", FloatType(), True),
    StructField("baro_altitude", FloatType(), True),
    StructField("velocity", FloatType(), True),
    StructField("vertical_rate", FloatType(), True),
    StructField("geo_altitude", FloatType(), True),
    StructField("on_ground", BooleanType(), True),
])

# Read streaming data from Event Hubs
raw_stream = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

# Decode and parse the JSON data
parsed_stream = raw_stream.select(from_json(col("body").cast("string"), schema).alias("data"))
final_stream = parsed_stream.select(
    "data.icao24", "data.callsign", "data.origin_country", "data.time_position",
    "data.longitude", "data.latitude", "data.baro_altitude", "data.velocity", 
    "data.vertical_rate", "data.geo_altitude", "data.on_ground"
).na.drop()

# Function to prepare features for GMM
def prepare_features(df):
    assembler = VectorAssembler(
        inputCols=["longitude", "latitude", "baro_altitude", "velocity", "vertical_rate", "geo_altitude"],
        outputCol="features"
    )
    return assembler.transform(df)

# Function to train GMM and detect anomalies
def train_gmm(batch_df, epoch_id):
    if batch_df.count() > 0:  # Train only if there is data
        # Prepare features for GMM
        feature_df = prepare_features(batch_df)
        
        # Train Gaussian Mixture Model
        gmm = GaussianMixture(featuresCol="features", k=2, maxIter=10)
        model = gmm.fit(feature_df)
        
        # Predict cluster for each point and calculate probabilities
        predictions = model.transform(feature_df)

        # Convert `probability` vector to an array and extract the second element (anomaly score)
        predictions = predictions.withColumn("probability_array", vector_to_array(col("probability")))
        predictions = predictions.withColumn("anomaly_score", col("probability_array")[1])

        # Detect anomalies based on threshold
        threshold = 0.1
        anomalies = predictions.filter(col("anomaly_score") < threshold)
        anomalies.show(truncate=False)

# Apply transformations to the streaming data
feature_stream = final_stream.select(
    col("longitude"), col("latitude"), col("baro_altitude"), col("velocity"), col("vertical_rate"), col("geo_altitude")
)

# Write stream with a timer
start_time = time.time()  # Get the start time

query = feature_stream.writeStream.foreachBatch(train_gmm).start()

# Run the stream for only 3 minutes
while time.time() - start_time < 3 * 60:  # Run for 180 seconds
    time.sleep(1)  # Check every second

# Stop the stream after 3 minutes
query.stop()
print("Stream stopped after 3 minutes.")


Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

+---------+--------+-------------+--------+-------------+------------+---------------------------------------------------------------------------------------------------------------+------------------------------------------+----------+-------------------------------------------+---------------------+
|longitude|latitude|baro_altitude|velocity|vertical_rate|geo_altitude|features                                                                                                       |probability                               |prediction|probability_array                          |anomaly_score        |
+---------+--------+-------------+--------+-------------+------------+---------------------------------------------------------------------------------------------------------------+------------------------------------------+----------+-------------------------------------------+---------------------+
|6.4201   |50.0547 |9144.0       |201.7   |0.0          |8976.36     |[6.420100212097168,50

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

+---------+--------+-------------+--------+-------------+------------+------------------------------------------------------------------------------------------------------------------+------------------------------------------+----------+-------------------------------------------+---------------------+
|longitude|latitude|baro_altitude|velocity|vertical_rate|geo_altitude|features                                                                                                          |probability                               |prediction|probability_array                          |anomaly_score        |
+---------+--------+-------------+--------+-------------+------------+------------------------------------------------------------------------------------------------------------------+------------------------------------------+----------+-------------------------------------------+---------------------+
|8.4349   |47.6803 |3352.8       |166.42  |-5.53        |3329.94     |[8.434900283

Downloading artifacts:   0%|          | 0/15 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

+---------+--------+-------------+--------+-------------+------------+-----------------------------------------------------------------------------------------------------------------+------------------------------------------+----------+-------------------------------------------+---------------------+
|longitude|latitude|baro_altitude|velocity|vertical_rate|geo_altitude|features                                                                                                         |probability                               |prediction|probability_array                          |anomaly_score        |
+---------+--------+-------------+--------+-------------+------------+-----------------------------------------------------------------------------------------------------------------+------------------------------------------+----------+-------------------------------------------+---------------------+
|-1.9601  |42.6757 |11887.2      |209.27  |0.0          |11940.54    |[-1.96010005474

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, udf, unix_timestamp, when, lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, BooleanType, LongType, IntegerType
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("FlightAnomalyDetection") \
    .getOrCreate()

# Event Hubs connection string
connection_str = "Endpoint=sb://flightproject.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=##;EntityPath=flightStream2"
encrypted_connection_str = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connection_str)

# Event Hubs configuration
ehConf = {
    'eventhubs.connectionString': encrypted_connection_str,
    'eventhubs.eventHubName': 'flightStream2',
}

# Define schema for incoming JSON data
schema = StructType([
    StructField("icao24", StringType(), True),
    StructField("callsign", StringType(), True),
    StructField("origin_country", StringType(), True),
    StructField("time_position", LongType(), True),
    StructField("longitude", FloatType(), True),
    StructField("latitude", FloatType(), True),
    StructField("baro_altitude", FloatType(), True),
    StructField("velocity", FloatType(), True),
    StructField("vertical_rate", FloatType(), True),
    StructField("geo_altitude", FloatType(), True),
    StructField("on_ground", BooleanType(), True),
])

# Read streaming data from Event Hubs
raw_stream = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load()

# Decode and parse the JSON data
parsed_stream = raw_stream.select(from_json(col("body").cast("string"), schema).alias("data"))
processed_stream = parsed_stream.select(
    col("data.icao24").alias("icao24"),
    col("data.callsign").alias("callsign"),
    col("data.time_position").alias("time").cast("int"),  # Rename and cast type
    col("data.latitude").alias("lat").cast("double"),
    col("data.longitude").alias("lon").cast("double"),
    col("data.velocity").alias("velocity").cast("double"),
    col("data.vertical_rate").alias("vertrate").cast("double"),
    col("data.baro_altitude").alias("baroaltitude").cast("double"),
    col("data.geo_altitude").alias("geoaltitude").cast("double"),
    col("data.on_ground").alias("onground").cast("boolean")
).na.drop()  # Drop rows with null values in selected columns

# Add missing columns with default values
processed_stream = processed_stream.withColumn("alert", lit(None).cast("boolean")) \
                                   .withColumn("spi", lit(None).cast("boolean")) \
                                   .withColumn("squawk", lit(None).cast("int")) \
                                   .withColumn("lastposupdate", lit(None).cast("double")) \
                                   .withColumn("lastcontact", lit(None).cast("double"))
start_time = time.time()

# Write the transformed stream to Cassandra
query= processed_stream.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="flightdata", keyspace="flightdb") \
    .outputMode("append") \
    .option("spark.cassandra.connection.ssl.enabled", "true") \
    .option("spark.cassandra.auth.username", "cassandraaccount") \
    .option("spark.cassandra.auth.password", "##") \
    .option("checkpointLocation", "/tmp/flightdata") \
    .start()

while time.time() - start_time < 2 * 60:  # Run for 120 seconds
    time.sleep(1)  # Check every second

# Stop the stream after 2 minutes
query.stop()

In [0]:
spark.conf.set("spark.cassandra.connection.host", "cassandraaccount.cassandra.cosmos.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")
