# Polars & DuckDB: Scaling DataFrames Without Spark
--------------------------

__[1. Introduction](#first-bullet)__

__[2. Getting Set Up On AWS with Docker](#second-bullet)__

__[3. Intro To Polars DataFrames](#third-bullet)__

__[4. DuckDB To The Rescue For SQL](#fourth-bullet)__

__[5. Conclusions](#fifth)__


## Introduction <a class="anchor" id="first-bullet"></a>
------

There are a plethora of dataframe alternatives to [Pandas](https://pandas.pydata.org/) due to its [limitations](https://insightsndata.com/what-are-the-limitations-of-pandas-35d462990c43), even the original author, Wes McKinney wrote a blog post about [10 Things I Hate About Pandas](https://wesmckinney.com/blog/apache-arrow-pandas-internals/). 

My biggest complaints to Pandas are:

1. Memory usage
2. Limited multi-core algorithms
3. No ability to execute SQL statements (like [SparkSQL & DataFrame](https://spark.apache.org/sql/))
4. No query planning/lazy-execution
5. [NULL values only exist for floats not ints](https://pandas.pydata.org/docs/user_guide/integer_na.html) (this changed in Pandas 1.0+)
6. Using [strings is inefficient](https://pandas.pydata.org/docs/user_guide/text.html) (this too changed in Pandas 1.0+
    
Many of these have been addressed by the [Pandas 2.0 release](https://pandas.pydata.org/docs/dev/whatsnew/v2.0.0.html). Over the years there has been many replacements for Pandas that have failed to gain traction in my opinion. And while there has been a steady march towards replacing the [NumPy](https://numpy.org/) backend with [Apache Arrow](https://arrow.apache.org/), I still feel the lack of SQL and overall API design is a major weakness.

For context I have been using a [Apache Spark](https://spark.apache.org/) since 2017 and love it not just from a performance point of view, but just how well the API is designed. The syntax makes sense coming from a SQL users perspective. If I want to group by a column and count in SQL or on Spark DataFrame I get what I expect either way. In Pandas I get the number of non nulls in every column:

In [14]:
import pandas as pd
pd_df = pd.read_csv("https://data.cityofnewyork.us/resource/h9gi-nx95.csv")
pd_df.groupby("borough").count()

Unnamed: 0_level_0,crash_date,crash_time,zip_code,latitude,longitude,location,on_street_name,off_street_name,cross_street_name,number_of_persons_injured,...,contributing_factor_vehicle_2,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_code1,vehicle_type_code2,vehicle_type_code_3,vehicle_type_code_4,vehicle_type_code_5
borough,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
BRONX,107,107,107,107,107,107,59,59,48,107,...,81,5,0,0,107,106,65,4,0,0
BROOKLYN,247,247,247,245,245,245,155,155,92,247,...,192,24,7,2,247,242,157,22,7,2
MANHATTAN,98,98,98,96,96,96,52,52,46,98,...,65,6,1,1,98,96,57,5,1,0
QUEENS,154,154,153,150,150,150,98,98,56,154,...,120,9,2,0,154,154,97,7,2,0
STATEN ISLAND,27,27,27,26,26,26,18,18,9,27,...,21,2,2,1,27,27,19,2,2,1


To get what I want I have to use the syntax:

In [15]:
pd_df.groupby("borough").size() # or pd_df.value_counts()

borough
BRONX            107
BROOKLYN         247
MANHATTAN         98
QUEENS           154
STATEN ISLAND     27
dtype: int64

But this returns a [Pandas Series](https://pandas.pydata.org/docs/reference/api/pandas.Series.html). It seems like a trivial difference, but counting duplicates in a column is easy in Spark because we can use method chaining, to the do the equivalent in Pandas I have to convert back to a dataframe:

In [19]:
pd_df.groupby("borough").size().to_frame("counts").reset_index().query("counts > 0")

Unnamed: 0,borough,counts
0,BRONX,107
1,BROOKLYN,247
2,MANHATTAN,98
3,QUEENS,154
4,STATEN ISLAND,27


For years I have beening using Spark for large datasets, but for smaller ones sticking with Pandas. Recently though, I heard lots of hype about [Polars](https://www.pola.rs/) and [DuckDB](https://duckdb.org/) and decide to try them myself and was immediately impressed. 

In this blog post I go over my first interactions with both library's and call out things I like and dont like, but first let's get set up to run this notebook on an AWS EC2 instance using [Docker](https://www.docker.com/).

## Getting Set Up On AWS with Docker <a class="anchor" id="second-bullet"></a>

## Intro To Polars DataFrames <a class="anchor" id="third-bullet"></a>

In [5]:
import polars as pl

In [4]:
df = pl.read_csv("https://data.cityofnewyork.us/resource/h9gi-nx95.csv")

In [4]:
df.write_parquet("crashes.parquet")

In [3]:
df.head(2)

crash_date,crash_time,borough,zip_code,latitude,longitude,location,on_street_name,off_street_name,cross_street_name,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,number_of_motorist_killed,contributing_factor_vehicle_1,contributing_factor_vehicle_2,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_code1,vehicle_type_code2,vehicle_type_code_3,vehicle_type_code_4,vehicle_type_code_5
str,str,str,i64,f64,f64,str,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64,str,str,str,str,str,i64,str,str,str,str,str
"""2021-09-11T00:…","""2:39""",,,,,,"""WHITESTONE EXP…","""20 AVENUE""",,2,0,0,0,0,0,2,0,"""Aggressive Dri…","""Unspecified""",,,,4455765,"""Sedan""","""Sedan""",,,
"""2022-03-26T00:…","""11:45""",,,,,,"""QUEENSBORO BRI…",,,1,0,0,0,0,0,1,0,"""Pavement Slipp…",,,,,4513547,"""Sedan""",,,,


In [63]:
df.schema

{'crash_date': Utf8,
 'crash_time': Utf8,
 'borough': Utf8,
 'zip_code': Int64,
 'latitude': Float64,
 'longitude': Float64,
 'location': Utf8,
 'on_street_name': Utf8,
 'off_street_name': Utf8,
 'cross_street_name': Utf8,
 'number_of_persons_injured': Int64,
 'number_of_persons_killed': Int64,
 'number_of_pedestrians_injured': Int64,
 'number_of_pedestrians_killed': Int64,
 'number_of_cyclist_injured': Int64,
 'number_of_cyclist_killed': Int64,
 'number_of_motorist_injured': Int64,
 'number_of_motorist_killed': Int64,
 'contributing_factor_vehicle_1': Utf8,
 'contributing_factor_vehicle_2': Utf8,
 'contributing_factor_vehicle_3': Utf8,
 'contributing_factor_vehicle_4': Utf8,
 'contributing_factor_vehicle_5': Utf8,
 'collision_id': Int64,
 'vehicle_type_code1': Utf8,
 'vehicle_type_code2': Utf8,
 'vehicle_type_code_3': Utf8,
 'vehicle_type_code_4': Utf8,
 'vehicle_type_code_5': Utf8}

In [64]:
(df.groupby("collision_id")
   .count()
   .filter(pl.col("count") > 1))

collision_id,count
i64,u32


In [65]:
df.crash_date.is_null()

AttributeError: 'DataFrame' object has no attribute 'crash_date'

In [66]:
df["crash_date"].is_null().any()

False

In [67]:
df['crash_date'][0]

'2021-09-11T00:00:00.000'

In [72]:
df = df.with_columns(
            pl.col("crash_date").str.slice(0, length=10).alias("crash_date_str")
      ).with_columns(
            pl.col("crash_date_str").str.strptime(
                pl.Datetime, "%Y-%m-%d", strict=False).alias("crash_date")
)

df.head()

crash_date,crash_time,borough,zip_code,latitude,longitude,location,on_street_name,off_street_name,cross_street_name,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,number_of_motorist_injured,number_of_motorist_killed,contributing_factor_vehicle_1,contributing_factor_vehicle_2,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_code1,vehicle_type_code2,vehicle_type_code_3,vehicle_type_code_4,vehicle_type_code_5,crash_date_str
datetime[μs],str,str,i64,f64,f64,str,str,str,str,i64,i64,i64,i64,i64,i64,i64,i64,str,str,str,str,str,i64,str,str,str,str,str,str
2021-09-11 00:00:00,"""2:39""",,,,,,"""WHITESTONE EXP…","""20 AVENUE""",,2,0,0,0,0,0,2,0,"""Aggressive Dri…","""Unspecified""",,,,4455765,"""Sedan""","""Sedan""",,,,"""2021-09-11"""
2022-03-26 00:00:00,"""11:45""",,,,,,"""QUEENSBORO BRI…",,,1,0,0,0,0,0,1,0,"""Pavement Slipp…",,,,,4513547,"""Sedan""",,,,,"""2022-03-26"""
2022-06-29 00:00:00,"""6:55""",,,,,,"""THROGS NECK BR…",,,0,0,0,0,0,0,0,0,"""Following Too …","""Unspecified""",,,,4541903,"""Sedan""","""Pick-up Truck""",,,,"""2022-06-29"""
2021-09-11 00:00:00,"""9:35""","""BROOKLYN""",11208.0,40.667202,-73.8665,""" , (40.66720…",,,"""1211 LORI…",0,0,0,0,0,0,0,0,"""Unspecified""",,,,,4456314,"""Sedan""",,,,,"""2021-09-11"""
2021-12-14 00:00:00,"""8:13""","""BROOKLYN""",11233.0,40.683304,-73.917274,""" , (40.68330…","""SARATOGA AVENU…","""DECATUR STREET…",,0,0,0,0,0,0,0,0,,,,,,4486609,,,,,,"""2021-12-14"""


In [73]:
df.groupby("borough").count()

borough,count
str,u32
"""QUEENS""",154
"""BRONX""",107
,367
"""BROOKLYN""",247
"""MANHATTAN""",98
"""STATEN ISLAND""",27


In [74]:
nn_df = df.filter(pl.col("borough").is_not_null())

In [75]:
df.filter(pl.col("borough").is_not_null()).select("borough").unique()

borough
str
"""BROOKLYN"""
"""MANHATTAN"""
"""QUEENS"""
"""BRONX"""
"""STATEN ISLAND"""


In [76]:
borough_df = pl.DataFrame({
                "borough": ["BROOKLYN", "BRONX", "MANHATTAN", "STATEN ISLAND", "QUEENS"],
                "population": [2590516, 1379946, 1596273, 2278029, 378977],
                "area":[179.7, 109.2, 58.68, 281.6, 149.0]
})

In [77]:
(df.filter(pl.col("borough").is_not_null())
   .select(["borough", "number_of_persons_injured"])
   .groupby("borough")
   .sum()
   .join(borough_df, on=["borough"])
   .select([
       "borough", 
       (pl.col("number_of_persons_injured") / pl.col("population")).alias("injuries_per_population")
   ])
)

borough,injuries_per_population
str,f64
"""BROOKLYN""",4.5e-05
"""BRONX""",3.3e-05
"""MANHATTAN""",2.5e-05
"""STATEN ISLAND""",7e-06
"""QUEENS""",0.000193


In [120]:
ctx = pl.SQLContext(crashes=df)

In [121]:
new_df = ctx.execute("""
    SELECT
        borough,
        crash_date AS day,
        SUM(number_of_persons_injured)
    FROM 
        crashes
    WHERE 
        borough IS NOT NULL
    GROUP BY 
        borough, crash_date
""", eager=False)

In [125]:
ctx = ctx.register("daily_crashes", new_df)

In [127]:
ctx.tables()

['crashes', 'daily_crashes']

In [164]:
daily_df = ctx.execute("select * from daily_crashes")

In [183]:
print(daily_df)

naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

 SELECT [col("borough"), col("day"), col("number_of_persons_injured")] FROM
   SELECT [col("borough"), col("crash_date").alias("day"), col("number_of_persons_injured")] FROM
    AGGREGATE
    	[col("number_of_persons_injured").sum()] BY [col("borough"), col("crash_date")] FROM
    	FILTER col("borough").is_not_null() FROMDF ["crash_date", "crash_time", "borough", "zip_code"]; PROJECT */30 COLUMNS; SELECTION: "None"


In [193]:
ctx.execute("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        LAG(1,number_of_persons_injured) OVER (PARTITION BY borough ORDER BY day) as prior_day_injured
FROM
    daily_crashes
ORDER BY 
    borough,
    day DESC
""", eager=True)

InvalidOperationError: unsupported SQL function: lag

In [179]:
non_lazy_daily_df = daily_df.collect()

## DuckDB To The Rescue For SQL <a class="anchor" id="fourth-bullet"></a>

EXECUTES ON LAZY DATAFRAMES!

In [180]:
import duckdb

In [181]:
query = duckdb.sql("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        LAG(1,number_of_persons_injured) OVER (PARTITION BY borough ORDER BY day) as prior_day_injured
FROM
    daily_df
ORDER BY 
    borough,
    day DESC
""")

In [182]:
query

┌───────────────┬─────────────────────┬───────────────────────────┬───────────────────┐
│    borough    │         day         │ number_of_persons_injured │ prior_day_injured │
│    varchar    │      timestamp      │           int64           │       int32       │
├───────────────┼─────────────────────┼───────────────────────────┼───────────────────┤
│ BRONX         │ 2022-04-24 00:00:00 │                         0 │                 1 │
│ BRONX         │ 2022-03-26 00:00:00 │                         7 │                 1 │
│ BRONX         │ 2022-03-25 00:00:00 │                         1 │                 1 │
│ BRONX         │ 2022-03-24 00:00:00 │                         1 │                 1 │
│ BRONX         │ 2022-03-22 00:00:00 │                         1 │                 1 │
│ BRONX         │ 2021-12-14 00:00:00 │                         2 │                 1 │
│ BRONX         │ 2021-12-11 00:00:00 │                         1 │                 1 │
│ BRONX         │ 2021-12-10 00:

In [170]:
query.pl()

borough,day,number_of_persons_injured,prior_day_injured
str,datetime[μs],i64,i32
"""BRONX""",2022-04-24 00:00:00,0,1
"""BRONX""",2022-03-26 00:00:00,7,1
"""BRONX""",2022-03-25 00:00:00,1,1
"""BRONX""",2022-03-24 00:00:00,1,1
"""BRONX""",2022-03-22 00:00:00,1,1
"""BRONX""",2021-12-14 00:00:00,2,1
"""BRONX""",2021-12-11 00:00:00,1,1
"""BRONX""",2021-12-10 00:00:00,1,1
"""BRONX""",2021-09-11 00:00:00,6,1
"""BRONX""",2021-09-10 00:00:00,1,1


Cummulative Sum!

In [188]:
ctx.execute("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        SUM(number_of_persons_injured) OVER (
                            PARTITION BY borough 
                            ORDER BY day ASC) AS cumulative_injuried
    FROM 
        daily_crashes
    ORDER BY
        borough,
        day DESC
""", eager=True).head(5)

borough,day,number_of_persons_injured,cumulative_injuried
str,datetime[μs],i64,i64
"""BRONX""",2022-04-24 00:00:00,0,45
"""BRONX""",2022-03-26 00:00:00,7,45
"""BRONX""",2022-03-25 00:00:00,1,45
"""BRONX""",2022-03-24 00:00:00,1,45
"""BRONX""",2022-03-22 00:00:00,1,45


WRONG!

In [191]:
query = duckdb.sql("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        SUM(number_of_persons_injured) OVER (
                            PARTITION BY borough 
                            ORDER BY day ASC) AS cumulative_injuried
    FROM 
        daily_df
    ORDER BY
        borough,
        day ASC
""")

In [192]:
query

┌───────────────┬─────────────────────┬───────────────────────────┬─────────────────────┐
│    borough    │         day         │ number_of_persons_injured │ cumulative_injuried │
│    varchar    │      timestamp      │           int64           │       int128        │
├───────────────┼─────────────────────┼───────────────────────────┼─────────────────────┤
│ BRONX         │ 2021-02-26 00:00:00 │                         0 │                   0 │
│ BRONX         │ 2021-04-06 00:00:00 │                         0 │                   0 │
│ BRONX         │ 2021-04-08 00:00:00 │                         0 │                   0 │
│ BRONX         │ 2021-04-10 00:00:00 │                         4 │                   4 │
│ BRONX         │ 2021-04-11 00:00:00 │                         0 │                   4 │
│ BRONX         │ 2021-04-12 00:00:00 │                         0 │                   4 │
│ BRONX         │ 2021-04-13 00:00:00 │                         3 │                   7 │
│ BRONX   

In [None]:
new_df = df.join(borough_df, on=["borough"], how="left")

In [10]:
# new_df.write_parquet("s3://harmonskis/nyc_accidents.parquet")

## Conclusions <a class="anchor" id="fifth-bullet"></a>