## Local filesystem, without batching

In [None]:
from pyarrow import fs
import pyarrow.dataset as ds

def list_parquet_files(local_fs, path):
    file_infos = local_fs.get_file_info(fs.FileSelector(path, recursive=True))
    parquet_files = [
        info.path
        for info in file_infos
        if info.type == fs.FileType.File and info.path.endswith(".parquet")
    ]
    return parquet_files

local_fs = fs.LocalFileSystem()

files = list_parquet_files(local_fs, "demo_data")
print(f"Found {len(files)} input files")

dataset = ds.dataset(
        files,
        format="parquet",
        filesystem=local_fs,
    )

min_rows_per_group = 500
max_rows_per_file = 1000000
max_rows_per_group = 10000

ds.write_dataset(
    dataset,
    "output_demo_data",
    format="parquet",
    basename_template=f"compacted_{{i}}.parquet",
    min_rows_per_group=min_rows_per_group,
    max_rows_per_file=max_rows_per_file,
    max_rows_per_group=max_rows_per_group,
    existing_data_behavior="overwrite_or_ignore",
    use_threads=True,
    filesystem=fs.LocalFileSystem(),
    file_options=ds.ParquetFileFormat().make_write_options(compression="gzip"),
)

Found 696 input files


## Local filesystem, with batching

In [None]:
import pyarrow.dataset as ds
from pyarrow import fs
import os

def list_parquet_files(local_fs, path):
    file_infos = local_fs.get_file_info(fs.FileSelector(path, recursive=True))
    return [
        info.path
        for info in file_infos
        if info.type == fs.FileType.File and info.path.endswith(".parquet")
    ]


input_path = 'demo_data'
output_path = 'output_demo_data_batched'
files_per_batch = 100
min_rows_per_group = 500
max_rows_per_file = 1000000
max_rows_per_group = 10000

local_fs = fs.LocalFileSystem()

all_files = list_parquet_files(local_fs, input_path)
print(f"Found {len(all_files)} input files")

os.makedirs(output_path, exist_ok=True)

for batch_idx in range(0, len(all_files), files_per_batch):
    batch_files = all_files[batch_idx: batch_idx + files_per_batch]
    print(f"Processing files {batch_idx} to {batch_idx + len(batch_files)}")

    dataset = ds.dataset(batch_files, format="parquet", filesystem=local_fs)

    ds.write_dataset(
        dataset,
        output_path,
        format="parquet",
        basename_template=f"compacted_batch_{batch_idx // files_per_batch}_{{i}}.parquet",
        min_rows_per_group=min_rows_per_group,
        max_rows_per_file=max_rows_per_file,
        max_rows_per_group=max_rows_per_group,
        existing_data_behavior="overwrite_or_ignore",
        use_threads=True,
        filesystem=local_fs,
        file_options=ds.ParquetFileFormat().make_write_options(compression="gzip"),
    )

    del dataset


Found 696 input files
Processing files 0 to 100
Processing files 100 to 200
Processing files 200 to 300
Processing files 300 to 400
Processing files 400 to 500
Processing files 500 to 600
Processing files 600 to 696
