# File Ingestion and Schema Validation Project

In [14]:
# create a utility file
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [15]:
%%writefile file.yaml
file_type: csv
dataset_name: ucars
file_name: used_cars_data
table_name: cars
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - vin
    - back_legroom
    - bed
    - bed_height
    - bed_length
    - body_type
    - cabin
    - city
    - city_fuel_economy
    - combine_fuel_economy
    - daysonmarket
    - dealer_zip
    - description
    - engine_cylinders
    - engine_displacement
    - engine_type
    - exterior_color
    - fleet
    - frame_damaged
    - franchise_dealer
    - franchise_make
    - front_legroom
    - fuel_tank_volume
    - fuel_type
    - has_accidents
    - height
    - highway_fuel_economy
    - horsepower
    - interior_color
    - isCab
    - is_certified
    - is_cpo
    - is_new
    - is_oemcpo
    - latitude
    - length
    - listed_date
    - listing_color
    - listing_id
    - longitude
    - main_picture_url
    - major_options
    - make_name
    - maximum_seating
    - mileage
    - model_name
    - owner_count
    - power
    - price
    - salvage
    - savings_amount
    - seller_rating
    - sp_id
    - sp_name
    - theft_title
    - torque
    - transmission
    - transmission_display
    - trimId
    - trim_name
    - vehicle_damage_category
    - wheel_system
    - wheel_system_display
    - wheelbase
    - width
    - year

Overwriting file.yaml


In [16]:
import testutility as util
config_data = util.read_config_file('file.yaml')

In [17]:
config_data

{'file_type': 'csv',
 'dataset_name': 'ucars',
 'file_name': 'used_cars_data',
 'table_name': 'cars',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['vin',
  'back_legroom',
  'bed',
  'bed_height',
  'bed_length',
  'body_type',
  'cabin',
  'city',
  'city_fuel_economy',
  'combine_fuel_economy',
  'daysonmarket',
  'dealer_zip',
  'description',
  'engine_cylinders',
  'engine_displacement',
  'engine_type',
  'exterior_color',
  'fleet',
  'frame_damaged',
  'franchise_dealer',
  'franchise_make',
  'front_legroom',
  'fuel_tank_volume',
  'fuel_type',
  'has_accidents',
  'height',
  'highway_fuel_economy',
  'horsepower',
  'interior_color',
  'isCab',
  'is_certified',
  'is_cpo',
  'is_new',
  'is_oemcpo',
  'latitude',
  'length',
  'listed_date',
  'listing_color',
  'listing_id',
  'longitude',
  'main_picture_url',
  'major_options',
  'make_name',
  'maximum_seating',
  'mileage',
  'model_name',
  'owner_count',
  'power',
  '

In [20]:
# read the file using config file
import pandas as pd
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv(source_file,config_data['inbound_delimiter'])
  df = pd.read_csv(source_file,config_data['inbound_delimiter'])


Unnamed: 0,vin,back_legroom,bed,bed_height,bed_length,body_type,cabin,city,city_fuel_economy,combine_fuel_economy,...,transmission,transmission_display,trimId,trim_name,vehicle_damage_category,wheel_system,wheel_system_display,wheelbase,width,year
0,ZACNJABB5KPJ92081,35.1 in,,,,SUV / Crossover,,Bayamon,,,...,A,9-Speed Automatic Overdrive,t83804,Latitude FWD,,FWD,Front-Wheel Drive,101.2 in,79.6 in,2019
1,SALCJ2FX1LH858117,38.1 in,,,,SUV / Crossover,,San Juan,,,...,A,9-Speed Automatic Overdrive,t86759,S AWD,,AWD,All-Wheel Drive,107.9 in,85.6 in,2020
2,JF1VA2M67G9829723,35.4 in,,,,Sedan,,Guaynabo,17.0,,...,M,6-Speed Manual,t58994,Base,,AWD,All-Wheel Drive,104.3 in,78.9 in,2016
3,SALRR2RV0L2433391,37.6 in,,,,SUV / Crossover,,San Juan,,,...,A,8-Speed Automatic Overdrive,t86074,V6 HSE AWD,,AWD,All-Wheel Drive,115 in,87.4 in,2020
4,SALCJ2FXXLH862327,38.1 in,,,,SUV / Crossover,,San Juan,,,...,A,9-Speed Automatic Overdrive,t86759,S AWD,,AWD,All-Wheel Drive,107.9 in,85.6 in,2020


In [31]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3000040 entries, 0 to 3000039
Data columns (total 66 columns):
 #   Column                   Dtype  
---  ------                   -----  
 0   vin                      object 
 1   back_legroom             object 
 2   bed                      object 
 3   bed_height               object 
 4   bed_length               object 
 5   body_type                object 
 6   cabin                    object 
 7   city                     object 
 8   city_fuel_economy        float64
 9   combine_fuel_economy     float64
 10  daysonmarket             int64  
 11  dealer_zip               object 
 12  description              object 
 13  engine_cylinders         object 
 14  engine_displacement      float64
 15  engine_type              object 
 16  exterior_color           object 
 17  fleet                    object 
 18  frame_damaged            object 
 19  franchise_dealer         bool   
 20  franchise_make           object 
 21  front_le

In [22]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [32]:
# write the dataframe to gzipped CSV file
df.to_csv('used_cars_data.csv.gz', index=False, compression='gzip', sep=config_data['outbound_delimiter'])