Skip to content

Commit

Permalink
Merge pull request #454 from mabel-dev/FIX/#453
Browse files Browse the repository at this point in the history
FIX/#453 Large Expression Results PyArrow bug
  • Loading branch information
joocer committed Aug 30, 2022
2 parents fdbe217 + 5436cdc commit ff93aae
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 43 deletions.
12 changes: 8 additions & 4 deletions blog/20220205 Writing a Query Engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@

## Motivation

No-one in their right mind would write a SQL Engine if they didn't need to. There are a lot of options in the space of providing SQL query access to distributed data - with a few players dominating the market from Facebook and from the Apache Foundation.
No-one in their right mind would write a SQL Engine if they didn't need to. There are a lot of options in the space of providing SQL query access to distributed data - with a few players dominating the market like Trino, DuckDB and SQLite.

We had a problem where we wanted a SQL interface to our data, but none of the existing tools were a good fit for our situation. We could change ourselves to fit an existing toolset, but wanted to explore other options before committing to vendor-defined design.

## Prior Attempts

The data store we're working with was designed to be transctional (read a row of data, process it, save the result, repeat). We use JSON lines files, which for this usecase, we were unable to find anything better in the sweet spot of human and machine readable, and performance to read and write.
The data store we're working with was designed to be transctional (read a row of data, process it, save the result, repeat). We use JSON lines files, which for this usecase we were unable to find anything better in the sweet spot of human and machine readable, and performance to read and write.

With this as the datastore, our first attempt at a SQL engine was also transactional, following what is known as the Volcano Model. This aligned well with the tools that we had written to process the data so most of the effort was with translating the SQL syntax to filters that the existing tools could understand. Functionality like GROUP BY was added to make it feel more like a database and less like a log-viewer.

This provided an acceptable level of functionality for single-table queries (the existing tools only ever read from one table and write to one table) and the engine was implemented into user-facing systems.

As data grew, we started to hit problems. Reading tens of million of rows, constraints outside the control of the system meant that jobs that ran for longer than 180 seconds were terminated. This generally meant that queries with more than about 30million records (or fewer records but with calculations) timed out. A lot of queries were still able to be run as not everything hit these thresholds, but it couldn't be used for large data analysis.
As data grew, we started to hit problems. Reading tens of million of rows, constraints outside the control of the system meant that jobs that ran for longer than 180 seconds were terminated. This generally meant that queries with more than about 30 million records (or far fewer records but with calculations) timed out. A lot of queries were still able to be run as not everything hit these thresholds, but it couldn't be used for large data analysis.

## Redesign

The redesigned SQL Engine, called Opteryx, is leveraging Parquet to help improve performance. Parquet was assessed for the transactional use case but the optimized JSONL implementation in Mabel consistently outperformed Parquet. However, reassessing performance for a SQL engine, Parquet out performs JSONL.
The decision

redesigned SQL Engine, called Opteryx, is leveraging Parquet to help improve performance. Parquet was assessed for the transactional use case but the optimized JSONL implementation in Mabel consistently outperformed Parquet. However, reassessing performance for a SQL engine, Parquet out performs JSONL.

The previous SQL Engine had a fixed execution plan, this meant that no matter what your query was it followed the same steps, with some steps doing nothing. This was simplier to write, but will have affected performance. Opteryx creates a query plan, the initial version doesn't optimize this plan by doing things like running selections and projections early, but it does only add steps to the plan that are required.

Expand Down
41 changes: 21 additions & 20 deletions docs/Contributor Guide/95 Project Structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,26 @@
Opteryx's repository folder structure is described below:

