In [9]:
from logging import exception

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import sys,argparse
from pyspark.sql.functions import col, to_date, date_format, dayofweek, lit,explode, struct, to_timestamp, count

In [10]:
spark = SparkSession.builder \
        .appName("KLM Popular Destinations") \
        .config("spark.sql.shuffle.partitions", "200") \
        .getOrCreate()

In [11]:
bookings_df = spark.read.option("mode", "PERMISSIVE") \
            .option("columnNameOfCorruptRecord", "_corrupt_record") \
            .json("../../data" + "/bookings/booking.json")
airports_df = spark.read.csv("../../data"+"/airports/airports.dat")

In [12]:
airports_df_cols = ["airport_id","name","city","country","iata","icao","latitude","longitude","altitude","timezone","dst","tz_database_time_zone","type","source"]
airports_df = airports_df.toDF(*airports_df_cols)

In [13]:
bookings_exploded = bookings_df.withColumn("passenger",
                                               F.explode(col("event.DataElement.travelrecord.passengersList"))) \
        .withColumn("product", F.explode(col("event.DataElement.travelrecord.productsList")))

bookings_flattened = bookings_exploded.select(
    col("timestamp"),
    # Travel record fields
    col("event.DataElement.travelrecord.attributeType").alias("attributeType"),
    col("event.DataElement.travelrecord.creationDate").alias("creationDate"),
    col("event.DataElement.travelrecord.envelopNumber").alias("envelopNumber"),
    # Passenger fields
    col("passenger.age").alias("passenger_age"),
    col("passenger.category").alias("passenger_category"),
    col("passenger.crid").alias("passenger_crid"),
    col("passenger.passengerType").alias("passenger_type"),
    col("passenger.tattoo").alias("passenger_tattoo"),
    col("passenger.uci").alias("passenger_uci"),
    col("passenger.weight").alias("passenger_weight"),
    # Product fields
    col("product.aircraftType").alias("product_aircraft_type"),
    col("product.bookingClass").alias("product_booking_class"),
    col("product.bookingStatus").alias("product_booking_status"),
    col("product.flight.arrivalDate").alias("product_arrival_date"),
    col("product.flight.departureDate").alias("product_departure_date"),
    col("product.flight.originAirport").alias("product_origin_airport"),
    col("product.flight.destinationAirport").alias("product_destination_airport"),
    col("product.flight.marketingAirline").alias("product_marketing_airline"),
    col("product.flight.operatingAirline").alias("product_operating_airline"),
    col("product.transportClass").alias("product_transport_class"),
    col("product.type").alias("product_type"))

In [14]:
bookings_flattened = bookings_flattened.withColumn("timestamp",
                                                       to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) \
        .withColumn("creationDate", to_timestamp(col("creationDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
        .withColumn("product_arrival_date", to_timestamp(col("product_arrival_date"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
        .withColumn("product_departure_date", to_timestamp(col("product_departure_date"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))


In [15]:
start_date="2019-03-01"
end_date="2019-03-31"

In [16]:
filtered_bookings_df = bookings_flattened.filter(
        (col("timestamp") >= start_date) &
        (col("timestamp") <= end_date) &
        (col("product_booking_status") == "CONFIRMED") &
        (col("product_operating_airline").startswith("KL"))
)

In [17]:
bookings_df.count()

9135

In [30]:
filtered_bookings_df.filter(col("passenger_uci") == "20062C080003242E").show()

+--------------------+-------------+-------------------+-------------+-------------+------------------+--------------+--------------+----------------+----------------+----------------+---------------------+---------------------+----------------------+--------------------+----------------------+----------------------+---------------------------+-------------------------+-------------------------+-----------------------+------------+
|           timestamp|attributeType|       creationDate|envelopNumber|passenger_age|passenger_category|passenger_crid|passenger_type|passenger_tattoo|   passenger_uci|passenger_weight|product_aircraft_type|product_booking_class|product_booking_status|product_arrival_date|product_departure_date|product_origin_airport|product_destination_airport|product_marketing_airline|product_operating_airline|product_transport_class|product_type|
+--------------------+-------------+-------------------+-------------+-------------+------------------+--------------+----------

In [26]:
x=filtered_bookings_df.dropDuplicates()

In [61]:
joined_df = filtered_bookings_df.join(
    airports_df,
    filtered_bookings_df.product_origin_airport == airports_df.iata,
    'left'
).select(
    col("timestamp"),
    col("product_origin_airport"),
    col("country"),
    col("passenger_uci"),
    col("passenger_age"),
    col("passenger_type"),
    col("product_departure_date"),
    col("passenger_category"))

In [63]:
def determine_season(date):
    """
    Determine season based on date
    """
    month = date.month
    if month in [12, 1, 2]:
        return "Winter"
    elif month in [3, 4, 5]:
        return "Spring"
    elif month in [6, 7, 8]:
        return "Summer"
    else:
        return "Autumn"

In [64]:

# Add season and day of week
season_dow_bookings = joined_df.withColumn(
    "season", F.udf(determine_season)(F.col("product_departure_date"))
).withColumn(
    "day_of_week", F.date_format(F.col("product_departure_date"), "EEEE")
)

In [71]:
season_dow_bookings.select("passenger_type").distinct().show()

+--------------+
|passenger_type|
+--------------+
|           CHD|
|           ADT|
+--------------+



In [73]:
 destination_analysis = season_dow_bookings.groupBy(
        F.col("country"), F.col("season"), F.col("day_of_week")
    ).agg(
        F.countDistinct("passenger_uci").alias("passenger_count"),
        F.avg("passenger_age").alias("avg_age"),
        F.sum(F.when(F.col("passenger_type") == ("ADT"), 1).otherwise(0)).alias("adult_count"),
        F.sum(F.when(F.col("passenger_type") == ("CHD"), 1).otherwise(0)).alias("child_count")
    ).orderBy("passenger_count", ascending=False)

In [74]:
destination_analysis.show()

+--------------+------+-----------+---------------+------------------+-----------+-----------+
|       country|season|day_of_week|passenger_count|           avg_age|adult_count|child_count|
+--------------+------+-----------+---------------+------------------+-----------+-----------+
|   Netherlands|Spring|     Monday|           1010| 38.64224507283633|       2719|         24|
|   Netherlands|Spring|     Sunday|            724|   42.723044397463|       1393|         12|
|   Netherlands|Spring|     Friday|            425| 47.12249443207127|        638|         21|
|   Netherlands|Spring|  Wednesday|            408| 45.16780045351474|        736|          7|
|   Netherlands|Spring|    Tuesday|            408|39.630630630630634|        645|         11|
|   Netherlands|Spring|   Thursday|            366| 41.72727272727273|        471|          4|
|   Netherlands|Spring|   Saturday|            301| 44.66896551724138|        453|          7|
|   Netherlands|Summer|   Saturday|            126