# File Ingestion and Schema Validation

In [1]:
import dask.dataframe as dd
from pyarrow import csv, parquet
import yaml
import logging
import json
from pprint import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as PysparkFunc
import pyspark.sql.types as PysparkType
import re
import os
import shutil

def read_config_file(filepath):
    with open(filepath, 'r') as file:
        try:
            return yaml.safe_load(file)
        except yaml.YAMLError as error:
            print(error)
            
def remove_dir(dirpath):
    try:
        if os.path.exists(dirpath) and os.path.isdir(dirpath):
            shutil.rmtree(dirpath)
            print(f'Existing {dirpath} removed.')
    except OSError as error:
        print(error)
        
def normalize_inbound(name):
    name = name.strip().lower()
    name = re.sub(r'[\W_]*','',name)
    return name

## 1. Read file

### - Write `config.yml`:

**Note:** *filename:*`bikes-1` is a valid while *filename:*`bikes-2` is invalid.

In [2]:
%%writefile config.yml
inbound:
    folder: data/input/
    filename: bikes-1
    filetype: csv
    header: True
    delimiter: ','
    skip_rows: 0
outbound:
    folder: data/output/
    filename: bikes-1
    filetype: gzip
    header: True
    delimiter: '|'
columns:
    - bike_id: int
    - start_time: timestamp
    - end_time: timestamp   
    - start_station_name: string
    - end_station_name: string
column_name_splitter: '_'

Overwriting config.yml


In [3]:
config = read_config_file('config.yml')
pprint(config)

{'column_name_splitter': '_',
 'columns': [{'bike_id': 'int'},
             {'start_time': 'timestamp'},
             {'end_time': 'timestamp'},
             {'start_station_name': 'string'},
             {'end_station_name': 'string'}],
 'inbound': {'delimiter': ',',
             'filename': 'bikes-1',
             'filetype': 'csv',
             'folder': 'data/input/',
             'header': True,
             'skip_rows': 0},
 'outbound': {'delimiter': '|',
              'filename': 'bikes-1',
              'filetype': 'gzip',
              'folder': 'data/output/',
              'header': True}}


### - Convert `csv` file into `parquet` format

In [4]:
config['inbound']

{'folder': 'data/input/',
 'filename': 'bikes-1',
 'filetype': 'csv',
 'header': True,
 'delimiter': ',',
 'skip_rows': 0}

In [5]:
%%time
# dask dataframe
# create parquet file
inboundfile = config['inbound']['folder']+config['inbound']['filename']+'.'+config['inbound']['filetype']
incolnames = []
parquetfile = config['inbound']['folder']+config['inbound']['filename']+'.parquet'
# remove parquetfile if exists
remove_dir(parquetfile)
df = None
if config['inbound']['filetype'] == 'csv':
    df = dd.read_csv(inboundfile, 
                     delimiter=config['inbound']['delimiter'],
                     header='infer' if config['inbound']['header'] else None,
                     skiprows=config['inbound']['skip_rows'] if config['inbound']['skip_rows'] else None,
                     assume_missing=True)
    df.to_parquet(parquetfile) # time-consuming

Existing data/input/bikes-1.parquet removed.
Wall time: 45.1 s


### - Load `parquet` file into `pyspark` dataframe

In [6]:
spark = SparkSession.builder.appName('FileIngestion')\
                            .getOrCreate()

In [7]:
%%time
df = spark.read.format('parquet').options(header=config['inbound']['header'], 
                                          inferSchema='True').load(parquetfile) 

df.printSchema()
df.show(10)

