# Stream processing

In [2]:
import pyarrow as pa
import pyarrow.dataset as ds

from dw.config import DataDir

## PyArrow: Stream reading & writing parquet files (in a directory)

In [3]:
ds1 = ds.dataset(DataDir.SOURCE, format="parquet")
schema = pa.schema([
    ("id", pa.int64()),
    ("text", pa.string()),
    ("to_be_ignored", pa.int64())
])

In [3]:
%%time
with ds1.scanner(columns=["id", "text"]).to_reader() as reader:
    print("Writing dataset with a reader")
    ds.write_dataset(reader, DataDir.OUT_1, format="parquet", existing_data_behavior="overwrite_or_ignore")

Writing dataset with a reader
CPU times: user 1min 49s, sys: 2min 1s, total: 3min 51s
Wall time: 3min 35s


In [6]:
%%time
# max_rows_per_file 1 million --> 2 m 20 s
# max_rows_per_file 2 million --> 1 m 50 s
# max_rows_per_file 10 million --> 3 m 30 s
from datetime import datetime
count = 0

def print_prog(batch):
    global count
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}][{count}] New batch, size: {batch.num_rows}")
    count += 1
    return batch

with ds1.scanner(columns=["id", "text"], batch_size=10_000).to_reader() as reader:
    print("Intercept iteration and print batch progress")
    ds.write_dataset(
        iter(print_prog(batch) for batch in reader),
        DataDir.OUT_1,
        format="parquet",
        schema=pa.schema([("id", pa.int64()), ("text", pa.string())]),
        max_rows_per_file=1024 * 1024 * 2,
        existing_data_behavior="overwrite_or_ignore"
    )

Intercept iteration and print batch progress
[2023-04-07 12:47:26][0] New batch, size: 10000
[2023-04-07 12:47:26][1] New batch, size: 10000
[2023-04-07 12:47:26][2] New batch, size: 10000
[2023-04-07 12:47:26][3] New batch, size: 10000
[2023-04-07 12:47:26][4] New batch, size: 10000
[2023-04-07 12:47:26][5] New batch, size: 10000
[2023-04-07 12:47:26][6] New batch, size: 10000
[2023-04-07 12:47:26][7] New batch, size: 10000
[2023-04-07 12:47:26][8] New batch, size: 10000
[2023-04-07 12:47:26][9] New batch, size: 10000
[2023-04-07 12:47:26][10] New batch, size: 10000
[2023-04-07 12:47:26][11] New batch, size: 10000
[2023-04-07 12:47:26][12] New batch, size: 10000
[2023-04-07 12:47:26][13] New batch, size: 10000
[2023-04-07 12:47:26][14] New batch, size: 10000
[2023-04-07 12:47:26][15] New batch, size: 10000
[2023-04-07 12:47:26][16] New batch, size: 10000
[2023-04-07 12:47:26][17] New batch, size: 10000
[2023-04-07 12:47:26][18] New batch, size: 10000
[2023-04-07 12:47:26][19] New batc

## PyArrow: Streaming + add column with join

In [11]:
%%time
ds1 = ds.dataset(DataDir.SOURCE, format="parquet")

with ds1.scanner(columns=["id", "to_be_ignored"]).to_reader() as reader:
    ds.write_dataset(
        iter(
            pa.RecordBatch.from_pandas(
                batch.to_pandas()
                .assign(product_id=lambda df_: df_.to_be_ignored * 10)
                .drop(columns=["to_be_ignored"])
            )
            for batch in reader
        ),
        DataDir.OUT_1,
        format="parquet",
        schema=pa.schema([("id", pa.int64()), ("product_id", pa.int64())]),
        existing_data_behavior="overwrite_or_ignore"
    )

CPU times: user 1.69 s, sys: 229 ms, total: 1.92 s
Wall time: 1.2 s
