Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Dec 7, 2022
1 parent c0572fc commit 1d80323
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 49 deletions.
4 changes: 1 addition & 3 deletions opteryx/connectors/base/base_blob_storage_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ def get_partitions(
*,
dataset: str,
partitioning: Iterable = None, # ("year_{yyyy}", "month_{mm}", "day_{dd}"),
start_date: Union[
datetime.datetime, datetime.date, str
] = None,
start_date: Union[datetime.datetime, datetime.date, str] = None,
end_date: Union[datetime.datetime, datetime.date, str] = None,
) -> List:
"""
Expand Down
2 changes: 2 additions & 0 deletions opteryx/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ class UnmetRequirementError(Exception):
class FeatureNotSupportedOnArchitectureError(Exception):
pass


class NotSupportedError(Exception):
pass


# PEP-0249


Expand Down
9 changes: 3 additions & 6 deletions opteryx/managers/expression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,8 @@
LITERAL_TYPE: int = int("0100", 2)


PUSHABLE_OPERATORS = {
"Gt": ">",
"Lt": "<",
"Eg": "="
}
PUSHABLE_OPERATORS = {"Gt": ">", "Lt": "<", "Eg": "="}


def to_dnf(root):
"""
Expand All @@ -55,6 +52,7 @@ def to_dnf(root):
version 1 only does single predicate filters in the form
(identifier, operator, literal)
"""

def _predicate_to_dnf(root):
if root.token_type == NodeType.AND:
return [_predicate_to_dnf(root.left), _predicate_to_dnf(root.right)]
Expand Down Expand Up @@ -458,4 +456,3 @@ def evaluate_and_append(expressions, table: Table, seed: str = None):
table = columns.apply(table)

return columns, return_expressions, table

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ def selection_pushdown(plan, properties):
"""
# find the in-scope nodes
selection_nodes = plan.get_nodes_of_type(operators.SelectionNode)
reader_nodes = plan.get_nodes_of_type((operators.BlobReaderNode, operators.CollectionReaderNode, operators.FunctionDatasetNode, operators.InternalDatasetNode))
reader_nodes = plan.get_nodes_of_type(
(
operators.BlobReaderNode,
operators.CollectionReaderNode,
operators.FunctionDatasetNode,
operators.InternalDatasetNode,
)
)

# killer questions - if any aren't met, bail
if selection_nodes is None:
Expand Down
6 changes: 4 additions & 2 deletions opteryx/operators/blob_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ def push_predicate(self, predicate):
if self._filter is None:
self._filter = predicate
return True
self._filter = ExpressionTreeNode(NodeType.AND, left=predicate, right=self._filter)
self._filter = ExpressionTreeNode(
NodeType.AND, left=predicate, right=self._filter
)
return True

@property
Expand Down Expand Up @@ -227,7 +229,7 @@ def execute(self) -> Iterable:
parser,
self._cache,
self._selection,
self._filter
self._filter,
)
)
for path, parser in partition["blob_list"]
Expand Down
2 changes: 1 addition & 1 deletion opteryx/operators/collection_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def config(self): # pragma: no cover
@property
def name(self): # pragma: no cover
return "Collection Reader"

@property
def can_push_selection(self):
return False
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/selection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from opteryx.models import QueryProperties
from opteryx.operators import BasePlanNode


class SelectionNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
Expand Down
2 changes: 1 addition & 1 deletion opteryx/third_party/distogram/tests/test_bounds.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def test_bounds():
h = distogram.Distogram()

for i in normal:
h = distogram.update(h, i)
distogram.update(h, i)

dmin, dmax = distogram.bounds(h)
assert dmin == min(normal)
Expand Down
6 changes: 3 additions & 3 deletions opteryx/third_party/distogram/tests/test_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ def test_count():
h = distogram.Distogram(bin_count=3)
assert distogram.count(h) == 0

