In [None]:
!pip install polar
!pip install pyspark

In [None]:
import pandas as pd
import polar as pl 
from pyspark.sql import SparkSession

flight_file = "flight-delay-dataset-20182022/Combined_Flights_2022.parquet"

In [None]:
df = pd.read_parquet(flight_file )

In [None]:
df.info()

# Pandas: baseline

In [None]:
%%timeit
import pandas as pd
flight_file = "flight-delay-dataset-20182022/Combined_Flights_2022.parquet"
df = pd.read_parquet(flight_file)
df_agg = df.groupby(['Airline', 'Year'])[["DepDelayMinutes", "ArrDelayMinutes"]].agg(
    ["mean","sum","max"]
)
df_agg = df_agg.reset_index()
df_agg.to_parquet("temp_pandas.parquet")

In [None]:
pd.read__parquet("temp_pandas.parquet")

In [None]:
!ls -GFlash temp_pandas.parquet

# Polar 

In [None]:
%%timeit
import polars as pl
flight_file = "flight-delay-dataset-20182022/Combined_Flights_2022.parquet"
df_polars = (
    pl.scan_parquet(flight_file)
    .groupby(["Airline", "Year"])
    .agg(
        [
            pl.col("DepDelayMinutes").mean().alias('avg_dep_delay'),
            pl.col("DepDelayMinutes").sum().alias('sum_dep_delay'),
            pl.col("DepDelayMinutes").max().alias('max_dep_delay'),
            pl.col("ArrDelayMinutes").mean().alias('avg_arr_delay'),
            pl.col("ArrDelayMinutes").sum().alias('sum_arr_delay'),
            pl.col("ArrDelayMinutes").max().alias('max_arr_delay'),
        ]
        )           
).collect()

In [None]:
df_polar.write_parquet("temp_polar.parquet")

In [None]:
!ls -GFlash temp_polar.parquet

# PySpark

In [None]:
#Import SparkSessions 
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg,max,sum

#Create SparkSession 
spark = SparkSession.builders.master("local[1]").appName("airline-example").getOrCreate()
flight_file = "flight-delay-dataset-20182022/Combined_Flights_2022.parquet"

In [None]:
df_spark = spark.read.parquet(flight_file)
df_spark_agg = df_spark.groupby("Airline", "Year").agg(
            avg("DepDelayMinutes").alias('avg_dep_delay'),
            sum("DepDelayMinutes").alias('sum_dep_delay'),
            max("DepDelayMinutes").alias('max_dep_delay'),
            avg("ArrDelayMinutes").alias('avg_arr_delay'),
            sum("ArrDelayMinutes").alias('sum_arr_delay'),
            max("ArrDelayMinutes").alias('max_arr_delay'),
)
df_spark_agg.write.mode('overwrite').parquet('temp_spark.parquet')

# Spark SQL

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create SparkSessions
spark = SparkSession.builder.master("local[1]").appName("airline-example").getOrCreate()

flight_file = "flight-delay-dataset-20182022/Combined_Flights_2022.parquet"

spark.sql(d"CREATE TEMPORARY VIEW flights USING parquet OPTIONS (path \"{flight_file}\")")

In [None]:
%%timeit
query = """
SELECT airline,
       avg(ArrDelayMinutes) AS avg_arr_delay,
       max(ArrDelayMinutes) AS max_arr_delay,
       min(ArrDelayMinutes) AS min_arr_delay,
       avg(DepDelayMinutes) AS avg_dep_delay,
       max(DepDelayMinutes) AS max_dep_delay,
       min(DepDelayMinutes) AS min_dep_delay
FROM flights
GROUP BY airline
"""
spark.sql(query).write.mode('overwrite').parquet('temp_spark_sql.parquet')

In [None]:
!ls -GFlash temp_spark_sql.parquet

# Read Result

In [None]:
import pandas as pd

agg_pandas = pd.read_parquet('temp_pandas.parquet')
agg_polars = pd.read_parquet('temp_polars.parquet')
agg_spark = pd.read_parquet('temp_spark.parquet')
agg_sparksql = pd.read_parquet('temp_spark_sql.parquet')

In [None]:
agg_pandas.shape, agg_polar.shape, agg_sparksql.shape

In [None]:
agg_pandas.sort_values(['Airline', 'Year']).head()

In [None]:
agg_polars.sort_values(['Airline', 'Year']).head()

In [None]:
agg_sparksql.sort_values(['airline', 'year']).head()