In [None]:
import glob
import os
import json
import re
import pandas as pd
def get_column_names(schemas, ds_name, sorting_key='column_position'):
    column_details = schemas.get(ds_name, [])
    columns = sorted(column_details, key=lambda col: col.get(sorting_key, 0)) 
    return [col['column_name'] for col in columns]
def read_csv(file, schemas):
    file_path_list = re.split('[/\\\]', file)
    ds_name = file_path_list[-2]
    file_name = file_path_list[-1]
    columns = get_column_names(schemas, ds_name)
    df = pd.read_csv(file, names=columns)
    return df
def to_json(df, tgt_base_dir, ds_name, file_name):
    json_file_path = f'{tgt_base_dir}/{ds_name}/{file_name}'
    os.makedirs(f'{tgt_base_dir}/{ds_name}', exist_ok=True)
    df.to_json(
        json_file_path,
        orient='records',
        lines=True
    )

def file_converter(src_base_dir,tgt_base_dir,ds_name):
    schemas = json.load(open(f'{src_base_dir}/schemas.json'))
    files = glob.glob(f'{src_base_dir}/{ds_name}/part-*')

    for file in files:
        print(f'Processing {file}')
        print(f'done {files}')
        df = read_csv(file, schemas)
        file_name = re.split('[/\\\]', file)[-1]
        to_json(df, tgt_base_dir, ds_name, file_name)
def process_files(ds_names=None):
    src_base_dir = '/Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db'
    tgt_base_dir = '/Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db_json'
    schemas = json.load(open(f'{src_base_dir}/schemas.json'))
    if not ds_names:
        ds_names = schemas.keys()
    for ds_name in ds_names:
        print(f'Processing {ds_name}')
        file_converter(src_base_dir,tgt_base_dir,ds_name)

In [16]:
process_files(['orders', 'order_items'])

Processing orders
Processing /Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db/orders/part-00000
done ['/Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db/orders/part-00000']
Processing order_items
Processing /Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db/order_items/part-00000
done ['/Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db/order_items/part-00000']


In [20]:
schemas = json.load(open('/Users/macintosh/Desktop/data_engineering_Esentials_using/data/retail_db/schemas.json'))
schemas

{'departments': [{'column_name': 'department_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'department_name',
   'data_type': 'string',
   'column_position': 2}],
 'categories': [{'column_name': 'category_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'category_department_id',
   'data_type': 'integer',
   'column_position': 2},
  {'column_name': 'category_name',
   'data_type': 'string',
   'column_position': 3}],
 'orders': [{'column_name': 'order_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'order_date', 'data_type': 'string', 'column_position': 2},
  {'column_name': 'order_customer_id',
   'data_type': 'timestamp',
   'column_position': 3},
  {'column_name': 'order_status',
   'data_type': 'string',
   'column_position': 4}],
 'products': [{'column_name': 'product_id',
   'data_type': 'integer',
   'column_position': 1},
  {'column_name': 'product_cateogry_id',
   'data_type': 'integer',
   'c

In [22]:
schemas.keys()

dict_keys(['departments', 'categories', 'orders', 'products', 'customers', 'order_items'])