Skip to content

Commit

Permalink
Merge pull request #166 from mabel-dev/FEATURE/#165
Browse files Browse the repository at this point in the history
Feature/#165
  • Loading branch information
joocer committed Jun 4, 2022
2 parents d64e7be + ab85735 commit f875e99
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 112 deletions.
7 changes: 7 additions & 0 deletions docs/Release Notes/Change Log.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

### [Unreleased]

#### Added

- [#20](https://github.com/mabel-dev/opteryx/issues/20) Split query planner and query plan into different modules. ([@joocer](https://github.com/joocer]))
- [#165](https://github.com/mabel-dev/opteryx/issues/165) Support S3/MinIO data stores for blobs. ([@joocer](https://github.com/joocer]))

### [0.0.2] - 2022-06-03

#### Added
Expand Down
145 changes: 145 additions & 0 deletions opteryx/engine/planner/execution_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class ExecutionTree:
"""
The execution tree is defined separately to the planner to simplify the
complex code which is the planner from the tree that describes the plan.
"""

def __init__(self):
"""create empty plan"""
self._nodes: dict = {}
self._edges: list = []

def add_operator(self, nid, operator):
"""
Add a step to the DAG
Parameters:
id: string
The id of the step, must be unique
Operator: BaseOperator
The Operator
"""
self._nodes[nid] = operator

def link_operators(self, source_operator, target_operator, direction=None):
"""
Link steps in a flow.
Parameters:
source_operator: string
The id of the source step
target_operator: string
The id of the target step
direction: string (optional)
Left/Right/None, the direction of the incoming node
"""
edge = (
source_operator,
target_operator,
direction,
)
if edge not in self._edges:
self._edges.append(edge)

def get_outgoing_links(self, nid):
"""
Get the ids of outgoing nodes from a given step.
Paramters:
name: string
The name of the step to search from
"""
retval = {target for source, target, direction in self._edges if source == nid}
return sorted(retval)

def get_incoming_links(self, nid):
"""
Get the ids of incoming nodes for a given step.
Paramters:
nid: string
The name of the step to search from
"""
retval = {
(
source,
direction,
)
for source, target, direction in self._edges
if target == nid
}
return sorted(retval)

def get_exit_points(self):
"""
Get steps in the flow with no outgoing steps.
"""
sources = {source for source, target, direction in self._edges}
retval = (
target for source, target, direction in self._edges if target not in sources
)
return sorted(retval)

def get_entry_points(self):
"""
Get steps in the flow with no incoming steps.
"""
if len(self._nodes) == 1:
return list(self._nodes.keys())
targets = {target for source, target, direction in self._edges}
retval = (
source for source, target, direction in self._edges if source not in targets
)
return sorted(retval)

def get_operator(self, nid):
"""
Get the Operator class by id.
Parameters:
nid: string
The id of the step
"""
return self._nodes.get(nid)

def is_acyclic(self):
"""
Test if the graph is acyclic
"""
# cycle over the graph removing a layer of exits each cycle
# if we have nodes but no exists, we're cyclic
my_edges = self._edges.copy()

while len(my_edges) > 0:
# find all of the exits
sources = {source for source, target, direction in my_edges}
exits = {
target
for source, target, direction in my_edges
if target not in sources
}

if len(exits) == 0:
return False

# remove the exits
new_edges = [
(source, target, direction)
for source, target, direction in my_edges
if target not in exits
]
my_edges = new_edges
return True
1 change: 1 addition & 0 deletions opteryx/engine/planner/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .distinct_node import DistinctNode # remove duplicate records
from .evaluation_node import EvaluationNode # aliases and evaluations
from .explain_node import ExplainNode # EXPLAIN queries
from .inner_join_node import InnerJoinNode # INNER JOIN
from .limit_node import LimitNode # select the first N records
from .offset_node import OffsetNode # skip a number of records
from .outer_join_node import OuterJoinNode # LEFT/RIGHT/FULL OUTER JOIN
Expand Down
99 changes: 10 additions & 89 deletions opteryx/engine/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@
these are not supported by SqlOxide and so are in a different module which strips
temporal aspects out of the query.
"""
import datetime
import numpy

from opteryx.engine.attribute_types import TOKEN_TYPES
from opteryx.engine.functions import is_function
from opteryx.engine.planner.execution_tree import ExecutionTree
from opteryx.engine.planner.operations import *
from opteryx.engine.planner.operations.inner_join_node import InnerJoinNode
from opteryx.engine.planner.temporal import extract_temporal_filters
from opteryx.exceptions import SqlError
from opteryx.utils import dates
Expand All @@ -68,16 +69,13 @@
}


class QueryPlanner(object):
class QueryPlanner(ExecutionTree):
def __init__(self, statistics, reader, cache, partition_scheme):
"""
PLan represents Directed Acyclic Graphs which are used to describe data
pipelines.
Planner creates a plan (Execution Tree or DAG) which presents the plan to
respond to the query.
"""
import datetime

self.nodes: dict = {}
self.edges: list = []
super().__init__()

self._ast = None

Expand All @@ -86,8 +84,8 @@ def __init__(self, statistics, reader, cache, partition_scheme):
self._cache = cache
self._partition_scheme = partition_scheme

self._start_date = datetime.datetime.today()
self._end_date = datetime.datetime.today()
self._start_date = datetime.datetime.utcnow().date()
self._end_date = datetime.datetime.utcnow().date()

def __repr__(self):
return "QueryPlanner"
Expand Down Expand Up @@ -266,9 +264,9 @@ def _build_dnf_filters(self, filters):
def _extract_relations(self, ast):
""" """

def _safe_get(l, i):
def _safe_get(iterable, index):
try:
return l[i]
return iterable[index]
except IndexError:
return None

Expand Down Expand Up @@ -790,83 +788,6 @@ def _inner_explain(operator_name, depth):
table = Columns.create_table_metadata(table, table.num_rows, "plan", None)
yield table

def add_operator(self, name, operator):
"""
Add a step to the DAG
Parameters:
name: string
The name of the step, must be unique
Operator: BaseOperator
The Operator
"""
self.nodes[name] = operator

def link_operators(self, source_operator, target_operator):
"""
Link steps in a flow.
Parameters:
source_operator: string
The name of the source step
target_operator: string
The name of the target step
"""
edge = (source_operator, target_operator)
if edge not in self.edges:
self.edges.append((source_operator, target_operator))

def get_outgoing_links(self, name):
"""
Get the names of outgoing links from a given step.
Paramters:
name: string
The name of the step to search from
"""
retval = {target for source, target in self.edges if source == name}
return sorted(retval)

def get_exit_points(self):
"""
Get steps in the flow with no outgoing steps.
"""
sources = {source for source, target in self.edges}
retval = {target for source, target in self.edges if target not in sources}
return sorted(retval)

def get_entry_points(self):
"""
Get steps in the flow with no incoming steps.
"""
if len(self.nodes) == 1:
return list(self.nodes.keys())
targets = {target for source, target in self.edges}
retval = {source for source, target in self.edges if source not in targets}
return sorted(retval)

def get_operator(self, name):
"""
Get the Operator class by name.
Parameters:
name: string
The name of the step
"""
return self.nodes.get(name)

def merge(self, assimilatee):
"""
Merge a flow into the current flow.
Parameters:
assimilatee: Flow
The flow to assimilate into the current flows
"""
self.nodes = {**self.nodes, **assimilatee.nodes}
self.edges += assimilatee.edges
self.edges = list(set(self.edges))

# def __repr__(self):
# return "\n".join(list(self._draw()))

Expand Down
5 changes: 3 additions & 2 deletions opteryx/storage/adapters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from opteryx.storage.adapters.local.disk_store import DiskStorage
from opteryx.storage.adapters.network.gcs_store import GcsStorage
from opteryx.storage.adapters.blob.disk_store import DiskStorage
from opteryx.storage.adapters.blob.gcs_store import GcsStorage
from opteryx.storage.adapters.blob.minio_store import MinIoStorage
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
# limitations under the License.

from .disk_store import DiskStorage
from .gcs_store import GcsStorage
from .minio_store import MinIoStorage
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@


class DiskStorage(BaseStorageAdapter):
def __init__(self):
pass

def read_blob(self, blob_name):
import io

Expand Down
File renamed without changes.
47 changes: 47 additions & 0 deletions opteryx/storage/adapters/blob/minio_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
MinIo Reader - also works with AWS
"""
from opteryx.exceptions import MissingDependencyError
from opteryx.storage import BaseStorageAdapter
from opteryx.utils import paths

try:
from minio import Minio # type:ignore

minio_installed = True
except ImportError: # pragma: no cover
minio_installed = False


class MinIoStorage(BaseStorageAdapter):
def __init__(self, end_point: str, access_key: str, secret_key: str, **kwargs):

if not minio_installed: # pragma: no cover
raise MissingDependencyError(
"`minio` is missing, please install or include in requirements.txt"
)

super().__init__(**kwargs)
secure = kwargs.get("secure", True)
self.minio = Minio(end_point, access_key, secret_key, secure=secure)

def get_blob_list(self, partition):
bucket, object_path, _, _ = paths.get_parts(partition)
blobs = self.minio.list_objects(
bucket_name=bucket, prefix=object_path, recursive=True
)
yield from (
bucket + "/" + blob.object_name
for blob in blobs
if not blob.object_name.endswith("/")
)

def read_blob(self, blob_name):
import io

try:
bucket, object_path, name, extension = paths.get_parts(blob_name)
stream = self.minio.get_object(bucket, object_path + name + extension)
return io.BytesIO(stream.read())
finally:
stream.close()
11 changes: 0 additions & 11 deletions opteryx/storage/adapters/network/__init__.py

This file was deleted.

0 comments on commit f875e99

Please sign in to comment.