In [None]:
!pip install fireducks

In [None]:
!pip install polars

In [None]:
!pip install duckdb

In [None]:
# download the dataset:

import urllib.request
s3_path = "https://modin-datasets.s3.amazonaws.com/testing/yellow_tripdata_2015-01.csv"
urllib.request.urlretrieve(s3_path, "taxi.csv")

# FireDucks

In [None]:
import fireducks.pandas as pd
import time

In [None]:
start = time.time()

df = pd.read_csv("taxi.csv")

big_df = pd.concat([df for _ in range(20)])

df = df.drop(columns = ["congestion_surcharge", "airport_fee"])

new_df = df.sort_values(by = "fare_amount").reset_index(drop=True)

t1 = big_df.groupby("PULocationID").mta_tax.mean().reset_index()
t2 = big_df.groupby("PULocationID").tip_amount.mean().reset_index()
t3 = big_df.groupby("PULocationID").tolls_amount.mean().reset_index()
t4 = big_df.groupby("PULocationID").trip_distance.mean().reset_index()


t5 = big_df.groupby("DOLocationID").mta_tax.mean().reset_index()
t6 = big_df.groupby("DOLocationID").tip_amount.mean().reset_index()
t7 = big_df.groupby("DOLocationID").tolls_amount.mean().reset_index()
t8 = big_df.groupby("DOLocationID").trip_distance.mean().reset_index()

a = (t1._evaluate(), t2._evaluate(), t3._evaluate(), t4._evaluate(),
     t5._evaluate(), t6._evaluate(), t7._evaluate(), t8._evaluate())

print(f"FireDucks run-time = {round(time.time()-start, 4)} seconds")

# Pandas

Restart the session and run Pandas code

In [None]:
import pandas as pd
import time

In [None]:
start = time.time()

df = pd.read_csv("taxi.csv")

big_df = pd.concat([df for _ in range(20)])

df = df.drop(columns = ["congestion_surcharge", "airport_fee"])

new_df = df.sort_values(by = "fare_amount").reset_index(drop=True)

t1 = big_df.groupby("PULocationID").mta_tax.mean().reset_index()
t2 = big_df.groupby("PULocationID").tip_amount.mean().reset_index()
t3 = big_df.groupby("PULocationID").tolls_amount.mean().reset_index()
t4 = big_df.groupby("PULocationID").trip_distance.mean().reset_index()


t5 = big_df.groupby("DOLocationID").mta_tax.mean().reset_index()
t6 = big_df.groupby("DOLocationID").tip_amount.mean().reset_index()
t7 = big_df.groupby("DOLocationID").tolls_amount.mean().reset_index()
t8 = big_df.groupby("DOLocationID").trip_distance.mean().reset_index()

a = (t1, t2, t3, t4,
     t5, t6, t7, t8)

print(f"Pandas run-time = {round(time.time()-start, 4)} seconds")

# Polars

In [None]:
import polars as pl
import time

In [None]:
start = time.time()

df = pl.scan_csv("taxi.csv")

big_df = pl.concat([df for _ in range(20)])

df = df.drop(["congestion_surcharge", "airport_fee"])

new_df = df.sort(by = "fare_amount")

t1 = big_df.group_by("PULocationID").agg(pl.mean("mta_tax"))
t2 = big_df.group_by("PULocationID").agg(pl.mean("tip_amount"))
t3 = big_df.group_by("PULocationID").agg(pl.mean("tolls_amount"))
t4 = big_df.group_by("PULocationID").agg(pl.mean("trip_distance"))


t5 = big_df.group_by("DOLocationID").agg(pl.mean("mta_tax"))
t6 = big_df.group_by("DOLocationID").agg(pl.mean("tip_amount"))
t7 = big_df.group_by("DOLocationID").agg(pl.mean("tolls_amount"))
t8 = big_df.group_by("DOLocationID").agg(pl.mean("trip_distance"))

a = (t1.collect(), t2.collect(), t3.collect(), t4.collect(),
     t5.collect(), t6.collect(), t7.collect(), t8.collect())

print(f"Polars run-time = {round(time.time()-start, 4)} seconds")

Polars run-time = 29.2099 seconds


# DuckDB

In [None]:
import duckdb
import time

In [None]:
start = time.time()

con = duckdb.connect(database=':memory:')
con.execute("CREATE TABLE taxi AS SELECT * FROM read_csv_auto('taxi.csv')")

con.execute(f"CREATE TABLE big_taxi AS {'SELECT * FROM taxi UNION ALL '*19} SELECT * FROM taxi")

con.execute("CREATE TABLE taxi_reduced AS SELECT * EXCLUDE (congestion_surcharge, airport_fee) FROM taxi")

sorted_df = con.execute("SELECT * FROM taxi_reduced ORDER BY fare_amount").fetch_df()

