In [1]:
!ls

Areas_in_blore.xlsx
Data_Quality_Engineer_Code_Test__(1).pdf
Prueba Inclusion.ipynb
data_file_20210527182730 copy.csv
data_file_20210527182730.csv
data_file_20210528182554.csv
data_file_20210528182844.csv
helper_functions.py


In [2]:
#!pip install validators
#!pip install phonenumbers
#!pip install openpyxl

In [3]:
import pandas as pd
import os
import glob
import re
import numpy as np

from helper_functions import is_zero_file, validate_url, empty_string, validate_phone
from helper_functions import strip_character, search_character, unique_list

# installed libraries
import openpyxl

In [4]:
def check_files(path):
    """Checks if there is any file in the path...
       
       Input Arguments: path - path where the files arrive.  
    """
    print('\n','***Looking for new files...','\n')
    
    file_paths = []
    
    # get all files matching extension from directory
    for root, dirs, files in os.walk(path):
        files = glob.glob(os.path.join(root,'*.csv'))
        for f in files :
            file_paths.append(os.path.abspath(f))

    # get total number of files found
    num_files = len(file_paths)
    print('{} files found in {}'.format(num_files, path))
    print('\n'.join(map(str, file_paths)))
    
    return file_paths


def is_file_loaded(filepath):
    """Checks if a file path is in the log_file.txt
       
       Input Arguments: filepath - path of a file 
    """        
    # creates the file if it doesn't exist
    f = open("log_file.txt", "a+")
    f.seek(0)
    lines = f.read().split('\n')
    f.close()
    
    processed_files = pd.Series(lines)
    res = sum(processed_files.isin([filepath]))

    return res


def log_processed_files(filePathList):
    """Saves the paths of the loaded files in a txt file.
       
       Input Arguments: filePathList - paths of the loaded files 
    """
    f = open("log_file.txt", "a")
    
    for path in filePathList:
        f.write(path+'\n')
        
    f.close()


def validate_files(filepath):
    """Checks if a file is empty or if the file was previously loaded
       I'm assuming that each file has a different name, so I'll not 
       load a file if it has the same name as a file already loaded
       
       Input Arguments: filepath - file to be checked or validated 
    """

    load_file = True
    
    if is_zero_file(filepath):
        print('Empty file found!!! ignoring file...','\n')
        load_file = False
    
    if is_file_loaded(filepath):
        print('File already loaded!!! ignoring file...','\n')
        load_file = False
        
    return load_file
        
    
def load_files(filepaths):
    """Loads the files found in the check_files function
       
       Input Arguments: filepaths - paths of the existing files.  
    """
    print('\n','***Loading files...','\n')
    
    cols = ['url', 'address', 'name', 'rate', 'votes', 'phone', 'location', 
            'rest_type', 'dish_liked', 'cuisines', 'reviews_list']
    df = pd.DataFrame()
    n = 0
    processed_files = []
    
    for path in filepaths:
        print(f'Loading file: {path}','\n')
        
        if validate_files(path):
            df_temp = pd.read_csv(path, usecols = cols)
            df = pd.concat([df, df_temp])
            print(f'{len(df_temp)} rows loaded!!!','\n')
            n = n + 1
            processed_files.append(path)
            
    # resets index to start from 0
    df = df.reset_index(drop = True)
    # replaces nan for '' to handle null as empty strings
    df = df.replace(np.nan, '', regex = True)
            
    # save processed files in a metadata file.
    log_processed_files(processed_files)
    
    print(f'***{n} files were loaded','\n')
    
    return df
    

