Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmarks: add dataset_serialize_benchmark.py #124

Merged
merged 24 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
68e9d19
benchmarks: add dataset_serialize_benchmark.py
jgehrcke Dec 9, 2022
f15947b
_source.py: add logging around HTTP requests
jgehrcke Dec 12, 2022
6992f18
serialize bm: introduce Scanner for filtering
jgehrcke Dec 12, 2022
277807b
fix isort
jgehrcke Dec 12, 2022
53968ad
isort
jgehrcke Dec 12, 2022
14b7e80
isort
jgehrcke Dec 12, 2022
8a1bed3
ci: isort: show diff
jgehrcke Dec 12, 2022
ff4f5e1
isort for conbench
jgehrcke Dec 12, 2022
35f2227
remove debug print
jgehrcke Dec 12, 2022
a292a81
serialize-benchmark: commit current state, cleanup follows
jgehrcke Dec 14, 2022
9cc975e
serialize-benchmark: clean up
jgehrcke Dec 14, 2022
3b313a4
add test_dataset_serialize_benchmark.py
jgehrcke Dec 14, 2022
c6ae972
dataset-serialize: add comment abt return value
jgehrcke Dec 14, 2022
c484871
_sources.py: add a bit of debug output
jgehrcke Dec 14, 2022
6aac940
dataset-serialize: polish comments, import
jgehrcke Dec 14, 2022
c0b3eb6
benchmarks.json: add dataset-serialize
jgehrcke Dec 14, 2022
15bbce8
dataset-serialize: friendly err on darwin
jgehrcke Dec 14, 2022
fd3868e
dataset-serialize: report /dev/shm usage on the fly
jgehrcke Dec 14, 2022
1218548
isort
jgehrcke Dec 14, 2022
cb7e90f
dataset-serialize: unlock 100pc case
jgehrcke Dec 14, 2022
fcfe9c4
dataset-serialize: simpler size cases
jgehrcke Dec 15, 2022
71781d8
dataset-serialize: BENCHMARK_OUTPUT_DIR env var
jgehrcke Dec 15, 2022
f0806f0
_sources: remove log line again
jgehrcke Dec 15, 2022
4ce89d5
tests: fix, adjust to dataset-serialize changes
jgehrcke Dec 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ jobs:
run: |
flake8
- name: Lint (isort)
# Note(JP): if --check fails then it's not always obvious why.
# Show diff beforehand.
run: |
isort --diff .
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've had a good amount of issues with isort; I wonder if we should pin a version to hopefully get more consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the difference between environments was after all how I chose to integrate conbench (not from PyPI, but from my local checkout).

