Skip to content

Commit

Permalink
container/csv: add support for load without schema
Browse files Browse the repository at this point in the history
  • Loading branch information
MainRo committed Jan 23, 2024
1 parent a22272a commit d2dce85
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 76 deletions.
132 changes: 60 additions & 72 deletions rxsci/container/csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing
import json
import csv
import logging
from collections import namedtuple
from datetime import datetime, timezone
from dateutil.parser import isoparse
Expand Down Expand Up @@ -42,7 +43,7 @@ def parse_decimal(ii):

return float(i) + r
except Exception:
#print("parse error on {}: {}".format(ii, e))
logging.error(f"parse error on {ii}: {e}")
return float(ii)


Expand Down Expand Up @@ -118,21 +119,23 @@ def merge_escape_parts(parts, separator, escapechar):

return merged_parts
except Exception as e:
print(e)
print(parts)
print(merged_parts)
logging.error(e)
logging.error(parts)
logging.error(merged_parts)
raise e


def create_line_parser(
dtype, none_values=[],
dtype=None, none_values=[],
separator=",", escapechar="\\",
ignore_error=False, schema_name='x'
):
''' creates a parser for csv lines
Args:
dtype: A list of (name, type) tuples, or a typing.NamedTuple class.
dtype: [Optional] A list of (name, type) tuples, or a typing.NamedTuple
class. When set to None, then the csv header is used to create a
schema where all columns are parsed as strings.
none_values: [Optional] Values to consider as None values
separator: [Optional] Token used to separate each columns
ignore_error: [Optional] when set to True, any line that does not
Expand All @@ -143,59 +146,9 @@ def create_line_parser(
A Parsing function, that can parse text lines as specified in the
parameters.
'''
Item, columns, types = create_schema_factory(dtype, schema_name)
columns_parser = [type_parser(t) for t in types]
columns_len = len(columns)

#csv_file = CsvDataFile()
#reader = csv.reader(csv_file)

def parse_column(index, i):
return columns_parser[index](i)

def split(line, separator):
return line.split(separator)

"""
def parse_line_as_csv(line):
csv_file.set_data(line)
parts = next(reader)
if len(parts) != columns_len:
error = "invalid number of columns: expected {}, found {} on: {}".format(
columns_len, len(parts), line)
if ignore_error is True:
print(error)
return None
else:
raise ValueError(error)
for index, i in enumerate(parts):
if i in none_values:
parts[index] = None
else:
parts[index] = parse_column(index, i)
return Item(*parts)
#return parts
def parse_line_as_json(line):
line = "[{}]".format(line)
parts = json.loads(line)
if len(parts) != columns_len:
error = "invalid number of columns: expected {}, found {} on: {}".format(
columns_len, len(parts), line)
if ignore_error is True:
print(error)
return None
else:
raise ValueError(error)
return Item(*parts)
#return parts
"""

def parse_line(line):
def parse_line(line, columns_parser, columns_len, Item):
try:
parts = split(line, separator)
parts = line.split(separator)
if len(parts) != columns_len:
parts = merge_escape_parts(parts, separator, escapechar)
if len(parts) != columns_len:
Expand All @@ -210,49 +163,84 @@ def parse_line(line):
if i in none_values:
parts[index] = None
else:
parts[index] = parse_column(index, i)
parts[index] = columns_parser[index](i)

except Exception as e:
if ignore_error is True:
print("{}, \nignoring this line".format(e))
logging.error(f"{e}, \nignoring this line")
return None
else:
raise e
#return item(*parsed_parts)
return Item(*parts)
#return parts

#if separator == ',':
# print("parsing as json")
# return parse_line_as_json
return parse_line
#return parse_line_as_csv
def _parse(source):
def on_subscribe(observer, scheduler):
Item = None
columns_parser = None
columns_len = None

def on_next(i):
nonlocal Item
nonlocal columns_parser
nonlocal columns_len

if Item is None:
if dtype is not None:
Item, columns, types = create_schema_factory(dtype, schema_name)
else:
# create a dummy schema with all fields as string
parts = i.split(separator)
Item, columns, types = create_schema_factory(
[(p, str) for p in parts],
schema_name
)
columns_parser = [type_parser(t) for t in types]
columns_len = len(columns)

else:
try:
item = parse_line(i, columns_parser, columns_len, Item)
except Exception as e:
observer.on_error(e)
return
observer.on_next(item)

source.subscribe(
on_next=on_next,
on_completed=observer.on_completed,
on_error=observer.on_error,
scheduler=scheduler,
)

return rx.create(on_subscribe)

return _parse