~~~
├── connectors <- modules to connect to data sources
├── functions <- modules to execute functions within SQL statements
├── managers <- libraries responsible for key functional units
│ ├── cache <- modules implementing the caching mechanism
│ ├── expression <- modules implementing expression evaluation
│ ├── process <- modules implementing process management
│ ├── query
│ │ └── planner <- modules implementing query planning
│ └── schemes <- modules implementing storage schemes
├── models <- internal data models
├── operators <- modules implementing steps in the query plan
├── samples <- sample data
├── third_party <- third party code
│ ├── distogram
│ ├── fuzzy
│ ├── hyperloglog
│ ├── pyarrow_ops
│ └── ...
├── utils <- helper libraries
└── ...
/
├── connectors/ <- modules to connect to data sources
├── functions/ <- modules to execute functions within SQL statements
├── managers/ <- libraries responsible for key functional units
│ ├── cache/ <- modules implementing the caching mechanism
│ ├── expression/ <- modules implementing expression evaluation
│ ├── process/ <- modules implementing process management
│ ├── query/
│ │ └── planner/ <- modules implementing query planning
│ └── schemes/ <- modules implementing storage schemes
├── models/ <- internal data models
├── operators/ <- modules implementing steps in the query plan
├── samples/ <- sample data
├── third_party/ <- third party code
│ ├── distogram/
│ ├── fuzzy/
│ ├── hyperloglog/
│ ├── pyarrow_ops/
│ └── ...
├── utils/ <- helper libraries
└── ...
~~~

28 changes: 14 additions & 14 deletions docs/Deployment/10 Storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Local Disk | DiskStorage | Blob/File Store

Connectors are registered with the storage engine using the `register_store` method. Multiple prefixes can be added, using different connectors - multiple storage types can be combined into a single query.

~~~
~~~python
opteryx.storage.register_store("tests", DiskStorage)
~~~

Expand All @@ -26,8 +26,8 @@ import opteryx
from opteryx.connectors import GcsStorage

# Tell the storage engine that datasets with the prefix 'your_bucket' are
# to be read using the GcsStorage adapter. Multiple prefixes can be added
# and do not need to be the same adapter.
# to be read using the GcsStorage connector. Multiple prefixes can be added
# and do not need to be the same connector.
opteryx.register_store("your_bucket", GcsStorage)

connextion = opteryx.connect()
Expand All @@ -45,13 +45,13 @@ Opteryx references datasets using their relative path as the table name. For exa

~~~
/
├── products/
├── customers/
├── profiles/
└── preferences/
├── marketing/
└── site/
└── purchases/
─ products/
─ customers/
─ profiles/
─ preferences/
─ marketing/
─ site/
└── purchases/
~~~

Would have the following datasets available (assuming leaf folders have data files within them)
Expand All @@ -75,10 +75,10 @@ To enable temporal queries, data must be structured into date hierarchy folders

~~~
/
└── products/
└── year_2022/
└── month_05/
└── day_01/
─ products/
─ year_2022/
─ month_05/
─ day_01/
~~~

To query the data for today with this structure, you can execute:
Expand Down
1 change: 1 addition & 0 deletions docs/Release Notes/Change Log.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

