Skip to content

Commit

Permalink
FIX/#199
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jun 16, 2022
1 parent 505d8bc commit df98380
Show file tree
Hide file tree
Showing 12 changed files with 1,124 additions and 134 deletions.
1 change: 1 addition & 0 deletions docs/Release Notes/Notices.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Opteryx uses the following libraries and components:

Component | Licence
---------------------------------------------------------- | -------
[distogram](https://github.com/maki-nage/distogram) | [MIT](https://github.com/maki-nage/distogram/blob/master/LICENSE.txt)
[pyarrow](https://github.com/apache/arrow/) | [Apache 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt)
[sqloxide](https://github.com/wseaton/sqloxide) | [MIT](https://github.com/wseaton/sqloxide/blob/master/LICENSE)
[cython](https://github.com/cython/cython) | [Apache 2.0](https://github.com/cython/cython/blob/master/LICENSE.txt)
Expand Down
356 changes: 222 additions & 134 deletions opteryx/engine/planner/operations/show_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
from typing import Iterable

import numpy
import orjson
import pyarrow

from opteryx.engine.query_statistics import QueryStatistics
from opteryx.engine.attribute_types import OPTERYX_TYPES, determine_type
from opteryx.engine.planner.operations.base_plan_node import BasePlanNode
from opteryx.exceptions import SqlError
from opteryx.third_party import distogram
from opteryx.utils.columns import Columns


Expand All @@ -43,9 +45,213 @@ def myhash(any):
return CityHash64(str(any))


def _simple_collector(page):
"""
Collect the very summary type information only, we read only a single page to do
this so it's pretty quick - helpful if you want to know what fields are available
programatically.
"""
columns = Columns(page)

buffer = []
for column in page.column_names:
column_data = page.column(column)
_type = determine_type(str(column_data.type))
new_row = {"column_name": columns.get_preferred_name(column), "type": _type}
buffer.append(new_row)

table = pyarrow.Table.from_pylist(buffer)
table = Columns.create_table_metadata(
table=table,
expected_rows=len(buffer),
name="show_columns",
table_aliases=[],
)
return table


def _full_collector(pages):
"""
Collect basic count information about columns, to do this we read the entire
dataset.
"""

EMPTY_PROFILE = orjson.dumps(
{
"column_name": None,
"type": [],
"count": 0,
"min": None,
"max": None,
"missing": 0,
}
)

columns = None
profile_collector = {}

for page in pages:
if columns is None:
columns = Columns(page)

for column in page.column_names:
column_data = page.column(column)
profile = profile_collector.get(column, orjson.loads(EMPTY_PROFILE))
_type = determine_type(str(column_data.type))
if _type not in profile["type"]:
profile["type"].append(_type)

profile["count"] += len(column_data)
profile["missing"] += column_data.null_count

if _type == OPTERYX_TYPES.NUMERIC:
if profile["min"]:
profile["min"] = min(profile["min"], numpy.nanmin(column_data))
profile["max"] = max(profile["max"], numpy.nanmax(column_data))
else:
profile["min"] = numpy.nanmin(column_data)
profile["max"] = numpy.nanmax(column_data)

profile_collector[column] = profile

buffer = []

for column, profile in profile_collector.items():
profile["column_name"] = columns.get_preferred_name(column)
profile["type"] = ", ".join(profile["type"])
buffer.append(profile)

table = pyarrow.Table.from_pylist(buffer)
table = Columns.create_table_metadata(
table=table,
expected_rows=len(buffer),
name="show_columns",
table_aliases=[],
)
return table


def _extended_collector(pages):
"""
Collect summary statistics about each column
"""

EMPTY_PROFILE = orjson.dumps(
{
"column_name": None,
"type": [],
"count": 0,
"min": None,
"max": None,
"missing": 0,
"mean": None,
"quantiles": None,
"histogram": None,
"unique": 0,
"most_frequent_values": None,
"most_frequent_counts": None,
"distogram": None,
}
)

columns = None
profile_collector = {}

for page in pages:
if columns is None:
columns = Columns(page)

for column in page.column_names:
column_data = page.column(column)
profile = profile_collector.get(column, orjson.loads(EMPTY_PROFILE))
_type = determine_type(str(column_data.type))
if _type not in profile["type"]:
profile["type"].append(_type)

profile["count"] += len(column_data)

# calculate the missing count more robustly
missing = reduce(
lambda x, y: x + 1,
(i for i in column_data if i in (None, numpy.nan)),
0,
)
profile["missing"] += missing

if _type in (OPTERYX_TYPES.LIST, OPTERYX_TYPES.STRUCT, OPTERYX_TYPES.OTHER):
profile_collector[column] = profile
continue

# convert TIMESTAMP into a NUMERIC (seconds after Linux Epoch)
if _type == OPTERYX_TYPES.TIMESTAMP:
import datetime

to_linux_epoch = (
lambda x: numpy.nan
if x.as_py() is None
else datetime.datetime.fromisoformat(
x.as_py().isoformat()
).timestamp()
)
column_data = (to_linux_epoch(i) for i in column_data)
else:
column_data = (i.as_py() for i in column_data)

# remove empty values
column_data = numpy.array(
[i for i in column_data if i not in (None, numpy.nan)]
)

if _type in (OPTERYX_TYPES.BOOLEAN, OPTERYX_TYPES.VARCHAR):
if profile[""]

if _type in (OPTERYX_TYPES.NUMERIC, OPTERYX_TYPES.TIMESTAMP):
# populate the distogram
if profile["distogram"] is None:
dgram = distogram.Distogram(10)
else:
dgram = profile["distogram"]
values, counts = numpy.unique(column_data, return_counts=True)
for index, value in enumerate(values):
dgram = distogram.update(dgram, value=value, count=counts[index])
profile["distogram"] = dgram

profile_collector[column] = profile

buffer = []

for column, profile in profile_collector.items():
profile["column_name"] = columns.get_preferred_name(column)
profile["type"] = ", ".join(profile["type"])
dgram = profile.pop("distogram")
if dgram:
profile["min"], profile["max"] = distogram.bounds(dgram)
profile["mean"] = distogram.mean(dgram)

histogram = distogram.histogram(dgram, bin_count=10)
if histogram:
profile["histogram"] = histogram[0]

profile["quantiles"] = (
distogram.quantile(dgram, value=0.25),
distogram.quantile(dgram, value=0.5),
distogram.quantile(dgram, value=0.75),
)
buffer.append(profile)

table = pyarrow.Table.from_pylist(buffer)
table = Columns.create_table_metadata(
table=table,
expected_rows=len(buffer),
name="show_columns",
table_aliases=[],
)
return table


class ShowColumnsNode(BasePlanNode):
def __init__(self, statistics: QueryStatistics, **config):
self._full = (config.get("full"),)
self._full = config.get("full")
self._extended = config.get("extended")
pass

Expand All @@ -67,136 +273,18 @@ def execute(self) -> Iterable:
if data_pages is None:
return None

if self._full:
dataset = pyarrow.concat_tables(data_pages.execute())
else:
dataset = next(data_pages.execute())

source_metadata = Columns(dataset)

buffer = []
for column in dataset.column_names:
column_data = dataset.column(column)
_type = determine_type(str(column_data.type))
new_row = {
"column_name": source_metadata.get_preferred_name(column),
"type": _type,
"nulls": (column_data.null_count) > 0,
}

if self._extended:

new_row = {
**new_row,
"count": -1,
"min": None,
"max": None,
"mean": None,
"quantiles": None,
"histogram": None,
"unique": -1,
"missing": -1,
"most_frequent_values": None,
"most_frequent_counts": None,
}

# Basic counting statistics
new_row["count"] = len(column_data)
new_row["missing"] = reduce(
lambda x, y: x + 1,
(i for i in column_data if i in (None, numpy.nan)),
0,
)
# Number of unique items in the column)
# We use hashes because some types don't play nicely
values = numpy.unique(
[hash(i) for i in column_data if i not in (None, numpy.nan)]
)
unique_values = len(values)
new_row["unique"] = unique_values
del values

# LISTS and STRUCTS are complex, don't profile them
if _type in (OPTERYX_TYPES.LIST, OPTERYX_TYPES.STRUCT):
continue

# convert TIMESTAMP into a NUMERIC (seconds after Linux Epoch)
if _type == OPTERYX_TYPES.TIMESTAMP:
import datetime

to_linux_epoch = (
lambda x: numpy.nan
if x.as_py() is None
else datetime.datetime.fromisoformat(
x.as_py().isoformat()
).timestamp()
)
column_data = (to_linux_epoch(i) for i in column_data)
else:
column_data = (i.as_py() for i in column_data)

# remove empty values
column_data = numpy.array(
[i for i in column_data if i not in (None, numpy.nan)]
)

# don't work with long strings
if _type == OPTERYX_TYPES.VARCHAR:
if max(len(i) for i in column_data) > 32:
continue

# For NUMERIC and TIMESTAMPS (now NUMERIC), get min, max, mean,
# quantiles and distribution
if _type in (
OPTERYX_TYPES.NUMERIC,
OPTERYX_TYPES.TIMESTAMP,
):
new_row["min"] = numpy.min(column_data)
new_row["max"] = numpy.max(column_data)

# Python has no practical limits on numbers, but Arrow does
if (
new_row["min"] < -9007199254740992
or new_row["max"] > 9007199254740992
):
new_row["min"] = None
new_row["max"] = None
else:
new_row["mean"] = numpy.mean(column_data)
new_row["quantiles"] = numpy.percentile(
column_data, [25, 50, 75]
)
new_row["histogram"], boundaries = numpy.histogram(
column_data, min(unique_values, 10)
)
del boundaries

# Don't work out frequencies for TIMESTAMPS
if _type not in (OPTERYX_TYPES.TIMESTAMP) and unique_values < 10:
column_data, counts = numpy.unique(column_data, return_counts=True)
# skip if everything occurs the same number of times
if max(counts) != min(counts):
top_counts = sorted(counts, reverse=True)[0:5]
most_frequent = {
str(v): c
for v, c in zip(column_data, counts)
if c in top_counts
}
new_row["most_frequent_values"] = list(most_frequent.keys())
new_row["most_frequent_counts"] = most_frequent.values()
del most_frequent
del counts

del column_data

buffer.append(new_row)

table = pyarrow.Table.from_pylist(buffer)
table = Columns.create_table_metadata(
table=table,
expected_rows=len(buffer),
name="show_columns",
table_aliases=[],
)
yield table
return
if not (self._full or self._extended):
# if it's not full or extended, do just get the list of columns and their
# types
yield _simple_collector(next(data_pages.execute()))
return

if self._full and not self._extended:
# we're going to read the full table, so we can count stuff
yield _full_collector(data_pages.execute())
return

if self._extended:
# get everything we can reasonable get
yield _extended_collector(data_pages.execute())
return

0 comments on commit df98380

Please sign in to comment.