From c0572fc87396fe695c20e2a7212a7c31d5c4067a Mon Sep 17 00:00:00 2001 From: Justin Joyce Date: Wed, 7 Dec 2022 20:39:11 +0000 Subject: [PATCH] #204 --- .../base/base_blob_storage_adapter.py | 4 +- opteryx/exceptions.py | 2 + opteryx/managers/expression/__init__.py | 37 ++++++++++ .../managers/planner/optimizer/__init__.py | 1 + .../planner/optimizer/actions/__init__.py | 1 + .../actions/action_selection_pushdown.py | 70 +++++++++++++++++++ opteryx/models/columns.py | 2 +- opteryx/models/execution_tree.py | 4 +- opteryx/operators/blob_reader_node.py | 24 ++++++- opteryx/operators/collection_reader_node.py | 6 +- opteryx/operators/function_dataset_node.py | 4 ++ opteryx/operators/internal_dataset_node.py | 4 ++ opteryx/operators/selection_node.py | 5 +- opteryx/operators/show_columns_node.py | 19 ++--- opteryx/third_party/distogram/__init__.py | 29 ++++++-- opteryx/utils/file_decoders.py | 38 +++++++--- 16 files changed, 214 insertions(+), 36 deletions(-) create mode 100644 opteryx/managers/planner/optimizer/actions/action_selection_pushdown.py diff --git a/opteryx/connectors/base/base_blob_storage_adapter.py b/opteryx/connectors/base/base_blob_storage_adapter.py index 471f9354..10fc0cee 100644 --- a/opteryx/connectors/base/base_blob_storage_adapter.py +++ b/opteryx/connectors/base/base_blob_storage_adapter.py @@ -31,8 +31,8 @@ def get_partitions( partitioning: Iterable = None, # ("year_{yyyy}", "month_{mm}", "day_{dd}"), start_date: Union[ datetime.datetime, datetime.date, str - ] = datetime.date.today(), - end_date: Union[datetime.datetime, datetime.date, str] = datetime.date.today(), + ] = None, + end_date: Union[datetime.datetime, datetime.date, str] = None, ) -> List: """ Get partitions doesn't confirm the partitions exist, it just creates a list diff --git a/opteryx/exceptions.py b/opteryx/exceptions.py index 275b3569..077d5fe0 100644 --- a/opteryx/exceptions.py +++ b/opteryx/exceptions.py @@ -30,6 +30,8 @@ class UnmetRequirementError(Exception): class FeatureNotSupportedOnArchitectureError(Exception): pass +class NotSupportedError(Exception): + pass # PEP-0249 diff --git a/opteryx/managers/expression/__init__.py b/opteryx/managers/expression/__init__.py index 76c3f65a..6740ada5 100644 --- a/opteryx/managers/expression/__init__.py +++ b/opteryx/managers/expression/__init__.py @@ -27,6 +27,7 @@ from cityhash import CityHash64 from pyarrow import Table +from opteryx.exceptions import NotSupportedError from opteryx.functions import FUNCTIONS from opteryx.functions.binary_operators import binary_operations from opteryx.functions.unary_operations import UNARY_OPERATIONS @@ -41,6 +42,41 @@ LITERAL_TYPE: int = int("0100", 2) +PUSHABLE_OPERATORS = { + "Gt": ">", + "Lt": "<", + "Eg": "=" +} + +def to_dnf(root): + """ + convert a filter to the form used by the selection pushdown + + version 1 only does single predicate filters in the form + (identifier, operator, literal) + """ + def _predicate_to_dnf(root): + if root.token_type == NodeType.AND: + return [_predicate_to_dnf(root.left), _predicate_to_dnf(root.right)] + if root.token_type != NodeType.COMPARISON_OPERATOR: + raise NotSupportedError() + if not root.value in PUSHABLE_OPERATORS: + # not all operators are universally supported + raise NotSupportedError() + if root.left.token_type != NodeType.IDENTIFIER: + raise NotSupportedError() + if root.left.token_type in (NodeType.LITERAL_NUMERIC, NodeType.LITERAL_VARCHAR): + # not all operands are universally supported + raise NotSupportedError() + return (root.left.value, PUSHABLE_OPERATORS[root.value], root.right.value) + + try: + dnf = _predicate_to_dnf(root) + except NotSupportedError: + return None + return dnf + + def format_expression(root): if root is None: return "null" @@ -422,3 +458,4 @@ def evaluate_and_append(expressions, table: Table, seed: str = None): table = columns.apply(table) return columns, return_expressions, table + diff --git a/opteryx/managers/planner/optimizer/__init__.py b/opteryx/managers/planner/optimizer/__init__.py index 76cdde00..9b84862b 100644 --- a/opteryx/managers/planner/optimizer/__init__.py +++ b/opteryx/managers/planner/optimizer/__init__.py @@ -20,6 +20,7 @@ actions.split_conjunctive_predicates, # run after eliminate_negations actions.eliminate_fixed_function_evaluations, # run before constant evaluations actions.eliminate_constant_evaluations, + actions.selection_pushdown, actions.defragment_pages, actions.use_heap_sort, ] diff --git a/opteryx/managers/planner/optimizer/actions/__init__.py b/opteryx/managers/planner/optimizer/actions/__init__.py index 7ce0b1b2..a7d13bcd 100644 --- a/opteryx/managers/planner/optimizer/actions/__init__.py +++ b/opteryx/managers/planner/optimizer/actions/__init__.py @@ -16,6 +16,7 @@ from .action_function_evaluations import eliminate_fixed_function_evaluations from .action_defragment_pages import defragment_pages from .action_eliminate_negations import eliminate_negations +from .action_selection_pushdown import selection_pushdown from .action_split_conjunctive_predicates import split_conjunctive_predicates from .action_use_heap_sort import use_heap_sort diff --git a/opteryx/managers/planner/optimizer/actions/action_selection_pushdown.py b/opteryx/managers/planner/optimizer/actions/action_selection_pushdown.py new file mode 100644 index 00000000..33cdc424 --- /dev/null +++ b/opteryx/managers/planner/optimizer/actions/action_selection_pushdown.py @@ -0,0 +1,70 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Optimization Rule - Selection Pushdown + +Type: Heuristic +Goal: Reduce rows +""" +from opteryx import operators +from opteryx.managers.expression import NodeType + + +def selection_pushdown(plan, properties): + """ + Initial implementation of Selection Pushdown, this puts selections (filters) as + close to the read as possible, including offloading them to external services. + + This has the benefit that we have less data to pass around the system, and + where we can offload to other systems (e.g. FireStore) we don't need to + transport that data over the network. + + We do this work by removing the selection node from the query plan and adding + it to the filter in the reader node. + + This initial implementation has a lot of limitations: + + - only a single reader can exist in the plan + - only some readers support pushdown + - no GROUP BY / Aggregate can exist in the plan + + """ + # find the in-scope nodes + selection_nodes = plan.get_nodes_of_type(operators.SelectionNode) + reader_nodes = plan.get_nodes_of_type((operators.BlobReaderNode, operators.CollectionReaderNode, operators.FunctionDatasetNode, operators.InternalDatasetNode)) + + # killer questions - if any aren't met, bail + if selection_nodes is None: + return plan + if len(plan.get_nodes_of_type(operators.AggregateNode)) > 0: + # don't try to work out if it's a HAVING or GROUP BY + # a HAVING without a GROUP BY should be okay + return plan + if len(reader_nodes) != 1: + # don't try to work out which reader to push to + return plan + reader_node = plan.get_operator(reader_nodes[0]) + if not reader_node.can_push_selection: + # not all readers support pushdown + return plan + + # WHERE are selection nodes + for nid in selection_nodes: + # get the node from the node_id + operator = plan.get_operator(nid) + # only add simple predicates (makes ANDs) + if operator.filter.token_type == NodeType.COMPARISON_OPERATOR: + if reader_node.push_predicate(operator.filter): + plan.remove_operator(nid) + + return plan diff --git a/opteryx/models/columns.py b/opteryx/models/columns.py index 1c954c2f..510977b3 100644 --- a/opteryx/models/columns.py +++ b/opteryx/models/columns.py @@ -195,7 +195,7 @@ def fuzzy_search(self, column_name): def filter(self, _filter): """ - accept a filter and return matching columnsd + accept a filter and return matching columns """ from opteryx.third_party import pyarrow_ops diff --git a/opteryx/models/execution_tree.py b/opteryx/models/execution_tree.py index 42272d25..11cff2f9 100644 --- a/opteryx/models/execution_tree.py +++ b/opteryx/models/execution_tree.py @@ -52,9 +52,9 @@ def get_nodes_of_type(self, operator): def _inner(operator): if not isinstance(operator, (list, tuple, set)): - operator = [operator] + operator = tuple([operator]) for nid, item in list(self._nodes.items()): - if isinstance(item, *operator): + if isinstance(item, operator): yield nid return list(_inner(operator)) diff --git a/opteryx/operators/blob_reader_node.py b/opteryx/operators/blob_reader_node.py index faea4281..b9411256 100644 --- a/opteryx/operators/blob_reader_node.py +++ b/opteryx/operators/blob_reader_node.py @@ -30,6 +30,9 @@ from opteryx import config from opteryx.exceptions import DatasetNotFoundError +from opteryx.managers.expression import ExpressionTreeNode +from opteryx.managers.expression import NodeType +from opteryx.managers.expression import to_dnf from opteryx.managers.schemes import MabelPartitionScheme from opteryx.managers.schemes import DefaultPartitionScheme from opteryx.models import Columns, QueryProperties, ExecutionTree @@ -116,6 +119,8 @@ def __init__(self, properties: QueryProperties, **config): self._start_date = config.get("start_date", today) self._end_date = config.get("end_date", today) + self._filter = None + if isinstance(self._dataset, (list, ExecutionTree, dict)): return @@ -156,6 +161,20 @@ def __init__(self, properties: QueryProperties, **config): # row count estimate self._row_count_estimate: int = None + @property + def can_push_selection(self): + return isinstance(self._dataset, str) + + def push_predicate(self, predicate): + if to_dnf(predicate) is None: + # we can't push all predicates everywhere + return False + if self._filter is None: + self._filter = predicate + return True + self._filter = ExpressionTreeNode(NodeType.AND, left=predicate, right=self._filter) + return True + @property def config(self): # pragma: no cover use_cache = "" @@ -208,6 +227,7 @@ def execute(self) -> Iterable: parser, self._cache, self._selection, + self._filter ) ) for path, parser in partition["blob_list"] @@ -278,7 +298,7 @@ def execute(self) -> Iterable: yield pyarrow_blob def _read_and_parse(self, config): - path, reader, parser, cache, projection = config + path, reader, parser, cache, projection, selection = config start_read = time.time_ns() # hash the blob name for the look up @@ -319,7 +339,7 @@ def _read_and_parse(self, config): else: self.statistics.cache_hits += 1 - table = parser(blob_bytes, projection) + table = parser(blob_bytes, projection, selection) time_to_read = time.time_ns() - start_read return time_to_read, blob_bytes.getbuffer().nbytes, table, path diff --git a/opteryx/operators/collection_reader_node.py b/opteryx/operators/collection_reader_node.py index 3d30b693..a8e8c6a2 100644 --- a/opteryx/operators/collection_reader_node.py +++ b/opteryx/operators/collection_reader_node.py @@ -19,7 +19,7 @@ """ import time -from typing import Iterable, Optional +from typing import Iterable import pyarrow @@ -56,6 +56,10 @@ def config(self): # pragma: no cover @property def name(self): # pragma: no cover return "Collection Reader" + + @property + def can_push_selection(self): + return False def execute(self) -> Iterable: diff --git a/opteryx/operators/function_dataset_node.py b/opteryx/operators/function_dataset_node.py index c6f8cad7..b89be9f5 100644 --- a/opteryx/operators/function_dataset_node.py +++ b/opteryx/operators/function_dataset_node.py @@ -88,6 +88,10 @@ def config(self): # pragma: no cover def name(self): # pragma: no cover return "Dataset Constructor" + @property + def can_push_selection(self): + return False + def execute(self) -> Iterable: try: diff --git a/opteryx/operators/internal_dataset_node.py b/opteryx/operators/internal_dataset_node.py index a18ca26c..91326f8b 100644 --- a/opteryx/operators/internal_dataset_node.py +++ b/opteryx/operators/internal_dataset_node.py @@ -95,6 +95,10 @@ def config(self): # pragma: no cover def name(self): # pragma: no cover return "Sample Dataset Reader" + @property + def can_push_selection(self): + return False + def execute(self) -> Iterable: pyarrow_page = _get_sample_dataset(self._dataset, self._alias, self._end_date) diff --git a/opteryx/operators/selection_node.py b/opteryx/operators/selection_node.py index 6b6b6085..fd313e1a 100644 --- a/opteryx/operators/selection_node.py +++ b/opteryx/operators/selection_node.py @@ -24,13 +24,12 @@ from pyarrow import Table -from opteryx.attribute_types import TOKEN_TYPES from opteryx.exceptions import SqlError -from opteryx.managers.expression import evaluate, format_expression +from opteryx.managers.expression import evaluate +from opteryx.managers.expression import format_expression from opteryx.models import QueryProperties from opteryx.operators import BasePlanNode - class SelectionNode(BasePlanNode): def __init__(self, properties: QueryProperties, **config): super().__init__(properties=properties) diff --git a/opteryx/operators/show_columns_node.py b/opteryx/operators/show_columns_node.py index dbe4b295..fbc1ce1e 100644 --- a/opteryx/operators/show_columns_node.py +++ b/opteryx/operators/show_columns_node.py @@ -259,7 +259,8 @@ def _extended_collector(pages): hll = profile.get("hyperloglog") if hll is None: hll = hyperloglog.HyperLogLogPlusPlus(p=16) - [hll.update(value) for value in column_data] + for value in column_data: + hll.update(value) profile["hyperloglog"] = hll if _type in ( @@ -273,11 +274,9 @@ def _extended_collector(pages): if counter is None: counter = {} if len(counter) < MAX_COLLECTOR: - [ - increment(counter, value) - for value in column_data - if len(counter) < MAX_COLLECTOR - ] + for value in column_data: + if len(counter) < MAX_COLLECTOR: + increment(counter, value) profile["counter"] = counter if _type in (OPTERYX_TYPES.NUMERIC, OPTERYX_TYPES.TIMESTAMP): @@ -285,11 +284,7 @@ def _extended_collector(pages): dgram = profile.get("distogram") if dgram is None: dgram = distogram.Distogram() # type:ignore - values, counts = numpy.unique(column_data, return_counts=True) - for index, value in enumerate(values): - dgram = distogram.update( # type:ignore - dgram, value=value, count=counts[index] - ) + dgram.bulkload(column_data) profile["distogram"] = dgram profile_collector[column] = profile @@ -307,7 +302,7 @@ def _extended_collector(pages): profile["min"], profile["max"] = distogram.bounds(dgram) # type:ignore profile["mean"] = distogram.mean(dgram) # type:ignore - histogram = distogram.histogram(dgram, bin_count=10) # type:ignore + histogram = distogram.histogram(dgram, bin_count=50) # type:ignore if histogram: profile["histogram"] = histogram[0] diff --git a/opteryx/third_party/distogram/__init__.py b/opteryx/third_party/distogram/__init__.py index 7ee2e8d6..da7501b5 100644 --- a/opteryx/third_party/distogram/__init__.py +++ b/opteryx/third_party/distogram/__init__.py @@ -82,9 +82,30 @@ def __add__(self, operand): # pragma: no cover return dgram def bulkload(self, values): - unique_values, counts = numpy.unique(values, return_counts=True) - for index, value in enumerate(unique_values): - update(self, value=value, count=counts[index]) + # To speed up bulk loads we use numpy to get a histogram at a higher resolution + # and add this to the distogram. + # Histogram gives us n+1 values, so we average consecutive values. + # This ends up being an approximation of an approximation but 1000x faster. + # The accuracy of this approach is poor on datasets with very low record counts, + # but even if a bad decision is made on a table with 500 rows, the consequence + # is minimal, if a bad decision is made on a table with 5m rows, it starts to + # matter. + counts, bin_values = numpy.histogram(values, self._bin_count * 5, density=False) + for index, count in enumerate(counts): + if count > 0: + update( + self, + value=(bin_values[index] + bin_values[index + 1]) / 2, + count=count, + ) + + # we need to overwrite any range values as we've approximated the dataset + if self.min is None: + self.min = min(values) + self.max = max(values) + else: + self.min = min(self.min, min(values)) + self.max = max(self.max, max(values)) # added for opteryx @@ -259,7 +280,7 @@ def update(h: Distogram, value: float, count: int = 1) -> Distogram: # pragma: if (h.max is None) or (h.max < value): h.max = value - return _trim(h) + h = _trim(h) def merge(h1: Distogram, h2: Distogram) -> Distogram: # pragma: no cover diff --git a/opteryx/utils/file_decoders.py b/opteryx/utils/file_decoders.py index 60e23256..f970f786 100644 --- a/opteryx/utils/file_decoders.py +++ b/opteryx/utils/file_decoders.py @@ -18,8 +18,14 @@ import numpy import pyarrow - -def zstd_decoder(stream, projection: List = None): +def _filter(filter, table): + # notes: + # at this point we've not renamed any columns + from opteryx.managers.expression import evaluate + mask = evaluate(filter, table, False) + return table.take(pyarrow.array(mask)) + +def zstd_decoder(stream, projection: List = None, selection=None): """ Read zstandard compressed JSONL files """ @@ -29,11 +35,17 @@ def zstd_decoder(stream, projection: List = None): return jsonl_decoder(file, projection) -def parquet_decoder(stream, projection: List = None): +def parquet_decoder(stream, projection: List = None, selection=None): """ Read parquet formatted files """ from pyarrow import parquet + from opteryx.managers.expression import to_dnf + + # parquet uses DNF filters + _select = None + if selection is not None: + _select = to_dnf(selection) selected_columns = None if isinstance(projection, (list, set)) and "*" not in projection: @@ -58,10 +70,10 @@ def parquet_decoder(stream, projection: List = None): if len(selected_columns) == 0: selected_columns = None # don't prebuffer - we're already buffered as an IO Stream - return parquet.read_table(stream, columns=selected_columns, pre_buffer=False) + return parquet.read_table(stream, columns=selected_columns, pre_buffer=False, filters=_select) -def orc_decoder(stream, projection: List = None): +def orc_decoder(stream, projection: List = None, selection=None): """ Read orc formatted files """ @@ -77,11 +89,13 @@ def orc_decoder(stream, projection: List = None): if len(selected_columns) == 0: selected_columns = None - table = orc_file.read() + table = orc_file.read(columns=selected_columns) + if selection is not None: + table = _filter(selection, table) return table -def jsonl_decoder(stream, projection: List = None): +def jsonl_decoder(stream, projection: List = None, selection=None): import pyarrow.json @@ -94,10 +108,12 @@ def jsonl_decoder(stream, projection: List = None): if len(selected_columns) > 0: table = table.select(selected_columns) + if selection is not None: + table = _filter(selection, table) return table -def csv_decoder(stream, projection: List = None): +def csv_decoder(stream, projection: List = None, selection=None): import pyarrow.csv @@ -110,10 +126,12 @@ def csv_decoder(stream, projection: List = None): if len(selected_columns) > 0: table = table.select(selected_columns) + if selection is not None: + table = _filter(selection, table) return table -def arrow_decoder(stream, projection: List = None): +def arrow_decoder(stream, projection: List = None, selection=None): import pyarrow.feather as pf @@ -126,4 +144,6 @@ def arrow_decoder(stream, projection: List = None): if len(selected_columns) > 0: table = table.select(selected_columns) + if selection is not None: + table = _filter(selection, table) return table