Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Dec 7, 2022
1 parent 6a540b4 commit c0572fc
Show file tree
Hide file tree
Showing 16 changed files with 214 additions and 36 deletions.
4 changes: 2 additions & 2 deletions opteryx/connectors/base/base_blob_storage_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def get_partitions(
partitioning: Iterable = None, # ("year_{yyyy}", "month_{mm}", "day_{dd}"),
start_date: Union[
datetime.datetime, datetime.date, str
] = datetime.date.today(),
end_date: Union[datetime.datetime, datetime.date, str] = datetime.date.today(),
] = None,
end_date: Union[datetime.datetime, datetime.date, str] = None,
) -> List:
"""
Get partitions doesn't confirm the partitions exist, it just creates a list
Expand Down
2 changes: 2 additions & 0 deletions opteryx/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class UnmetRequirementError(Exception):
class FeatureNotSupportedOnArchitectureError(Exception):
pass

class NotSupportedError(Exception):
pass

# PEP-0249

Expand Down
37 changes: 37 additions & 0 deletions opteryx/managers/expression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from cityhash import CityHash64
from pyarrow import Table

from opteryx.exceptions import NotSupportedError
from opteryx.functions import FUNCTIONS
from opteryx.functions.binary_operators import binary_operations
from opteryx.functions.unary_operations import UNARY_OPERATIONS
Expand All @@ -41,6 +42,41 @@
LITERAL_TYPE: int = int("0100", 2)


PUSHABLE_OPERATORS = {
"Gt": ">",
"Lt": "<",
"Eg": "="
}

def to_dnf(root):
"""
convert a filter to the form used by the selection pushdown
version 1 only does single predicate filters in the form
(identifier, operator, literal)
"""
def _predicate_to_dnf(root):
if root.token_type == NodeType.AND:
return [_predicate_to_dnf(root.left), _predicate_to_dnf(root.right)]
if root.token_type != NodeType.COMPARISON_OPERATOR:
raise NotSupportedError()
if not root.value in PUSHABLE_OPERATORS:
# not all operators are universally supported
raise NotSupportedError()
if root.left.token_type != NodeType.IDENTIFIER:
raise NotSupportedError()
if root.left.token_type in (NodeType.LITERAL_NUMERIC, NodeType.LITERAL_VARCHAR):
# not all operands are universally supported
raise NotSupportedError()
return (root.left.value, PUSHABLE_OPERATORS[root.value], root.right.value)

try:
dnf = _predicate_to_dnf(root)
except NotSupportedError:
return None
return dnf


def format_expression(root):
if root is None:
return "null"
Expand Down Expand Up @@ -422,3 +458,4 @@ def evaluate_and_append(expressions, table: Table, seed: str = None):
table = columns.apply(table)

return columns, return_expressions, table

1 change: 1 addition & 0 deletions opteryx/managers/planner/optimizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
actions.split_conjunctive_predicates, # run after eliminate_negations
actions.eliminate_fixed_function_evaluations, # run before constant evaluations
actions.eliminate_constant_evaluations,
actions.selection_pushdown,
actions.defragment_pages,
actions.use_heap_sort,
]
Expand Down
1 change: 1 addition & 0 deletions opteryx/managers/planner/optimizer/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .action_function_evaluations import eliminate_fixed_function_evaluations
from .action_defragment_pages import defragment_pages
from .action_eliminate_negations import eliminate_negations
from .action_selection_pushdown import selection_pushdown
from .action_split_conjunctive_predicates import split_conjunctive_predicates
from .action_use_heap_sort import use_heap_sort

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Optimization Rule - Selection Pushdown
Type: Heuristic
Goal: Reduce rows
"""
from opteryx import operators
from opteryx.managers.expression import NodeType


def selection_pushdown(plan, properties):
"""
Initial implementation of Selection Pushdown, this puts selections (filters) as
close to the read as possible, including offloading them to external services.
This has the benefit that we have less data to pass around the system, and
where we can offload to other systems (e.g. FireStore) we don't need to
transport that data over the network.
We do this work by removing the selection node from the query plan and adding
it to the filter in the reader node.
This initial implementation has a lot of limitations:
- only a single reader can exist in the plan
- only some readers support pushdown
- no GROUP BY / Aggregate can exist in the plan
"""
# find the in-scope nodes
selection_nodes = plan.get_nodes_of_type(operators.SelectionNode)
reader_nodes = plan.get_nodes_of_type((operators.BlobReaderNode, operators.CollectionReaderNode, operators.FunctionDatasetNode, operators.InternalDatasetNode))