def load(parse_line, skip=0):
def load(parse_line=create_line_parser(), skip=0):
''' Loads a csv observable.
The source observable must emit one csv row per item
The source must be an Observable.
Args:
parse_line: A line parser, e.g. created with create_line_parser
skip: number of items to skip before parsing
skip: number of items to skip before parsing (excluding the header)
Returns:
An observable of namedtuple items, where each key is a csv column
'''
def _load(source):
return source.pipe(
parse_line,
ops.skip(skip),
ops.map(parse_line),
ops.filter(lambda i: i is not None),
)

return _load


def load_from_file(filename, parse_line, skip=1, encoding=None):
def load_from_file(filename, parse_line=create_line_parser(), skip=0, encoding=None):
''' Loads a csv file.
This factory loads the provided file and returns its content as an
Expand All @@ -261,7 +249,7 @@ def load_from_file(filename, parse_line, skip=1, encoding=None):
Args:
filename: Path of the file to read or a file object
parse_line: A line parser, e.g. created with create_line_parser
skip: [Optional] Number of lines to skip before parsing
skip: [Optional] Number of lines to skip before parsing (excluding the header)
encoding [Optional] Encoding used to parse the text content
Returns:
Expand Down
44 changes: 40 additions & 4 deletions tests/container/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,30 @@ def test_parser():
)

actual_data = process(rx.from_([
"index,f1,f2",
"42,the,True",
"07,quick,False",
]), [ops.map(parser)])
]), [parser])

assert len(actual_data) == 2
assert actual_data[0] == (42, 'the', True)
assert actual_data[1] == (7, 'quick', False)


def test_parser_auto():
parser = csv.create_line_parser()

actual_data = process(rx.from_([
"index,f1,f2",
"42,the,True",
"07,quick,False",
]), [parser])

assert len(actual_data) == 2
assert actual_data[0] == ('42', 'the', 'True')
assert actual_data[1] == ('07', 'quick', 'False')


def test_parser_typed():
class TestLine(typing.NamedTuple):
foo: int
Expand All @@ -56,9 +71,10 @@ class TestLine(typing.NamedTuple):
parser = csv.create_line_parser(dtype=TestLine)

actual_data = process(rx.from_([
"index,f1,f2",
"42,the,True",
"07,quick,False",
]), [ops.map(parser)])
]), [parser])

assert len(actual_data) == 2
assert actual_data[0] == (42, 'the', True)
Expand All @@ -74,8 +90,9 @@ def test_parser_empty_numbers():
)

actual_data = process(rx.from_([
"f1,f2",
",",
]), [ops.map(parser)])
]), [parser])

assert len(actual_data) == 1
assert actual_data[0] == (None, None)
Expand All @@ -91,8 +108,9 @@ def test_parser_invalid_numbers():

with pytest.raises(ValueError):
process(rx.from_([
"f1,f2",
"as,ds",
]), [ops.map(parser)])
]), [parser])


def test_load():
Expand All @@ -105,6 +123,7 @@ def test_load():
)

actual_data = process(rx.from_([
"index,f1,f2",
"42,the,True",
"07,quick,False",
"08,,False",
Expand All @@ -116,6 +135,20 @@ def test_load():
assert actual_data[2] == (8, '', False)


def test_load_auto():
actual_data = process(rx.from_([
"index,f1,f2",
"42,the,True",
"07,quick,False",
"08,,False",
]), [csv.load()])

assert len(actual_data) == 3
assert actual_data[0] == ('42', 'the', 'True')
assert actual_data[1] == ('07', 'quick', 'False')
assert actual_data[2] == ('08', '', 'False')


def test_load_quoted():
parser = csv.create_line_parser(
dtype=[
Expand All @@ -125,6 +158,7 @@ def test_load_quoted():
)

actual_data = process(rx.from_([
'index,f1,f2',
'1,"the, quick"',
'2,"\\"brown fox\\""',
'3,"a\"$#ܟ<a;.b^F ^M^E^Aa^Bov^D^\"[^BƆm^A^Q^]#lx"',
Expand Down Expand Up @@ -154,6 +188,7 @@ def test_load_quoted_with_escapechar():
)

actual_data = process(rx.from_([
'index,f1,f2',
'1,"the, quick"',
'2,"^"brown fox^""',
'3,"a\"$#ܟ<a;.b^F ^M^E^Aa^Bov^D^^\"[^BƆm^A^Q^]#lx"',
Expand Down Expand Up @@ -190,6 +225,7 @@ def on_error(e):
error = e

source = rx.from_([
"index,f1,f2",
"42,the,True",
"07",
])
Expand Down

0 comments on commit d2dce85

Please sign in to comment.