In [7]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, concat, col, lit
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, DateType, IntegerType, FloatType, ArrayType, BooleanType
from time import sleep
from pyspark.sql.functions import from_json


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("SparkStreamFlightData")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()



In [11]:


flight_schema = StructType([
    StructField("lastUpdatedAt", StringType(), True),
    StructField("actualLandingTime", StringType(), True),
    StructField("actualOffBlockTime", StringType(), True),
    StructField("aircraftRegistration", StringType(), True),
    StructField("aircraftType", StructType([
        StructField("iataMain", StringType(), True),
        StructField("iataSub", StringType(), True)
    ]), True),
    StructField("baggageClaim", StructType([
        StructField("belts", ArrayType(StringType()), True)
    ]), True),
    StructField("checkinAllocations", StringType(), True),
    StructField("codeshares", StructType([
        StructField("codeshares", ArrayType(StringType()), True)
    ]), True),
    StructField("estimatedLandingTime", StringType(), True),
    StructField("expectedTimeBoarding", StringType(), True),
    StructField("expectedTimeGateClosing", StringType(), True),
    StructField("expectedTimeGateOpen", StringType(), True),
    StructField("expectedTimeOnBelt", StringType(), True),
    StructField("expectedSecurityFilter", StringType(), True),
    StructField("flightDirection", StringType(), True),
    StructField("flightName", StringType(), True),
    StructField("flightNumber", IntegerType(), True),
    StructField("gate", StringType(), True),
    StructField("pier", StringType(), True),
    StructField("id", StringType(), True),
    StructField("isOperationalFlight", BooleanType(), True),
    StructField("mainFlight", StringType(), True),
    StructField("prefixIATA", StringType(), True),
    StructField("prefixICAO", StringType(), True),
    StructField("airlineCode", IntegerType(), True),
    StructField("publicEstimatedOffBlockTime", StringType(), True),
    StructField("publicFlightState", StructType([
        StructField("flightStates", ArrayType(StringType()), True)
    ]), True),
    StructField("route", StructType([
        StructField("destinations", ArrayType(StringType()), True),
        StructField("eu", StringType(), True),
        StructField("visa", BooleanType(), True)
    ]), True),
    StructField("scheduleDateTime", StringType(), True),
    StructField("scheduleDate", StringType(), True),
    StructField("scheduleTime", StringType(), True),
    StructField("serviceType", StringType(), True),
    StructField("terminal", IntegerType(), True),
    StructField("transferPositions", StringType(), True),
    StructField("schemaVersion", StringType(), True)
])

# Read the whole dataset as a batch
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "flights") \
    .option("startingOffsets", "latest") \
    .load()

# input_df.printSchema()
# Transform to Output DataFrame
value_df = kafkaStream.select(from_json(col("value").cast("string"),flight_schema).alias("value"))

value_df.printSchema()

exploded_df = value_df.selectExpr('value.estimatedLandingTime', 'value.flightName', 'value.actualLandingTime', 'value.scheduleDateTime', 'value.actualLandingTime',
                                  'explode(value.route.destinations) as destinations')

exploded_df.printSchema()

query = exploded_df \
    .select(col("destinations").alias("value")) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint/flight_results") \
    .option("topic", "flights_results") \
    .start()
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- value: struct (nullable = true)
 |    |-- lastUpdatedAt: string (nullable = true)
 |    |-- actualLandingTime: string (nullable = true)
 |    |-- actualOffBlockTime: string (nullable = true)
 |    |-- aircraftRegistration: string (nullable = true)
 |    |-- aircraftType: struct (nullable = true)
 |    |    |-- iataMain: string (nullable = true)
 |    |    |-- iataSub: string (nullable = true)
 |    |-- baggageClaim: struct (nullable = true)
 |    |    |-- belts: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- checkinAllocations: string (nullable = true)
 |    |-- codeshares: struct (nullable = true)
 |    |    |-- codeshares: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- estimatedLandingTime: string (nullable = true)
 |    |-- expectedTimeBoarding: string (nullable = true)
 |    |-- expectedTimeGateClosing: string (nullable = true)
 |    |-- expectedTimeGateOpen: string (nullable = true)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


Stoped the streaming query and the spark context
