# Analysing 300M+ dataset on a laptop in 30 seconds

This post is a summary of my quest to the edge of the performance of `Polars` on my laptop. I generate a large dataset and run some predefined complex queries to find the results. In this post, we will see how to write larger than RAM datasets, scan the files, do analytics and collect the results. 

The `Polars` package makes it easier than ever to work with larger than RAM datasets.

## Out of core 

Out of core refers to the ability of a computer program or system to process data that does not fit in memory (RAM) of a computer. Memory is a type of fast, temporary storage that is used by a computer's central processing unit (CPU) to access data and instructions quickly. However, main memory is usually limited in size and cannot hold all of the data that a program or system needs to process. 

Many systems will automically `swap` memory, meaning they will write data that doens't fit in memory to the hard disk. This will decrease the performance significantly. Also, it turns out that it impacts the lifetime of SSDs as many cycles are required to read and write the data [link](https://www.enterprisestorageforum.com/hardware/ssd-lifespan-how-long-will-your-ssd-work/).


![Example of memory swap](./static/swap.png)


The out of core capability of `Polars` is very useful for handling big data problems, where the data sets are, initially, too large to be processed on a single computer. It enables you to run your 20, 50, 100GB+ datasets on your laptop. No need to spin up a cluster on a cloud platform or learn a new syntax.

## Generating the dataset