- [[#448](https://github.com/mabel-dev/opteryx/issues/448)] `VERSION()` failed and missing from regression suite. ([@joocer](https://github.com/joocer))
- [[#404](https://github.com/mabel-dev/opteryx/issues/404)] `COALESCE` fails for NaN values. ([@joocer](https://github.com/joocer))
- [[#453](https://github.com/mabel-dev/opteryx/issues/453)] PyArrow bug with long lists creating new columns. ([@joocer](https://github.com/joocer))

## [0.3.0] - 2022-08-28

Expand Down
13 changes: 9 additions & 4 deletions opteryx/managers/expression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def evaluate(expression: ExpressionTreeNode, table: Table):

columns = Columns(table)
result = _inner_evaluate(root=expression, table=table, columns=columns)

if not isinstance(result, (pyarrow.Array, numpy.ndarray)):
result = numpy.array(result)
return result
Expand Down Expand Up @@ -334,11 +335,15 @@ def evaluate_and_append(expressions, table: Table):

new_column = evaluate(statement, table)

# Strings need special handling
if isinstance(new_column, (pyarrow.lib.StringScalar)):
new_column = new_column.as_py()
# large arrays appear to have a bug in PyArrow where they're automatically
# converted to a chunked array, but the internal function can't handle
# chunked arrays
if new_column.nbytes > 50000000:
new_column = [[i] for i in new_column]
else:
new_column = [new_column]

table = table.append_column(new_column_name, [new_column])
table = table.append_column(new_column_name, new_column)

# add the column to the schema and because it's been evaluated and added to
# table, it's an INDENTIFIER rather than a FUNCTION
Expand Down
2 changes: 2 additions & 0 deletions opteryx/models/query_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self):
self.time_selecting: float = 0
self.time_aggregating: float = 0
self.time_ordering: float = 0
self.time_evaluating: float = 0

self.start_time: int = 0
self.end_time: int = 0
Expand Down Expand Up @@ -107,6 +108,7 @@ def as_dict(self):
"time_selecting": self._ns_to_s(self.time_selecting),
"time_aggregating": self._ns_to_s(self.time_aggregating),
"time_ordering": self._ns_to_s(self.time_ordering),
"time_evaluating": self._ns_to_s(self.time_evaluating),
"partitions_found": self.partitions_found,
"partitions_scanned": self.partitions_scanned,
"partitions_read": self.partitions_read,
Expand Down
2 changes: 2 additions & 0 deletions opteryx/operators/aggregate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ def execute(self) -> Iterable:
)

# Allow grouping by functions by evaluating them
start_time = time.time_ns()
columns, self._groups, table = evaluate_and_append(self._groups, table)
self._statistics.time_evaluating += time.time_ns() - start_time

start_time = time.time_ns()
group_by_columns = [
Expand Down
4 changes: 3 additions & 1 deletion opteryx/operators/function_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
"""
import os
import sys
import time

from typing import Iterable

import numpy
import pyarrow

from opteryx.models import Columns, QueryDirectives, QueryStatistics
Expand Down Expand Up @@ -129,7 +129,9 @@ def name(self): # pragma: no cover
def execute(self) -> Iterable:

try:
start_time = time.time_ns()
data = FUNCTIONS[self._function](self._alias, *self._args) # type:ignore
self._statistics.time_data_read += time.time_ns() - start_time
except TypeError as err: # pragma: no cover
print(str(err))
if str(err).startswith("_unnest() takes 2"):
Expand Down
4 changes: 4 additions & 0 deletions opteryx/operators/projection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
This Node eliminates columns that are not needed in a Relation. This is also the Node
that performs column renames.
"""
import time

from typing import Iterable

import pyarrow
Expand Down Expand Up @@ -92,9 +94,11 @@ def execute(self) -> Iterable:
for page in data_pages.execute():

# If any of the columns are FUNCTIONs, we need to evaluate them
start_time = time.time_ns()
_columns, self._expressions, page = evaluate_and_append(
self._expressions, page
)
self._statistics.time_evaluating += time.time_ns() - start_time

# first time round we're going work out what we need from the metadata
if columns is None:
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/selection_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def execute(self) -> Iterable:

start_selection = time.time_ns()
mask = evaluate(self._filter, page)
self._statistics.time_evaluating += time.time_ns() - start_selection

# if the mask is a boolean array, we've called a function that
# returns booleans
Expand Down
2 changes: 2 additions & 0 deletions tests/sql_battery/test_battery_shape.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@
("SELECT VERSION()", 1, 1),
# COALESCE doesn't work with NaNs [#404]
("SELECT is_reply_to FROM tests.data.formats.parquet WITH(NO_PARTITION) WHERE COALESCE(is_reply_to, -1) < 0", 74765, 1),
# Large results can't be added to pages #453
("SELECT SHA512(column_0) FROM FAKE(150000, 1)", 150000, 1),
]
# fmt:on

Expand Down

0 comments on commit ff93aae

Please sign in to comment.