def cleaning_cols(df):
    """Identifies and Cleans the errors in each column of the dataframe
       
       Input Arguments: df - data frame with data loaded from the files
    """
    print('\n','***Cleaning Data...','\n')
    re_str = r'[^a-zA-Z \\!@#$%&*_+-=|\:"<>,./()[\]{}\']'
    re_rate = r"[^0-5+\.0-9+/0-5+|NEW]"
    
    print('* Checking Url Column','\n')
    temp = df['url'].apply(validate_url)
    url_ind_list = temp[temp == False].index.to_list()
    print(f'Rows with errors in Url: {len(url_ind_list)}','\n')
    
    print('* Checking Address Column','\n')
    temp2 = df['address'].apply(search_character, reg = re_str)
    address_ind_list = temp2[temp2.notna()].index.to_list()
    print(f'Rows with errors in Address: {len(address_ind_list)}','\n')

    print('-> Cleaning Address...','\n')
    df['address'] = df['address'].apply(strip_character, reg = re_str)
    
    print('* Checking Name Column','\n')
    temp3 = df['name'].apply(search_character, reg = re_str)
    name_ind_list = temp3[temp3.notna()].index.to_list()
    name_nan_rows = df[df['name'] == ''].index.to_list()
    
    print(f'Rows with null values in name: {len(name_nan_rows)}','\n')
    print(f'Rows with errors in Name: {len(name_ind_list)}','\n')
    
    print('-> Cleaning Name...','\n')
    df['name'] = df['name'].apply(strip_character, reg = re_str)
    
    print('* Checking Rate Column','\n')
    temp4 = df['rate'].apply(search_character, reg = re_rate)
    rate_ind_list = temp4[temp4.notna()].index.to_list()
    print(f'Rows with errors in Rate: {len(rate_ind_list)}','\n')
    
    print('-> Cleaning Rate...','\n')
    df['rate'] = df['rate'].apply(strip_character, reg = re_rate)
    
    print('* Checking Phone Column','\n')
    df[['phone1','phone2']] = df['phone'].str.split('[\\r|\\n]{1,}', n=1, expand = True)
    # replacing + and whitespaces
    df['phone1'] = df['phone1'].str.replace('[\s\+]','', regex = True)
    df['phone2'] = df['phone2'].str.replace('[\s\+]','', regex = True)
    print('* Phone column was slipt into columns: Phone1 & Phone2','\n')
    
    # checking valid phone numbers
    temp5 = df['phone1'].apply(validate_phone)
    phone_ind_list = temp5[temp5 == False].index.to_list()
    temp6 = df['phone2'].apply(validate_phone)
    phone_ind_list2 = temp6[temp6 == False].index.to_list()
    
    # checking null values
    temp7 = df.apply(lambda x: empty_string(x['phone1']) & empty_string(x['phone2']), axis = 1)
    phone_nan_rows = temp7[temp7 == True].index.to_list()
    # transform phone from string to numeric
    df['phone1'] = pd.to_numeric(df['phone1'].str.replace('na|n','',regex = True))
    df['phone2'] = pd.to_numeric(df['phone2'].str.replace('na|n','',regex = True))
    # dropping phone column
    df = df.drop('phone', axis=1)
    
    print(f'Rows with null values in Phone: {len(phone_nan_rows)}','\n')
    print(f'Rows with errors in Phone1: {len(phone_ind_list)}','\n')
    print(f'Rows with errors in Phone2: {len(phone_ind_list2)}','\n')
    
    print('* Checking Location Column','\n')
    # splitting location into 2 columns, because some records have a street included in the area. 
    df[['loc1','loc2']] = df['location'].str.split(',', n=1, expand = True)
    # extracting the area from the split columns
    df['area'] = df.apply(lambda x: x['loc1'] if empty_string(x['loc2']) else x['loc2'].replace(' ',''), axis = 1)
    
    df_locations =  pd.read_excel('Areas_in_Blore.xlsx')
    
    # looking up the area in the Areas_in_Blore.xlsx file
    temp8 = df['area'].isin(df_locations['Area'])
    loc_ind_list = temp8[temp8 == False].index.to_list()
    
    # checking null values
    temp9 = df['location'].apply(empty_string)
    loc_nan_rows = temp9[temp9 == True].index.to_list()
    
    # dropping columns
    df = df.drop(['loc1','loc2','area'], axis=1)
    
    print(f'Rows with null values in Location: {len(loc_nan_rows)}','\n')
    print(f'Rows not found in Areas_in_Blore.xlsx: {len(loc_ind_list)}','\n')
    
    bad_row_list = url_ind_list + address_ind_list + name_ind_list + name_nan_rows \
                   + rate_ind_list + phone_ind_list + phone_ind_list2 + phone_nan_rows \
                   + loc_ind_list + loc_nan_rows
    
    with open("metadata.bad", "w") as f:
        f.write("Type_of_issue,"+'Row_num_list'+'\n')
        f.write("Invalid url,"+' '.join([str(item) for item in url_ind_list])+'\n')
        f.write("Address with junk characters,"+' '.join([str(item) for item in address_ind_list])+'\n')
        f.write("Name with junk characters,"+' '.join([str(item) for item in name_ind_list])+'\n')
        f.write("Name with null values,"+' '.join([str(item) for item in name_nan_rows])+'\n')
        f.write("Rate with spaces or - char,"+' '.join([str(item) for item in rate_ind_list])+'\n')
        f.write("Invalid Phone1,"+' '.join([str(item) for item in phone_ind_list])+'\n')
        f.write("Invalid Phone2,"+' '.join([str(item) for item in phone_ind_list2])+'\n')
        f.write("Phones with null values,"+' '.join([str(item) for item in phone_nan_rows])+'\n')
        f.write("Incorrect location,"+' '.join([str(item) for item in loc_ind_list])+'\n')
        f.write("Location with null values,"+' '.join([str(item) for item in loc_nan_rows])+'\n')
        
    return df, bad_row_list