# killer questions - if any aren't met, bail
if selection_nodes is None:
return plan
if len(plan.get_nodes_of_type(operators.AggregateNode)) > 0:
# don't try to work out if it's a HAVING or GROUP BY
# a HAVING without a GROUP BY should be okay
return plan
if len(reader_nodes) != 1:
# don't try to work out which reader to push to
return plan
reader_node = plan.get_operator(reader_nodes[0])
if not reader_node.can_push_selection:
# not all readers support pushdown
return plan

# WHERE are selection nodes
for nid in selection_nodes:
# get the node from the node_id
operator = plan.get_operator(nid)
# only add simple predicates (makes ANDs)
if operator.filter.token_type == NodeType.COMPARISON_OPERATOR:
if reader_node.push_predicate(operator.filter):
plan.remove_operator(nid)

return plan
2 changes: 1 addition & 1 deletion opteryx/models/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def fuzzy_search(self, column_name):

def filter(self, _filter):
"""
accept a filter and return matching columnsd
accept a filter and return matching columns
"""
from opteryx.third_party import pyarrow_ops

Expand Down
4 changes: 2 additions & 2 deletions opteryx/models/execution_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ def get_nodes_of_type(self, operator):

def _inner(operator):
if not isinstance(operator, (list, tuple, set)):
operator = [operator]
operator = tuple([operator])
for nid, item in list(self._nodes.items()):
if isinstance(item, *operator):
if isinstance(item, operator):
yield nid

return list(_inner(operator))
Expand Down
24 changes: 22 additions & 2 deletions opteryx/operators/blob_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

from opteryx import config
from opteryx.exceptions import DatasetNotFoundError
from opteryx.managers.expression import ExpressionTreeNode
from opteryx.managers.expression import NodeType
from opteryx.managers.expression import to_dnf
from opteryx.managers.schemes import MabelPartitionScheme
from opteryx.managers.schemes import DefaultPartitionScheme
from opteryx.models import Columns, QueryProperties, ExecutionTree
Expand Down Expand Up @@ -116,6 +119,8 @@ def __init__(self, properties: QueryProperties, **config):
self._start_date = config.get("start_date", today)
self._end_date = config.get("end_date", today)

self._filter = None

if isinstance(self._dataset, (list, ExecutionTree, dict)):
return

Expand Down Expand Up @@ -156,6 +161,20 @@ def __init__(self, properties: QueryProperties, **config):
# row count estimate
self._row_count_estimate: int = None

@property
def can_push_selection(self):
return isinstance(self._dataset, str)

def push_predicate(self, predicate):
if to_dnf(predicate) is None:
# we can't push all predicates everywhere
return False
if self._filter is None:
self._filter = predicate
return True
self._filter = ExpressionTreeNode(NodeType.AND, left=predicate, right=self._filter)
return True

@property
def config(self): # pragma: no cover
use_cache = ""
Expand Down Expand Up @@ -208,6 +227,7 @@ def execute(self) -> Iterable:
parser,
self._cache,
self._selection,
self._filter
)
)
for path, parser in partition["blob_list"]
Expand Down Expand Up @@ -278,7 +298,7 @@ def execute(self) -> Iterable:
yield pyarrow_blob

def _read_and_parse(self, config):
path, reader, parser, cache, projection = config
path, reader, parser, cache, projection, selection = config
start_read = time.time_ns()

# hash the blob name for the look up
Expand Down Expand Up @@ -319,7 +339,7 @@ def _read_and_parse(self, config):
else:
self.statistics.cache_hits += 1

table = parser(blob_bytes, projection)
table = parser(blob_bytes, projection, selection)

time_to_read = time.time_ns() - start_read
return time_to_read, blob_bytes.getbuffer().nbytes, table, path
Expand Down
6 changes: 5 additions & 1 deletion opteryx/operators/collection_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""
import time

from typing import Iterable, Optional
from typing import Iterable

import pyarrow

Expand Down Expand Up @@ -56,6 +56,10 @@ def config(self): # pragma: no cover
@property
def name(self): # pragma: no cover
return "Collection Reader"

@property
def can_push_selection(self):
return False

def execute(self) -> Iterable:

Expand Down
4 changes: 4 additions & 0 deletions opteryx/operators/function_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ def config(self): # pragma: no cover
def name(self): # pragma: no cover
return "Dataset Constructor"

