Project 1 - File Format Converter Handout
Overview


The objective of this project is to develop solutions based on the design provided. In this case, 
the source data was obtained in the form of CSV files from a MySQL DB.To improve the efficiency of our
data engineering pipelines, we need to convert these CSV files into JSON files, since JSON is better to 
use in downstream applications than CSV files. The scope of this project involves converting CSV files into JSON files.

In [9]:
!pip install pandas



In [10]:
# function to get column names
def get_column_names(schemas,schemaName,sortKey=None):
    column_list = list(map(lambda order: order['column_name'],list(schemas.get(schemaName))))
    # return column_list
    if(sortKey != None):
        sorted_list = sorted(list(schemas.get(schemaName)),key= lambda order: order.get(sortKey))
        return  list(map(lambda order: order['column_name'],list(sorted_list)))
    return column_list

In [11]:
import glob;

# returns list of recursive csv file path 
# path = 'data/retail_db/*/*'
def get_input_file_list(path):
    return glob.glob(path,recursive=True)


In [12]:
import os

# path f'data/retail_db_json/{collection_name}'
def ensureDirectoryExists(path):
    os.makedirs(path,exist_ok=True)


In [13]:
import pandas as pd 

def read_csv(filePath,columns):
    return pd.read_csv(filePath,names=columns)


In [14]:
import json

def get_schema():
    return json.load(open('./data/retail_db/schemas.json'))

In [15]:
def process_files(filesList):
    for file_name in filesList:
        # capture collection name from file path 
        collection_name = file_name.split('/')[-2]

        # enure directory exists for output
        ensureDirectoryExists(f'data/retail_db_json/{collection_name}')

        print(f'Processing file : {file_name}')

        # csv headers
        column_headers = get_column_names(get_schema(),collection_name)
        
        # read csv
        collection = read_csv(file_name,column_headers)

        # create json file for csv collection 
        
        collection.to_json(f'data/retail_db_json/{collection_name}/part-00000')   


In [16]:
path = 'data/retail_db/*/*'
fileLists = get_input_file_list(path)

process_files(fileLists)

Processing file : data/retail_db/customers/part-00000
Processing file : data/retail_db/products/part-00000
Processing file : data/retail_db/departments/part-00000
Processing file : data/retail_db/order_items/part-00000
Processing file : data/retail_db/orders/part-00000
Processing file : data/retail_db/categories/part-00000
