# File Ingestion and Schema Validation

The utilities python file that includes all of the validation functions and necessary imports.

In [1]:
%%writefile utilityfile.py

# necessary imports

import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re

# file reading

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            
# function to replace characters
            
def replace(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

# standardizes columns and validates dataframe YAML against validation YAML

def col_header_valid(df, table_config):
    '''
    replace whitespaces in  the columns and 
    standardize the column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_', regex = True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replace(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()
    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    
    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("Column name and column length validation passed.")
        return 1
    else:
        print("Column name and column length validation failed.")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following file columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utilityfile.py


### Write Validation YAML

In order for our validation function to work as intended, we'll need to create a baseline for the columns of incoming files to be tested against. This is where our validation YAML file comes in.<br><br>

We can write this file with the following code, specifying the schema and columns we expect:

In [2]:
%%writefile validation_yaml.csv

file_type: csv
dataset_name: validation
file_name: validation_yaml
table_name: example
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - date
    - game_size
    - match_id
    - match_mode
    - party_size
    - player_assists
    - player_dbno
    - player_dist_ride
    - player_dist_walk
    - player_dmg
    - player_kills
    - player_name
    - player_survive_time
    - team_id
    - team_placement

Writing validation_yaml.csv


### Write Data YAMLs

Now we'll create a validation YAML file to test ingested files against. The YAML file will contain the columns within our dataset, which is what we'll expect everytime a file is ingested within the pipeline.

In [3]:
%%writefile pubgdata.yaml

file_type: yaml
dataset_name: pubg_agg_match
file_name: pubg_agg_match_stats_0
table_name: ast
inbound_delimiter: ','
outbound_delimiter: '|'
skip_leading_rows: 1
columns: 
    - date
    - game_size
    - match_id
    - match_mode
    - party_size
    - player_assists
    - player_dbno
    - player_dist_ride
    - player_dist_walk
    - player_dmg
    - player_kills
    - player_name
    - player_survive_time
    - team_id
    - team_placement

Writing pubgdata.yaml


In [4]:
# read configuration (validation) file

import utilityfile as util

config_data = util.read_config_file("validation_yaml.csv")

In [5]:
config_data['file_type']

'csv'

In [6]:
config_data['file_name']

'validation_yaml'

In [7]:
# inspecting data of validation file

config_data

{'file_type': 'csv',
 'dataset_name': 'validation',
 'file_name': 'validation_yaml',
 'table_name': 'example',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['date',
  'game_size',
  'match_id',
  'match_mode',
  'party_size',
  'player_assists',
  'player_dbno',
  'player_dist_ride',
  'player_dist_walk',
  'player_dmg',
  'player_kills',
  'player_name',
  'player_survive_time',
  'team_id',
  'team_placement']}

Next comes the actual ingestion of a file. In order to more closely simulate a data pipeline, we will ingest a file over 4GB in size.<br><br>

When we begin to consider a large, constant influx of data in our pipeline, we begin to understand the importance of processing times with regards to file reading. Though most files read by conventional pandas processes suffice speed requirements for files under 1GB, in a data pipeline we could be expecting several Gigabytes of data to be loaded at once.<br><br>

This is where libraries like Dask come in handy. Dask is a parallel processing library within Python that allows workloads to be distributed across multiple cores (and even multiple pay-as-you-go machines should the scaling of your processes require it). This drastically reduces the processing times of large files such as the one below.

In [8]:
%%time

import pandas as pd

df_sample = pd.read_csv("/kaggle/input/pubg-match-deaths/aggregate/agg_match_stats_0.csv", delimiter = ',')
df_sample.head()

CPU times: user 22.6 s, sys: 4.12 s, total: 26.7 s
Wall time: 56.5 s


Unnamed: 0,date,game_size,match_id,match_mode,party_size,player_assists,player_dbno,player_dist_ride,player_dist_walk,player_dmg,player_kills,player_name,player_survive_time,team_id,team_placement
0,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,1,2870.724,1784.84778,117,1,SnuffIes,1106.32,4,18
1,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,1,2938.40723,1756.07971,127,1,Ozon3r,1106.315,4,18
2,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,0,0.0,224.157562,67,0,bovize,235.558,5,33
3,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,0,0.0,92.93515,0,0,sbahn87,197.553,5,33
4,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,0,2619.07739,2510.447,175,2,GeminiZZZ,1537.495,14,11


While running the previous code on my laptop, the CPU total time was 40.5 seconds for a single 4GB file. The wall time clocked in at 48.1 seconds.

In [9]:
%%time

import dask.dataframe as dd

ddf = dd.read_csv("/kaggle/input/pubg-match-deaths/aggregate/agg_match_stats_0.csv", delimiter = ',')
ddf.head()

CPU times: user 1.32 s, sys: 536 ms, total: 1.86 s
Wall time: 1.99 s


Unnamed: 0,date,game_size,match_id,match_mode,party_size,player_assists,player_dbno,player_dist_ride,player_dist_walk,player_dmg,player_kills,player_name,player_survive_time,team_id,team_placement
0,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,1,2870.724,1784.84778,117,1,SnuffIes,1106.32,4,18
1,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,1,2938.40723,1756.07971,127,1,Ozon3r,1106.315,4,18
2,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,0,0.0,224.157562,67,0,bovize,235.558,5,33
3,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,0,0.0,92.93515,0,0,sbahn87,197.553,5,33
4,2017-11-26T20:59:40+0000,37,2U4GBNA0YmnNZYkzjkfgN4ev-hXSrak_BSey_YEG6kIuDG...,tpp,2,0,0,2619.07739,2510.447,175,2,GeminiZZZ,1537.495,14,11


Performing the same task with Dask's parallel processing had a CPU time of 2 seconds flat; a 90% reduction in processing time. The wall time was also significantly lower at 3.26 seconds.<br><br>

We can perform the data ingestion and subsequent validation even faster if we utilize Dask's "lazy" dataframes. These dataframes return the datatypes and column names without loading any actual data; that is, until we perform a computation or request to view it.

In [10]:
%%time

test_ddf = dd.read_csv("/kaggle/input/pubg-match-deaths/aggregate/agg_match_stats_0.csv", delimiter = ',')
test_ddf

CPU times: user 13.8 ms, sys: 1.95 ms, total: 15.7 ms
Wall time: 4.88 s


Unnamed: 0_level_0,date,game_size,match_id,match_mode,party_size,player_assists,player_dbno,player_dist_ride,player_dist_walk,player_dmg,player_kills,player_name,player_survive_time,team_id,team_placement
npartitions=33,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
,object,int64,object,object,int64,int64,int64,float64,float64,int64,int64,object,float64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Performing the code above on my laptop returned a CPU total time of 0 nanoseconds and a wall time of 17.8 milliseconds.

In [11]:
test_ddf.columns

Index(['date', 'game_size', 'match_id', 'match_mode', 'party_size',
       'player_assists', 'player_dbno', 'player_dist_ride', 'player_dist_walk',
       'player_dmg', 'player_kills', 'player_name', 'player_survive_time',
       'team_id', 'team_placement'],
      dtype='object')

Dask dataframes support most pandas functions as well. Above we check the column names just as we might in pandas.

In an actual pipeline, we might also want to make the reading of the files dynamic. We can do so by creating several variables for the parts of a file directory:
- directory/
- config_data['file_name']
- f'.{file_type}'

Then we would simply add all three parts together within a variable, and perform dd.read_csv(final_variable) to get the desired result. Due to the limitations of local directories within Kaggle, we will have to omit this process in practice.

In [12]:
# validate the header of the file

util.col_header_valid(ddf,config_data)

Column name and column length validation passed.


1

We can see that our validation file correctly validated the file with the expected columns. Let's see how it handles files with some irrelevant data.

In [13]:
import pandas as pd

invalid = pd.read_csv("/kaggle/input/validation-test-invalid-data/test_data (1).csv")
invalid

Unnamed: 0,city,age,Country
0,Delhi,34,India
1,Lima,30,Peru
2,Istanbul,16,Turkey
3,Riyadh,33,Saudi Arabia


In [14]:
util.col_header_valid(invalid, config_data)

Column name and column length validation failed.
Following file columns are not in the YAML file ['age', 'city', 'country']
Following YAML columns are not in the file uploaded ['player_dist_walk', 'match_id', 'date', 'player_assists', 'match_mode', 'player_dist_ride', 'team_id', 'player_name', 'player_dbno', 'player_dmg', 'player_survive_time', 'team_placement', 'party_size', 'game_size', 'player_kills']


0

As expected, the function returns with a failure. Below that message, we see the differences in columns between the two YAML files.

Knowing that the validation functions work as intended, we can write an if statement to direct files to separate locations depending on whether they passed validation. For instance, we can send files that passed to a directory for further cleaning and manipulation, and direct files that failed to their own separate folder for reinspection.

In [15]:
if util.col_header_valid(ddf,config_data) == 0:
    print("Validation failed.")
    # write code to reject the file
else:
    print("Column validation passed.")
    # write code to perform further action
    # in the pipeline

Column name and column length validation passed.
Column validation passed.


We might also want to write the ingested file's columns to a text file. We can do that with the following:

In [16]:
import csv

columns = ddf.columns
with open("output.txt", "w", newline="") as out:
    writer = csv.writer(out, delimiter="|")
    writer.writerow(columns)

To finish off, we'll summarize the shape of the ingested file with two simple commands.

In [17]:
# file summary

print('There are',ddf.shape[0].compute(),'rows in the PUBG Player Stats 00 dataset.')
print('There are',len(ddf.columns),'columns in the PUBG Player Stats 00 dataset.')

There are 13849287 rows in the PUBG Player Stats 00 dataset.
There are 15 columns in the PUBG Player Stats 00 dataset.