root
 |-- rental_id: double (nullable = true)
 |-- duration: double (nullable = true)
 |-- bike_id: double (nullable = true)
 |-- end_rental_date_time: string (nullable = true)
 |-- end_station_id: double (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_rental_date_time: string (nullable = true)
 |-- start_station_id: double (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- __null_dask_index__: long (nullable = true)

+-----------+--------+-------+--------------------+--------------+--------------------+----------------------+----------------+--------------------+-------------------+
|  rental_id|duration|bike_id|end_rental_date_time|end_station_id|    end_station_name|start_rental_date_time|start_station_id|  start_station_name|__null_dask_index__|
+-----------+--------+-------+--------------------+--------------+--------------------+----------------------+----------------+--------------------+-------------------+
|7.2091291E7|   480.

## 2. Schema validation

In [8]:
%%time
validincolnames = []
validconfigcolnames = []
incolumns = list(set(df.dtypes))
incolnames = [name for name, dtype in incolumns]
configcolnames = [name for col in config['columns'] for name, dtype in col.items()]

for incol in incolumns:
    incolname = incol[0]
    incoldtype = incol[1]
    for configcol in config['columns']:
        for configcolname, configcoldtype in configcol.items():
            # validate incolname as per configcolname
            configcolname = configcolname.strip().lower()
            allparts = configcolname.split(config['column_name_splitter'])
            partsfound = 0
            for part in allparts:
                if part in normalize_inbound(incolname):
                    partsfound += 1
            if partsfound == len(allparts):
                validincolnames += [incolname]
                validconfigcolnames += [configcolname]
                print(f'{incolname} matches {configcolname}.')
                # validate incoldtype as per configcoldtype
                configcoldtype = configcoldtype.strip().lower()
                if configcoldtype in normalize_inbound(incoldtype):
                    print(f'{incolname} of dtype {incoldtype} matches dtype {configcoldtype}\n')
                else:
                    print(f'{incolname} of dtype {incoldtype} doesn\'t match dtype {configcoldtype}\n')

end_station_name matches end_station_name.
end_station_name of dtype string matches dtype string

start_rental_date_time matches start_time.
start_rental_date_time of dtype string doesn't match dtype timestamp

bike_id matches bike_id.
bike_id of dtype double doesn't match dtype int

start_station_name matches start_station_name.
start_station_name of dtype string matches dtype string

end_rental_date_time matches end_time.
end_rental_date_time of dtype string doesn't match dtype timestamp

Wall time: 3 ms


## 3. Validation result and decision

In [9]:
%%time
if len(validincolnames) == len(config['columns']):
    print('Schema validation passed and file accepted.')
    # rename INBOUND column names as per CONFIG column names
    colargs = []
    for i, validincolname in enumerate(validincolnames):
        colargs += [PysparkFunc.col(validincolname).alias(validconfigcolnames[i])]
    valid_df = df.select(*colargs)
    # add increasing index column
    valid_df = valid_df.withColumn('index', PysparkFunc.monotonically_increasing_id())
    # add inbound file path column
    valid_df = valid_df.withColumn('inbound_file', PysparkFunc.lit(inboundfile))
    # write outbound compressed file
    outboundfile = config['outbound']['folder']+config['outbound']['filename']
    # remove if exists
    remove_dir(outboundfile)
    print('Writing outbound compressed file ...',end=' ')
    valid_df.repartition(1).write.options(header=config['outbound']['header'],
                              delimiter=config['outbound']['delimiter'],
                              compression=config['outbound']['filetype']).csv(outboundfile)
    print('Done')
    # read ingested file
    print('Reading ingested file ...',end=' ')
    df = spark.read.format('csv').options(header=config['outbound']['header'],
                                          delimiter=config['outbound']['delimiter'],
                                          inferSchema='True').load(outboundfile)
    print('Done')
    # get rows count
    num_rows = df.count()
    # get columns count
    num_cols = len(df.columns)
    # get input file size
    inputfile = df.select(PysparkFunc.input_file_name()).first().__getitem__('input_file_name()').strip('file:///')
    inputfile = inputfile.replace('%20',' ').replace('/','\\').replace(os.getcwd()+'\\','')
    file_size = os.path.getsize(inputfile)
    # create summary file
    summary = {
        'filename':inputfile,
        'num_cols':num_cols,
        'num_rows':num_rows,
        'file_size_bytes':file_size
    }
    with open(config['outbound']['folder']+config['outbound']['filename']+'.yml','w') as file:
        yaml.dump(summary,file)
    pprint(summary)
else:
    print('Schema validation failed and file rejected.')
    print(f'Following CONFIG columns are not in INBOUND {list(set(configcolnames).difference(set(validconfigcolnames)))}')

if len(validincolnames) != len(incolnames):
    print(f'Following INBOUND columns are not in CONFIG {list(set(incolnames).difference(set(validincolnames)))}')

Schema validation passed and file accepted.
Writing outbound compressed file ... Done
Reading ingested file ... Done
{'file_size_bytes': 810570730,
 'filename': 'data\\output\\bikes-1\\part-00000-de65e3a6-0d58-4989-9884-19fdd99a13cd-c000.csv.gz',
 'num_cols': 7,
 'num_rows': 38215560}
Following INBOUND columns are not in CONFIG ['duration', '__null_dask_index__', 'end_station_id', 'rental_id', 'start_station_id']
Wall time: 4min 35s


In [10]:
spark.stop()