## CSV files 4: streaming larger-than-memory datasets
By the end of this lecture you will be able to:
- process larger-than-memory datasets from CSVs with streaming

**These concepts also apply to streaming Parquet files**.

With streaming Polars can process a full query on a larger-than-memory dataset by:
- reading each CSV file in batches
- adapting its standard operations to work on batches instead of the full dataset at once

In [None]:
import polars as pl

Obviously it doesn't work for me to provide very large datasets with this course. Instead we will do streaming on a small dataset and you can then apply it to your own larger datasets

In [None]:
csv_file = "../data/nyc_trip_data_1k.csv"

We start with a simple non-streaming query

In [None]:
(
    pl.scan_csv(csv_file,try_parse_dates = True)
    .group_by("passenger_count")
    .agg(
        pl.col(pl.Float64).mean()
    )
    .collect()
)

We make this streaming by passing `streaming = True` to `collect`

In [None]:
(
    pl.scan_csv(csv_file,try_parse_dates = True)
    .group_by("passenger_count")
    .agg(
        pl.col(pl.Float64).mean()
    )    
    .collect(streaming=True)
)

### What happens in streaming mode?
In streaming mode for a CSV Polars uses the batched CSV reader that we saw in a previous lecture to read a CSV in batches. However, it goes beyond that to implement the remaining parts of a lazy query in batches as well.

Not all operations support streaming - for those that do not Polars uses a non-streaming approach. 

You can also use the `explain` method to see if a query will use the streaming engine by passing the `streaming=True` argument

In [None]:
print(
    pl.scan_csv(csv_file,try_parse_dates = True)
    .group_by("passenger_count")
    .agg(
        pl.col(pl.Float64).mean()
    )
    .explain(streaming=True)
)

If the query contains a section with `STREAMING` then it will be executed in streaming mode

## Controlling batch size
Polars uses a simple rule to determine the default size of the batches in streaming mode (see the determine_chunk_size function on this page if you are interested https://github.com/pola-rs/polars/blob/main/crates/polars-pipe/src/pipeline/mod.rs). 

> The following applies to any streaming query, not just CSVs

The rule requires the number of threads Polars can run on. Typically this is set equal to the number of CPU cores on your machine.

You can see the number of threads with the `pl.thread_pool_size` function

In [None]:
n_threads = pl.thread_pool_size()
n_threads

Then Polars sets a variable called `thread_factor`

In [None]:
thread_factor = max(12 / n_threads, 1)
thread_factor

Finally the batch size is set as:

In [None]:
# Number of cols in the DataFrame/CSV/Parquet
n_cols = 10
max((50_000 / n_cols) * thread_factor, 1000)

If you want to modify this batch size for your query you can do this with a config setting

In [None]:
pl.Config.set_streaming_chunk_size(50000)

You can set this parameter at any time and it will affect subsequent queries

The number of threads in the threadpool is set equal to the number of CPUs on your machine by default. You can modify this by setting the max threads environment variable **before you import Polars. You need to restart the notebook kernel to see the change in the output of this cell

In [None]:
import os
os.environ["POLARS_MAX_THREADS"] = "20"
import polars as pl
pl.thread_pool_size()

## Streaming joins
We can join data from different CSVs with streaming. In this example we join the CITES trade records with the ISO country data on the `Importer` column
> See the lectures on joining `DataFrames` if you have not encountered these datasets yet

In [None]:
citescsv_file = "../data/cites_extract.csv"
iso_csv_file = "../data/countries_extract.csv"
(
    pl.scan_csv(citescsv_file)
    .join(
        pl.scan_csv(iso_csv_file),
        left_on="Importer",
        right_on="alpha-2", 
        how="inner"
    )
    .collect(streaming=True)
)
        

## Output to a file
With standard streaming (i.e. ending a query with `.collect(streaming=True)` Polars needs to output a `DataFrame` from streaming and so the output of the query must fit in memory. 

However, Polars can write (or `sink`) output directly to a [Parquet](https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.LazyFrame.sink_parquet.html#polars.LazyFrame.sink_parquet) or [CSV](https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.LazyFrame.sink_csv.html) file in streaming mode - see the Single Parquet lecture for an example. 

## Profiling
We can profile a query when we use streaming. 

> If you have not encountered `profile` see the lecture on Lazy Groupby in the section on Statistics, Counts and Grouping for an introduction

In [None]:
groupDf, profileDF = (
    pl.scan_csv(csv_file)
    .group_by("passenger_count")
    .agg(
        pl.col("trip_distance").mean()
    )
    .sort("passenger_count")
    .profile(streaming=True,show_plot=True)
)

The `STREAMING` part of this query captures the parts of the query carried out in the streaming engine. In this case we see that almost the full query was spent in the pipeline while the Polars query optimiser took a very small proportion of the overall time.

## Streaming and common subplan elimination
The query above produced the following notification from Polars

```
Cannot combine 'streaming' with 'common_subplan_elimination'. CSE will be turned off.
```
Common subplan elimination is one of the ways that the query optimiser can optimise queries. It arises in queries where the same action is applied to the same `LazyFrame` in different parts of a query. You generally don't need to worry about seeing this warning.

## When does streaming not work?
Streaming works for many common operations such as `groupby`,`filter` and `join`. However, there are other common operations where it does not work.

One simple example where streaming does not work is the `shift` expression - this replaces the value with the value on the next row. If we look at the query plan below we see that the `shift` is not inside the `STREAMING` part of the query plan

In [None]:
print(
    pl.scan_csv(csv_file)
    .with_columns(
        pl.col("tip_amount").shift(1)
    )
    .explain(streaming=True)
)

The reason for this is that streaming works in batches of rows. For the `shift` expression the last row in the batch needs data from another batch. This inter-batch communicaton is not supported and so streaming does not work for `shift`.

## Exercises
There are no exercises here as streaming works in a similar way to operations we have met before.

Try streaming on your own data and check if streaming is being used by calling `explain(streaming=True)`