@property
def can_push_selection(self):
return False

def execute(self) -> Iterable:

try:
Expand Down
4 changes: 4 additions & 0 deletions opteryx/operators/internal_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def config(self): # pragma: no cover
def name(self): # pragma: no cover
return "Sample Dataset Reader"

@property
def can_push_selection(self):
return False

def execute(self) -> Iterable:

pyarrow_page = _get_sample_dataset(self._dataset, self._alias, self._end_date)
Expand Down
5 changes: 2 additions & 3 deletions opteryx/operators/selection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

from pyarrow import Table

from opteryx.attribute_types import TOKEN_TYPES
from opteryx.exceptions import SqlError
from opteryx.managers.expression import evaluate, format_expression
from opteryx.managers.expression import evaluate
from opteryx.managers.expression import format_expression
from opteryx.models import QueryProperties
from opteryx.operators import BasePlanNode


class SelectionNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **config):
super().__init__(properties=properties)
Expand Down
19 changes: 7 additions & 12 deletions opteryx/operators/show_columns_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ def _extended_collector(pages):
hll = profile.get("hyperloglog")
if hll is None:
hll = hyperloglog.HyperLogLogPlusPlus(p=16)
[hll.update(value) for value in column_data]
for value in column_data:
hll.update(value)
profile["hyperloglog"] = hll

if _type in (
Expand All @@ -273,23 +274,17 @@ def _extended_collector(pages):
if counter is None:
counter = {}
if len(counter) < MAX_COLLECTOR:
[
increment(counter, value)
for value in column_data
if len(counter) < MAX_COLLECTOR
]
for value in column_data:
if len(counter) < MAX_COLLECTOR:
increment(counter, value)
profile["counter"] = counter

if _type in (OPTERYX_TYPES.NUMERIC, OPTERYX_TYPES.TIMESTAMP):
# populate the distogram, this is used for distribution statistics
dgram = profile.get("distogram")
if dgram is None:
dgram = distogram.Distogram() # type:ignore
values, counts = numpy.unique(column_data, return_counts=True)
for index, value in enumerate(values):
dgram = distogram.update( # type:ignore
dgram, value=value, count=counts[index]
)
dgram.bulkload(column_data)
profile["distogram"] = dgram

profile_collector[column] = profile
Expand All @@ -307,7 +302,7 @@ def _extended_collector(pages):
profile["min"], profile["max"] = distogram.bounds(dgram) # type:ignore
profile["mean"] = distogram.mean(dgram) # type:ignore

histogram = distogram.histogram(dgram, bin_count=10) # type:ignore
histogram = distogram.histogram(dgram, bin_count=50) # type:ignore
if histogram:
profile["histogram"] = histogram[0]

Expand Down
29 changes: 25 additions & 4 deletions opteryx/third_party/distogram/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,30 @@ def __add__(self, operand): # pragma: no cover
return dgram

def bulkload(self, values):
unique_values, counts = numpy.unique(values, return_counts=True)
for index, value in enumerate(unique_values):
update(self, value=value, count=counts[index])
# To speed up bulk loads we use numpy to get a histogram at a higher resolution
# and add this to the distogram.
# Histogram gives us n+1 values, so we average consecutive values.
# This ends up being an approximation of an approximation but 1000x faster.
# The accuracy of this approach is poor on datasets with very low record counts,
# but even if a bad decision is made on a table with 500 rows, the consequence
# is minimal, if a bad decision is made on a table with 5m rows, it starts to
# matter.
counts, bin_values = numpy.histogram(values, self._bin_count * 5, density=False)
for index, count in enumerate(counts):
if count > 0:
update(
self,
value=(bin_values[index] + bin_values[index + 1]) / 2,
count=count,
)

# we need to overwrite any range values as we've approximated the dataset
if self.min is None:
self.min = min(values)
self.max = max(values)
else:
self.min = min(self.min, min(values))
self.max = max(self.max, max(values))


# added for opteryx
Expand Down Expand Up @@ -259,7 +280,7 @@ def update(h: Distogram, value: float, count: int = 1) -> Distogram: # pragma:
if (h.max is None) or (h.max < value):
h.max = value

return _trim(h)
h = _trim(h)


def merge(h1: Distogram, h2: Distogram) -> Distogram: # pragma: no cover
Expand Down

0 comments on commit c0572fc

Please sign in to comment.