It is not that easy to find very large datasets on the internet. Especially as we want to find the edge of what `Polars` is capable of processing on our laptop. Luckily, other people do a lot of benchmarking. That is how I found the TPCH benchmark. `Polars` uses it to benchmark against other packages. [You can find the repository here](https://github.com/pola-rs/tpch).

You can clone the repo and run the code. In this [Makefile](https://github.com/pola-rs/tpch/blob/main/Makefile) you can change the scale of the generated dataset or run the command in your terminal. Replace XX with the scale. In our case we will scale with factor 50.

```bash
tables_scale_XX: .venv
	$(MAKE) -C tpch-dbgen all
	cd tpch-dbgen && ./dbgen -vf -s XX && cd ..
	mkdir -p "tables_scale_XX"
	mv tpch-dbgen/*.tbl tables_scale_XX/
	.venv/bin/python prepare_files.py XX
```

This factor will create several files. The largest is `lineitem.tbl` with a size of 39.53GB. With the `prepare_files.py` file we will create `parquet` files, which are smaller. However, we will need to adjust this file as the file is (probably) too large to read to memory. Therefore we have to replace the `read_csv()` with `scan_csv()`.

```python
df = pl.scan_csv(
        f"tables_scale_{scale_fac}/{name}.tbl",
        has_header=False,
        sep="|",
        parse_dates=True,
        with_column_names= lambda _: eval(f"h_{name}")
    )

df = df.with_columns([pl.col(pl.Date).cast(pl.Datetime)])
df.sink_parquet(f"tables_scale_{scale_fac}/{name}.parquet")
```

We also see one of the newest features in action; `sink_parquet()`. This function allows us to write larger than memory files to disk.

## Scanning and analysing the dataset

> I am processing and analysing the data with a 2020 Macbook Pro, 16GB RAM, M1 chip. Performance can differ with your machine based on OS and specs.

### Required imports

In [1]:
import polars as pl 

# to time query times
import time

# to handle datatimes
from datetime import datetime

### Scanning the datasets

Due to the size of the datasets, we cannot simply read the parquet files to memory. We will have to use the Lazy API of `Polars`. Fortunately for us, the API is not so different from the Eager API. 

The `TPCH` dataset creates several datasets, but we will only `scan` a selection to keep this notebook concise.

In [2]:
lineitem = pl.scan_parquet("../../../tpch/tables_scale_50/lineitem.parquet")
orders = pl.scan_parquet("../../../tpch/tables_scale_50/orders.parquet")

### Investigating the dataset

In [3]:
lineitem.columns

['l_orderkey',
 'l_partkey',
 'l_suppkey',
 'l_linenumber',
 'l_quantity',
 'l_extendedprice',
 'l_discount',
 'l_tax',
 'l_returnflag',
 'l_linestatus',
 'l_shipdate',
 'l_commitdate',
 'l_receiptdate',
 'l_shipinstruct',
 'l_shipmode',
 'comments',
 '']

In [4]:
%%time 

lineitem.select([
    pl.col('l_linenumber').count(),
    pl.col('l_shipdate').min().alias('earliest shipdate'),
    pl.col('l_shipdate').max().alias('most recent shipdate')
]).collect()

CPU times: user 8.87 s, sys: 2.42 s, total: 11.3 s
Wall time: 2.08 s


l_linenumber,earliest shipdate,most recent shipdate
u32,datetime[μs],datetime[μs]
300005811,1992-01-02 00:00:00,1998-12-01 00:00:00


Our `lineitem` file spans 6 years of transactions and contains just a little over 300M+ rows. That is a pretty large dataset. 

In [5]:
lineitem.limit(2).collect()

l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,comments,Unnamed: 16_level_0
i64,i64,i64,i64,i64,f64,f64,f64,str,str,datetime[μs],datetime[μs],datetime[μs],str,str,str,str
1,7759468,384484,1,17,25960.36,0.04,0.02,"""N""","""O""",1996-03-13 00:00:00,1996-02-12 00:00:00,1996-03-22 00:00:00,"""DELIVER IN PER...","""TRUCK""","""egular courts ...",
1,3365454,365455,2,36,54694.44,0.09,0.06,"""N""","""O""",1996-04-12 00:00:00,1996-02-28 00:00:00,1996-04-20 00:00:00,"""TAKE BACK RETU...","""MAIL""","""ly final depen...",


In [6]:
orders.columns

['o_orderkey',
 'o_custkey',
 'o_orderstatus',
 'o_totalprice',
 'o_orderdate',
 'o_orderpriority',
 'o_clerk',
 'o_shippriority',
 'o_comment',
 '']

In [7]:
%%time 

orders.select([
    pl.col('o_orderkey').count(),
    pl.col('o_orderdate').min().alias('earliest order date'),
    pl.col('o_orderdate').max().alias('most recent order date')
]).collect()

CPU times: user 2.41 s, sys: 480 ms, total: 2.89 s
Wall time: 504 ms


o_orderkey,earliest order date,most recent order date
u32,datetime[μs],datetime[μs]
75000000,1992-01-01 00:00:00,1998-08-02 00:00:00


Our `orders` file contains 75M rows.

In [8]:
orders.limit(2).collect()

o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment,Unnamed: 9_level_0
i64,i64,str,f64,datetime[μs],str,str,i64,str,str
1,1845001,"""O""",218611.01,1996-01-02 00:00:00,"""5-LOW""","""Clerk#00004752...",0,"""nstructions sl...",
2,3900082,"""O""",59659.27,1996-12-01 00:00:00,"""1-URGENT""","""Clerk#00004395...",0,""" foxes. pendin...",


In [9]:
%%time

VAR1 = datetime(1998, 9, 2)


(lineitem.filter(pl.col("l_shipdate") <= VAR1)
.groupby(["l_returnflag", "l_linestatus"])
.agg(
    [
        pl.sum("l_quantity").alias("sum_qty"),
        pl.sum("l_extendedprice").alias("sum_base_price"),
        (pl.col("l_extendedprice") * (1 - pl.col("l_discount")))
        .sum()
        .alias("sum_disc_price"),
        (
            pl.col("l_extendedprice")
            * (1.0 - pl.col("l_discount"))
            * (1.0 + pl.col("l_tax"))
        )
        .sum()
        .alias("sum_charge"),
        pl.mean("l_quantity").alias("avg_qty"),
        pl.mean("l_extendedprice").alias("avg_price"),
        pl.mean("l_discount").alias("avg_disc"),
        pl.count().alias("count_order"),
    ],
)
.sort(["l_returnflag", "l_linestatus"])).collect(streaming=True)


CPU times: user 1min 10s, sys: 6.98 s, total: 1min 17s
Wall time: 13.7 s


l_returnflag,l_linestatus,sum_qty,sum_base_price,sum_disc_price,sum_charge,avg_qty,avg_price,avg_disc,count_order
str,str,i64,f64,f64,f64,f64,f64,f64,u32
"""A""","""F""",1887655913,2830600000000.0,2689000000000.0,2796600000000.0,25.499323,38236.556664,0.050001,74027688
"""N""","""F""",49261643,73892000000.0,70198000000.0,73006000000.0,25.510566,38265.510418,0.049979,1931029
"""N""","""O""",3717663006,5574500000000.0,5295800000000.0,5507600000000.0,25.498582,38234.266455,0.049998,145798813
"""R""","""F""",1887847853,2830700000000.0,2689200000000.0,2796700000000.0,25.502359,38239.130191,0.049999,74026400


How many line items do we filter in the query above? A little over 295M.

In [10]:
%%time 

(lineitem.filter(
    (pl.col('l_shipdate') <= datetime(1998, 9, 2))
).select([
    pl.col('l_shipdate').count(),
    pl.col('l_linenumber').count()
]).collect())

CPU times: user 10.5 s, sys: 1.93 s, total: 12.4 s
Wall time: 1.77 s


l_shipdate,l_linenumber
u32,u32
295783930,295783930


### A more complex example

In the `TCPH` benchmarks of `Polars` at [pola.rs/benchmarks](https://www.pola.rs/benchmarks.html), we can see that query 7 is the most demanding for most Dataframe packages. These benchmarks were ran on with scale factor 10. Lets see how it performs on our dataset with scale factor 50.

In [11]:
nation = pl.scan_parquet("../../../tpch/tables_scale_50/nation.parquet")
customer = pl.scan_parquet("../../../tpch/tables_scale_50/customer.parquet")
supplier = pl.scan_parquet("../../../tpch/tables_scale_50/supplier.parquet")

In this example I have removed the original filter. I wanted to see the result for all the years, not just 1995 and 1996.

In [12]:
%%time

n1 = nation.filter(pl.col("n_name") == "FRANCE")
n2 = nation.filter(pl.col("n_name") == "GERMANY")

df1 = (
    customer.join(n1, left_on="c_nationkey", right_on="n_nationkey")
    .join(orders, left_on="c_custkey", right_on="o_custkey")
    .rename({"n_name": "cust_nation"})
    .join(lineitem, left_on="o_orderkey", right_on="l_orderkey")
    .join(supplier, left_on="l_suppkey", right_on="s_suppkey")
    .join(n2, left_on="s_nationkey", right_on="n_nationkey")
    .rename({"n_name": "supp_nation"})
)

df2 = (
    customer.join(n2, left_on="c_nationkey", right_on="n_nationkey")
    .join(orders, left_on="c_custkey", right_on="o_custkey")
    .rename({"n_name": "cust_nation"})
    .join(lineitem, left_on="o_orderkey", right_on="l_orderkey")
    .join(supplier, left_on="l_suppkey", right_on="s_suppkey")
    .join(n1, left_on="s_nationkey", right_on="n_nationkey")
    .rename({"n_name": "supp_nation"})
)

(
    pl.concat([df1, df2])
    .with_column(
        (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).alias("volume")
    )
    .with_column(pl.col("l_shipdate").dt.year().alias("l_year"))
    .groupby(["supp_nation", "cust_nation", "l_year"])
    .agg([pl.sum("volume").alias("revenue")])
    .sort(by=["supp_nation", "cust_nation", "l_year"])
).collect(streaming=True)

CPU times: user 2min 15s, sys: 13.5 s, total: 2min 28s
Wall time: 28.6 s


supp_nation,cust_nation,l_year,revenue
str,str,i32,f64
"""FRANCE""","""GERMANY""",1992,2219200000.0
"""FRANCE""","""GERMANY""",1993,2665300000.0
"""FRANCE""","""GERMANY""",1994,2631600000.0
"""FRANCE""","""GERMANY""",1995,2668300000.0
"""FRANCE""","""GERMANY""",1996,2641100000.0
"""FRANCE""","""GERMANY""",1997,2652600000.0
"""FRANCE""","""GERMANY""",1998,2021000000.0
"""GERMANY""","""FRANCE""",1992,2231900000.0
"""GERMANY""","""FRANCE""",1993,2670500000.0
"""GERMANY""","""FRANCE""",1994,2670700000.0


Running the above query without `streaming=True`, will lead to large parts of the data being swapped to the SSD on my laptop.

### Conclusion

The out of core functionality of `Polars` is great. It enables the opportunity to run a whole new set of use cases on our laptops. Personally, I prefer this, as I don't have to create a cluster, spin it up and rewrite my queries to `pyspark` or another syntax. Also, I can do most of the work on my laptop in the tools I prefer. In case it is not fast enough it is straightforward to get a larger machine with more cores to improve performance.

If you are curious. There are other queries in the `TPCH` benchmark to experiment with. It is easy to use the repository to generate even larger datasets.

Learn more about Polars here: https://github.com/pola-rs/polars