def create_outfile(df, bad_list):
    """Capture all the clean records and save them in log.out file
       
       Input Arguments: bad_list - bad records index list
    """
    print('\n','***Saving clean records in log.out file...','\n')
    lst = unique_list(bad_list)
    
    if len(lst) > 0:
        df_clean = df[~df.index.isin(lst)]
        df_clean.to_csv("log.out")
        
    print(f'Clean Rows loaded: {len(df_clean)}','\n')
        
        
def create_badfile(df, bad_list):
    """Capture all the bad records and save them in log.bad file
       
       Input Arguments: bad_list - bad records index list
    """
    print('\n','***Saving bad records in log.bad file...','\n')
    lst = unique_list(bad_list)
    
    if len(lst) > 0:
        df_bad = df[df.index.isin(lst)]
        df_bad.to_csv("log.bad")
        
    print(f'Bad Rows loaded: {len(df_bad)}','\n')

In [5]:
def main():
    
    print('****************************************')
    print('Pipeline Started!')
    print('****************************************')
    
    # checks if there are files to load
    filepaths = check_files(path = './')
    
    # loads existing files data into the df Data Frame
    if(len(filepaths) > 0):
        df = load_files(filepaths)
        # validates each column of df
        df, bad_row_list = cleaning_cols(df)
        # saves clean records
        create_outfile(df, bad_row_list)
        # saves bad records
        create_badfile(df, bad_row_list)
        
    print('****************************************')
    print('Pipeline Ended!')
    print('****************************************')
    
if __name__ == "__main__":
    main()

****************************************
Pipeline Started!
****************************************

 ***Looking for new files... 

4 files found in ./
/Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210528182554.csv
/Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210527182730 copy.csv
/Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210528182844.csv
/Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210527182730.csv

 ***Loading files... 

Loading file: /Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210528182554.csv 

7378 rows loaded!!! 

Loading file: /Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210527182730 copy.csv 

Empty file found!!! ignoring file... 

Loading file: /Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210528182844.csv 

7379 rows loaded!!! 

Loading file: /Users/jorge.rivas/Desktop/Prueba Tecnica Inclusion/data_file_20210527182730.csv 

7379 rows loaded!!!

In [6]:
!ls

Areas_in_blore.xlsx
Data_Quality_Engineer_Code_Test__(1).pdf
Prueba Inclusion.ipynb
[34m__pycache__[m[m
data_file_20210527182730 copy.csv
data_file_20210527182730.csv
data_file_20210528182554.csv
data_file_20210528182844.csv
helper_functions.py
log.bad
log.out
log_file.txt
metadata.bad