h = distogram.update(h, 16, count=4)
distogram.update(h, 16, count=4)
assert distogram.count(h) == 4
h = distogram.update(h, 23, count=3)
distogram.update(h, 23, count=3)
assert distogram.count(h) == 7
h = distogram.update(h, 28, count=5)
distogram.update(h, 28, count=5)
assert distogram.count(h) == 12
20 changes: 10 additions & 10 deletions opteryx/third_party/distogram/tests/test_count_at.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def test_count_at():
print(h)

# fill histogram
h = distogram.update(h, 16, count=4)
h = distogram.update(h, 23, count=3)
h = distogram.update(h, 28, count=5)
distogram.update(h, 16, count=4)
distogram.update(h, 23, count=3)
distogram.update(h, 28, count=5)
print(h)

actual_result = distogram.count_at(h, 25)
Expand All @@ -28,17 +28,17 @@ def test_count_at_normal():
h = distogram.Distogram()

for i in normal:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.count_at(h, 0) == approx(points / 2, rel=0.05)


def test_count_at_not_enough_elements():
h = distogram.Distogram()

h = distogram.update(h, 1)
h = distogram.update(h, 2)
h = distogram.update(h, 3)
distogram.update(h, 1)
distogram.update(h, 2)
distogram.update(h, 3)

assert distogram.count_at(h, 2.5) == 2

Expand All @@ -47,7 +47,7 @@ def test_count_at_left():
h = distogram.Distogram(bin_count=6)

for i in [1, 2, 3, 4, 5, 6, 0.7, 1.1]:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.count_at(h, 0.77) == approx(0.14)

Expand All @@ -56,7 +56,7 @@ def test_count_at_right():
h = distogram.Distogram(bin_count=6)

for i in [1, 2, 3, 4, 5, 6, 6.7, 6.1]:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.count_at(h, 6.5) == approx(7.307692307692308)

Expand All @@ -71,7 +71,7 @@ def test_count_at_out_of_bouns():
h = distogram.Distogram()

for i in [1, 2, 3, 4, 5, 6, 6.7, 6.1]:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.count_at(h, 0.2) is None
assert distogram.count_at(h, 10) is None
12 changes: 6 additions & 6 deletions opteryx/third_party/distogram/tests/test_histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ def test_histogram():
h = distogram.Distogram(bin_count=64)

for i in normal:
h = distogram.update(h, i)
distogram.update(h, i)

np_values, np_edges = np.histogram(normal, 10)
d_values, d_edges = distogram.histogram(h, 10)

