Skip to content

Commit

Permalink
Merge pull request #1016 from mabel-dev/#1015
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Apr 28, 2023
2 parents f357ba1 + 0ef67da commit 61b554d
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 10 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,6 @@ Opteryx is in beta. Beta means different things to different people, to us, bein
## Related Projects

- **[HadroDB](https://github.com/mabel-dev/hadrodb)** Data storage engine
- **[mabel](https://github.com/mabel-dev/mabel)** platform for authoring
- **[mabel](https://github.com/mabel-dev/mabel)** Streaming data APIs
- **[mesos](https://github.com/mabel-dev/mesos)** MySQL connector for Opteryx
- **[orso](https://github.com/mabel-dev/orso)** DataFrame librarydata processing systems
- **[orso](https://github.com/mabel-dev/orso)** DataFrame library
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def predicate_pushdown(plan, properties):
(
operators.BlobReaderNode,
operators.CollectionReaderNode,
operators.FileReaderNode,
operators.SqlReaderNode,
)
)
Expand Down
23 changes: 17 additions & 6 deletions opteryx/functions/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def get_functions():
if function_name[0] != "_":
# python CamelCase to SQL SNAKE_CASE
function_name = CAMEL_TO_SNAKE.sub("_", function_name).upper()
functions[function_name] = function_implementation
if function_name.startswith("FUNCTION_"):
function_name = function_name[9:]
functions[function_name] = function_implementation
return functions


Expand Down Expand Up @@ -80,7 +82,7 @@ def __str__(self):
return f"{self.__class__.__name__.upper()} (<params>) → <type>\n{self.__doc__.strip()}"


class CurrentTime(_BaseFunction):
class FunctionCurrentTime(_BaseFunction):
"""Return the current system time."""

style = _FunctionStyle.CONSTANT
Expand All @@ -90,7 +92,7 @@ def _func(self) -> datetime.time:
return datetime.datetime.utcnow().time()


class E(_BaseFunction):
class FunctionE(_BaseFunction):
"""Return Euler's number."""

style = _FunctionStyle.CONSTANT
Expand All @@ -100,7 +102,16 @@ def _func(self) -> float:
return 2.71828182845904523536028747135266249775724709369995


class Phi(_BaseFunction):
class FunctionLen(_BaseFunction):
"""return the length of a VARCHAR or ARRAY"""

style = _FunctionStyle.ELEMENTWISE

def _func(self, item: typing.Union[list, str]) -> int:
return len(item)


class FunctionPhi(_BaseFunction):
"""Return the golden ratio."""

style = _FunctionStyle.CONSTANT
Expand All @@ -110,7 +121,7 @@ def _func(self) -> float:
return 1.61803398874989484820458683436563811772030917980576


class Pi(_BaseFunction):
class FunctionPi(_BaseFunction):
"""Return Pi."""

style = _FunctionStyle.CONSTANT
Expand All @@ -120,7 +131,7 @@ def _func(self) -> float:
return 3.14159265358979323846264338327950288419716939937510


class Version(_BaseFunction):
class FunctionVersion(_BaseFunction):
"""Return the version of the query engine."""

style = _FunctionStyle.CONSTANT
Expand Down
3 changes: 3 additions & 0 deletions opteryx/operators/blob_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def push_predicate(self, predicate):
# these to DNF at read time, for everything else, we run the selection nodes
from opteryx.connectors.capabilities import predicate_pushable

# Appears to be a bug in how pyarrow does indentifier filters so don't push these
if predicate.left.token_type == predicate.right.token_type == NodeType.IDENTIFIER:
return False
dnf = predicate_pushable.to_dnf(predicate)
if dnf is None:
# we can't push all predicates everywhere
Expand Down
3 changes: 3 additions & 0 deletions opteryx/operators/file_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def push_predicate(self, predicate):
# these to DNF at read time, for everything else, we run the selection nodes
from opteryx.connectors.capabilities import predicate_pushable

# Appears to be a bug in how pyarrow does indentifier filters so don't push these
if predicate.left.token_type == predicate.right.token_type == NodeType.IDENTIFIER:
return False
if predicate_pushable.to_dnf(predicate) is None:
# we can't push all predicates everywhere
return False
Expand Down
4 changes: 4 additions & 0 deletions opteryx/operators/sql_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import pyarrow

from opteryx.managers.expression import NodeType
from opteryx.models import QueryProperties
from opteryx.models.columns import Columns
from opteryx.operators import BasePlanNode
Expand Down Expand Up @@ -59,6 +60,9 @@ def can_push_selection(self):
return not self._disable_selections

def push_predicate(self, predicate):
# SQL reader currently assumes the right parameter is a literal
if predicate.left.token_type == predicate.right.token_type == NodeType.IDENTIFIER:
return False
if self.can_push_selection:
return self._reader.push_predicate(predicate)
return False
Expand Down
4 changes: 2 additions & 2 deletions opteryx/third_party/query_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ def _lines_keyword(self, keyword, things):
if thing.keyword:
yield thing.keyword + "\n"

format = self.formats[bool(thing.alias)][keyword]
format_ = self.formats[bool(thing.alias)][keyword]
value = thing.value
if thing.is_subquery:
value = f"(\n{self._indent(value)}\n)"
yield self._indent(format.format(value=value, alias=thing.alias))
yield self._indent(format_.format(value=value, alias=thing.alias))

if not last and not thing.keyword:
try:
Expand Down
Binary file added testdata/flat/planets/parquet/planets.parquet
Binary file not shown.
36 changes: 36 additions & 0 deletions tests/query_planning/test_predicate_pushdown_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,42 @@ def test_predicate_pushdowns_blobs_parquet():
assert cur.stats["rows_read"] == 711, cur.stats
config.ONLY_PUSH_EQUALS_PREDICATES = False

# identifier = identifier isn't pushed - file reader
config.ONLY_PUSH_EQUALS_PREDICATES = True
cur = conn.cursor()
cur.execute(
"SELECT * FROM 'testdata/flat/planets/parquet/planets.parquet' WITH(NO_PARTITION) WHERE rotationPeriod = lengthOfDay;"
)
assert cur.rowcount == 3, cur.rowcount
assert cur.stats["rows_read"] == 9, cur.stats
config.ONLY_PUSH_EQUALS_PREDICATES = False

# push the >, not the eq
cur = conn.cursor()
cur.execute(
"SELECT * FROM 'testdata/flat/planets/parquet/planets.parquet' WITH(NO_PARTITION) WHERE rotationPeriod = lengthOfDay AND id > 5;"
)
assert cur.rowcount == 2, cur.rowcount
assert cur.stats["rows_read"] == 4, cur.stats

# identifier = identifier isn't pushed - blob reader
config.ONLY_PUSH_EQUALS_PREDICATES = True
cur = conn.cursor()
cur.execute(
"SELECT * FROM 'testdata.flat.planets.parquet' WITH(NO_PARTITION) WHERE rotationPeriod = lengthOfDay;"
)
assert cur.rowcount == 3, cur.rowcount
assert cur.stats["rows_read"] == 9, cur.stats
config.ONLY_PUSH_EQUALS_PREDICATES = False

# identifier = identifier isn't pushed - blob reader
cur = conn.cursor()
cur.execute(
"SELECT * FROM 'testdata.flat.planets.parquet' WITH(NO_PARTITION) WHERE rotationPeriod = lengthOfDay AND id > 5;"
)
assert cur.rowcount == 2, cur.rowcount
assert cur.stats["rows_read"] == 4, cur.stats

conn.close()


Expand Down
6 changes: 6 additions & 0 deletions tests/query_planning/test_predicate_pushdown_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def test_predicate_pushdowns_postgres_eq():
assert cur.rowcount == 1, cur.rowcount
assert cur.stats["rows_read"] == 2, cur.stats

# identifier = identifier isn't pushed to SQL engines
cur = conn.cursor()
cur.execute("SELECT * FROM pg.planets WHERE rotation_period = length_of_day AND id > 5;")
assert cur.rowcount == 2, cur.rowcount
assert cur.stats["rows_read"] == 4, cur.stats

conn.close()


Expand Down

0 comments on commit 61b554d

Please sign in to comment.