In [1]:
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SQLContext

In [22]:
spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

raw_events = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "planes") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

In [None]:
raw_events.cache()

In [23]:
events = raw_events.select(raw_events.value.cast('string'))

In [4]:
# extract question level information
def extract_info_level1(row):
    data = json.loads(row.value)
    result_list = []
    
    for flight_info in data["states"]:
        flight_info_row={"time":data["time"],
                        "icao24" : flight_info[0],
                        "callsign": flight_info[1],
                        "origin_country": flight_info[2],
                        "time_position": flight_info[3],
                        "last_contact":flight_info[4],
                        "longitude":flight_info[5],
                         "latitude":flight_info[6],
                         "baro_altitude":flight_info[7],
                         "on_ground":flight_info[8],
                         "velocity":flight_info[9],
                         "true_track":flight_info[10],
                         "vertical_rate":flight_info[11],
#                          "sensors":flight_info[12],
                         "geo_altitude":flight_info[13],
#                          "squawk":flight_info[14],
                         "spi":flight_info[15],
                         "position_source":flight_info[16]
                        }
        result_list.append(Row(**flight_info_row))
        
    return result_list

In [30]:
type(events)

pyspark.sql.dataframe.DataFrame

In [28]:
from pyspark.sql.functions import explode
words = events.select(
   explode(
       **events.value
   ).alias("word")
)

TypeError: attribute of type 'Column' is not callable

In [5]:
# assume that keen_id is the prime key of level0, sequance id is the prime key in sequences netted info. 
# and question_id is the prime key for questions
flight_status_info=events.rdd.flatMap(extract_info_level1).toDF()
flight_status_info.show(5)

+-------------+--------+------------+------+------------+--------+---------+---------+--------------+---------------+-----+----------+-------------+----------+--------+-------------+
|baro_altitude|callsign|geo_altitude|icao24|last_contact|latitude|longitude|on_ground|origin_country|position_source|  spi|      time|time_position|true_track|velocity|vertical_rate|
+-------------+--------+------------+------+------------+--------+---------+---------+--------------+---------------+-----+----------+-------------+----------+--------+-------------+
|      10972.8|SKW3768 |    11536.68|a2e5ec|  1627273100| 43.4066|-122.0168|    false| United States|              0|false|1627273100|   1627273099|     350.4|  246.79|         0.33|
|      2072.64|JST963  |     2110.74|7c6b2f|  1627273099| -34.675| 138.3818|    false|     Australia|              0|false|1627273100|   1627273099|    192.37|  184.86|        -10.4|
|         null|        |        null|ade18c|  1627273001| 32.7337| -117.202|     true

In [13]:
# write the result into hadoop
flight_status_info.write.parquet("/tmp/flight_status_info_table",mode='append')

In [None]:
# # need to create a topic called planes1
# # send the result to kafka and save at same time save in hadoop

# query = flight_status_info \
#   .selectExpr("CAST(icao24 AS STRING) AS key", "to_json(struct(*)) AS value") \
#   .write\
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", "kafka:29092") \
#   .option("topic", "planes1") \
#   .save()

In [8]:
flight_info_on_ground=flight_status_info.filter(flight_status_info.on_ground ==True)
flight_info_on_ground.write.parquet("/tmp/flight_info_on_ground",mode='append')

In [9]:
flight_info_off_ground=flight_status_info.filter(flight_status_info.on_ground !=True)
flight_info_off_ground.write.parquet("/tmp/flight_info_off_ground",mode='append')

In [10]:
flight_info_domestic=flight_status_info.filter(flight_status_info.origin_country =='United States')
flight_info_domestic.write.parquet("/tmp/flight_info_domestic",mode='append')
flight_info_international=flight_status_info.filter(flight_status_info.origin_country !='United States')
flight_info_domestic.write.parquet("/tmp/flight_info_international",mode='append')


In [12]:
flight_status_info.registerTempTable("flight_status_events")

spark.sql("""
        create external table flight_status
        stored as parquet
        location '/tmp/flight_status_info_hive'
        as
        select * from flight_status_events
    """)

DataFrame[]