-
Notifications
You must be signed in to change notification settings - Fork 10
/
dataset_selectivity_benchmark.py
73 lines (67 loc) · 2.69 KB
/
dataset_selectivity_benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import conbench.runner
import pyarrow.dataset as ds
from benchmarks import _benchmark
@conbench.runner.register_benchmark
class DatasetSelectivityBenchmark(_benchmark.Benchmark):
"""Read and filter a dataset with different selectivity."""
name = "dataset-selectivity"
arguments = ["source"]
sources = [
"nyctaxi_multi_parquet_s3",
"nyctaxi_multi_ipc_s3",
"chi_traffic_2020_Q1",
]
sources_test = [
"nyctaxi_multi_parquet_s3_sample",
"nyctaxi_multi_ipc_s3_sample",
"chi_traffic_sample",
]
valid_cases = (["selectivity"], ["1%"], ["10%"], ["100%"])
filters = {
"nyctaxi_multi_parquet_s3": {
"1%": ds.field("pickup_longitude") < -74.013451, # 561384
"10%": ds.field("pickup_longitude") < -74.002055, # 5615432
"100%": None, # 56154689
},
"nyctaxi_multi_ipc_s3": {
"1%": ds.field("pickup_longitude") < -74.014053, # 596165
"10%": ds.field("pickup_longitude") < -74.002708, # 5962204
"100%": None, # 59616487
},
"chi_traffic_2020_Q1": {
"1%": ds.field("END_LONGITUDE") < -87.807262, # 124530
"10%": ds.field("END_LONGITUDE") < -87.7624, # 1307565
"100%": None, # 13038291
},
**dict.fromkeys(
["nyctaxi_multi_parquet_s3_sample", "nyctaxi_multi_ipc_s3_sample"],
{
"1%": ds.field("pickup_longitude") < -74.0124, # 20
"10%": ds.field("pickup_longitude") < -74.00172, # 200
"100%": None, # 2000
},
),
"chi_traffic_sample": {
"1%": ds.field("END_LONGITUDE") < -87.80726, # 10
"10%": ds.field("END_LONGITUDE") < -87.76148, # 100
"100%": None, # 1000
},
}
def run(self, source, case=None, **kwargs):
cases = self.get_cases(case, kwargs)
for source in self.get_sources(source):
source.download_source_if_not_exists()
tags = self.get_tags(kwargs, source)
format_str = source.format_str
schema = ds.dataset(source.source_paths[0], format=format_str).schema
for case in cases:
(selectivity,) = case
dataset = ds.dataset(
source.source_paths, schema=schema, format=format_str
)
f = self._get_benchmark_function(dataset, source.name, selectivity)
yield self.benchmark(f, tags, kwargs, case)
def _get_benchmark_function(self, dataset, source, selectivity):
return lambda: dataset.to_table(
filter=self.filters[source][selectivity]
).num_rows