In [None]:
!pip install pandas

In [3]:
import pandas as pd
import polars as pl
from pyspark.sql import SparkSession

In [None]:
try:
    %load_ext autotime
except:
    !pip install ipython-autotime
    %load_ext autotime
%unload_ext autotime

In [5]:
flights_file = "flights.parquet"


## use pandas

In [7]:
df = pd.read_parquet(flights_file)

In [8]:
df_agg = df.groupby(['carrier', 'year'])[['dep_delay', 'arr_delay']].agg(["mean", "sum", "max"])
df_agg

Unnamed: 0_level_0,Unnamed: 1_level_0,dep_delay,dep_delay,dep_delay,arr_delay,arr_delay,arr_delay
Unnamed: 0_level_1,Unnamed: 1_level_1,mean,sum,max,mean,sum,max
carrier,year,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2
9E,2013,16.725769,291296.0,747.0,7.379669,127624.0,744.0
AA,2013,8.586016,275551.0,1014.0,0.364291,11638.0,1007.0
AS,2013,5.804775,4133.0,225.0,-9.930889,-7041.0,198.0
B6,2013,13.022522,705417.0,502.0,9.457973,511194.0,497.0
DL,2013,9.264505,442482.0,960.0,1.644341,78366.0,931.0
EV,2013,19.95539,1024829.0,548.0,15.796431,807324.0,577.0
F9,2013,20.215543,13787.0,853.0,21.920705,14928.0,834.0
FL,2013,18.726075,59680.0,602.0,20.115906,63868.0,572.0
HA,2013,4.900585,1676.0,1301.0,-6.915205,-2365.0,1272.0
MQ,2013,10.552041,265521.0,1137.0,10.774733,269767.0,1127.0


## use polars

In [9]:
dfl = (
    pl.scan_parquet(flights_file)
    .groupby(['carrier', 'year'])
    .agg(
        [
            pl.col('dep_delay').mean().alias('avg_dep_delay'),
            pl.col('dep_delay').sum().alias('sum_dep_delay'),
            pl.col('dep_delay').max().alias('max_dep_delay'),
            pl.col('arr_delay').mean().alias('avg_arr_delay'),
            pl.col('arr_delay').sum().alias('sum_arr_delay'),
            pl.col('arr_delay').max().alias('max_arr_delay'),
       ]
    )
).collect()

In [10]:
dfl

carrier,year,avg_dep_delay,sum_dep_delay,max_dep_delay,avg_arr_delay,sum_arr_delay,max_arr_delay
str,i16,f64,i64,i16,f64,i64,i16
"""US""",2013,3.782418,75168,500,2.129595,42232,492
"""VX""",2013,12.869421,66033,653,1.764464,9027,676
"""YV""",2013,18.99633,10353,387,15.556985,8463,381
"""DL""",2013,9.264505,442482,960,1.644341,78366,931
"""FL""",2013,18.726075,59680,602,20.115906,63868,572
"""MQ""",2013,10.552041,265521,1137,10.774733,269767,1127
"""WN""",2013,17.711744,214011,471,9.64912,116214,453
"""F9""",2013,20.215543,13787,853,21.920705,14928,834
"""UA""",2013,12.106073,701898,483,3.558011,205589,455
"""AA""",2013,8.586016,275551,1014,0.364291,11638,1007


## use PySpark

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, sum

spark = SparkSession.builder.master("local[1]").appName("airline").getOrCreate()


23/04/08 19:39:20 WARN Utils: Your hostname, gauss resolves to a loopback address: 127.0.0.1; using 192.168.31.190 instead (on interface wlp0s20f3)
23/04/08 19:39:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/08 19:39:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [13]:
%timeit
df_spark = spark.read.parquet(flights_file)
df_spark_agg = df_spark.groupby('carrier', 'year').agg(
    avg('dep_delay').alias('avg_dep_delay'),
    sum('dep_delay').alias('sum_dep_delay'),
    max('dep_delay').alias('max_dep_delay'),
    avg('arr_delay').alias('avg_arr_delay'),
    sum('arr_delay').alias('sum_arr_delay'),
    max('arr_delay').alias('max_arr_delay'),
)
df_spark_agg.write.mode('overwrite').parquet('temp.parquet')

In [14]:
spark.sql(f"CREATE TEMPORARY VIEW flights USING parquet OPTIONS (path \"{flights_file}\")")

DataFrame[]

In [15]:
query = """
select carrier,
       avg(dep_delay) as avg_dep_delay,
       sum(dep_delay) as sum_dep_delay,
       max(dep_delay) as max_dep_delay,
       avg(arr_delay) as avg_arr_delay,
       sum(arr_delay) as sum_arr_delay,
       max(arr_delay) as max_arr_delay
  from flights
 group by carrier
"""

spark.sql(query).write.mode('overwrite').parquet('temp_spark_sql.parquet')
