# Data exploration and ETL development using `pySpark`

## Data exploration

In [7]:
spark

### pySpark

In [8]:
from pyspark.sql.functions import avg, col, desc

fact_flight = spark.read.format('bigquery').option('table','airline.fact_flight').load()
dim_flight = spark.read.format('bigquery').option('table','airline.dim_flight').load()
dim_airport = spark.read.format('bigquery').option('table','airline.dim_airport').load()

# Perform the joins and aggregation
result = (fact_flight.alias("f")
          .join(dim_flight.alias("df"), col("f.flight_key") == col("df.flight_key"), "inner")
          .join(dim_airport.alias("a"), col("df.departure_airport_key") == col("a.airport_key"), "inner")
          .groupBy("a.airport_name")
          .agg(avg("f.departure_delay").alias("average_departure_delay"))
          .orderBy(desc("average_departure_delay")))



result.show()

+--------------------+-----------------------+
|        airport_name|average_departure_delay|
+--------------------+-----------------------+
|Los Angeles Inter...|                   30.0|
|    Heathrow Airport|                   15.0|
|John F. Kennedy I...|                    0.0|
+--------------------+-----------------------+



### sparkSQL

In [9]:
fact_flight.createOrReplaceTempView("fact_flight")
dim_flight.createOrReplaceTempView("dim_flight")
dim_airport.createOrReplaceTempView("dim_airport")

In [10]:
%%sparksql
SELECT
    a.airport_name,
    AVG(f.departure_delay) AS average_departure_delay
FROM
    fact_flight AS f
JOIN
    dim_flight AS df ON f.flight_key = df.flight_key
JOIN
    dim_airport AS a ON df.departure_airport_key = a.airport_key
GROUP BY
    a.airport_name
ORDER BY
    average_departure_delay DESC;

0,1
airport_name,average_departure_delay
Los Angeles International Airport,30.0
Heathrow Airport,15.0
John F. Kennedy International Airport,0.0


## ETL Development

In [None]:
from pyspark.sql.functions import coalesce, col, when, avg, lit


fact_flight = spark.read.format('bigquery').option('table','airline.fact_flight').load()


etl_step_1_delays = (fact_flight
                     .select("flight_key",
                             (coalesce("departure_delay", lit(0)) + coalesce("arrival_delay", lit(0))).alias("total_delay"),  
                             when((coalesce("departure_delay", lit(0)) + coalesce("arrival_delay", lit(0))) <= 0, 1).otherwise(0).alias("on_time_performance"))) 

etl_step_1_delays.write.mode('overwrite').format('bigquery').option("writeMethod", "direct").option('table', 'airline.etl_step_1_delays_spark').save()


# --- Step 2: Join with dim_flight and dim_airport ---
dim_flight = spark.read.format('bigquery').option('table','airline.dim_flight').load()
dim_airport = spark.read.format('bigquery').option('table','airline.dim_airport').load()


etl_step_2_flight_delays_with_airports = (etl_step_1_delays.alias("d")
                                         .join(dim_flight.alias("df"), col("d.flight_key") == col("df.flight_key"), "inner")
                                         .join(dim_airport.alias("a"), col("df.departure_airport_key") == col("a.airport_key"), "inner")
                                         .select("d.total_delay", "d.on_time_performance", "a.airport_name"))

etl_step_2_flight_delays_with_airports.write.mode('overwrite').format('bigquery').option("writeMethod", "direct").option('table', 'airline.etl_step_2_flight_delays_with_airports_spark').save()



# --- Step 3: Aggregate by airport ---
etl_step_3_airport_performance = (etl_step_2_flight_delays_with_airports.alias("a")
                                  .groupBy("a.airport_name")
                                  .agg(avg("a.total_delay").alias("average_total_delay"),
                                       avg("a.on_time_performance").alias("on_time_percentage")))


etl_step_3_airport_performance.write.mode('overwrite').format('bigquery').option("writeMethod", "direct").option('table', 'airline.etl_step_3_airport_performance_spark').save()

In [None]:
etl_step_3_airport_performance.show()