# Data Ingestion & Brief EDA notebook
This notebook is for incremental development of modularized classes for ingesting the various datasets in the loan application dataset

The goals for data ingestion in src are to
1) Ingest all data sources (bronze tables) as individual pandas dataframes
2) Merge all datasets into one large dataframe suitable for analysis (one silver table)

Keep in mind that overall project goal is to predict the loan outcome for finished loans at the time of loan start

## EDA
Start developing data ingestion and merging code. Do not prioritize modularizing the code. Investigation OK here

In [35]:
import pandas as pd
import numpy as np
import datetime

import pandera as pa
import logging
import yaml

import matplotlib.pyplot as plt

# url to raw content in repo
data_url= 'https://raw.githubusercontent.com/vvbauman/sample-work-loan-application/feature/data-ingestion/dataset/'

In [36]:
# ingest all data sources as pandas dataframes - these are saved as bronze tables
account= pd.read_csv(data_url + 'account.txt', sep= ';')
card= pd.read_csv(data_url + 'card.txt', sep= ';')
client= pd.read_csv(data_url + 'client.txt', sep= ';')
disp= pd.read_csv(data_url + 'disp.txt',  sep= ';')
district= pd.read_csv(data_url + 'district.txt',  sep= ';')
loan= pd.read_csv(data_url + 'loan.txt', sep= ';')
order= pd.read_csv(data_url + 'order.txt', sep= ';')
transactions= pd.read_csv(data_url + 'trans.txt', sep= ';')

In [37]:
# check for null values. If less than 5% of overall data, drop rows with nulls
# option to provide list subset, used in pd.dropna() to only consider a subset of rows when counting null values
def drop_nulls(df : pd.DataFrame, subset : list = []):
    if len(subset) == 0:
        percent_null= df.isnull().sum().sum() / len(df)
    else:
        percent_null= df[subset].isnull().sum().sum() / len(df)

    if percent_null == 0:
        print('No null values in dataframe. Returned dataframe is same as input dataframe')
        return df
    elif percent_null < 0.05:
        print('Less than 5% of rows are missing data. Returned dataframe is same as input dataframe with these rows dropped')
        return df.dropna(subset= subset)
    else:
        print('More than 5% of rows are missing data. Returning dataframe without dropping rows')
        return df

account= drop_nulls(account)
card= drop_nulls(card)
client= drop_nulls(client)
disp= drop_nulls(disp)
district= drop_nulls(district)
loan= drop_nulls(loan)
order= drop_nulls(order)
transactions= drop_nulls(transactions, subset= ['trans_id', 'account_id', 'date'])

# if any rows are dropped, this would be a good place to save these tables as silver tables, since they have undergone some preprocessing

No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe
No null values in dataframe. Returned dataframe is same as input dataframe


