diff --git a/docs/Release Notes/Change Log.md b/docs/Release Notes/Change Log.md index 5cb7e150..89c9c74e 100644 --- a/docs/Release Notes/Change Log.md +++ b/docs/Release Notes/Change Log.md @@ -3,6 +3,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +### [Unreleased] + +#### Added + +- [#20](https://github.com/mabel-dev/opteryx/issues/20) Split query planner and query plan into different modules. ([@joocer](https://github.com/joocer])) +- [#165](https://github.com/mabel-dev/opteryx/issues/165) Support S3/MinIO data stores for blobs. ([@joocer](https://github.com/joocer])) + ### [0.0.2] - 2022-06-03 #### Added diff --git a/opteryx/engine/planner/execution_tree.py b/opteryx/engine/planner/execution_tree.py new file mode 100644 index 00000000..ec9d9a9a --- /dev/null +++ b/opteryx/engine/planner/execution_tree.py @@ -0,0 +1,145 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ExecutionTree: + """ + The execution tree is defined separately to the planner to simplify the + complex code which is the planner from the tree that describes the plan. + """ + + def __init__(self): + """create empty plan""" + self._nodes: dict = {} + self._edges: list = [] + + def add_operator(self, nid, operator): + """ + Add a step to the DAG + + Parameters: + id: string + The id of the step, must be unique + Operator: BaseOperator + The Operator + """ + self._nodes[nid] = operator + + def link_operators(self, source_operator, target_operator, direction=None): + """ + Link steps in a flow. + + Parameters: + source_operator: string + The id of the source step + target_operator: string + The id of the target step + direction: string (optional) + Left/Right/None, the direction of the incoming node + """ + edge = ( + source_operator, + target_operator, + direction, + ) + if edge not in self._edges: + self._edges.append(edge) + + def get_outgoing_links(self, nid): + """ + Get the ids of outgoing nodes from a given step. + + Paramters: + name: string + The name of the step to search from + """ + retval = {target for source, target, direction in self._edges if source == nid} + return sorted(retval) + + def get_incoming_links(self, nid): + """ + Get the ids of incoming nodes for a given step. + + Paramters: + nid: string + The name of the step to search from + """ + retval = { + ( + source, + direction, + ) + for source, target, direction in self._edges + if target == nid + } + return sorted(retval) + + def get_exit_points(self): + """ + Get steps in the flow with no outgoing steps. + """ + sources = {source for source, target, direction in self._edges} + retval = ( + target for source, target, direction in self._edges if target not in sources + ) + return sorted(retval) + + def get_entry_points(self): + """ + Get steps in the flow with no incoming steps. + """ + if len(self._nodes) == 1: + return list(self._nodes.keys()) + targets = {target for source, target, direction in self._edges} + retval = ( + source for source, target, direction in self._edges if source not in targets + ) + return sorted(retval) + + def get_operator(self, nid): + """ + Get the Operator class by id. + + Parameters: + nid: string + The id of the step + """ + return self._nodes.get(nid) + + def is_acyclic(self): + """ + Test if the graph is acyclic + """ + # cycle over the graph removing a layer of exits each cycle + # if we have nodes but no exists, we're cyclic + my_edges = self._edges.copy() + + while len(my_edges) > 0: + # find all of the exits + sources = {source for source, target, direction in my_edges} + exits = { + target + for source, target, direction in my_edges + if target not in sources + } + + if len(exits) == 0: + return False + + # remove the exits + new_edges = [ + (source, target, direction) + for source, target, direction in my_edges + if target not in exits + ] + my_edges = new_edges + return True diff --git a/opteryx/engine/planner/operations/__init__.py b/opteryx/engine/planner/operations/__init__.py index a0432f93..27521ee9 100644 --- a/opteryx/engine/planner/operations/__init__.py +++ b/opteryx/engine/planner/operations/__init__.py @@ -18,6 +18,7 @@ from .distinct_node import DistinctNode # remove duplicate records from .evaluation_node import EvaluationNode # aliases and evaluations from .explain_node import ExplainNode # EXPLAIN queries +from .inner_join_node import InnerJoinNode # INNER JOIN from .limit_node import LimitNode # select the first N records from .offset_node import OffsetNode # skip a number of records from .outer_join_node import OuterJoinNode # LEFT/RIGHT/FULL OUTER JOIN diff --git a/opteryx/engine/planner/planner.py b/opteryx/engine/planner/planner.py index ecd6957f..9a94392d 100644 --- a/opteryx/engine/planner/planner.py +++ b/opteryx/engine/planner/planner.py @@ -40,12 +40,13 @@ these are not supported by SqlOxide and so are in a different module which strips temporal aspects out of the query. """ +import datetime import numpy from opteryx.engine.attribute_types import TOKEN_TYPES from opteryx.engine.functions import is_function +from opteryx.engine.planner.execution_tree import ExecutionTree from opteryx.engine.planner.operations import * -from opteryx.engine.planner.operations.inner_join_node import InnerJoinNode from opteryx.engine.planner.temporal import extract_temporal_filters from opteryx.exceptions import SqlError from opteryx.utils import dates @@ -68,16 +69,13 @@ } -class QueryPlanner(object): +class QueryPlanner(ExecutionTree): def __init__(self, statistics, reader, cache, partition_scheme): """ - PLan represents Directed Acyclic Graphs which are used to describe data - pipelines. + Planner creates a plan (Execution Tree or DAG) which presents the plan to + respond to the query. """ - import datetime - - self.nodes: dict = {} - self.edges: list = [] + super().__init__() self._ast = None @@ -86,8 +84,8 @@ def __init__(self, statistics, reader, cache, partition_scheme): self._cache = cache self._partition_scheme = partition_scheme - self._start_date = datetime.datetime.today() - self._end_date = datetime.datetime.today() + self._start_date = datetime.datetime.utcnow().date() + self._end_date = datetime.datetime.utcnow().date() def __repr__(self): return "QueryPlanner" @@ -266,9 +264,9 @@ def _build_dnf_filters(self, filters): def _extract_relations(self, ast): """ """ - def _safe_get(l, i): + def _safe_get(iterable, index): try: - return l[i] + return iterable[index] except IndexError: return None @@ -790,83 +788,6 @@ def _inner_explain(operator_name, depth): table = Columns.create_table_metadata(table, table.num_rows, "plan", None) yield table - def add_operator(self, name, operator): - """ - Add a step to the DAG - - Parameters: - name: string - The name of the step, must be unique - Operator: BaseOperator - The Operator - """ - self.nodes[name] = operator - - def link_operators(self, source_operator, target_operator): - """ - Link steps in a flow. - - Parameters: - source_operator: string - The name of the source step - target_operator: string - The name of the target step - """ - edge = (source_operator, target_operator) - if edge not in self.edges: - self.edges.append((source_operator, target_operator)) - - def get_outgoing_links(self, name): - """ - Get the names of outgoing links from a given step. - - Paramters: - name: string - The name of the step to search from - """ - retval = {target for source, target in self.edges if source == name} - return sorted(retval) - - def get_exit_points(self): - """ - Get steps in the flow with no outgoing steps. - """ - sources = {source for source, target in self.edges} - retval = {target for source, target in self.edges if target not in sources} - return sorted(retval) - - def get_entry_points(self): - """ - Get steps in the flow with no incoming steps. - """ - if len(self.nodes) == 1: - return list(self.nodes.keys()) - targets = {target for source, target in self.edges} - retval = {source for source, target in self.edges if source not in targets} - return sorted(retval) - - def get_operator(self, name): - """ - Get the Operator class by name. - - Parameters: - name: string - The name of the step - """ - return self.nodes.get(name) - - def merge(self, assimilatee): - """ - Merge a flow into the current flow. - - Parameters: - assimilatee: Flow - The flow to assimilate into the current flows - """ - self.nodes = {**self.nodes, **assimilatee.nodes} - self.edges += assimilatee.edges - self.edges = list(set(self.edges)) - # def __repr__(self): # return "\n".join(list(self._draw())) diff --git a/opteryx/storage/adapters/__init__.py b/opteryx/storage/adapters/__init__.py index 5fc12924..c5fa8d94 100644 --- a/opteryx/storage/adapters/__init__.py +++ b/opteryx/storage/adapters/__init__.py @@ -10,5 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from opteryx.storage.adapters.local.disk_store import DiskStorage -from opteryx.storage.adapters.network.gcs_store import GcsStorage +from opteryx.storage.adapters.blob.disk_store import DiskStorage +from opteryx.storage.adapters.blob.gcs_store import GcsStorage +from opteryx.storage.adapters.blob.minio_store import MinIoStorage diff --git a/opteryx/storage/adapters/local/__init__.py b/opteryx/storage/adapters/blob/__init__.py similarity index 88% rename from opteryx/storage/adapters/local/__init__.py rename to opteryx/storage/adapters/blob/__init__.py index 0265b01a..72802b76 100644 --- a/opteryx/storage/adapters/local/__init__.py +++ b/opteryx/storage/adapters/blob/__init__.py @@ -11,3 +11,5 @@ # limitations under the License. from .disk_store import DiskStorage +from .gcs_store import GcsStorage +from .minio_store import MinIoStorage diff --git a/opteryx/storage/adapters/local/disk_store.py b/opteryx/storage/adapters/blob/disk_store.py similarity index 96% rename from opteryx/storage/adapters/local/disk_store.py rename to opteryx/storage/adapters/blob/disk_store.py index d63bdec3..9ca7736f 100644 --- a/opteryx/storage/adapters/local/disk_store.py +++ b/opteryx/storage/adapters/blob/disk_store.py @@ -15,9 +15,6 @@ class DiskStorage(BaseStorageAdapter): - def __init__(self): - pass - def read_blob(self, blob_name): import io diff --git a/opteryx/storage/adapters/network/gcs_store.py b/opteryx/storage/adapters/blob/gcs_store.py similarity index 100% rename from opteryx/storage/adapters/network/gcs_store.py rename to opteryx/storage/adapters/blob/gcs_store.py diff --git a/opteryx/storage/adapters/blob/minio_store.py b/opteryx/storage/adapters/blob/minio_store.py new file mode 100644 index 00000000..ea77bbd7 --- /dev/null +++ b/opteryx/storage/adapters/blob/minio_store.py @@ -0,0 +1,47 @@ +""" +MinIo Reader - also works with AWS +""" +from opteryx.exceptions import MissingDependencyError +from opteryx.storage import BaseStorageAdapter +from opteryx.utils import paths + +try: + from minio import Minio # type:ignore + + minio_installed = True +except ImportError: # pragma: no cover + minio_installed = False + + +class MinIoStorage(BaseStorageAdapter): + def __init__(self, end_point: str, access_key: str, secret_key: str, **kwargs): + + if not minio_installed: # pragma: no cover + raise MissingDependencyError( + "`minio` is missing, please install or include in requirements.txt" + ) + + super().__init__(**kwargs) + secure = kwargs.get("secure", True) + self.minio = Minio(end_point, access_key, secret_key, secure=secure) + + def get_blob_list(self, partition): + bucket, object_path, _, _ = paths.get_parts(partition) + blobs = self.minio.list_objects( + bucket_name=bucket, prefix=object_path, recursive=True + ) + yield from ( + bucket + "/" + blob.object_name + for blob in blobs + if not blob.object_name.endswith("/") + ) + + def read_blob(self, blob_name): + import io + + try: + bucket, object_path, name, extension = paths.get_parts(blob_name) + stream = self.minio.get_object(bucket, object_path + name + extension) + return io.BytesIO(stream.read()) + finally: + stream.close() diff --git a/opteryx/storage/adapters/network/__init__.py b/opteryx/storage/adapters/network/__init__.py deleted file mode 100644 index 4d9a9249..00000000 --- a/opteryx/storage/adapters/network/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/opteryx/storage/base_storage_adapter.py b/opteryx/storage/base_storage_adapter.py index 711f9985..d9c5c88d 100644 --- a/opteryx/storage/base_storage_adapter.py +++ b/opteryx/storage/base_storage_adapter.py @@ -22,7 +22,7 @@ class BaseStorageAdapter(abc.ABC): - def __init__(self): + def __init__(self, *args, **kwargs): pass def get_partitions( diff --git a/tests/performance/test_performance.py b/tests/performance/test_performance.py index 39efc488..f1543638 100644 --- a/tests/performance/test_performance.py +++ b/tests/performance/test_performance.py @@ -4,7 +4,7 @@ sys.path.insert(1, os.path.join(sys.path[0], "../..")) import opteryx -from opteryx.storage.adapters.local import DiskStorage +from opteryx.storage.adapters import DiskStorage def simple_query(): diff --git a/tests/query_planning/test_execution_plan.py b/tests/query_planning/test_execution_plan.py new file mode 100644 index 00000000..32e4a52b --- /dev/null +++ b/tests/query_planning/test_execution_plan.py @@ -0,0 +1,42 @@ +""" +Tests for the execution of flows. Create a basic flow +and push a payload through it. +""" +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../..")) + +from opteryx.engine.planner.execution_tree import ExecutionTree + + +def test_linear_execution_tree(): + """ + Test an execution tree where each item has no more than one incoming edge + """ + tree = ExecutionTree() + tree.add_operator("p", print) + tree.add_operator("m", max) + tree.link_operators("p", "m") + + assert len(tree._nodes) == 2 + assert ["m", "p"] == sorted(tree._nodes.keys()) + assert tree.get_operator("p") == print + assert len(tree._edges) == 1 + assert tree.get_entry_points() == ["p"] + assert tree.get_exit_points() == ["m"] + assert tree.is_acyclic() + assert tree.get_outgoing_links("p") == ["m"] + assert tree.get_incoming_links("m") == [("p", None)] + + tree.add_operator("n", min) + tree.link_operators("m", "n") + + assert len(tree._nodes) == 3 + assert ["m", "n", "p"] == sorted(tree._nodes.keys()) + assert len(tree._edges) == 2 + + +if __name__ == "__main__": # pragma: no cover + test_linear_execution_tree() + print("okay") diff --git a/tests/requirements.txt b/tests/requirements.txt index 6d61b5a6..a5237bac 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -7,4 +7,5 @@ rich zstandard pymemcache mypy -google-cloud-storage \ No newline at end of file +google-cloud-storage +minio \ No newline at end of file diff --git a/tests/storage/test_gcs_storage.py b/tests/storage/test_gcs_storage.py index a9693513..75a9665c 100644 --- a/tests/storage/test_gcs_storage.py +++ b/tests/storage/test_gcs_storage.py @@ -35,7 +35,7 @@ def populate_gcs(): def test_gcs_storage(): import opteryx - from opteryx.storage.adapters.network.gcs_store import GcsStorage + from opteryx.storage.adapters import GcsStorage populate_gcs() @@ -51,7 +51,6 @@ def test_gcs_storage(): # PROCESS THE DATA IN SOME WAY cur = conn.cursor() cur.execute(f"SELECT COUNT(*) FROM {BUCKET_NAME}.data.tweets GROUP BY userid;") - rows = cur.fetchall() rows = list(cur.fetchall()) assert len(rows) == 2 diff --git a/tests/storage/test_in_memory_cache.py b/tests/storage/test_in_memory_cache.py index 793bd7d7..5f0a3984 100644 --- a/tests/storage/test_in_memory_cache.py +++ b/tests/storage/test_in_memory_cache.py @@ -12,7 +12,7 @@ def test_in_memory_cache(): import opteryx from opteryx.storage.cache.memory_cache import InMemoryCache - from opteryx.storage.adapters.local import DiskStorage + from opteryx.storage.adapters import DiskStorage cache = InMemoryCache(size=5) diff --git a/tests/storage/test_memcached_cache.py b/tests/storage/test_memcached_cache.py index 82091605..3cc0982b 100644 --- a/tests/storage/test_memcached_cache.py +++ b/tests/storage/test_memcached_cache.py @@ -13,7 +13,7 @@ def test_memcached_cache(): import opteryx from opteryx.storage.cache.memcached_cache import MemcachedCache - from opteryx.storage.adapters.local import DiskStorage + from opteryx.storage.adapters import DiskStorage cache = MemcachedCache(server="localhost:11211") diff --git a/tests/storage/test_minio_storage.py b/tests/storage/test_minio_storage.py new file mode 100644 index 00000000..ac1ea668 --- /dev/null +++ b/tests/storage/test_minio_storage.py @@ -0,0 +1,59 @@ +""" +Test we can read from MinIO +""" +import os +import sys + +sys.path.insert(1, os.path.join(sys.path[0], "../..")) + +from minio import Minio # type:ignore + +BUCKET_NAME = "opteryx" +END_POINT = "localhost:9000" +SECRETS = "minioadmin" + + +def populate_minio(): + + import io + + client = Minio(END_POINT, SECRETS, SECRETS, secure=False) + if not client.bucket_exists(BUCKET_NAME): + client.make_bucket(BUCKET_NAME) + + data = open("tests/data/tweets/tweets-0000.jsonl", mode="rb").read() + + client.put_object( + BUCKET_NAME, "data/tweets/data.jsonl", io.BytesIO(data), len(data) + ) + + +def test_minio_storage(): + + import opteryx + from opteryx.storage.adapters.blob import MinIoStorage + + populate_minio() + + storage = MinIoStorage( + end_point=END_POINT, access_key=SECRETS, secret_key=SECRETS, secure=False + ) + conn = opteryx.connect(reader=storage, partition_scheme=None) + + # SELECT EVERYTHING + cur = conn.cursor() + cur.execute(f"SELECT * FROM {BUCKET_NAME}.data.tweets;") + rows = list(cur.fetchall()) + assert len(rows) == 25 + + # PROCESS THE DATA IN SOME WAY + cur = conn.cursor() + cur.execute(f"SELECT COUNT(*) FROM {BUCKET_NAME}.data.tweets GROUP BY userid;") + rows = list(cur.fetchall()) + assert len(rows) == 2 + + conn.close() + + +if __name__ == "__main__": + test_minio_storage()