In [None]:
import sys
!{sys.executable} -m pip install fastavro

In [None]:
import sys
for x in ['../../']:
    if x in sys.path:
        sys.path.remove(x)
try:
    import bi_etl
except ImportError:
    sys.path.append('../../')    
    import bi_etl
print("Imported")

In [None]:
import random
from datetime import datetime

from bi_etl.components.row.row import Row
from bi_etl.components.row.row_iteration_header import RowIterationHeader


avro_schema = {
    'doc': 'Test data cols A-Z.',
    'name': 'TestData',
    'namespace': 'test',
    'type': 'record',
    'fields': [        
    ],
}

columns = [f'column_{chr(n)}' for n in range(ord('a'), ord('z') + 1)]
iteration_header = RowIterationHeader(logical_name='Test', columns_in_order=columns)
source_rows = list()
for row_num in range(10000):
    data_list = list()
    for col_num, col in enumerate(columns):
        if col_num == 0:            
            if row_num == 0:
                avro_schema['fields'].append({'name': col, 'type': 'long'})                
            data_list.append(row_num)
        elif col_num % 3 == 0:
            if row_num == 0:
                avro_schema['fields'].append({'name': col, 'type': ['int','null']})
            if random.random() < 0.9:
                data_list.append(col_num)
            else:
                data_list.append(None)            
        elif col_num % 3 == 1:
            if row_num == 0:
                avro_schema['fields'].append({'name': col, 'type': ['string','null']})
            if random.random() < 0.9:
                data_list.append(f'Value {col_num} for {col} on row {row_num}')
            else:
                data_list.append(None)                        
        else:
            if row_num == 0:
                avro_schema['fields'].append({'name': col, 
                                              'type': ['float','null']}
                                            )            
            if random.random() < 0.9:
                data_list.append(datetime.now())
            else:
                data_list.append(None)                        
    row = Row(iteration_header, data=data_list)
    source_rows.append(row)
source_rows[:2]
avro_schema

In [None]:
import fastavro

parsed_schema = fastavro.parse_schema(avro_schema)
parsed_schema

In [None]:
def row_generator():    
    for row in source_rows:
        #row_list = row.values().copy()
        row_dict = row.as_dict
        for col in range(2, 26, 3):            
            col_name = columns[col]
            value = row_dict[col_name]
            # print(f'col {col} = {value}')
            if value is not None:
                try:
                    row_dict[col_name] = value.timestamp()
                except AttributeError:
                    raise ValueError(f'{value} in col {col} on row {row}')
        #print(row_dict)
        yield row_dict
        

with open('test.avro', 'wb') as out:
    fastavro.writer(out, parsed_schema, row_generator())

In [None]:
from io import BytesIO

In [None]:
#%%timeit
str_out = BytesIO()
fastavro.writer(str_out, parsed_schema, row_generator())

In [None]:
str_out.seek(0)
for record in fastavro.reader(str_out):    
    for col in range(2, 26, 3):            
        col_name = columns[col]
        value = record[col_name]
        # print(f'col {col} = {value}')
        if value is not None:
            try:
                record[col_name] = datetime.fromtimestamp(value)
            except Exception as e:
                raise ValueError(f'{e} on {value} in col {col} {col_name} on row {record}')
    if record['column_a'] == 1:
        print(record)
    

In [None]:
%%timeit
for row in row_generator():
    str_out = BytesIO()
    fastavro.schemaless_writer(str_out, parsed_schema, row)
    
    del row
    str_out.seek(0)
    
    #Mock msg receiver
    record = fastavro.schemaless_reader(str_out, parsed_schema)
    for col in range(2, 26, 3):            
        col_name = columns[col]
        value = record[col_name]
        # print(f'col {col} = {value}')
        if value is not None:
            try:
                record[col_name] = datetime.fromtimestamp(value)
            except Exception as e:
                raise ValueError(f'{e} on {value} in col {col} {col_name} on row {record}')
    

In [None]:
import json

In [None]:
%%timeit
for row in row_generator():    
    str_out = json.dumps(row)
    
    del row
    
    #Mock msg receiver
    record = json.loads(str_out)
    for col in range(2, 26, 3):            
        col_name = columns[col]
        value = record[col_name]
        # print(f'col {col} = {value}')
        if value is not None:
            try:
                record[col_name] = datetime.fromtimestamp(value)
            except Exception as e:
                raise ValueError(f'{e} on {value} in col {col} {col_name} on row {record}')
#     if record['column_a'] == 2:
#         print(record)

In [None]:
import csv

In [None]:
%%timeit
str_out = StringIO()
writer = csv.DictWriter(str_out, columns)
reader = csv.DictReader(str_out, columns)
for row in row_generator():     
    str_out.seek(0)
    writer.writerow(row)
    
    del row
    str_out.seek(0)
    
    #Mock msg receiver
    record = reader.__next__()
    record = dict(record)
    #print(record)
    
    for col_num, col_name in enumerate(columns):
        value = record[col_name]
        if value == '' or value is None:
            record[col_name] = None
            #print(f'{col_name}, {value}, {record[col_name]} {type(record[col_name])}, None')
        elif col_num == 0:            
            record[col_name] = int(value)
            #print(f'{col_name}, {value}, {record[col_name]} {type(record[col_name])}, pk')
        elif col_num % 3 == 0:
            record[col_name] = int(value)
            #print(f'{col_name}, {value}, {record[col_name]} {type(record[col_name])}, int')
        elif col_num % 3 == 1:
            pass            
            #print(f'{col_name}, {value}, {record[col_name]} {type(record[col_name])}, str')
        else:
            record[col_name] = datetime.fromtimestamp(float(value))                
            #print(f'{col_name}, {value}, {record[col_name]} {type(record[col_name])}, dt')
#     if record['column_a'] == 2:
#         print(record)
#         break