In [38]:
# get info for all dataframes, to understand how the star schema can be configured
for i in [account, card, client, disp, district, loan, order, transactions]:
    print(i.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4500 entries, 0 to 4499
Data columns (total 4 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   account_id   4500 non-null   int64 
 1   district_id  4500 non-null   int64 
 2   frequency    4500 non-null   object
 3   date         4500 non-null   int64 
dtypes: int64(3), object(1)
memory usage: 140.8+ KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 892 entries, 0 to 891
Data columns (total 4 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   card_id  892 non-null    int64 
 1   disp_id  892 non-null    int64 
 2   type     892 non-null    object
 3   issued   892 non-null    object
dtypes: int64(2), object(2)
memory usage: 28.0+ KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5369 entries, 0 to 5368
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype
---  ------        --------------  -----
 0   client_id     53

In [39]:
# take advantage of the fact there are common column names across dataframes and assume they mean the same thing across dataframes
# (i.e., will be able to join on these columns and there will be common elements within these columns across the multiple tables)

# confirm that the common column among account, loan, order, transactions, and disp is "account_id"
print(set(account.columns) & set(loan.columns) & set(order.columns) & set(transactions.columns) & set(disp.columns))

# confirm that the common column among disp and client is "client_id"
print(set(disp.columns) & set(client.columns))

# card can be joined with disp - disp has "account_id" and "disp_id columns"
# no common columns in the district dataframe with any of the other dataframes. Do merges for above dataframes, then see how district fits in

{'account_id'}
{'client_id'}


In [40]:
# inner-join account, loan, order, transactions, and disp dataframes on account_id
# we only want details on accounts with a loan
account_id_merge= (account
                    .merge(loan, on= ['account_id'], how= 'inner', suffixes= ('_account', '_loan')) 
                    .merge(order, on= ['account_id'], how= 'inner', suffixes= ('_loan', '_order'))
                    .merge(transactions, on= ['account_id'], how= 'inner', suffixes= ('_order', '_transactions'))
                    .merge(disp, on= ['account_id'], how= 'inner', suffixes= ('_transactions', '_disp'))
                     )
account_id_merge.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 330720 entries, 0 to 330719
Data columns (total 27 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   account_id             330720 non-null  int64  
 1   district_id            330720 non-null  int64  
 2   frequency              330720 non-null  object 
 3   date_account           330720 non-null  int64  
 4   loan_id                330720 non-null  int64  
 5   date_loan              330720 non-null  int64  
 6   amount_loan            330720 non-null  int64  
 7   duration               330720 non-null  int64  
 8   payments               330720 non-null  float64
 9   status                 330720 non-null  object 
 10  order_id               330720 non-null  int64  
 11  bank_to                330720 non-null  object 
 12  account_to             330720 non-null  int64  
 13  amount_order           330720 non-null  float64
 14  k_symbol_order         330720 non-nu

In [41]:
# inner-join disp and client dataframes on client_id, then merge with card on disp_id, then merge with account_id_merge on disp_id
client_id_merge= disp.merge(client, on= ['client_id'], how= 'inner', suffixes= ('_disp', '_client'))
disp_id_merge= client_id_merge.merge(card, on= ['disp_id'], how= 'inner', suffixes= ('_client_disp', '_card'))

silver= disp_id_merge.merge(account_id_merge, on= ['account_id'], how= 'inner', suffixes= ('', '_disp_id'))
silver.info() # silver dataframe with 7/8 tables merged has 75599 rows

<class 'pandas.core.frame.DataFrame'>
Int64Index: 75599 entries, 0 to 75598
Data columns (total 35 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   disp_id                75599 non-null  int64  
 1   client_id              75599 non-null  int64  
 2   account_id             75599 non-null  int64  
 3   type_client_disp       75599 non-null  object 
 4   birth_number           75599 non-null  int64  
 5   district_id            75599 non-null  int64  
 6   card_id                75599 non-null  int64  
 7   type_card              75599 non-null  object 
 8   issued                 75599 non-null  object 
 9   district_id_disp_id    75599 non-null  int64  
 10  frequency              75599 non-null  object 
 11  date_account           75599 non-null  int64  
 12  loan_id                75599 non-null  int64  
 13  date_loan              75599 non-null  int64  
 14  amount_loan            75599 non-null  int64  
 15  du

## Code modularization
Create custom ingestion class that accomplishes all tasks done in cells above

In [42]:
bronze_merge_schema= pa.DataFrameSchema({
    'disp_id' : pa.Column(object, nullable= True),
    'client_id' : pa.Column(object, nullable= True),
    'account_id' : pa.Column(object, nullable= True),
    'type_client_disp' : pa.Column(object, nullable= True),
    'birth_number' : pa.Column('datetime64[ns]', nullable= True),
    'district_id' : pa.Column(object, nullable= True),
    'card_id' : pa.Column(object, nullable= True),
    'type_card' : pa.Column(object, nullable= True),
    'issued' : pa.Column('datetime64[ns]', nullable= True),
    'district_id_disp_id' : pa.Column(object, nullable= True),
    'frequency' : pa.Column(object, nullable= True),
    'date_account' : pa.Column('datetime64[ns]', nullable= True),
    'loan_id' : pa.Column(object, nullable= True),
    'date_loan' : pa.Column('datetime64[ns]', nullable= True),
    'amount_loan' : pa.Column(int, nullable= True),
    'duration' : pa.Column(int, nullable= True),
    'payments' : pa.Column(float, nullable= True),
    'status' : pa.Column(object, nullable= True),
    'order_id' : pa.Column(object, nullable= True),
    'bank_to' : pa.Column(object, nullable= True),
    'account_to' : pa.Column(object, nullable= True),
    'amount_order' : pa.Column(float, nullable= True),
    'k_symbol_order' : pa.Column(object, nullable= True),
    'trans_id' : pa.Column(object, nullable= True),
    'date' : pa.Column('datetime64[ns]', nullable= True),
    'type_transactions' : pa.Column(object, nullable= True),
    'operation' : pa.Column(object, nullable= True),
    'amount' : pa.Column(float, nullable= True),
    'balance' : pa.Column(float, nullable= True),
    'k_symbol_transactions' : pa.Column(object, nullable= True),
    'bank' : pa.Column(object, nullable= True),
    'account' : pa.Column(object, nullable= True),
    'disp_id_disp_id' : pa.Column(object, nullable= True),
    'client_id_disp_id' : pa.Column(object, nullable= True),
    'type_disp' : pa.Column(object, nullable= True)
})

In [43]:
def validate_schema(df : pd.DataFrame, schema : pa.DataFrameSchema, cols : list = []) -> str:
    """ 
    Validate pandas dataframe schema
    First, run schema check using panderas. If schema check fails, check that columns of interest (specified in cols argument) are at least present. 
    If these columns are not present, throw an error. If they are present, return the dataframe. Regardless of outcome, return a message 
    (message intended to display in print statement or in a logger)

    Parameters
    ----------
    df : dataframe you want to validate the schema of
    schema : expected/desired schema of df
    cols : subset of columns in df you require the dataframe to have

    Returns
    ----------
    msg : str describing results of schema check
    """
    try:
        schema(df, lazy= True)
        msg= 'Correct schema, no action needed'
    except:
        if (len(cols) > 0) & (not set(df.columns) >= set(cols)):
            msg= 'Provided schema does not match dataframe schema. Dataframe also does not have expected columns'
        elif (len(cols) == 0) & (not set(df.columns) >= set(cols)):
            msg= 'Provided schema does not match dataframe schema. Did not check for subset of columns - no argument provided to cols argument'
        elif (len(cols) > 0) & (set(df.columns) >= set(cols)):
            msg= 'Provided schema does not match dataframe schema but dataframe has expected columns'
    return msg
    
def ingest_flat_file(path : str, filename : str, schema : pa.DataFrameSchema, sep : str = ','):
    """
    Ingest a flat file saved to path and validate schema of ingested dataframe
    Acceptable file formats are .csv, .txt, and .pkl. Any other provided formats will throw an error
    Schema of ingested dataframe must exactly match the input argument schema otherwise an error will be thrown

    Parameters
    ----------
    path : absolute path to flat file, not including the flat file name and extension itself
    filename : name and extension of flat file to be ingested
    schema : expected schema of the ingested file
    sep : separator to use in pd.read_csv() when attempting to ingest the flat file

    Returns
    ----------
    df : ingested data
    msg : str describing the results of data ingestion and schema check. Intended to display in print statement or log

    """
    try:
        df= pd.read_csv(path + filename, sep= sep)
    except: 
        try: 
            df= pd.read_pickle(path + filename)
        except:
            raise(Exception)

    try:
        validate_schema(df= df, schema= schema)
        msg= 'Dataframe successfully ingested and has expected schema'
    except:
        msg= 'Dataframe successfully ingested but does not have expected schema'

    return df, msg

def save_flat_file(path : str, filename : str, df : pd.DataFrame) -> str:
    """ 
    Save a pandas dataframe as a flat file to a specified location

    Parameters
    ----------
    path : absolute path to where the flat file should be saved. Do not include name or extension to save table as
    filename : name and extension to save dataframe under 
    df : pandas dataframe to save as a flat file. This function does not include any data validation, so it should be conducted before running this function

    Returns
    ----------
    msg : str describing the results of writing the dataframe. Intended to display in print statement or log
    """
    try: 
        df.to_csv(path + filename)
        msg= f'Dataframe successfully written to {path} (filename: {filename})'
    except:
        msg= f'Dataframe write unsuccessful. Check provided path and filename are correct (path: {path}, filename: {filename})'
    return msg
    

In [44]:
def drop_nulls(df : pd.DataFrame, subset : list = [], logger : logging.Logger = None) -> pd.DataFrame:
    """ 
    Function to drop rows in a dataframe containing null values, if the % of rows with null values is less than 5% of the entire dataframe length
    If logging.Logger object provided to input arguments, log messages on rows being dropped will be written to logger

    Parameters
    ----------
    df : dataframe to check for null values
    subset : list of str. Columns in df to consider when checking for null values. 
            If this argument is provided, any columns not in this list are not considered when checking for nulls
    logger : logging.Logger object to write log messages to

    Returns
    ----------
    df : same as input df but with rows with null values dropped (as long as they account for less than 5% of the data)
    """
    if len(subset) == 0:
        percent_null= df.isnull().sum().sum() / len(df)
    else:
        percent_null= df[subset].isnull().sum().sum() / len(df)

    if percent_null == 0:
        if isinstance(logger, logging.Logger):
            logger.info('No null values in dataframe. Returned dataframe is same as input dataframe')
        return df
    elif percent_null < 0.05:
        if isinstance(logger, logging.Logger):
            logger.warning('Less than 5% of rows are missing data. Returned dataframe is same as input dataframe with these rows dropped')
        return df.dropna(subset= subset)
    else:
        if isinstance(logger, logging.Logger):
            logger.warning('More than 5% of rows are missing data. Returning dataframe without dropping rows')
        return df

In [45]:
class IngestMergeLoanTables():
    def __init__(self, cf : dict, cf_m : dict) -> None:
        """
        Class to ingest and merge all (bronze) tables in the loan dataset, to create a silver table with all merged data
        
        Args
        ----------
        cf : configuration file with constants used across project
        cf_m : configuration file with constants used in this specific class

        Attributes
        ----------
        cf (dict) : same as cf input argument. Configuration file with constants used across projects
        cf_m (dict) : same as cf_m input argument. Configuration file with constants used in this specific class
        account (pd.DataFrame) : table containing account data. 
        
        """
        self.cf= cf
        self.cf_m= cf_m

        self.account= pd.DataFrame([])
        self.card= pd.DataFrame([])
        self.client= pd.DataFrame([])
        self.disp= pd.DataFrame([])
        self.district= pd.DataFrame([])
        self.loan= pd.DataFrame([])
        self.order= pd.DataFrame([])
        self.transactions= pd.DataFrame([])
        self.silver= pd.DataFrame([])

        # add logger

    def get_silver_table(self, schema : pa.DataFrameSchema):
        """ 
        Wrapper method for retrieving the silver table containing all merged bronze tables, by either ingesting and merging all bronze tables 
        using ingest_bronze_tables(), cast_bronze_tables(), and merge_bronze_tables() methods 
        or by ingesting a previously saved silver table using load_from_disk() method

        Option to save silver table to disk. Option available regardless if ingesting from scratch

        Populates self.silver, returns self.silver

        Parameters
        ----------
        schema : expected schema of final silver dataframe (7 of 8 bronze tables merged into a single dataframe)
        """
        if self.cf['FROM_SCRATCH']:
            self.silver= self.ingest_bronze_tables().cast_bronze_dtypes().merge_bronze_tables()
            # add validation 
        else:
            self.silver= self.load_from_disk(schema= schema)
            # add validation
        
        if self.cf['SAVE_SILVER']:
            save_flat_file(path= self.cf['SILVER_SAVE_DIR'], filename= self.cf_m['bronze_merged_save_name'], df= self.silver)
            # add log message

        return self.silver

    def ingest_bronze_tables(self):
        """ 
        Ingest bronze tables, dropping nulls when adjusting using drop_nulls utility functions
        Run validation to ensure all tables are what's expected

        Populates self attributes account, card, client, disp, district, loan, order, and transactions
        """
        self.account= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_account'], sep= self.cf_m['bronze_table_sep']))
        self.card= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_card'], sep= self.cf_m['bronze_table_sep']))
        self.client= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_client'], sep= self.cf_m['bronze_table_sep']))
        self.disp= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_disp'], sep= self.cf_m['bronze_table_sep']))
        self.district= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_district'], sep= self.cf_m['bronze_table_sep']))
        self.loan= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_loan'], sep= self.cf_m['bronze_table_sep']))
        self.order= drop_nulls(df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_order'], sep= self.cf_m['bronze_table_sep']))
        self.transactions= drop_nulls(
            df= pd.read_csv(self.cf['DATA_URL'] + self.cf_m['bronze_transactions'], sep= self.cf_m['bronze_table_sep']), 
            subset= self.cf_m['transactions_null_subset'])
        
        # add data validation on columns/dtypes

        return self
    
    def cast_bronze_dtypes(self):
        """ 
        Method to cast all bronze tables to their required dtypes. Does not include district table. Intended to be run before merging all bronze tables
        Modifies self attributes account, card, client, disp, loan, order, and transactions
        """
        self.account= self.date_convert(df= self.account.astype(self.cf_m['account_dtypes']), date_col= self.cf_m['account_date_col'], date_format= self.cf_m['date_format'])
        self.card= self.card.astype(self.cf_m['card_dtypes'])
        self.client= self.date_convert(df= self.client.astype(self.cf_m['client_dtypes']), date_col= self.cf_m['client_date_col'], date_format= self.cf_m['date_format'])
        self.disp= self.disp.astype(self.cf_m['disp_dtypes'])
        self.loan= self.date_convert(df= self.loan.astype(self.cf_m['loan_dtypes']), date_col= self.cf_m['loan_date_col'], date_format= self.cf_m['date_format'])
        self.order= self.order.astype(self.cf_m['order_dtypes'])
        self.transactions= self.date_convert(df= self.transactions.astype(self.cf_m['transactions_dtypes']), date_col= self.cf_m['transactions_date_col'], date_format= self.cf_m['date_format'])

        # add log message

        return self
    
    @staticmethod
    def date_convert(df : pd.DataFrame, date_col : str, date_format : str):
        """ 
        Method for converting a str column in a pandas dataframe to a date column. Expected str format for the date column is '%y%m%d'

        Parameters
        ----------
        df : dataframe containing the str column to be converted to dates
        date_col : name of column in df with the str values to be converted to dates

        Returns
        ----------
        df : same as input df but with all values in date_col converted to datetime.datetime

        """
        df[date_col]= pd.to_datetime(df[date_col], format= date_format, errors= 'coerce')
        return df

    def merge_bronze_tables(self) -> pd.DataFrame:
        """ 
        Method for merging all bronze tables into one silver table. Two merges are run in this method: first merges all dataframes with an "account_id"
        column, second merges all dataframes with a "client_id" column. These two dataframes are then merged to create the silver dataframe
        
        Try/except blocks are used to provide informative log messages if a merge fails

        Tables/dataframes being merged are self attributes account, loan, order, transactions, disp, client, and card

        Returns
        ----------
        silver : dataframe with all bronze tables merged

        """
        try:
            account_id_merge= (self.account
            .merge(self.loan, on= self.cf_m['account_id_merge_cols'], how = self.cf_m['account_id_merge_type'], suffixes= (self.cf_m['account_suffix'], self.cf_m['loan_suffix']))
            .merge(self.order, on= self.cf_m['account_id_merge_cols'], how = self.cf_m['account_id_merge_type'], suffixes= (self.cf_m['loan_suffix'], self.cf_m['order_suffix']))
            .merge(self.transactions, on= self.cf_m['account_id_merge_cols'], how = self.cf_m['account_id_merge_type'], suffixes= (self.cf_m['order_suffix'], self.cf_m['transactions_suffix']))
            .merge(self.disp, on= self.cf_m['account_id_merge_cols'], how = self.cf_m['account_id_merge_type'], suffixes= (self.cf_m['transactions_suffix'], self.cf_m['disp_suffix']))
            )
        except:
            raise(Exception)

        try:
            other_tables_merge= (self.disp
            .merge(self.client, on= self.cf_m['client_id_merge_cols'], how= self.cf_m['client_id_merge_type'], suffixes= (self.cf_m['disp_suffix'], self.cf_m['client_suffix']))
            .merge(self.card, on= self.cf_m['client_id_card_merge_cols'], how= self.cf_m['client_id_card_merge_type'], suffixes= (self.cf_m['client_disp_suffix'], self.cf_m['card_suffix']))
            )
        except:
            raise(Exception)

        try:
            silver= other_tables_merge.merge(account_id_merge, on= self.cf_m['account_id_merge_cols'], how= self.cf_m['account_id_merge_type'], suffixes= ('', self.cf_m['disp_id_suffix']))
            # log message
        except:
            raise(Exception)
        return silver
    
    def load_from_disk(self, schema : pa.DataFrameSchema):
        silver, msg= ingest_flat_file(path= self.cf['DATA_URL'], filename= self.cf['BRONZE_MERGED_FILE'], schema= schema)
        # add log message
        return silver

In [46]:
with open(r'C:\Users\valerie.bauman\Documents\GitHub\sample-work-loan-application\ingestion_config.yml', 'r') as stream:
    cf= yaml.safe_load(stream)
    
bronze_loader= IngestMergeLoanTables(cf= cf, cf_m= cf['ingest'])
bronze_merged= bronze_loader.get_silver_table(schema= bronze_merge_schema)

In [47]:
# STOP HERE

In [48]:
bronze_merge_schema.validate(bronze_merged)

SchemaError: expected series 'amount_loan' to have type int64, got int32

In [None]:
# final goal is to predict loan outcome for finished loans at the time of loan start
# in status column in loan, A is good loan that is finished, B is bad loan that is finished, 
# C is good loan that is unfinished, D is bad loan that is unfinished
district.head()

Unnamed: 0,A1,A2,A3,A4,A5,A6,A7,A8,A9,A10,A11,A12,A13,A14,A15,A16
0,1,Hl.m. Praha,Prague,1204953,0,0,0,1,1,100.0,12541,0.29,0.43,167,85677,99107
1,2,Benesov,central Bohemia,88884,80,26,6,2,5,46.7,8507,1.67,1.85,132,2159,2674
2,3,Beroun,central Bohemia,75232,55,26,4,1,5,41.7,8980,1.95,2.21,111,2824,2813
3,4,Kladno,central Bohemia,149893,63,29,6,2,6,67.4,9753,4.64,5.05,109,5244,5892
4,5,Kolin,central Bohemia,95616,65,30,4,1,6,51.4,9307,3.85,4.43,118,2616,3040


In [None]:
np.unique(loan.status.values, return_counts= True)

(array(['A', 'B', 'C', 'D'], dtype=object),
 array([203,  31, 403,  45], dtype=int64))