isort --check .
- name: Install libcurl (for R arrow)
run: |
Expand Down
6 changes: 6 additions & 0 deletions benchmarks.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
"language": "Python"
}
},
{
"command": "dataset-serialize ALL --iterations=3 --all=true --drop-caches=true",
"flags": {
"language": "Python"
}
},
{
"command": "file-read ALL --iterations=3 --all=true --drop-caches=true",
"flags": {
Expand Down
19 changes: 15 additions & 4 deletions benchmarks/_sources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import functools
import logging
import os
import pathlib
from enum import Enum
Expand All @@ -15,6 +16,9 @@
temp_dir = os.path.join(data_dir, "temp")


log = logging.getLogger(__name__)


def _local(name):
"""Sources for unit testing, committed to benchmarks/data."""
return os.path.join(local_data_dir, name)
Expand Down Expand Up @@ -435,8 +439,10 @@ def table(self):
return self._table

def _get_object_url(self, idx=0):
log.info("_get_object_url for idx %s", idx)
if self.paths:
s3_url = pathlib.Path(self.paths[idx])
log.info("s3_url: %s", s3_url)
return (
"https://"
+ s3_url.parts[0]
Expand All @@ -452,11 +458,16 @@ def download_source_if_not_exists(self):
for idx, p in enumerate(self.source_paths):
path = pathlib.Path(p)
if not path.exists():
log.info("path does not exist: %s", path)
path.parent.mkdir(parents=True, exist_ok=True)
source = self.store.get("source")
if not source:
source = self._get_object_url(idx)
r = requests.get(source)

url = self.store.get("source")
if not url:
url = self._get_object_url(idx)

log.info("HTTP GET %s", url)
r = requests.get(url)
log.info("write response to disk")
open(path, "wb").write(r.content)

def _csv_write(self, table, path, compression):
Expand Down
272 changes: 272 additions & 0 deletions benchmarks/dataset_serialize_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import itertools
import logging
import os
import shutil
import subprocess
import sys
import time
import uuid

import conbench.runner
import pyarrow
import pyarrow.dataset as ds

from benchmarks import _benchmark

log = logging.getLogger(__name__)


# All benchmark scnearios will write below /dev/shm/<SHM_DIR_PREFIX>. That
# directory tree is removed upon completion (not necessarily in case of error
# though).
OUTPUT_DIR_PREFIX = os.path.join("/dev/shm/", "bench-" + str(uuid.uuid4())[:8])


@conbench.runner.register_benchmark
class DatasetSerializeBenchmark(_benchmark.Benchmark):
"""
This benchmark is supposed to measure the time it takes to write data from
memory (from a pyarrow Table) to a tmpfs file system, given a specific
serialization format (parquet, arrow, ...).

To make this benchmark agnostic to disk read performance on the input side
of things, the data is read fully into memory before starting (and timing)
the benchmark function. That is (believed to be) achieved with:

data = source_dataset.to_table(filter=somefilter, ...)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are performing this outside of the measurements, then the filter is also not being measured, right? I'm all for keeping the code, though, because I think it would make sense to eventually expand this benchmark into measuring an end to end read - filter - write workload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are performing this outside of the measurements, then the filter is also not being measured, right?

Right!

This following benchmark times read-from-disk-backed-fs-and-then-filter:

return lambda: dataset.to_table(

And this one does, too:

return lambda: dataset.to_table(filter=(vendor == "DDS") & (count > 3))

Although I would think that in both cases the filtering does not dominate the time, but instead it's the network-attached disk (see below).

For focusing a benchmark on the filter performance, the data should be in memory already, using e.g. tmpfs (/dev/shm), like I am proposing here.

I think it would make sense to eventually expand this benchmark into measuring an end to end read - filter - write workload.

Yeah, interesting thought. Maybe we want this kind of test/benchmark in the future. As described in #123 in fact here I started with that. And then I wondered what the use is. I called this "RDFSW":

reading -> deserializing -> filtering -> serializing -> writing

and I found that the signal on the final two stages was weak / non-existant, because of the read-from-network-attached-disk on the left-hand side of the flow. This part is both slow and volatile compared to the other steps.

If we do this, we should read from tmpfs (which limits dataset size, of course).

That's a great discussion for subsequent work!


After data of interest has been read into memory, the following call is
used for both serialization and writing-to-filesystem in one go:

pyarrow.dataset.write_dataset(format=someformat, ...)

That operation is timed (and the duration is the major output of this
benchmark).

The data is written to `/dev/shm` (available on all Linux systems). This is
a file system backed by RAM (tmpfs). The assumption is that writing to
tmpfs is fast (so fast that benchmark duration is significantly affected by
the serialization itself), and stable (so that its performance is ~constant
across runs on the same machine).

This benchmark does not resolve how much time goes into the CPU work for
serialization vs. the system calls for writing to tmpfs (that would be a
different question to answer, an interesting one, that is maybe more of a
task for profiling).

There are two dimensions that are varied:

- serialization format
- amount of the data being written, as set by a filter on the input

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, the filter is the means by which we are varying the data size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It's a little indirect, and we don't get to see how big the data actually was. I think this can/should be improved in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: with the recent commits I changed the approach


A note about /dev/shm: it's of great value because

- unprivileges users can write to it
- the `base_dir` arg to pyarrow.dataset.write_dataset() requires a path to
a directory. That is, one cannot inject a memory-backed Python file
object (a strategy that's elsewhere often used to simulate writing to an
actual file)
- it is not available on MacOS which is why we skip this test

"""

name = "dataset-serialize"

arguments = ["source"]

sources = [
"nyctaxi_multi_parquet_s3",
"nyctaxi_multi_ipc_s3",
# "chi_traffic_2020_Q1",
Copy link
Contributor Author

@jgehrcke jgehrcke Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the chi_... benchmark I do get:

[221214-16:26:00.151] [370389] [root] ERROR: {"timestamp": "2022-12-14T15:26:00.151675+00:00", "tags": {"dataset": "chi_traffic_2020_Q1", "cpu_count": null, "selectivity": "10pc", "format": "csv", "name": "dataset-serialize"}, "info": {"arrow_version": "10.0.1", "arrow_compiler_id": "GNU", "arrow_compiler_version": "10.2.1", "benchmark_language_version": "Python 3.10.8"}, "context": {"arrow_compiler_flags": " -fdiagnostics-color=always -O2 -DNDEBUG -ftree-vectorize", "benchmark_language": "Python"}, "error": "Unsupported Type:struct<sec: double, min: int32, hour: int32, mday: int32, mon: int32, year: int32, wday: int32, yday: int32, isdst: int32, zone: string, gmtoff: int32>"}
Traceback (most recent call last):
  File "/home/jp/dev/voltrondata-labs-benchmarks/benchmarks/_benchmark.py", line 86, in benchmark
    benchmark, output = self.conbench.benchmark(
  File "/home/jp/dev/voltrondata-labs-benchmarks/conbench/conbench/runner.py", line 188, in benchmark
    raise e
  File "/home/jp/dev/voltrondata-labs-benchmarks/conbench/conbench/runner.py", line 160, in benchmark
    data, output = self._get_timing(f, iterations, timing_options)
  File "/home/jp/dev/voltrondata-labs-benchmarks/conbench/conbench/runner.py", line 363, in _get_timing
    output = f()
  File "/home/jp/dev/voltrondata-labs-benchmarks/benchmarks/dataset_serialize_benchmark.py", line 242, in benchfunc
  File "/home/jp/.pyenv/versions/3108-vd-benchmarks/lib/python3.10/site-packages/pyarrow/dataset.py", line 988, in write_dataset
    _filesystemdataset_write(
  File "pyarrow/_dataset.pyx", line 2859, in pyarrow._dataset._filesystemdataset_write
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Unsupported Type:struct<sec: double, min: int32, hour: int32, mday: int32, mon: int32, year: int32, wday: int32, yday: int32, isdst: int32, zone: string, gmtoff: int32>

for the csv case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonkeane should we submit this as an issue somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at that error, there's an issue converting a struct to a text representation for the csv representation. There might be an issue in apache/arrow for that already (though I suspect that faithful writing of structs to csvs is likely a feature that apache/arrow will punt on or possibly already did punt on)

]

sources_test = [
"nyctaxi_multi_parquet_s3_sample",
"nyctaxi_multi_ipc_s3_sample",
"chi_traffic_sample",
]

_params = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a number of other parameters here that are likely worth exploring here.

  1. A huge large issue to explore here is partitioning, encompassing all of what categoricals are used for hive directory partitioning, how big files are allowed to be within those, and how big row groups are within files. I'm not sure the extent it's worth it to torture the system with many, many files or not enough, but at least trending towards either end of the plausible spectrum is useful for representing more real-world workloads.
  2. Compression is always important for file formats. The file writing benchmarks cover a lot, but we probably want to make sure we're covering some common variants here like snappy for parquet (a default, I think?) and gzip for csv (not a default, I don't think).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great input!

I think writing-with-compression should be its own rather laser-focused benchmark, very similar to what's in here so far (certainly with writing to RAM!). It's a great candidate for extending this very benchmark here later by one more dimension.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About partitioning: maybe this makes most sense when writing many files of considerable size, I don't know yet how compatible these ideas are: i) write to potentially small tmpfs and ii) exercise quite a bit of partitioning stuff. They probably are compatible. But yeah, let's think about that in an independent effort.

"selectivity": ("1pc", "10pc"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end we'd also like to measure 100%, is that not included to keep the runtime in check?

Copy link
Contributor Author

@jgehrcke jgehrcke Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am conservative here to keep /dev/shm usage in check.

On my machine I have a 16 GB /dev/shm and during dev I managed to fill this and almost crashed my desktop environment.

I will play a little more with that and maybe update the PR again to add a variant that writes more data, maybe not 100 %, but more than 10 % :).

Copy link
Contributor Author

@jgehrcke jgehrcke Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added commits. For nyctaxi_multi_ipc_s3 and the 10 % / csv scenario the output is already ~ 5 GB:

[221214-17:11:21.205] [375153] [benchmarks.dataset_serialize_benchmark] INFO: case ('10pc', 'csv'): create directory
[221214-17:11:21.205] [375153] [benchmarks.dataset_serialize_benchmark] INFO: directory created, path: /dev/shm/bench-02a3a0e3/10pc-csv-de09c188-69d7-41b4-9c0b-fd00654f06ad
[221214-17:11:21.877] [375153] [benchmarks.dataset_serialize_benchmark] INFO: read source dataset into memory in 0.6720 s
[221214-17:11:50.099] [375153] [benchmarks.dataset_serialize_benchmark] INFO: stdout of ['du', '-sh', '/dev/shm/bench-02a3a0e3/10pc-csv-de09c188-69d7-41b4-9c0b-fd00654f06ad']: 4.6G

I think we should not grow beyond that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I have adopted the ‘overwrite_or_ignore’ technique and enabled the 100 % case. The maximum usage of /dev/shm now is 7.6 GB, and I think that's OK. (if the CI runners also provide that much of space).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Percent (I'm assuming pc == %?) seems like an awkward way to do this for lots of sources; wouldn't it be simpler to do a number of rows? Although what you sample needs to be deterministic as well, one way or another, so I guess there's no magic way to generalize.

"format": (
"parquet",
"arrow",
"ipc",
"feather",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow == feather, at least for feather v2, and I don't think we care much about v1 anymore. There's been some effort now to just start calling everything "arrow files" so we can probably just drop feather here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait is ipc also the same thing? I don't know that much about it but it may be...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great input.

arrow == feather, at least for feather v2,

Interesting! Hmm, the timings that I saw suggest that this equality isn't quite right. It appears like feather and ipc are highly comparable, e,g.:

dataset: nyctaxi_multi_parquet_s3, format: arrow, selectivity: 100pc
    --> min: 1.38 s     mean±SE: (1.48 ± 0.05) s
dataset: nyctaxi_multi_parquet_s3, format: ipc, selectivity: 100pc
    --> min: 1.39 s     mean±SE: (1.55 ± 0.09) s
dataset: nyctaxi_multi_parquet_s3, format: feather, selectivity: 100pc
    --> min: 1.44 s     mean±SE: (1.55 ± 0.06) s
dataset: nyctaxi_multi_ipc_s3, format: arrow, selectivity: 100pc
    --> min: 1.64 s     mean±SE: (1.77 ± 0.08) s
dataset: nyctaxi_multi_ipc_s3, format: ipc, selectivity: 100pc
    --> min: 1.75 s     mean±SE: (2.4 ± 0.2) s
dataset: nyctaxi_multi_ipc_s3, format: feather, selectivity: 100pc
    --> min: 2.34 s     mean±SE: (2.7 ± 0.1) s

https://arrow.apache.org/docs/python/feather.html

Version 2 (V2), the default version, which is exactly represented as the Arrow IPC file format on disk. V2 files support storing all Arrow data types as well as compression with LZ4 or ZSTD. V2 was first made available in Apache Arrow 0.17.0.

So, I will remove either feather or ipc.

The docs for write_dataset() suggest that all three are the same/comparable:

The format in which to write the dataset. Currently supported: “parquet”, “ipc”/”arrow”/”feather”, and “csv”.

However, there probably is a difference (as indicated by measurement results). I think it might have to do with compression. For feather, this is documented:

LZ4 is used by default if it is available (which it should be if you obtained pyarrow through a normal package manager):

Maybe maybe feather is lz4(arrow).

I think in this case it makes sense to leave in both, kind of covering the 'default' settings.

"csv",
),
}

valid_cases = [tuple(_params.keys())] + list(
itertools.product(*[v for v in _params.values()])
)

filters = {
"nyctaxi_multi_parquet_s3": {
"1pc": ds.field("pickup_longitude") < -74.013451, # 561384
"10pc": ds.field("pickup_longitude") < -74.002055, # 5615432
"100pc": None, # 56154689
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need to recalculate any of these for some reason (say we decide we're interested in 20% or whatever at some point), how are these values calculated? Can we store that somewhere?

Copy link
Contributor Author

@jgehrcke jgehrcke Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started building this module by copying everything from dataset_selectivity_benchmark.py, and then changing those things that I cared about most. That's where this approach came from.

If we need to recalculate any of these for some reason (say we decide we're interested in 20% or whatever at some point), how are these values calculated?

I do not know, but I suppose the original author applied some manual approximation, tweaking the threshold until reaching roundabout the desired number of rows.

.... wouldn't it be simpler to do a number of rows? Although what you sample needs to be deterministic as well, one way or another, so I guess there's no magic way to generalize.

I thought about simply taking a number of rows either from head or tail or mid part, but then I realized that the filter gives some set of rows that might be a more interesting data selection, obtained deterministically (as you say, i.e. just doing a random subset of fixed size isn't going to be similar).

I fully agree that the percent -> number of rows translation is very indirect. In the process I also learned that filtering shouldn't even be part of this benchmark, so this indirection is by now just kind of misleading.

I will see if it's super easy to change this into using either head or tail with desired row counts, w/o affecting the timing of the benchmark much.

"nyctaxi_multi_ipc_s3": {
"1pc": ds.field("pickup_longitude") < -74.014053, # 596165
"10pc": ds.field("pickup_longitude") < -74.002708, # 5962204
"100pc": None, # 59616487
},
"chi_traffic_2020_Q1": {
"1pc": ds.field("END_LONGITUDE") < -87.807262, # 124530
"10pc": ds.field("END_LONGITUDE") < -87.7624, # 1307565
"100pc": None, # 13038291
},
**dict.fromkeys(
["nyctaxi_multi_parquet_s3_sample", "nyctaxi_multi_ipc_s3_sample"],
{
"1pc": ds.field("pickup_longitude") < -74.0124, # 20
"10pc": ds.field("pickup_longitude") < -74.00172, # 200
"100pc": None, # 2000
},
),
"chi_traffic_sample": {
"1pc": ds.field("END_LONGITUDE") < -87.80726, # 10
"10pc": ds.field("END_LONGITUDE") < -87.76148, # 100
"100pc": None, # 1000
},
}

_case_tmpdir_mapping = {}

def _create_tmpdir_in_ramdisk(self, case: tuple):
# Build simple prefix string for specific test case to facilitate
# correlating directory names to test cases.
pfx = "-".join(c.lower()[:9] for c in case)
dirpath = os.path.join(OUTPUT_DIR_PREFIX, pfx + "-" + str(uuid.uuid4()))

self._case_tmpdir_mapping[tuple(case)] = dirpath

os.makedirs(dirpath, exist_ok=False)
return dirpath

def _get_dataset_for_source(self, source) -> ds.Dataset:
"""Helper to construct a Dataset object."""

return pyarrow.dataset.dataset(
source.source_paths,
schema=pyarrow.dataset.dataset(
source.source_paths[0], format=source.format_str
).schema,
format=source.format_str,
)

def _report_dirsize_and_wipe(self, dirpath: str):
"""
This module already has a dependency on Linux so we can just as well
spawn `du` for correct recursive directory size reporting"""

ducmd = ["du", "-sh", dirpath]
p = subprocess.run(ducmd, capture_output=True)
log.info("stdout of %s: %s", ducmd, p.stdout.decode("utf-8").split()[0])
if p.returncode != 0:
log.info("stderr of %s: %s", ducmd, p.stderr)
log.info("removing directory: %s", dirpath)
shutil.rmtree(dirpath)

def run(self, source, case=None, **kwargs):

if not os.path.exists("/dev/shm"):
sys.exit("/dev/shm not found but required (not available on Darwin). Exit.")

cases = self.get_cases(case, kwargs)

for source in self.get_sources(source):

log.info("source %s: download, if required", source.name)
source.download_source_if_not_exists()
tags = self.get_tags(kwargs, source)

t0 = time.monotonic()
source_ds = self._get_dataset_for_source(source)
log.info(
"constructed Dataset object for source in %.4f s", time.monotonic() - t0
)

for case in cases:

log.info("case %s: create directory", case)
dirpath = self._create_tmpdir_in_ramdisk(case)
log.info("directory created, path: %s", dirpath)

yield self.benchmark(
f=self._get_benchmark_function(
case, source.name, source_ds, dirpath
),
extra_tags=tags,
options=kwargs,
case=case,
)

# Free up memory in the RAM disk (tmpfs), assuming that we're
# otherwise getting close to filling it (depending on the
# machine this is executed on, a single test might easily
# occupy 10 % or more of this tmpfs). Note that what
# accumulated in `dirpath` is the result of potentially
# multiple iterations.
self._report_dirsize_and_wipe(dirpath)

# Finally, remove outest directory. Should have no contents by now, but
# if an individual benchmark iteration was Ctrl+C'd then this here
# might still do useful cleanup.
self._report_dirsize_and_wipe(OUTPUT_DIR_PREFIX)

def _get_benchmark_function(
self, case, source_name: str, source_ds: ds.Dataset, dirpath: str
):

(selectivity, serialization_format) = case

# Option A: read-from-disk -> deserialize -> filter -> into memory
# before timing serialize -> write-to-tmpfs
t0 = time.monotonic()
data = source_ds.to_table(filter=self.filters[source_name][selectivity])
log.info("read source dataset into memory in %.4f s", time.monotonic() - t0)

# Option B (thrown away, but kept for posterity): use a Scanner()
# object to transparently filter the source dataset upon consumption,
# in which case what's timed is read-from-disk -> deserialize -> filter
# -> serialize -> write-to-tmpfs
#
# Note(JP): I have confirmed that for the data used in this benchmark
# this option is dominated by read-from-disk to the extent that no
# useful signal is generated for the write phase, at least for some
# serialization formats.
#
# data = pyarrow.dataset.Scanner.from_dataset(source_ds,
# filter=self.filters[source_name][selectivity])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this because it's somewhat non-obvious from pyarrow docs that this Scanner()-based technique really works (that the Scanner instance can be provided as data argument to write_dataset()). I tried it out and it looks good, it's certainly a lazy evaluation. It's a super cool technique that maybe we should blog about (well, maybe my enthusiasm is too big because I am new to this ecosystem :)).

I think I'd like to add a benchmark that does the complete round trip in a child process and then the metric we would care about is the maximum memory usage of that child process, confirming that this really is operating in a stream-like fashion: that at any given time the process only holds a fraction of the complete data set in memory.


def benchfunc():
# This is a hack to make each iteration work in a separate
# directory (otherwise some write operations would error out saying
# that the target directory is not empty). With `benchrun` it will

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't like this, there is an existing_data_behavior parameter in write_dataset you could use

Copy link
Contributor Author

@jgehrcke jgehrcke Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, cool. Thanks for pointing that out! Documented here:

https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html

existing_data_behavior
‘error’ | ‘overwrite_or_ignore’ | ‘delete_matching’

The current approach creates a fresh directory for each iteration and should therefore be 'fastest'. Deletion might take valuable time (which would be timed by the benchmark wrapper), and is therefore not a good option.

It might be that ‘overwrite_or_ignore’ is actually the smartest if the second+ write consumes as much time as the first one. This would be the best option to limit space consumption across iterations.

# be easier to cleanly hook into doing resource management before
# and after an iteration w/o affecting the timing measurement.
# Assume that creating a directory does not significantly add to
# the duration of the actual payload function.
dp = os.path.join(dirpath, str(uuid.uuid4())[:8])
os.makedirs(dp)

# When dimensioning of benchmark parameters and execution
# environment are not adjusted to each other, tmpfs quickly gets
# full. In that case writing might fail with
#
# File "pyarrow/_dataset.pyx", line 2859, in
# pyarrow._dataset._filesystemdataset_write File
# "pyarrow/error.pxi", line 113, in pyarrow.lib.check_status
# OSError: [Errno 28] Error writing bytes to file. Detail: [errno
# 28] No space left on device

pyarrow.dataset.write_dataset(
data=data, format=serialization_format, base_dir=dp
)

# The benchmark function returns `None` for now. If we need
# deeper inspection into the result maybe iterate on that.

return benchfunc
Loading