h = distogram.Distogram(bin_count=3)
h = distogram.update(h, 23)
h = distogram.update(h, 28)
h = distogram.update(h, 16)
distogram.update(h, 23)
distogram.update(h, 28)
distogram.update(h, 16)
assert distogram.histogram(h, bin_count=3) == (
approx([1.0714285714285714, 0.6285714285714286, 1.3]),
[16.0, 20.0, 24.0, 28],
Expand All @@ -34,7 +34,7 @@ def test_histogram_on_too_small_distribution():
h = distogram.Distogram(bin_count=64)

for i in range(5):
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.histogram(h, 10) == None

Expand All @@ -44,7 +44,7 @@ def test_format_histogram():
h = distogram.Distogram(bin_count=bin_count)

for i in range(4):
h = distogram.update(h, i)
distogram.update(h, i)

hist = distogram.histogram(h, bin_count=bin_count)
assert len(hist[1]) == len(hist[0]) + 1
16 changes: 8 additions & 8 deletions opteryx/third_party/distogram/tests/test_quantile.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

def test_quantile():
h = distogram.Distogram(bin_count=3)
h = distogram.update(h, 16, count=4)
h = distogram.update(h, 23, count=3)
h = distogram.update(h, 28, count=5)
distogram.update(h, 16, count=4)
distogram.update(h, 23, count=3)
distogram.update(h, 28, count=5)

assert distogram.quantile(h, 0.5) == approx(23.625)

Expand All @@ -23,7 +23,7 @@ def test_quantile_not_enough_elemnts():
h = distogram.Distogram(bin_count=10)

for i in [12.3, 5.4, 8.2, 100.53, 23.5, 13.98]:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.quantile(h, 0.5) == approx(13.14)

Expand All @@ -33,7 +33,7 @@ def test_quantile_on_left():

data = [12.3, 5.2, 5.4, 4.9, 5.5, 5.6, 8.2, 30.53, 23.5, 13.98]
for i in data:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.quantile(h, 0.01) == approx(np.quantile(data, 0.01), rel=0.01)
assert distogram.quantile(h, 0.05) == approx(np.quantile(data, 0.05), rel=0.05)
Expand All @@ -45,7 +45,7 @@ def test_quantile_on_right():

data = [12.3, 8.2, 100.53, 23.5, 13.98, 200, 200.2, 200.8, 200.4, 200.1]
for i in data:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.quantile(h, 0.99) == approx(np.quantile(data, 0.99), rel=0.01)
assert distogram.quantile(h, 0.85) == approx(np.quantile(data, 0.85), rel=0.01)
Expand All @@ -57,7 +57,7 @@ def test_normal():
h = distogram.Distogram(bin_count=64)

for i in normal:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.quantile(h, 0.5) == approx(np.quantile(normal, 0.5), abs=0.2)
assert distogram.quantile(h, 0.95) == approx(np.quantile(normal, 0.95), abs=0.2)
Expand All @@ -73,7 +73,7 @@ def test_quantile_out_of_bouns():
h = distogram.Distogram()

for i in [1, 2, 3, 4, 5, 6, 6.7, 6.1]:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.quantile(h, -0.2) is None
assert distogram.quantile(h, 10) is None
2 changes: 1 addition & 1 deletion opteryx/third_party/distogram/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_stats():
h = distogram.Distogram()

for i in normal:
h = distogram.update(h, i)
distogram.update(h, i)

assert distogram.mean(h) == approx(np.mean(normal), abs=0.1)
assert distogram.variance(h) == approx(np.var(normal), abs=0.1)
Expand Down
12 changes: 6 additions & 6 deletions opteryx/third_party/distogram/tests/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ def test_update():
h = distogram.Distogram(bin_count=3)

# fill histogram
h = distogram.update(h, 23)
distogram.update(h, 23)
assert h.bins == [(23, 1)]
h = distogram.update(h, 28)
distogram.update(h, 28)
assert h.bins == [(23, 1), (28, 1)]
h = distogram.update(h, 16)
distogram.update(h, 16)
assert h.bins == [(16, 1), (23, 1), (28, 1)]

# update count on existing value
h = distogram.update(h, 23)
distogram.update(h, 23)
assert h.bins == [(16, 1), (23, 2), (28, 1)]
h = distogram.update(h, 28)
distogram.update(h, 28)
assert h.bins == [(16, 1), (23, 2), (28, 2)]
h = distogram.update(h, 16)
distogram.update(h, 16)
assert h.bins == [(16, 2), (23, 2), (28, 2)]

# merge values
Expand Down
7 changes: 6 additions & 1 deletion opteryx/utils/file_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import numpy
import pyarrow


def _filter(filter, table):
# notes:
# at this point we've not renamed any columns
from opteryx.managers.expression import evaluate

mask = evaluate(filter, table, False)
return table.take(pyarrow.array(mask))


def zstd_decoder(stream, projection: List = None, selection=None):
"""
Read zstandard compressed JSONL files
Expand Down Expand Up @@ -70,7 +73,9 @@ def parquet_decoder(stream, projection: List = None, selection=None):
if len(selected_columns) == 0:
selected_columns = None
# don't prebuffer - we're already buffered as an IO Stream
return parquet.read_table(stream, columns=selected_columns, pre_buffer=False, filters=_select)
return parquet.read_table(
stream, columns=selected_columns, pre_buffer=False, filters=_select
)


def orc_decoder(stream, projection: List = None, selection=None):
Expand Down

0 comments on commit 1d80323

Please sign in to comment.