In [6]:
"""
Basic read and write
"""

from fastavro import writer, reader, parse_schema

schema = {
    "doc": "A weather reading.",
    "name": "Weather",
    "namespace": "test",
    "type": "record",
    "fields": [
        {"name": "station", "type": "string"},
        {"name": "time", "type": "long"},
        {"name": "temp", "type": "int"},
    ],
}
parsed_schema = parse_schema(schema)

# 'records' can be an iterable (including generator)
records = [
    {"station": "011990-99999", "temp": 0, "time": 1433269388},
    {"station": "011990-99999", "temp": 22, "time": 1433270389},
    {"station": "011990-99999", "temp": -11, "time": 1433273379},
    {"station": "012650-99999", "temp": 111, "time": 1433275478},
]

# Writing
with open("weather.avro", "wb") as out:
    writer(out, parsed_schema, records)

# Reading
with open("weather.avro", "rb") as fo:
    for record in reader(fo):
        print(record)

{'station': '011990-99999', 'time': 1433269388, 'temp': 0}
{'station': '011990-99999', 'time': 1433270389, 'temp': 22}
{'station': '011990-99999', 'time': 1433273379, 'temp': -11}
{'station': '012650-99999', 'time': 1433275478, 'temp': 111}


In [31]:
"""
Reader and writer schema
"""

from io import BytesIO
import fastavro

ws = {
    "name": "ReaderWriterSchema",
    "type": "record",
    "namespace": "com",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "value", "type": "string"},
    ],
}

rs = {
    "name": "ReaderWriterSchema",
    "type": "record",
    "namespace": "com",
    "fields": [
        {"name": "eid", "aliases": ["id"], "type": "int"},
        {"name": "reader_new_name", "aliases": ["name"], "type": "string"},
    ],
}

data = [
    {"id": 1, "name": "John", "value": "Value1"},
    {"id": 2, "name": "Alice", "value": "Value2"},
    {"id": 3, "name": "Bob", "value": "Value3"},
]


# Write the data to the buffer using the writer schema and then read it back using the reader schema
bdata = BytesIO()

for record in data:
    bdata.seek(0)
    try:
        fastavro.schemaless_writer(bdata, ws, record=record, strict=True)
    except Exception as e:
        print(e)
    else:
        decoded_json = fastavro.schemaless_reader(
            BytesIO(bdata.getvalue()), reader_schema=rs, writer_schema=ws
        )
        print(decoded_json)

{'eid': 1, 'reader_new_name': 'John'}
{'eid': 2, 'reader_new_name': 'Alice'}
{'eid': 3, 'reader_new_name': 'Bob'}


In [5]:
"""
Custom logical type
"""

from io import BytesIO

import fastavro


def encode_ctv_cm_m(data: str, *args):
    return data


# convert cm to m
def decode_ctv_cm_m(data: str, *args):
    return str(float(data) / 100) + " m"


fastavro.write.LOGICAL_WRITERS["string-cvt_cm_m"] = encode_ctv_cm_m
fastavro.read.LOGICAL_READERS["string-cvt_cm_m"] = decode_ctv_cm_m

schema = {
    "type": "record",
    "name": "root",
    "fields": [
        {
            "name": "self_defined_type",
            "type": ["null", {"type": "string", "logicalType": "cvt_cm_m"}],
            "description": "when reading, convert cm to m",
        },
    ],
}

valid_data = {"self_defined_type": "123"}

bdata = BytesIO()
bdata.seek(0)
fastavro.schemaless_writer(bdata, schema, valid_data, strict_allow_default=True)
decoded_json = fastavro.schemaless_reader(
    BytesIO(bdata.getvalue()), reader_schema=schema, writer_schema=schema
)
print(decoded_json)

{'self_defined_type': '1.23 m'}


In [10]:
"""
Validation
"""

from io import BytesIO
import fastavro

schema = {
    "type": "record",
    "name": "root",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
    ],
}

valid_data = {"name": "John", "age": 30}
invalid_data = {"name": "John", "age": "30"}


def validate(data):
    try:
        bdata = BytesIO()
        fastavro.schemaless_writer(bdata, schema, data, strict=True)
    except Exception as e:
        print("Data is invalid: ", e)
    else:
        decoded_json = fastavro.schemaless_reader(
            BytesIO(bdata.getvalue()), reader_schema=schema, writer_schema=schema
        )
        print("Data is valid, decoded as: ", decoded_json)


validate(valid_data)
validate(invalid_data)

Data is valid, decoded as:  {'name': 'John', 'age': 30}
Data is invalid:  an integer is required on field age