t1 = con.execute("SELECT PULocationID, AVG(mta_tax) FROM big_taxi GROUP BY PULocationID")
t2 = con.execute("SELECT PULocationID, AVG(tip_amount) FROM big_taxi GROUP BY PULocationID")
t3 = con.execute("SELECT PULocationID, AVG(tolls_amount) FROM big_taxi GROUP BY PULocationID")
t4 = con.execute("SELECT PULocationID, AVG(trip_distance) FROM big_taxi GROUP BY PULocationID")

t5 = con.execute("SELECT DOLocationID, AVG(mta_tax) FROM big_taxi GROUP BY DOLocationID")
t6 = con.execute("SELECT DOLocationID, AVG(tip_amount) FROM big_taxi GROUP BY DOLocationID")
t7 = con.execute("SELECT DOLocationID, AVG(tolls_amount) FROM big_taxi GROUP BY DOLocationID")
t8 = con.execute("SELECT DOLocationID, AVG(trip_distance) FROM big_taxi GROUP BY DOLocationID")

a = (t1.fetch_df(), t2.fetch_df(), t3.fetch_df(), t4.fetch_df(),
     t5.fetch_df(), t6.fetch_df(), t7.fetch_df(), t8.fetch_df())

print(f"DuckDB run-time = {round(time.time()-start, 4)} seconds")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

DuckDB run-time = 8.0222 seconds


## Let's understand the changes

In [None]:
%load_ext fireducks.pandas
import pandas as pd
pd.__name__

In [None]:
%%fireducks.profile
t1 = time.time()
df = pd.read_csv("taxi.csv").groupby("PULocationID").tip_amount.mean().reset_index() # nothing is executed, because there is no explicit evaluation
t2 = time.time()
print(f"execution time: {t2 - t1} sec")

In [None]:
%%fireducks.profile # shows execution time for each kernel-methods used in this cell expression (if executed)
t1 = time.time()
df = pd.read_csv("taxi.csv").groupby("PULocationID").tip_amount.mean().reset_index()._evaluate() # similar to polars collect(), but adding this will make it different than pandas code
t2 = time.time()
print(f"execution time: {t2 - t1} sec")

In [None]:
%%fireducks.profile # shows execution time for each kernel-methods used in this cell expression (if executed)
t1 = time.time()
df = pd.read_csv("taxi.csv").groupby("PULocationID").tip_amount.mean().reset_index()
print(df.shape) # "shape" is a method that will trigger the evaluation, so you may use it if you don't want to use _evaluate() that is a non-pandas method
t2 = time.time()
print(f"execution time: {t2 - t1} sec")

## For Polars


read_csv() in polars is a eager method, so it will stop its optimization.
Calling read_csv().lazy() will also stop optimization related to data-load.
So better to use scan_csv() to make the comparison looks similar to FireDucks. Also in case of polars-lazy mode, in order to trigger the execution you need to invoke collect() method explicitly. Hence modified it as follows:

In [None]:
t1 = time.time()
df = pl.scan_csv("taxi.csv").group_by("PULocationID").agg(pl.mean("tip_amount")).collect()
t2 = time.time()
print(f"execution time: {t2 - t1} sec")

In [None]:
# to check polars kernel wise execution time:
t1 = time.time()
df = pl.scan_csv("taxi.csv").group_by("PULocationID").agg(pl.mean("tip_amount")).profile()
t2 = time.time()
print(f"execution time: {t2 - t1} sec")
df[1].with_columns(((pl.col("end") - pl.col("start")) / 1e2).alias("duration(msec)"))

## Now let's see an interesting difference

When you use FireDucks, it can reuse already evaluated expression,

but it seems like Polars execute entire lazy-expression for each collect. Hence it becomes slower.

In [None]:
%%fireducks.profile
# You can see read_csv() is called only once, whereas groupby, reset_index() are called two times.
# read_csv() is called at the time of computing "r1", # and it has been automatically cached by the
# FireDucks compiler to be reused in the computation for "r2"
t1 = time.time()
df = pd.read_csv("taxi.csv")
r1 = df.groupby("PULocationID").tip_amount.mean().reset_index()._evaluate()
r2 = df.groupby("PULocationID").tolls_amount.mean().reset_index()._evaluate()
t2 = time.time()
print(f"execution time: {t2 - t1} sec")

In [None]:
# to check polars kernel wise execution time:
t1 = time.time()
df = pl.scan_csv("taxi.csv")
r1 = df.group_by("PULocationID").agg(pl.mean("tip_amount")).profile()
r2 = df.group_by("PULocationID").agg(pl.mean("tolls_amount")).profile()
t2 = time.time()
print(f"execution time: {t2 - t1} sec")

In [None]:
r1[1].with_columns(((pl.col("end") - pl.col("start")) / 1e2).alias("duration(msec)")) # for r1: it will execute everything (read-csv, groupby-projection)

In [None]:
r2[1].with_columns(((pl.col("end") - pl.col("start")) / 1e2).alias("duration(msec)")) # for r2: it will again execute everything (read-csv, groupby-projection)