Skip to content

Commit

Permalink
Merge b45ab29 into 9ae3fc4
Browse files Browse the repository at this point in the history
  • Loading branch information
juarezr committed Jul 30, 2020
2 parents 9ae3fc4 + b45ab29 commit fc420f2
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 31 deletions.
90 changes: 64 additions & 26 deletions petl/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
from datetime import datetime, date, time
from decimal import Decimal

from petl.compat import izip, izip_longest, text_type, PY3
from petl.compat import izip_longest, text_type, string_types, PY3
from petl.io.sources import read_source_from_arg, write_source_from_arg
from petl.transform.headers import skip, setheader
from petl.util.base import Table, dicts, fieldnames, iterpeek
from petl.util.base import Table, dicts, fieldnames, iterpeek, wrap

# region API


def fromavro(source, limit=None, skip=0, **avro_args):
def fromavro(source, limit=None, skips=0, **avro_args):
"""Extract a table from the records of a avro file.
The `source` argument (string or file-like or fastavro.reader) can either
Expand Down Expand Up @@ -92,7 +92,7 @@ def fromavro(source, limit=None, skip=0, **avro_args):
source2 = read_source_from_arg(source)
return AvroView(source=source2,
limit=limit,
skip=skip,
skips=skips,
**avro_args)


Expand Down Expand Up @@ -193,9 +193,8 @@ def toavro(table, target, schema=None, sample=9,
.. _documentation : https://fastavro.readthedocs.io/en/latest/writer.html
"""
target2 = write_source_from_arg(target)
_write_toavro(table,
target=target2,
target=target,
mode='wb',
schema=schema,
sample=sample,
Expand Down Expand Up @@ -228,9 +227,8 @@ def appendavro(table, target, schema=None, sample=9, **avro_args):
.. versionadded:: 1.4.0
"""
target2 = write_source_from_arg(target, mode='ab')
_write_toavro(table,
target=target2,
target=target,
mode='a+b',
schema=schema,
sample=sample,
Expand All @@ -244,10 +242,10 @@ def appendavro(table, target, schema=None, sample=9, **avro_args):
class AvroView(Table):
'''Read rows from avro file with their types and logical types'''

def __init__(self, source, limit, skip, **avro_args):
def __init__(self, source, limit, skips, **avro_args):
self.source = source
self.limit = limit
self.skip = skip
self.skip = skips
self.avro_args = avro_args
self.avro_schema = None

Expand Down Expand Up @@ -316,21 +314,31 @@ def _write_toavro(table, target, mode, schema, sample,
schema, table2 = _build_schema_from_values(table, sample)
else:
table2 = _fix_missing_headers(table, schema)

# fastavro expects a iterator of dicts
rows = dicts(table2) if PY3 else _ordered_dict_iterator(table2)

with target.open(mode) as target_file:
target2 = write_source_from_arg(target, mode=mode)
with target2.open(mode) as target_file:
# delay the import of fastavro for not breaking when unused
import fastavro
parsed_schema = fastavro.parse_schema(schema)
# this could raise a error when any value is not of supported tupe
fastavro.writer(fo=target_file,
from fastavro import parse_schema
from fastavro.write import Writer

parsed_schema = parse_schema(schema)
writer = Writer(fo=target_file,
schema=parsed_schema,
records=rows,
codec=codec,
codec_compression_level=compression_level,
compression_level=compression_level,
**avro_args)
num = 1
for record in rows:
try:
writer.write(record)
num = num + 1
except ValueError as err:
details = _get_error_details(target, num, err, record, schema)
_raise_error(details)
# finish writing
writer.flush()

# endregion Implementation

Expand Down Expand Up @@ -449,7 +457,6 @@ def _get_definition_from_record(prop, val, fprev, dprev, fill_missing):
fprev = OrderedDict()
if dprev is None:
dprev = OrderedDict()

props = list(val.keys())
row = list(val.values())

Expand All @@ -467,9 +474,9 @@ def _get_definition_from_record(prop, val, fprev, dprev, fill_missing):

def _get_precision_from_decimal(curr, val, prev):
if val is None:
prec = scale = bytes_req = num = 0
prec = scale = 0
else:
prec, scale, bytes_req, num = precision_and_scale(val)
prec, scale, _, _ = precision_and_scale(val)
if prev is not None:
# get the greatests precision and scale of the sample
prec0, scale0 = prev.get('precision'), prev.get('scale')
Expand Down Expand Up @@ -500,19 +507,50 @@ def precision_and_scale(numeric_value):

def _fix_missing_headers(table, schema):
'''add missing columns headers from schema'''
if schema is None or not 'fields' in schema:
if schema is None or 'fields' not in schema:
return table
# table2: try not advance iterators
sample, table2 = iterpeek(table, 2)
cols = fieldnames(sample)
fields = schema.get('fields')
if len(cols) >= len(fields):
headers = _get_schema_header_names(schema)
if len(cols) >= len(headers):
return table2
header = [field.get('name') for field in fields]
table3 = setheader(table2, header)
table3 = setheader(table2, headers)
return table3


def _get_error_details(target, num, err, record, schema):
'''show last row when failed writing for throubleshooting'''
headers = _get_schema_header_names(schema)
if isinstance(record, dict):
table = [headers, list(record.values())]
else:
table = [headers, record]
example = wrap(table).look()
dest = " output: %s" % target if isinstance(target, string_types) else ''
printed = "failed writing on row #%d: %s\n%s\n schema: %s\n%s"
details = printed % (num, err, dest, schema, example)
return details


def _get_schema_header_names(schema):
fields = schema.get('fields')
if fields is None:
return []
header = [field.get('name') for field in fields]
return header


def _raise_error(details):
if PY3:
raise ValueError(details).with_traceback(sys.exc_info()[2])
else:
import traceback
stacktrace = traceback.format_exc(sys.exc_info())
err = "%s%s" % (stacktrace, details)
raise ValueError(err)


def _ordered_dict_iterator(table):
it = iter(table)
hdr = next(it)
Expand Down
23 changes: 18 additions & 5 deletions petl/test/io/test_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import sys
import math

from datetime import datetime, date, time
from datetime import datetime, date
from decimal import Decimal
from tempfile import NamedTemporaryFile

from nose.tools import eq_

from petl.compat import izip_longest, PY3
from petl.transform.basics import cat
from petl.util.base import dicts, records
from petl.util.base import dicts
from petl.util.vis import look

from petl.io.avro import fromavro, toavro, appendavro
Expand Down Expand Up @@ -101,6 +101,17 @@ def test_appendavro22():
def test_appendavro10():
_append_to_avro_file(table11, table12, schema1)

def test_toavro_troubleshooting():
wrong_schema = dict(schema0)
schema_fields = wrong_schema['fields']
for field in schema_fields:
field['type'] = ['null', 'string']
try:
_write_temp_avro_file(table1, wrong_schema)
except ValueError:
return
assert False, 'Failed schema conversion'

# endregion

# region Execution
Expand All @@ -113,12 +124,15 @@ def _read_from_mavro_file(test_rows, test_schema, test_expect=None, print_tables
_assert_rows_are_equals(test_expect2, test_actual, print_tables)
return test_filename

def _write_to_avro_file(test_rows, test_schema, test_expect=None, print_tables=True):
_show__expect_rows(test_rows, print_tables)
def _write_temp_avro_file(test_rows, test_schema):
test_filename = _get_tempfile_path()
print("Writing avro file:", test_filename)
toavro(test_rows, test_filename, schema=test_schema)
return test_filename

def _write_to_avro_file(test_rows, test_schema, test_expect=None, print_tables=True):
_show__expect_rows(test_rows, print_tables)
test_filename = _write_temp_avro_file(test_rows, test_schema)
test_actual = fromavro(test_filename)
test_expect2 = test_rows if test_expect is None else test_expect
_assert_rows_are_equals(test_expect2, test_actual, print_tables)
Expand Down Expand Up @@ -182,7 +196,6 @@ def _eq2(ve, va, re, ra):
def _decs(float_value, rounding=12):
return Decimal(str(round(float_value, rounding)))


def _utc(year, month, day, hour=0, minute=0, second=0, microsecond=0):
u = datetime(year, month, day, hour, minute, second, microsecond)
if PY3:
Expand Down

0 comments on commit fc420f2

Please sign in to comment.