-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
benchmarks: add dataset_serialize_benchmark.py #124
Conversation
This is work in progress.
to give CLI users better progress information around time-consuming HTTP downloads
and refactor things a bit
https://github.com/voltrondata-labs/benchmarks/actions/runs/3675023431/jobs/6213938529#step:8:19 -- Error from CI not helpful:
|
this seemingly depends on how conbench is installed in the project
Pull Request Test Coverage Report for Build 3705415491
💛 - Coveralls |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thanks for this! In the future it would be nice to extend this benchmark to also include the read side, and to add parameters for input and output parallelism. Question: is it possible to capture maximum memory usage as a secondary benchmark result?
of things, the data is read fully into memory before starting (and timing) | ||
the benchmark function. That is (believed to be) achieved with: | ||
|
||
data = source_dataset.to_table(filter=somefilter, ...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are performing this outside of the measurements, then the filter is also not being measured, right? I'm all for keeping the code, though, because I think it would make sense to eventually expand this benchmark into measuring an end to end read - filter - write workload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are performing this outside of the measurements, then the filter is also not being measured, right?
Right!
This following benchmark times read-from-disk-backed-fs-and-then-filter:
return lambda: dataset.to_table( |
And this one does, too:
return lambda: dataset.to_table(filter=(vendor == "DDS") & (count > 3)) |
Although I would think that in both cases the filtering does not dominate the time, but instead it's the network-attached disk (see below).
For focusing a benchmark on the filter performance, the data should be in memory already, using e.g. tmpfs (/dev/shm
), like I am proposing here.
I think it would make sense to eventually expand this benchmark into measuring an end to end read - filter - write workload.
Yeah, interesting thought. Maybe we want this kind of test/benchmark in the future. As described in #123 in fact here I started with that. And then I wondered what the use is. I called this "RDFSW":
reading -> deserializing -> filtering -> serializing -> writing
and I found that the signal on the final two stages was weak / non-existant, because of the read-from-network-attached-disk on the left-hand side of the flow. This part is both slow and volatile compared to the other steps.
If we do this, we should read from tmpfs (which limits dataset size, of course).
That's a great discussion for subsequent work!
There are two dimensions that are varied: | ||
|
||
- serialization format | ||
- amount of the data being written, as set by a filter on the input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, the filter is the means by which we are varying the data size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. It's a little indirect, and we don't get to see how big the data actually was. I think this can/should be improved in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: with the recent commits I changed the approach
] | ||
|
||
_params = { | ||
"selectivity": ("1pc", "10pc"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end we'd also like to measure 100%, is that not included to keep the runtime in check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am conservative here to keep /dev/shm usage in check.
On my machine I have a 16 GB /dev/shm and during dev I managed to fill this and almost crashed my desktop environment.
I will play a little more with that and maybe update the PR again to add a variant that writes more data, maybe not 100 %, but more than 10 % :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added commits. For nyctaxi_multi_ipc_s3
and the 10 %
/ csv
scenario the output is already ~ 5 GB:
[221214-17:11:21.205] [375153] [benchmarks.dataset_serialize_benchmark] INFO: case ('10pc', 'csv'): create directory
[221214-17:11:21.205] [375153] [benchmarks.dataset_serialize_benchmark] INFO: directory created, path: /dev/shm/bench-02a3a0e3/10pc-csv-de09c188-69d7-41b4-9c0b-fd00654f06ad
[221214-17:11:21.877] [375153] [benchmarks.dataset_serialize_benchmark] INFO: read source dataset into memory in 0.6720 s
[221214-17:11:50.099] [375153] [benchmarks.dataset_serialize_benchmark] INFO: stdout of ['du', '-sh', '/dev/shm/bench-02a3a0e3/10pc-csv-de09c188-69d7-41b4-9c0b-fd00654f06ad']: 4.6G
I think we should not grow beyond that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I have adopted the ‘overwrite_or_ignore’
technique and enabled the 100 %
case. The maximum usage of /dev/shm now is 7.6 GB, and I think that's OK. (if the CI runners also provide that much of space).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Percent (I'm assuming pc
== %
?) seems like an awkward way to do this for lots of sources; wouldn't it be simpler to do a number of rows? Although what you sample needs to be deterministic as well, one way or another, so I guess there's no magic way to generalize.
# Build simple prefix string to facilitate correlating directory names | ||
# to test cases. | ||
pfx = "-".join(c.lower()[:9] for c in case) | ||
dirpath = os.path.join("/dev/shm", pfx + "-" + str(uuid.uuid4())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rely on /dev/shm existing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On Linux we can, yes.
Maybe this is interesting background: https://unix.stackexchange.com/a/151987/13256
On MacOS, sadly not:
Support for POSIX shared memory is somewhat limited. Though the functionality is implemented, there is no file system interface like /dev/shm to manage the shared memory segments.
Maybe we should mark this test to be skipped on MacOS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a test, exactly. The other situation like this I know we have is file cache clearing; we try a command that only works on linux, but don't fail if it fails (as a lot of us develop on macos). It'd be nice to at least be able to run this locally for dev purposes if possible, even if this is how we want it to run in CI.
def benchfunc(): | ||
# This is a hack to make each iteration work in a separate | ||
# directory (otherwise some write operations would error out saying | ||
# that the target directory is not empty). With `benchrun` it will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't like this, there is an existing_data_behavior
parameter in write_dataset
you could use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, cool. Thanks for pointing that out! Documented here:
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html
existing_data_behavior
‘error’ | ‘overwrite_or_ignore’ | ‘delete_matching’
The current approach creates a fresh directory for each iteration and should therefore be 'fastest'. Deletion might take valuable time (which would be timed by the benchmark wrapper), and is therefore not a good option.
It might be that ‘overwrite_or_ignore’
is actually the smartest if the second+ write consumes as much time as the first one. This would be the best option to limit space consumption across iterations.
I have looked into that. Not currently (easily), because these benchmarks are not executed in tidy child processes, but instead as part of the one process running all. The 'maximum memory usage' by that process has I think what can fairly be called an unknown relationship to the individual benchmark scenarios. In a controlled environment where we understand that a certain process' mem usage is useful to measure we could then use the Others recommend looking at
(https://man7.org/linux/man-pages/man5/proc.5.html) Measuring memory consumption is a lovely topic and needs careful goal-setting and verification. |
sources = [ | ||
"nyctaxi_multi_parquet_s3", | ||
"nyctaxi_multi_ipc_s3", | ||
# "chi_traffic_2020_Q1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the chi_...
benchmark I do get:
[221214-16:26:00.151] [370389] [root] ERROR: {"timestamp": "2022-12-14T15:26:00.151675+00:00", "tags": {"dataset": "chi_traffic_2020_Q1", "cpu_count": null, "selectivity": "10pc", "format": "csv", "name": "dataset-serialize"}, "info": {"arrow_version": "10.0.1", "arrow_compiler_id": "GNU", "arrow_compiler_version": "10.2.1", "benchmark_language_version": "Python 3.10.8"}, "context": {"arrow_compiler_flags": " -fdiagnostics-color=always -O2 -DNDEBUG -ftree-vectorize", "benchmark_language": "Python"}, "error": "Unsupported Type:struct<sec: double, min: int32, hour: int32, mday: int32, mon: int32, year: int32, wday: int32, yday: int32, isdst: int32, zone: string, gmtoff: int32>"}
Traceback (most recent call last):
File "/home/jp/dev/voltrondata-labs-benchmarks/benchmarks/_benchmark.py", line 86, in benchmark
benchmark, output = self.conbench.benchmark(
File "/home/jp/dev/voltrondata-labs-benchmarks/conbench/conbench/runner.py", line 188, in benchmark
raise e
File "/home/jp/dev/voltrondata-labs-benchmarks/conbench/conbench/runner.py", line 160, in benchmark
data, output = self._get_timing(f, iterations, timing_options)
File "/home/jp/dev/voltrondata-labs-benchmarks/conbench/conbench/runner.py", line 363, in _get_timing
output = f()
File "/home/jp/dev/voltrondata-labs-benchmarks/benchmarks/dataset_serialize_benchmark.py", line 242, in benchfunc
File "/home/jp/.pyenv/versions/3108-vd-benchmarks/lib/python3.10/site-packages/pyarrow/dataset.py", line 988, in write_dataset
_filesystemdataset_write(
File "pyarrow/_dataset.pyx", line 2859, in pyarrow._dataset._filesystemdataset_write
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Unsupported Type:struct<sec: double, min: int32, hour: int32, mday: int32, mon: int32, year: int32, wday: int32, yday: int32, isdst: int32, zone: string, gmtoff: int32>
for the csv
case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonkeane should we submit this as an issue somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at that error, there's an issue converting a struct to a text representation for the csv representation. There might be an issue in apache/arrow for that already (though I suspect that faithful writing of structs to csvs is likely a feature that apache/arrow will punt on or possibly already did punt on)
I have just added some commits responding to review comments. Also have added a pragmatic usage report for |
# serialization formats. | ||
# | ||
# data = pyarrow.dataset.Scanner.from_dataset(source_ds, | ||
# filter=self.filters[source_name][selectivity]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept this because it's somewhat non-obvious from pyarrow docs that this Scanner()
-based technique really works (that the Scanner
instance can be provided as data
argument to write_dataset()
). I tried it out and it looks good, it's certainly a lazy evaluation. It's a super cool technique that maybe we should blog about (well, maybe my enthusiasm is too big because I am new to this ecosystem :)).
I think I'd like to add a benchmark that does the complete round trip in a child process and then the metric we would care about is the maximum memory usage of that child process, confirming that this really is operating in a stream-like fashion: that at any given time the process only holds a fraction of the complete data set in memory.
With the 'overwrite_or_ignore' strategy the maximum /dev/shm usage is confined by what a single iteration writes at most. In my case this is 7.6G written by this scenario: dataset: nyctaxi_multi_ipc_s3 format: csv selectivity: 100pc
Added a commit that uses the Output from running this locally.
The differences between e.g. these two are impressive:
(and I understand that the difference may become smaller by more fine-tuned CSV-writing as suggested by @joosthooz via "add parameters for input and output parallelism") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts! What's here looks good; mostly my comments are things we could do to represent what real-world uses of writing datasets will look like.
run: | | ||
isort --diff . |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I've had a good amount of issues with isort; I wonder if we should pin a version to hopefully get more consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the difference between environments was after all how I chose to integrate conbench
(not from PyPI, but from my local checkout).
sources = [ | ||
"nyctaxi_multi_parquet_s3", | ||
# "nyctaxi_multi_ipc_s3", | ||
# "chi_traffic_2020_Q1", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to enable more sources because type diversity can vary a lot. (At least one of the versions of taxi—not sure if it's this one—is all floats, I believe.) We've got a lot more single-file than multi-file datasets, predictably, but some are quite useful, like the type_*
ones. They're not big enough to stress this much, but they could be resampled up to a larger size if necessary. Torturing the partitioning and grouping may also provide more interesting results from smaller datasets (though we do obviously want to track defaults, too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great input! The diff you commented on makes me think which version you reviewed, because in last state of yesterday I had also enabled nyctaxi_multi_ipc_s3
. chi_traffic_2020_Q1
is still disabled because of a pyarrow.lib.ArrowInvalid: Unsupported Type:
error for the case of writing to CSV. Can look into understanding and potentially fixing that later.
] | ||
|
||
_params = { | ||
"selectivity": ("1pc", "10pc"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Percent (I'm assuming pc
== %
?) seems like an awkward way to do this for lots of sources; wouldn't it be simpler to do a number of rows? Although what you sample needs to be deterministic as well, one way or another, so I guess there's no magic way to generalize.
"nyctaxi_multi_parquet_s3": { | ||
"1pc": ds.field("pickup_longitude") < -74.013451, # 561384 | ||
"10pc": ds.field("pickup_longitude") < -74.002055, # 5615432 | ||
"100pc": None, # 56154689 | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need to recalculate any of these for some reason (say we decide we're interested in 20% or whatever at some point), how are these values calculated? Can we store that somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started building this module by copying everything from dataset_selectivity_benchmark.py
, and then changing those things that I cared about most. That's where this approach came from.
If we need to recalculate any of these for some reason (say we decide we're interested in 20% or whatever at some point), how are these values calculated?
I do not know, but I suppose the original author applied some manual approximation, tweaking the threshold until reaching roundabout the desired number of rows.
.... wouldn't it be simpler to do a number of rows? Although what you sample needs to be deterministic as well, one way or another, so I guess there's no magic way to generalize.
I thought about simply taking a number of rows either from head or tail or mid part, but then I realized that the filter gives some set of rows that might be a more interesting data selection, obtained deterministically (as you say, i.e. just doing a random subset of fixed size isn't going to be similar).
I fully agree that the percent -> number of rows
translation is very indirect. In the process I also learned that filtering shouldn't even be part of this benchmark, so this indirection is by now just kind of misleading.
I will see if it's super easy to change this into using either head or tail with desired row counts, w/o affecting the timing of the benchmark much.
"arrow", | ||
"ipc", | ||
"feather", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow == feather, at least for feather v2, and I don't think we care much about v1 anymore. There's been some effort now to just start calling everything "arrow files" so we can probably just drop feather here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait is ipc also the same thing? I don't know that much about it but it may be...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great input.
arrow == feather, at least for feather v2,
Interesting! Hmm, the timings that I saw suggest that this equality isn't quite right. It appears like feather
and ipc
are highly comparable, e,g.:
dataset: nyctaxi_multi_parquet_s3, format: arrow, selectivity: 100pc
--> min: 1.38 s mean±SE: (1.48 ± 0.05) s
dataset: nyctaxi_multi_parquet_s3, format: ipc, selectivity: 100pc
--> min: 1.39 s mean±SE: (1.55 ± 0.09) s
dataset: nyctaxi_multi_parquet_s3, format: feather, selectivity: 100pc
--> min: 1.44 s mean±SE: (1.55 ± 0.06) s
dataset: nyctaxi_multi_ipc_s3, format: arrow, selectivity: 100pc
--> min: 1.64 s mean±SE: (1.77 ± 0.08) s
dataset: nyctaxi_multi_ipc_s3, format: ipc, selectivity: 100pc
--> min: 1.75 s mean±SE: (2.4 ± 0.2) s
dataset: nyctaxi_multi_ipc_s3, format: feather, selectivity: 100pc
--> min: 2.34 s mean±SE: (2.7 ± 0.1) s
https://arrow.apache.org/docs/python/feather.html
Version 2 (V2), the default version, which is exactly represented as the Arrow IPC file format on disk. V2 files support storing all Arrow data types as well as compression with LZ4 or ZSTD. V2 was first made available in Apache Arrow 0.17.0.
So, I will remove either feather
or ipc
.
The docs for write_dataset() suggest that all three are the same/comparable:
The format in which to write the dataset. Currently supported: “parquet”, “ipc”/”arrow”/”feather”, and “csv”.
However, there probably is a difference (as indicated by measurement results). I think it might have to do with compression. For feather
, this is documented:
LZ4 is used by default if it is available (which it should be if you obtained pyarrow through a normal package manager):
Maybe maybe feather
is lz4(arrow)
.
I think in this case it makes sense to leave in both, kind of covering the 'default' settings.
"chi_traffic_sample", | ||
] | ||
|
||
_params = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a number of other parameters here that are likely worth exploring here.
- A huge large issue to explore here is partitioning, encompassing all of what categoricals are used for hive directory partitioning, how big files are allowed to be within those, and how big row groups are within files. I'm not sure the extent it's worth it to torture the system with many, many files or not enough, but at least trending towards either end of the plausible spectrum is useful for representing more real-world workloads.
- Compression is always important for file formats. The file writing benchmarks cover a lot, but we probably want to make sure we're covering some common variants here like snappy for parquet (a default, I think?) and gzip for csv (not a default, I don't think).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great input!
I think writing-with-compression should be its own rather laser-focused benchmark, very similar to what's in here so far (certainly with writing to RAM!). It's a great candidate for extending this very benchmark here later by one more dimension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About partitioning: maybe this makes most sense when writing many files of considerable size, I don't know yet how compatible these ideas are: i) write to potentially small tmpfs and ii) exercise quite a bit of partitioning stuff. They probably are compatible. But yeah, let's think about that in an independent effort.
# Build simple prefix string to facilitate correlating directory names | ||
# to test cases. | ||
pfx = "-".join(c.lower()[:9] for c in case) | ||
dirpath = os.path.join("/dev/shm", pfx + "-" + str(uuid.uuid4())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a test, exactly. The other situation like this I know we have is file cache clearing; we try a command that only works on linux, but don't fail if it fails (as a lot of us develop on macos). It'd be nice to at least be able to run this locally for dev purposes if possible, even if this is how we want it to run in CI.
Good point! Will consider this. Note to self: I am also reading about the ORC file format, and will quickly investigate if it's worth adding before we merge this. |
Instead of the current selectivity/filter method that uses dataset-specific floaty filtering, as of @alistaire47's feedback I have looked into finding a method that uses So, I was looking inter pyarrow filter expressions and farmed out with questions to Slack and Zulip. Found that there is no modulo operator yet, and there's no easy way to filter by a magic 'row number'. These are just interesting side findings I wanted to quickly document. |
This benchmark only needs various input sizes created in a pragmatic way. It's more pragmatic to do head(desired_row_count) than doing some fancy filter technique (that might confuse code readers). The filter technique was copied from a different benchmark.
Ed made a good point that MacOS devs should still be able to run this. Of course.
Writing ORC: No, not yet. apache/arrow#14968. |
Added commits after review feedback, to
|
The skip condition was bad. Woops.
OK, landing this now to learn what this does in the various pipelines that this affects. Hope it doesn't disturb anything but fails very locally if it fails. 🚢 |
This now ran as part of arrow-benchmarks-ci, also see voltrondata-labs/arrow-benchmarks-ci#92 (comment). |
This is work in progress for #123.