diff --git a/docs/changes.rst b/docs/changes.rst index c537a6cf..8a736611 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -1,6 +1,16 @@ Changes ======= +Version 1.4.0 +------------- + +* Added functions :func:`petl.io.avro.fromavro`, :func:`petl.io.avro.toavro`, + and :func:`petl.io.avro.appendavro` for reading and writing to + `Apache Avro ` files. Avro + generally is faster and safer than text formats like Json, XML or CSV. + By :user:`juarezr`, :issue:`490`. + + Version 1.3.0 ------------- diff --git a/docs/io.rst b/docs/io.rst index 2e225bfb..4db4888f 100644 --- a/docs/io.rst +++ b/docs/io.rst @@ -288,6 +288,52 @@ Text indexes (Whoosh) .. module:: petl.io.sources .. _io_helpers: +Avro files (fastavro) +---------------------------- + +.. note:: + + The following functions require `fastavro + `_ to be + installed, e.g.:: + + $ pip install fastavro + +.. autofunction:: petl.io.avro.fromavro +.. autofunction:: petl.io.avro.toavro +.. autofunction:: petl.io.avro.appendavro + +.. literalinclude:: ../petl/test/io/test_avro_schemas.py + :name: logical_schema + :language: python + :caption: Avro schema for logical types + :start-after: begin_logical_schema + :end-before: end_logical_schema + +.. literalinclude:: ../petl/test/io/test_avro_schemas.py + :name: nullable_schema + :language: python + :caption: Avro schema with nullable fields + :start-after: begin_nullable_schema + :end-before: end_nullable_schema + +.. literalinclude:: ../petl/test/io/test_avro_schemas.py + :name: array_schema + :language: python + :caption: Avro schema with array values in fields + :start-after: begin_array_schema + :end-before: end_array_schema + +.. literalinclude:: ../petl/test/io/test_avro_schemas.py + :name: complex_schema + :language: python + :caption: Example of recursive complex Avro schema + :start-after: begin_complex_schema + :end-before: end_complex_schema + +.. module:: petl.io.avro +.. _io_avro: + I/O helper classes ------------------ diff --git a/optional_requirements.txt b/optional_requirements.txt index 50cb7ea8..cef34460 100644 --- a/optional_requirements.txt +++ b/optional_requirements.txt @@ -13,3 +13,4 @@ SQLAlchemy==1.3.6 Whoosh==2.7.4 xlrd==1.2.0 xlwt==1.3.0 +fastavro>=0.23.4 diff --git a/petl/io/__init__.py b/petl/io/__init__.py index d554ae8f..6dc1c8da 100644 --- a/petl/io/__init__.py +++ b/petl/io/__init__.py @@ -35,3 +35,5 @@ searchtextindexpage, totextindex, appendtextindex from petl.io.bcolz import frombcolz, tobcolz, appendbcolz + +from petl.io.avro import fromavro, toavro, appendavro diff --git a/petl/io/avro.py b/petl/io/avro.py new file mode 100644 index 00000000..b6f7932b --- /dev/null +++ b/petl/io/avro.py @@ -0,0 +1,531 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, division, print_function + +import sys +import math +from collections import OrderedDict +from datetime import datetime, date, time +from decimal import Decimal + +from petl.compat import izip, izip_longest, text_type, PY3 +from petl.io.sources import read_source_from_arg, write_source_from_arg +from petl.transform.headers import skip, setheader +from petl.util.base import Table, dicts, fieldnames, iterpeek + +# region API + + +def fromavro(source, limit=None, skip=0, **avro_args): + """Extract a table from the records of a avro file. + + The `source` argument (string or file-like or fastavro.reader) can either + be the path of the file, a file-like input stream or a instance from + fastavro.reader. + + The `limit` and `skip` arguments can be used to limit the range of rows + to extract. + + The `sample` argument (int, optional) defines how many rows are inspected + for discovering the field types and building a schema for the avro file + when the `schema` argument is not passed. + + The rows fields read from file can have scalar values like int, string, + float, datetime, date and decimal but can also have compound types like + enum, :ref:`array `, map, union and record. + The fields types can also have recursive structures defined + in :ref:`complex schemas `. + + Also types with :ref:`logical types ` types are read and + translated to coresponding python types: long timestamp-millis and + long timestamp-micros: datetime.datetime, int date: datetime.date, + bytes decimal and fixed decimal: Decimal, int time-millis and + long time-micros: datetime.time. + + Example usage for reading files:: + + >>> # set up a Avro file to demonstrate with + ... + >>> schema1 = { + ... 'doc': 'Some people records.', + ... 'name': 'People', + ... 'namespace': 'test', + ... 'type': 'record', + ... 'fields': [ + ... {'name': 'name', 'type': 'string'}, + ... {'name': 'friends', 'type': 'int'}, + ... {'name': 'age', 'type': 'int'}, + ... ] + ... } + ... + >>> records1 = [ + ... {'name': 'Bob', 'friends': 42, 'age': 33}, + ... {'name': 'Jim', 'friends': 13, 'age': 69}, + ... {'name': 'Joe', 'friends': 86, 'age': 17}, + ... {'name': 'Ted', 'friends': 23, 'age': 51} + ... ] + ... + >>> import fastavro + >>> parsed_schema1 = fastavro.parse_schema(schema1) + >>> with open('example-file-to-read.avro', 'wb') as f1: + ... fastavro.writer(f1, parsed_schema1, records1) + ... + >>> # now demonstrate the use of fromavro() + >>> import petl as etl + >>> tbl1 = etl.fromavro('example-file-to-read.avro') + >>> tbl1 + +-------+---------+-----+ + | name | friends | age | + +=======+=========+=====+ + | 'Bob' | 42 | 33 | + +-------+---------+-----+ + | 'Jim' | 13 | 69 | + +-------+---------+-----+ + | 'Joe' | 86 | 17 | + +-------+---------+-----+ + | 'Ted' | 23 | 51 | + +-------+---------+-----+ + + .. versionadded:: 1.4.0 + + """ + + source2 = read_source_from_arg(source) + return AvroView(source=source2, + limit=limit, + skip=skip, + **avro_args) + + +def toavro(table, target, schema=None, sample=9, + codec='deflate', compression_level=None, **avro_args): + """ + Write the table into a new avro file according to schema passed. + + This method assume that each column has values with the same type + for all rows of the source `table`. + + `Apache Avro`_ is a data + serialization framework. It is used in data serialization (especially in + Hadoop ecosystem), for dataexchange for databases (Redshift) and RPC + protocols (like in Kafka). It has libraries to support many languages and + generally is faster and safer than text formats like Json, XML or CSV. + + The `target` argument is the file path for creating the avro file. + Note that if a file already exists at the given location, it will be + overwritten. + + The `schema` argument (dict) defines the rows field structure of the file. + Check fastavro `documentation`_ and Avro schema `reference`_ for details. + + The `sample` argument (int, optional) defines how many rows are inspected + for discovering the field types and building a schema for the avro file + when the `schema` argument is not passed. + + The `codec` argument (string, optional) sets the compression codec used to + shrink data in the file. It can be 'null', 'deflate' (default), 'bzip2' or + 'snappy', 'zstandard', 'lz4', 'xz' (if installed) + + The `compression_level` argument (int, optional) sets the level of + compression to use with the specified codec (if the codec supports it) + + Additionally there are support for passing extra options in the + argument `**avro_args` that are fowarded directly to fastavro. Check the + fastavro `documentation`_ for reference. + + The avro file format preserves type information, i.e., reading and writing + is round-trippable for tables with non-string data values. However the + conversion from Python value types to avro fields is not perfect. Use the + `schema` argument to define proper type to the conversion. + + The following avro types are supported by the schema: null, boolean, + string, int, long, float, double, bytes, fixed, enum, + :ref:`array `, map, union, record, and recursive types + defined in :ref:`complex schemas `. + + Also :ref:`logical types ` are supported and translated to + coresponding python types: long timestamp-millis, long timestamp-micros, int date, + bytes decimal, fixed decimal, string uuid, int time-millis, long time-micros. + + Example usage for writing files:: + + >>> # set up a Avro file to demonstrate with + >>> table2 = [['name', 'friends', 'age'], + ... ['Bob', 42, 33], + ... ['Jim', 13, 69], + ... ['Joe', 86, 17], + ... ['Ted', 23, 51]] + ... + >>> schema2 = { + ... 'doc': 'Some people records.', + ... 'name': 'People', + ... 'namespace': 'test', + ... 'type': 'record', + ... 'fields': [ + ... {'name': 'name', 'type': 'string'}, + ... {'name': 'friends', 'type': 'int'}, + ... {'name': 'age', 'type': 'int'}, + ... ] + ... } + ... + >>> # now demonstrate what writing with toavro() + >>> import petl as etl + >>> etl.toavro(table2, 'example-file-to-write.avro', schema=schema2) + ... + >>> # this was what was saved above + >>> tbl2 = etl.fromavro('example-file-to-write.avro') + >>> tbl2 + +-------+---------+-----+ + | name | friends | age | + +=======+=========+=====+ + | 'Bob' | 42 | 33 | + +-------+---------+-----+ + | 'Jim' | 13 | 69 | + +-------+---------+-----+ + | 'Joe' | 86 | 17 | + +-------+---------+-----+ + | 'Ted' | 23 | 51 | + +-------+---------+-----+ + + .. versionadded:: 1.4.0 + + .. _Apache Avro: https://avro.apache.org/docs/current/spec.html + .. _reference: https://avro.apache.org/docs/current/spec.html#schemas + .. _documentation : https://fastavro.readthedocs.io/en/latest/writer.html + + """ + target2 = write_source_from_arg(target) + _write_toavro(table, + target=target2, + mode='wb', + schema=schema, + sample=sample, + codec=codec, + compression_level=compression_level, + **avro_args) + + +def appendavro(table, target, schema=None, sample=9, **avro_args): + """ + Append rows into a avro existing avro file or create a new one. + + The `target` argument can be either an existing avro file or the file + path for creating new one. + + The `schema` argument is checked against the schema of the existing file. + So it must be the same schema as used by `toavro()` or the schema of the + existing file. + + The `sample` argument (int, optional) defines how many rows are inspected + for discovering the field types and building a schema for the avro file + when the `schema` argument is not passed. + + Additionally there are support for passing extra options in the + argument `**avro_args` that are fowarded directly to fastavro. Check the + fastavro documentation for reference. + + See :meth:`petl.io.avro.toavro` method for more information and examples. + + .. versionadded:: 1.4.0 + + """ + target2 = write_source_from_arg(target) + _write_toavro(table, + target=target2, + mode='a+b', + schema=schema, + sample=sample, + **avro_args) + +# endregion API + +# region Implementation + + +class AvroView(Table): + '''Read rows from avro file with their types and logical types''' + + def __init__(self, source, limit, skip, **avro_args): + self.source = source + self.limit = limit + self.skip = skip + self.avro_args = avro_args + self.avro_schema = None + + def get_avro_schema(self): + '''gets the schema stored in avro file header''' + return self.avro_schema + + def __iter__(self): + with self.source.open('rb') as source_file: + avro_reader = self._open_reader(source_file) + header = self._decode_schema(avro_reader) + yield header + for row in self._read_rows_from(avro_reader, header): + yield row + + def _open_reader(self, source_file): + '''This could raise a error when the file is corrupt or is not avro''' + # delay the import of fastavro for not breaking when unused + import fastavro + avro_reader = fastavro.reader(source_file, **self.avro_args) + return avro_reader + + def _decode_schema(self, avro_reader): + '''extract the header from schema stored in avro file header''' + self.avro_schema = avro_reader.writer_schema + if self.avro_schema is None: + return None, None + schema_fields = self.avro_schema['fields'] + header = tuple(col['name'] for col in schema_fields) + return header + + def _read_rows_from(self, avro_reader, header): + count = 0 + maximum = self.limit if self.limit is not None else sys.maxsize + for i, record in enumerate(avro_reader): + if i < self.skip: + continue + if count >= maximum: + break + count += 1 + row = self._map_row_from(header, record) + yield row + + def _map_row_from(self, header, record): + ''' + fastavro auto converts logical types defined in avro schema to + correspoding python types. E.g: + - avro type: long logicalType: timestamp-millis -> python datetime + - avro type: int logicalType: date -> python date + - avro type: bytes logicalType: decimal -> python Decimal + ''' + if header is None or PY3: + r = tuple(record.values()) + else: + # fastavro on python2 does not respect dict order + r = tuple(record.get(col) for col in header) + return r + + +def _write_toavro(table, target, mode, schema, sample, + codec='deflate', compression_level=None, **avro_args): + if table is None or len(table) <= 0: + return + # build a schema when not defined by user + if not schema: + schema, table2 = _build_schema_from_values(table, sample) + else: + table2 = _fix_missing_headers(table, schema) + + # fastavro expects a iterator of dicts + rows = dicts(table2) if PY3 else _ordered_dict_iterator(table2) + + with target.open(mode) as target_file: + # delay the import of fastavro for not breaking when unused + import fastavro + parsed_schema = fastavro.parse_schema(schema) + # this could raise a error when any value is not of supported tupe + fastavro.writer(fo=target_file, + schema=parsed_schema, + records=rows, + codec=codec, + codec_compression_level=compression_level, + **avro_args) + +# endregion Implementation + +# region Helpers + + +def _build_schema_from_values(table, sample): + # table2: try not advance iterators + samples, table2 = iterpeek(table, sample + 1) + props = fieldnames(samples) + peek = skip(samples, 1) + schema_fields = _build_schema_fields_from_values(peek, props) + schema_source = _build_schema_with(schema_fields) + return schema_source, table2 + + +def _build_schema_with(schema_fields): + schema = { + 'type': 'record', + 'name': 'output', + 'namespace': 'avro', + 'doc': 'generated by petl', + 'fields': schema_fields, + } + return schema + + +def _build_schema_fields_from_values(peek, props): + # store the previous for calculate max precision and max scale + previous = OrderedDict() + # set a default when value is None in the first row but allow override after + fill_missing = True + fields = OrderedDict() + # iterate on sample rows for dealing with columns with None values + for row in peek: + _update_field_defs_from(props, row, fields, previous, fill_missing) + fill_missing = False + + schema_fields = [item for item in fields.values()] + return schema_fields + + +def _update_field_defs_from(props, row, fields, previous, fill_missing): + for prop, val in izip_longest(props, row): + if prop is None: + break + dprev = previous.get(prop + '_prec') + fprev = previous.get(prop + '_prop') + fcurr = None + if isinstance(val, dict): + # get the fields from a recursive definition of record inside this field + tdef, dcurr, fcurr = _get_definition_from_record(prop, val, fprev, dprev, fill_missing) + else: + # get the field definition for building the schema + tdef, dcurr = _get_definition_from_type_of(prop, val, dprev) + + if tdef is not None: + fields[prop] = {'name': prop, 'type': ['null', tdef]} + elif fill_missing: + fields[prop] = {'name': prop, 'type': ['null', 'string']} + if dcurr is not None: + previous[prop + '_prec'] = dcurr + if fcurr is not None: + previous[prop + '_prop'] = fcurr + + +def _get_definition_from_type_of(prop, val, prev): + # TODO: get type for record, enum, map + tdef = None + curr = None + if isinstance(val, int): + tdef = 'long' + elif isinstance(val, float): + tdef = 'double' + elif isinstance(val, datetime): + tdef = {'type': 'long', 'logicalType': 'timestamp-millis'} + elif isinstance(val, time): + tdef = {'type': 'int', 'logicalType': 'time-millis'} + elif isinstance(val, date): + tdef = {'type': 'int', 'logicalType': 'date'} + elif isinstance(val, bool): + tdef = 'boolean' + elif isinstance(val, Decimal): + curr, precision, scale = _get_precision_from_decimal(curr, val, prev) + tdef = {'type': 'bytes', 'logicalType': 'decimal', + 'precision': precision, 'scale': scale, } + elif isinstance(val, bytes): + tdef = 'bytes' + elif isinstance(val, list): + tdef, curr = _get_definition_from_array(prop, val, prev) + elif val is not None: + tdef = 'string' + else: + return None, None + return tdef, curr + + +def _get_definition_from_array(prop, val, prev): + afield = None + for item in iter(val): + if item is None: + continue + field2, curr2 = _get_definition_from_type_of(prop, item, prev) + if field2 is not None: + afield = field2 + if curr2 is not None: + prev = curr2 + + bfield = 'string' if afield is None else afield + tdef = {'type': 'array', 'items': bfield} + return tdef, prev + + +def _get_definition_from_record(prop, val, fprev, dprev, fill_missing): + if fprev is None: + fprev = OrderedDict() + if dprev is None: + dprev = OrderedDict() + + props = list(val.keys()) + row = list(val.values()) + + _update_field_defs_from(props, row, fprev, dprev, fill_missing) + + schema_fields = [item for item in fprev.values()] + tdef = { + 'type': 'record', + 'name': prop + '_record', + 'namespace': 'avro', + 'fields': schema_fields, + } + return tdef, dprev, fprev + + +def _get_precision_from_decimal(curr, val, prev): + if val is None: + prec = scale = bytes_req = num = 0 + else: + prec, scale, bytes_req, num = precision_and_scale(val) + if prev is not None: + # get the greatests precision and scale of the sample + prec0, scale0 = prev.get('precision'), prev.get('scale') + prec, scale = max(prec, prec0), max(scale, scale0) + prec = max(prec, 8) + curr = {'precision': prec, 'scale': scale, } + return curr, prec, scale + + +def precision_and_scale(numeric_value): + sign, digits, exp = numeric_value.as_tuple() + number = 0 + for digit in digits: + number = (number * 10) + digit + # delta = exp + scale + delta = 1 + number = 10 ** delta * number + inumber = int(number) + + bits_req = inumber.bit_length() + 1 + bytes_req = (bits_req + 8) // 8 + if sign: + inumber = - inumber + prec = int(math.ceil(math.log10(abs(inumber)))) + scale = abs(exp) + return prec, scale, bytes_req, inumber + + +def _fix_missing_headers(table, schema): + '''add missing columns headers from schema''' + if schema is None or not 'fields' in schema: + return table + # table2: try not advance iterators + sample, table2 = iterpeek(table, 2) + cols = fieldnames(sample) + fields = schema.get('fields') + if len(cols) >= len(fields): + return table2 + header = [field.get('name') for field in fields] + table3 = setheader(table2, header) + return table3 + + +def _ordered_dict_iterator(table): + it = iter(table) + hdr = next(it) + flds = [text_type(f) for f in hdr] + for row in it: + items = list() + for i, f in enumerate(flds): + try: + v = row[i] + except IndexError: + v = None + items.append((f, v)) + yield OrderedDict(items) + + +# endregion diff --git a/petl/test/io/test_avro.py b/petl/test/io/test_avro.py new file mode 100644 index 00000000..79a55254 --- /dev/null +++ b/petl/test/io/test_avro.py @@ -0,0 +1,333 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import sys +import math + +from datetime import datetime, date, time +from decimal import Decimal +from tempfile import NamedTemporaryFile + +from nose.tools import eq_ + +from petl.compat import izip_longest, PY3 +from petl.transform.basics import cat +from petl.util.base import dicts, records +from petl.util.vis import look + +from petl.io.avro import fromavro, toavro, appendavro + +from petl.test.io.test_avro_schemas import schema0, schema1, schema2, \ + schema3, schema4, schema5, schema6 + +if PY3: + from datetime import timezone + +try: + import fastavro + # import fastavro depedencies + import pytz +except ImportError as e: + print('SKIP avro tests: %s' % e, file=sys.stderr) +else: + # region Test Cases + + def test_fromavro11(): + _read_from_mavro_file(table1, schema1) + + def test_fromavro22(): + _read_from_mavro_file(table2, schema2) + + def test_fromavro33(): + _read_from_mavro_file(table3, schema3) + + def test_toavro11(): + _write_to_avro_file(table1, schema1) + + def test_toavro22(): + _write_to_avro_file(table2, schema2) + + def test_toavro33(): + _write_to_avro_file(table3, schema3) + + def test_toavro10(): + _write_to_avro_file(table1, None) + + def test_toavro13(): + _write_to_avro_file(table01, schema0, table10) + + def test_toavro20(): + _write_to_avro_file(table2, None) + + def test_toavro30(): + _write_to_avro_file(table3, None) + + def test_toavro44(): + _write_to_avro_file(table4, schema4) + + def test_toavro55(): + _write_to_avro_file(table5, schema5) + + def test_toavro50(): + _write_to_avro_file(table5, None) + + def test_toavro70(): + _write_to_avro_file(table71, None) + + def test_toavro80(): + _write_to_avro_file(table8, None) + + def test_toavro90(): + _write_to_avro_file(table9, None) + + def test_toavro61(): + _write_to_avro_file(table61, schema6, print_tables=False) + + def test_toavro62(): + _write_to_avro_file(table62, schema6, print_tables=False) + + def test_toavro63(): + _write_to_avro_file(table63, schema6, print_tables=False) + + def test_toavro60(): + _write_to_avro_file(table60, schema6, print_tables=False) + + def test_appendavro11(): + _append_to_avro_file(table11, table12, schema1, table1) + + def test_appendavro22(): + _append_to_avro_file(table21, table22, schema2, table2) + + def test_appendavro10(): + _append_to_avro_file(table11, table12, schema1) + + # endregion + + # region Execution + + def _read_from_mavro_file(test_rows, test_schema, test_expect=None, print_tables=True): + _show__expect_rows(test_rows, print_tables) + test_filename = _create_avro_example(test_schema, test_rows) + test_actual = fromavro(test_filename) + test_expect2 = test_rows if test_expect is None else test_expect + _assert_rows_are_equals(test_expect2, test_actual, print_tables) + return test_filename + + def _write_to_avro_file(test_rows, test_schema, test_expect=None, print_tables=True): + _show__expect_rows(test_rows, print_tables) + test_filename = _get_tempfile_path() + print("Writing avro file:", test_filename) + toavro(test_rows, test_filename, schema=test_schema) + + test_actual = fromavro(test_filename) + test_expect2 = test_rows if test_expect is None else test_expect + _assert_rows_are_equals(test_expect2, test_actual, print_tables) + + def _append_to_avro_file(test_rows1, test_rows2, test_schema, test_expect=None, print_tables=True): + _show__expect_rows(test_rows1, print_tables) + _show__expect_rows(test_rows2, print_tables) + test_filename = _get_tempfile_path() + toavro(test_rows1, test_filename, schema=test_schema) + appendavro(test_rows2, test_filename, schema=test_schema) + + test_actual = fromavro(test_filename) + if test_expect is not None: + test_expect2 = test_expect + else: + test_expect2 = cat(test_rows1, test_rows2) + _assert_rows_are_equals(test_expect2, test_actual, print_tables) + + # endregion + + # region Helpers + + def _assert_rows_are_equals(test_expect, test_actual, print_tables=True): + if print_tables: + _show__rows_from('Actual:', test_actual) + avro_schema = test_actual.get_avro_schema() + print('\nSchema:\n', avro_schema) + _eq_rows(test_expect, test_actual) + _eq_rows(test_expect, test_actual) # verify can iterate twice + + def _show__expect_rows(test_rows, print_tables=True, limit=0): + if print_tables: + _show__rows_from('\nExpected:', test_rows, limit) + + def _show__rows_from(label, test_rows, limit=0): + print(label) + print(look(test_rows, limit=limit)) + + def _eq_rows(expect, actual, cast=None): + '''test when values are equals for eacfh row and column''' + ie = iter(expect) + ia = iter(actual) + for re, ra in izip_longest(ie, ia, fillvalue=None): + if cast: + ra = cast(ra) + for ve, va in izip_longest(re, ra, fillvalue=None): + if isinstance(ve, list): + for je, ja in izip_longest(ve, va, fillvalue=None): + _eq2(je, ja, re, ra) + elif not isinstance(ve, dict): + _eq2(ve, va, re, ra) + + def _eq2(ve, va, re, ra): + try: + eq_(ve, va) + except AssertionError as ea: + print('\nrow: ', re, ' != ', ra) + print('val: ', ve, ' != ', va) + raise ea + + def _decs(float_value, rounding=12): + return Decimal(str(round(float_value, rounding))) + + + def _utc(year, month, day, hour=0, minute=0, second=0, microsecond=0): + u = datetime(year, month, day, hour, minute, second, microsecond) + if PY3: + return u.replace(tzinfo=timezone.utc) + return u.replace(tzinfo=pytz.utc) + + def _get_tempfile_path(delete_on_close=False): + f = NamedTemporaryFile(delete=delete_on_close, mode='r') + test_filename = f.name + f.close() + return test_filename + + def _create_avro_example(test_schema, test_table): + parsed_schema = fastavro.parse_schema(test_schema) + rows = dicts(test_table) + with NamedTemporaryFile(delete=False, mode='wb') as fo: + fastavro.writer(fo, parsed_schema, rows) + return fo.name + + # endregion + + # region Mockup data + + header1 = [u'name', u'friends', u'age'] + + rows1 = [[u'Bob', 42, 33], + [u'Jim', 13, 69], + [u'Joe', 86, 17], + [u'Ted', 23, 51]] + + table1 = [header1] + rows1 + + table11 = [header1] + rows1[0:2] + table12 = [header1] + rows1[2:] + + table01 = [header1[0:2]] + [item[0:2] for item in rows1] + table10 = [header1] + [item[0:2] + [None] for item in rows1] + + table2 = [[u'name', u'age', u'birthday', u'death', u'insurance'], + [u'pete', 17, date(2012, 10, 11), + _utc(2018, 10, 14, 15, 16, 17, 18000), Decimal('1.100')], + [u'mike', 27, date(2002, 11, 12), + _utc(2015, 12, 13, 14, 15, 16, 17000), Decimal('1.010')], + [u'zack', 37, date(1992, 12, 13), + _utc(2010, 11, 12, 13, 14, 15, 16000), Decimal('123.456')], + [u'gene', 47, date(1982, 12, 25), + _utc(2009, 10, 11, 12, 13, 14, 15000), Decimal('-1.010')]] + + table21 = table2[0:3] + table22 = [table2[0]] + table2[3:] + + table3 = [[u'name', u'age', u'birthday', u'death'], + [u'pete', 17, date(2012, 10, 11), + _utc(2018, 10, 14, 15, 16, 17, 18000)], + [u'mike', 27, date(2002, 11, 12), + _utc(2015, 12, 13, 14, 15, 16, 17000)], + [u'zack', 37, date(1992, 12, 13), + _utc(2010, 11, 12, 13, 14, 15, 16000)], + [u'gene', 47, date(1982, 12, 25), + _utc(2009, 10, 11, 12, 13, 14, 15000)]] + + table4 = [[u'name', u'friends', u'age', u'birthday'], + [u'Bob', 42, 33, date(2012, 10, 11)], + [u'Jim', 13, 69, None], + [None, 86, 17, date(1992, 12, 13)], + [u'Ted', 23, None, date(1982, 12, 25)]] + + table5 = [[u'palette', u'colors'], + [u'red', [u'red', u'salmon', u'crimson', u'firebrick', u'coral']], + [u'pink', [u'pink', u'rose']], + [u'purple', [u'purple', u'violet', u'fuchsia', + u'magenta', u'indigo', u'orchid', u'lavender']], + [u'green', [u'green', u'lime', u'seagreen', + u'grass', u'olive', u'forest', u'teal']], + [u'blue', [u'blue', u'cyan', u'aqua', u'aquamarine', + u'turquoise', u'royal', u'sky', u'navy']], + [u'gold', [u'gold', u'yellow', u'khaki', + u'mocassin', u'papayawhip', u'lemonchiffon']], + [u'black', None]] + + header6 = [u'array_string', u'array_record', u'nulable_date', + u'multi_union_time', u'array_bytes_decimal', u'array_fixed_decimal'] + + rows61 = [[u'a', u'b', u'c'], + [{u'f1': u'1', u'f2': Decimal('654.321')}], + date(2020, 1, 10), + _utc(2020, 12, 19, 18, 17, 16, 15000), + [Decimal('123.456')], + [Decimal('987.654')], ] + + rows62 = [[u'a', u'b', u'c'], + [{u'f1': u'1', u'f2': Decimal('654.321')}], + date(2020, 1, 10), + _utc(2020, 12, 19, 18, 17, 16, 15000), + [Decimal('123.456'), Decimal('456.789')], + [Decimal('987.654'), Decimal('321.123'), Decimal('456.654')]] + + table61 = [header6, rows61] + + table62 = [header6, rows62] + + table63 = [header6, rows61, rows62] + + table60 = [header6, [rows61[0], rows61[1], ]] + + header7 = [u'col', u'sqrt_pow_ij'] + + rows70 = [[j, [round(math.sqrt(math.pow(i*j, i+j)), 9) + for i in range(1, j+1)]] for j in range(1, 7)] + + rows71 = [[j, [Decimal(str(round(math.sqrt(math.pow(i*j, i+j)), 9))) + for i in range(1, j+1)]] for j in range(1, 7)] + + table70 = [header7] + rows70 + table71 = [header7] + rows71 + + header8 = [u'number', u'properties'] + + rows8 = [[_decs(x), { + u'atan': _decs(math.atan(x)), + u'sin': math.sin(x), + u'cos': math.cos(x), + u'tan': math.tan(x), + u'square': x*x, + u'sqrt': math.sqrt(x), + u'log': math.log(x), + u'log10': math.log10(x), + u'exp': math.log10(x), + u'power_x': x**x, + u'power_minus_x': x**-x, + }] for x in range(1, 12)] + + table8 = [header8] + rows8 + + rows9 = [[1, { u'name': u'Bob', u'age': 20 }], + [2, { u'name': u'Ted', u'budget': _decs(54321.25) }], + [2, { u'name': u'Jim', u'color': u'blue' }], + [2, { u'name': u'Joe', u'alias': u'terminator' }]] + + table9 = [header8] + rows9 + + # endregion + + # region testing + + # endregion + +# end of tests # diff --git a/petl/test/io/test_avro_schemas.py b/petl/test/io/test_avro_schemas.py new file mode 100644 index 00000000..699d1a83 --- /dev/null +++ b/petl/test/io/test_avro_schemas.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- + +# begin_nullable_schema + +schema0 = { + 'doc': 'Nullable records.', + 'name': 'anyone', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'name', 'type': ['null', 'string']}, + {'name': 'friends', 'type': ['null', 'int']}, + {'name': 'age', 'type': ['null', 'int']}, + ], +} + +# end_nullable_schema + +# begin_basic_schema + +schema1 = { + 'doc': 'Some people records.', + 'name': 'People', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'name', 'type': 'string'}, + {'name': 'friends', 'type': 'int'}, + {'name': 'age', 'type': 'int'}, + ], +} + +# end_basic_schema + +# begin_logicalType_schema + +schema2 = { + 'doc': 'Some random people.', + 'name': 'Crowd', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'name', 'type': 'string'}, + {'name': 'age', 'type': 'int'}, + {'name': 'birthday', 'type': { + 'type': 'int', + 'logicalType': 'date' + }}, + {'name': 'death', 'type': { + 'type': 'long', + 'logicalType': 'timestamp-millis' + }}, + {'name': 'insurance', 'type': { + 'type': 'bytes', + 'logicalType': 'decimal', + 'precision': 12, + 'scale': 3 + }}, + ], +} + +# end_logicalType_schema + +# begin_micros_schema + +schema3 = { + 'doc': 'Some random people.', + 'name': 'Crowd', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'name', 'type': 'string'}, + {'name': 'age', 'type': 'int'}, + {'name': 'birthday', 'type': { + 'type': 'int', + 'logicalType': 'date' + }}, + {'name': 'death', 'type': { + 'type': 'long', + 'logicalType': 'timestamp-micros' + }}, + ], +} + +# end_micros_schema + +# begin_mixed_schema + +schema4 = { + 'doc': 'Some people records.', + 'name': 'People', + 'namespace': 'test', + 'type': 'record', + 'fields': [ + {'name': 'name', 'type': ['null', 'string']}, + {'name': 'friends', 'type': ['null', 'long']}, + {'name': 'age', 'type': ['null', 'int']}, + {'name': 'birthday', + 'type': ['null', {'type': 'int', 'logicalType': 'date'}] + } + ], +} + +# end_mixed_schema + +# begin_array_schema + +schema5 = { + 'name': 'palettes', + 'namespace': 'color', + 'type': 'record', + 'fields': [ + {'name': 'palette', 'type': 'string'}, + {'name': 'colors', + 'type': ['null', {'type': 'array', 'items': 'string'}] + } + ], +} + +# end_array_schema + +# begin_complex_schema + +schema6 = { + 'fields': [ + { + 'name': 'array_string', + 'type': {'type': 'array', 'items': 'string'} + }, + { + 'name': 'array_record', + 'type': {'type': 'array', 'items': { + 'type': 'record', + 'name': 'some_record', + 'fields': [ + { + 'name': 'f1', + 'type': 'string' + }, + { + 'name': 'f2', + 'type': {'type': 'bytes', + 'logicalType': 'decimal', + 'precision': 18, + 'scale': 6, } + } + ] + } + } + }, + { + 'name': 'nulable_date', + 'type': ['null', {'type': 'int', + 'logicalType': 'date'}] + }, + { + 'name': 'multi_union_time', + 'type': ['null', 'string', {'type': 'long', + 'logicalType': 'timestamp-micros'}] + }, + { + 'name': 'array_bytes_decimal', + 'type': ['null', {'type': 'array', + 'items': {'type': 'bytes', + 'logicalType': 'decimal', + 'precision': 18, + 'scale': 6, } + }] + }, + { + 'name': 'array_fixed_decimal', + 'type': ['null', {'type': 'array', + 'items': {'type': 'fixed', + 'name': 'FixedDecimal', + 'size': 8, + 'logicalType': 'decimal', + 'precision': 18, + 'scale': 6, } + }] + }, + ], + 'namespace': 'namespace', + 'name': 'name', + 'type': 'record' +} + +# end_complex_schema + +# begin_logical_schema + +logical_schema = { + 'fields': [ + { + 'name': 'date', + 'type': {'type': 'int', 'logicalType': 'date'} + }, + { + 'name': 'datetime', + 'type': {'type': 'long', 'logicalType': 'timestamp-millis'} + }, + { + 'name': 'datetime2', + 'type': {'type': 'long', 'logicalType': 'timestamp-micros'} + }, + { + 'name': 'uuid', + 'type': {'type': 'string', 'logicalType': 'uuid'} + }, + { + 'name': 'time', + 'type': {'type': 'int', 'logicalType': 'time-millis'} + }, + { + 'name': 'time2', + 'type': {'type': 'long', 'logicalType': 'time-micros'} + }, + { + 'name': 'Decimal', + 'type': + { + 'type': 'bytes', 'logicalType': 'decimal', + 'precision': 15, 'scale': 6 + } + }, + { + 'name': 'Decimal2', + 'type': + { + 'type': 'fixed', 'size': 8, + 'logicalType': 'decimal', 'precision': 15, 'scale': 3 + } + } + ], + 'namespace': 'namespace', + 'name': 'name', + 'type': 'record' +} + +# end_logical_schema