Skip to content

Commit

Permalink
Merge pull request #503 from mabel-dev/FEATURE/#395
Browse files Browse the repository at this point in the history
Feature/#395 CSV reader
  • Loading branch information
joocer committed Sep 9, 2022
2 parents 3946e74 + 743a698 commit 5e94524
Show file tree
Hide file tree
Showing 12 changed files with 60,029 additions and 56 deletions.
11 changes: 8 additions & 3 deletions docs/Deployment/10 Storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,14 @@ This is the default structure created by [Mabel](https://github.com/mabel-dev/ma

Opteryx is primarily designed for use with [Parquet](https://parquet.apache.org/) to store data, Parquet is fast to process and offers optimizations not available for other formats, however, in some benchmarks [ORC](https://orc.apache.org/) out performs Parquet.

Opteryx also supports [JSONL](https://jsonlines.org/) files, and JSONL files which have been [Zstandard](http://facebook.github.io/zstd/) compressed (`.zstd`). `.jsonl` and `.zstd` format files are the default storage for [Mabel](https://github.com/mabel-dev/mabel).

Opteryx also has support for Feather (Arrow) files.
Opteryx supports:

- Parquet formatted files
- CSV formatted files
- [JSONL](https://jsonlines.org/) formatted files
- JSONL formatted files which have been [Zstandard](http://facebook.github.io/zstd/) compressed (`.zstd`)
- ORC formatted files
- Feather (Arrow) formatted files

### File Sizes

Expand Down
1 change: 1 addition & 0 deletions docs/Release Notes/Change Log.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [[#375](https://github.com/mabel-dev/opteryx/issues/375)] Results to an Arrow Table. ([@joocer](https://github.com/joocer))
- [[#486](https://github.com/mabel-dev/opteryx/issues/486)] Support functions on aggregates and aggregates on functions. ([@joocer](https://github.com/joocer))
- Initial support for `INTERVAL`s. ([@joocer](https://github.com/joocer))
- [[#395](https://github.com/mabel-dev/opteryx/issues/395)] Support reading CSV files. ([@joocer](https://github.com/joocer))

**Changed**

Expand Down
36 changes: 20 additions & 16 deletions docs/SQL Reference/06 Functions.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Functions

This document describes the built-in SQL functions and operators supported by Opteryx.

Generally functions will return `Null` on `Null` input, although note that this is not true in all circumstances.

Definitions noted with a 🔻 accept different input arguments.

## Conversion Functions
Expand Down Expand Up @@ -41,7 +45,7 @@ Definitions noted with a 🔻 accept different input arguments.
Return timestamp of **seconds** seconds since the Unix Epoch.

!!! function "`TRY_CAST` (**any**: _any_ AS **type**) → _[type]_"
Cast **any** to **type**, if cast is not possible, returns `NULL`.
Cast **any** to **type**, if cast is not possible, returns `Null`.

!!! function "`VARCHAR` (_any_) → _varchar_"
Cast **any** to a string, raises an error if cast is not possible.
Expand Down Expand Up @@ -175,17 +179,17 @@ For more details, see [Working with Lists](https://mabel-dev.github.io/opteryx/S
Returns the number of elements in **array**.

!!! function "`LIST_CONTAINS` (**array**: _list_, **value**) → _boolean_"
Return `true` if **array** contains **value**.
Return `True` if **array** contains **value**.
See also `SEARCH`(**array**, **value**).

!!! function "`LIST_CONTAINS_ANY` (**array**: _list_, **values**: _list_) → _boolean_"
Return `true` if **array** contains any elements in **values**.
Return `True` if **array** contains any elements in **values**.

!!! function "`LIST_CONTAINS_ALL` (**array**: _list_, **values**: _list_) → _boolean_"
Return `true` if **array** contains all of elements in **values**.
Return `True` if **array** contains all of elements in **values**.

!!! function "`SEARCH` (**array**: _list_, **value**) → _boolean_ 🔻"
Return `true` if **array** contains **value**.
Return `True` if **array** contains **value**.

!!! function "`SORT` (**array**: _list_) → _list_"
Return **array** in ascending order.
Expand Down Expand Up @@ -274,7 +278,7 @@ Functions for examining and manipulating string values.
Related: `CONCAT`.

!!! function "`ENDS_WITH` (**str**: _varchar_, **value**: _varchar_) → _boolean_"
Return True if **str** ends with **value**.
Return `True` if **str** ends with **value**.
Related: `STARTS_WITH`.

!!! function "`GET` (**str**: _varchar_, **index**: _numeric_) → _varchar_ 🔻"
Expand Down Expand Up @@ -305,10 +309,10 @@ Functions for examining and manipulating string values.
Returns a character string containing the phonetic representation of char. See [Soundex 🡕](https://en.wikipedia.org/wiki/Soundex).

!!! function "`SEARCH` (**str**: _varchar_, **value**: _varchar_) → _boolean_ 🔻"
Return True if **str** contains **value**.
Return `True` if **str** contains **value**.

!!! function "`STARTS_WITH` (**str**: _varchar_, **value**: _varchar_) → _boolean_"
Return True if **str** starts with **value**.
Return `True` if **str** starts with **value**.
Related: `ENDS_WITH`

!!! function "`TITLE` (**str**: _varchar_) → _varchar_"
Expand All @@ -333,7 +337,7 @@ For more details, see [Working with Structs](https://mabel-dev.github.io/opteryx
Alias of **object**`[`**key**`]`.

!!! function "`SEARCH` (**object**: _struct_, **value**: _varchar_) → **boolean** 🔻"
Return `TRUE` if any of the values in **object** is **value**. Note `SEARCH` does not match struct keys.
Return `True` if any of the values in **object** is **value**. Note `SEARCH` does not match struct keys.

## System Functions

Expand All @@ -359,7 +363,7 @@ For more details, see [Working with Structs](https://mabel-dev.github.io/opteryx
Related: `BASE85_DECODE`.

!!! function "`COALESCE` (**arg1**, **arg2**, ...) → _[input type]_"
Return the first item from args which is not `NULL`.
Return the first item from args which is not `Null`.
Related: `IFNULL`.

!!! function "`GENERATE_SERIES` (**stop**: _numeric_) → _list_<_numeric_> 🔻"
Expand All @@ -378,25 +382,25 @@ For more details, see [Working with Structs](https://mabel-dev.github.io/opteryx
Return a list of IP addresses from a given **cidr**.

!!! function "`IFNULL` (**check_expression**: _any_, **replacement_value**: _any_) → _[input type]_"
Returns **check_expression** if not `NULL`, otherwise returns **replacement_value**.
Returns **check_expression** if not `Null`, otherwise returns **replacement_value**.
Related: `COALESCE`.

!!! function "`HASH` (**any**) → _varchar_"
Calculate the [CityHash](https://opensource.googleblog.com/2011/04/introducing-cityhash.html) (64 bit).

!!! function "`HEX_DECODE` (**any**) → _varchar_"
Decode a value which has been encoded using HEX (BASE16) encoding.
Decode a value which has been encoded using hexadecimal (Base16) encoding.
Related: `HEX_ENCODE`.

!!! function "`HEX_ENCODE` (**any**) → _varchar_"
Encode value with HEX (BASE16) encoding.
Encode value with hexadecimal (Base16) encoding.
Related: `HEX_DECODE`.

!!! function "`IIF` (**condition**, **true_value**, **false_value***) → _[input type]_"
Return the **true_value** if the condition evaluates to True, otherwise return the **false_value**.
!!! function "`IIF` (**condition**, **true_value**, **false_value**) → _[input type]_"
Return the **true_value** if the condition evaluates to `True`, otherwise return the **false_value**.

!!! function "`NORMAL` () → _numeric_"
Random number from a normal (Gaussian) distribution; distribution is centred at 0.0 and have a standard deviation of 1.0.
Random number from a normal (Gaussian) distribution; distribution is centred at 0.0 and has a standard deviation of 1.0.

!!! function "`MD5` (**any**) → _varchar_"
Calculate the MD5 hash.
Expand Down
4 changes: 2 additions & 2 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, project: Optional[str] = None, credentials=None, **kwargs):
def read_blob(self, blob_name):

bucket, object_path, name, extension = paths.get_parts(blob_name)
bucket = bucket.replace("_data", "-data")
bucket = bucket.replace("va_data", "va-data")
bucket = bucket.replace("data_", "data-")

blob = get_blob(
Expand All @@ -55,7 +55,7 @@ def read_blob(self, blob_name):

def get_blob_list(self, partition=None):
bucket, object_path, name, extension = paths.get_parts(partition)
bucket = bucket.replace("_data", "-data")
bucket = bucket.replace("va_data", "va-data")
bucket = bucket.replace("data_", "data-")

# print(bucket, object_path, name, extension)
Expand Down
4 changes: 2 additions & 2 deletions opteryx/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from pyarrow import ArrowNotImplementedError

import numpy
import pyarrow

import opteryx

Expand All @@ -31,7 +30,7 @@
from opteryx.functions import other_functions
from opteryx.functions import string_functions
from opteryx.third_party.date_trunc import date_trunc
from opteryx.utils import dates
from opteryx.utils import dates, arrays


def get_random():
Expand Down Expand Up @@ -240,6 +239,7 @@ def _coalesce(*args):
"GREATEST": _iterate_single_parameter(numpy.nanmax),
"LEAST": _iterate_single_parameter(numpy.nanmin),
"IIF": other_functions.iif,
"GENERATE_SERIES": arrays.generate_series,

# NUMERIC
"ROUND": number_functions.round,
Expand Down
1 change: 1 addition & 0 deletions opteryx/operators/blob_reader_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def do_nothing(stream, projection=None):
"complete": (do_nothing, ExtentionType.CONTROL),
"ignore": (do_nothing, ExtentionType.CONTROL),
"arrow": (file_decoders.arrow_decoder, ExtentionType.DATA), # feather
"csv": (file_decoders.csv_decoder, ExtentionType.DATA),
"jsonl": (file_decoders.jsonl_decoder, ExtentionType.DATA),
"orc": (file_decoders.orc_decoder, ExtentionType.DATA),
"parquet": (file_decoders.parquet_decoder, ExtentionType.DATA),
Expand Down
33 changes: 3 additions & 30 deletions opteryx/operators/function_dataset_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,12 @@
from opteryx.managers.expression import NodeType, evaluate
from opteryx.operators import BasePlanNode
from opteryx.exceptions import SqlError
from opteryx.utils import arrays


def _generate_series(alias, *args):

from opteryx.utils import intervals, dates

arg_len = len(args)
arg_vals = [i.value for i in args]
first_arg_type = args[0].token_type

# if the parameters are numbers, generate series is an alias for range
if first_arg_type == NodeType.LITERAL_NUMERIC:
if arg_len not in (1, 2, 3):
raise SqlError("generate_series for numbers takes 1,2 or 3 parameters.")
return [{alias: i} for i in intervals.generate_range(*arg_vals)]

# if the params are timestamps, we create time intervals
if first_arg_type == NodeType.LITERAL_TIMESTAMP:
if arg_len != 3:
raise SqlError(
"generate_series for dates needs start, end, and interval parameters"
)
return [{alias: i} for i in dates.date_range(*arg_vals)]

# if the param is a CIDR, we create network ranges
if first_arg_type == NodeType.LITERAL_VARCHAR:
if arg_len not in (1,):
raise SqlError("generate_series for strings takes 1 CIDR parameter.")

import ipaddress

ips = ipaddress.ip_network(arg_vals[0], strict=False)
return [{alias: str(ip)} for ip in ips]
value_array = arrays.generate_series(*args)
return [{alias: value} for value in value_array]


def _unnest(alias, values):
Expand Down
37 changes: 37 additions & 0 deletions opteryx/utils/arrays.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import numpy

from opteryx.exceptions import SqlError
from opteryx.utils import intervals, dates


def generate_series(*args):

from opteryx.managers.expression import NodeType

arg_len = len(args)
arg_vals = [i.value for i in args]
first_arg_type = args[0].token_type

# if the parameters are numbers, generate series is an alias for range
if first_arg_type in (NodeType.LITERAL_NUMERIC, numpy.float64):
if arg_len not in (1, 2, 3):
raise SqlError("generate_series for numbers takes 1,2 or 3 parameters.")
return intervals.generate_range(*arg_vals)

# if the params are timestamps, we create time intervals
if first_arg_type == NodeType.LITERAL_TIMESTAMP:
if arg_len != 3:
raise SqlError(
"generate_series for dates needs start, end, and interval parameters"
)
return dates.date_range(*arg_vals)

# if the param is a CIDR, we create network ranges
if first_arg_type == NodeType.LITERAL_VARCHAR:
if arg_len not in (1,):
raise SqlError("generate_series for strings takes 1 CIDR parameter.")

import ipaddress

ips = ipaddress.ip_network(arg_vals[0], strict=False)
return [str(ip) for ip in ips]
40 changes: 38 additions & 2 deletions opteryx/utils/file_decoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ def orc_decoder(stream, projection: List = None):
import pyarrow.orc as orc

orc_file = orc.ORCFile(stream)

selected_columns = None
if isinstance(projection, (list, set)) and "*" not in projection:
orc_metadata = orc_file.schema
selected_columns = list(set(orc_metadata.names).intersection(projection))
# if nothing matched, there's been a problem - maybe HINTS confused for columns
if len(selected_columns) == 0:
selected_columns = None

table = orc_file.read()
return table

Expand All @@ -67,8 +76,27 @@ def jsonl_decoder(stream, projection: List = None):
table = pyarrow.json.read_json(stream)

# the read doesn't support projection, so do it now
# if projection:
# table = table.select(projection)
if projection and "*" not in projection:
selected_columns = list(set(table.column_names).intersection(projection))
# if nothing matched, don't do a thing
if len(selected_columns) > 0:
table = table.select(selected_columns)

return table


def csv_decoder(stream, projection: List = None):

import pyarrow.csv

table = pyarrow.csv.read_csv(stream)

# the read doesn't support projection, so do it now
if projection and "*" not in projection:
selected_columns = list(set(table.column_names).intersection(projection))
# if nothing matched, don't do a thing
if len(selected_columns) > 0:
table = table.select(selected_columns)

return table

Expand All @@ -78,4 +106,12 @@ def arrow_decoder(stream, projection: List = None):
import pyarrow.feather as pf

table = pf.read_table(stream)

# we can't get the schema before reading the file, so do selection now
if projection and "*" not in projection:
selected_columns = list(set(table.column_names).intersection(projection))
# if nothing matched, don't do a thing
if len(selected_columns) > 0:
table = table.select(selected_columns)

return table

0 comments on commit 5e94524

Please sign in to comment.