In [1]:
# Ignore 'dask' warning
import pandas as pd
import gcsfs
from google.cloud import storage
from pandas import DataFrame
from IPython.display import HTML
from google.cloud.storage import Blob
import datalab.storage as gcs_datalab
import numpy as np

In [2]:
# Setting up constants. All required
project = 'graydon-moving-indicator'
bucket_name = 'graydon-data'

In [3]:
# Initializing bucket
fs = gcsfs.GCSFileSystem(project='graydon-moving-indicator')
gcs = storage.Client()
bucket = gcs.get_bucket(bucket_name)

In [4]:
selected_columns = ['date_month', 'id_company', 'id_branch',
       'is_discontinued',
       'financial_calamity_outcome', 'date_established', 
       'qty_employees', 'year_qty_employees', 'id_company_creditproxy',
       'score_payment_assessment', 'amt_revenue',
       'year_revenue', 'amt_consolidated_revenue', 'year_consolidated_revenue',
       'amt_consolidated_operating_result',
       'year_consolidated_operating_result', 
       'perc_credit_limit_adjustment', 'color_credit_status', 'rat_pd',
       'score_pd','has_increased_risk',
       'is_sole_proprietor', 'code_SBI_2', 'code_SBI_1',
       'qty_address_mutations_total',
       'qty_address_mutations_month', 
       'has_relocated',
       'has_name_change', 'code_discontinuation', 'code_financial_calamity',
       'qty_issued_credit_reports', 'Associate', 'Authorized official', 'Board member', 'Chairman',
       'Commissioner', 'Director', 'Liquidator', 'Major', 'Managing clerk',
       'Managing partner', 'Member of the partnership', 'Miscellaneous',
       'Owner', 'Secretary', 'Secretary/Treasurer', 'Treasurer', 'Unknown',
       'Vice President', 'amt_operating_result', 'code_legal_form', 'date_financial_calamity_started', 
       'date_financial_calamity_stopped', 'date_start', 'from_date_start',
       'qty_stopped_names', 'qty_started_names', 'year_operating_result'       ]

In [5]:
def aggregate_board_members(df):
    """Agregates the number of board members into one feature """    
    col_list_to_sum = ['associate', 'authorized_official', 'board_member', 'chairman', 'commissioner',
       'director', 'liquidator', 'major', 'managing_clerk', 'managing_partner',
       'member_of_the_partnership', 'miscellaneous', 'owner', 'secretary',
       'secretary/treasurer', 'treasurer', 'unknown', 'vice_president']  
    df['total_changeof_board_members_'] = df[col_list_to_sum].sum(axis=1)
    df = df.drop(columns=col_list_to_sum)
    return df

In [6]:
def read_one_month_csv_from_bucket(year, month, last_day_of_month, dir_prefix = '', selected_columns= ''):
    """ Reads one month of data and returns a pandas Df """
    one_month_df = pd.DataFrame()
    dir_prefix = dir_prefix + '/' + year
    print(dir_prefix)
    blob_list = list(bucket.list_blobs(prefix=dir_prefix))    
    for blob in blob_list:
        if month + '-' + last_day_of_month in blob.name:
            print('Processing file: ', blob.name)
            with fs.open('graydon-data/' + blob.name) as f:
                if selected_columns == '' or None:
                    one_month_df = pd.read_csv(f, sep=';')
                else:
                    one_month_df = pd.read_csv(f, sep=';', usecols= selected_columns)
    one_month_df.columns = (one_month_df.columns.str.strip().str.lower().str.replace(' ', '_').
                            str.replace('(', '').str.replace(')', '') )
    return one_month_df

In [7]:
#%%time
def read_all_csv_months_yearly_from_bucket_merged(years_to_read_in_list, dir_prefix = '', selected_columns = ''):
    """ Reads a whole year of data and returns a monthly merged pandas Df """
    all_years_merged_df = pd.DataFrame()
    for year in years_to_read_in_list:
        print('Starting with year: ', year)
        dir_prefix = dir_prefix + '/' + year
        blob_list = list(bucket.list_blobs(prefix=dir_prefix))    
        for blob in blob_list:  
            one_month_df = None
            if 'CSV' in blob.name:
                print('Processing file: ', blob.name)
                with fs.open('graydon-data/' + blob.name) as f:
                    one_month_df = pd.read_csv(f, sep=';', usecols= selected_columns)   
                    one_month_df = one_month_df[(one_month_df['is_sole_proprietor'] == 0) ]
                                               # & (one_month_df['is_discontinued'] == 0) 
                    one_month_df.columns = (one_month_df.columns.str.strip().str.lower(). 
                    str.replace(' ', '_').str.replace('(', '').str.replace(')', '') )
                    one_month_df = aggregate_board_members(one_month_df)
                    one_month_df = clean_data_per_year(one_month_df)
                    all_years_merged_df = all_years_merged_df.append(one_month_df)
            print('The number of rows so far is: ', all_years_merged_df.shape[0])
    return all_years_merged_df

In [8]:
def clean_data_per_year(df):
    """Cleans data and returns formatted df"""
    df['date_month'] = pd.to_datetime(df['date_month'])
    df['financial_calamity_outcome'] = df['financial_calamity_outcome'].fillna(-1) 
    df['qty_employees'] = df['qty_employees'].str.strip() 
    df.loc[df.qty_employees == 'NA', 'qty_employees'] = np.NaN
    #df['qty_employees'] = df['qty_employees'].fillna(0) 
    #df['qty_employees'] = df['qty_employees'].astype(str).astype(int)
    df['year_qty_employees'] = df['year_qty_employees'].str.strip()
    df.loc[df.year_qty_employees == 'NA', 'year_qty_employees'] =  np.NaN
    df['amt_revenue'] = df['amt_revenue'].str.strip() 
    df.loc[df.amt_revenue == 'NA', 'amt_revenue'] =  np.NaN
    df['amt_revenue'] = df['amt_revenue'].astype(str).str.replace(',','.')
    df['year_revenue'] = df['year_revenue'].str.strip() 
    df.loc[df.year_revenue == 'NA', 'year_revenue'] = 0
    df['amt_consolidated_revenue'] = df['amt_consolidated_revenue'].str.strip() 
    df.loc[df.amt_consolidated_revenue == 'NA', 'amt_consolidated_revenue'] =  np.NaN
    df['amt_consolidated_revenue'] = df['amt_consolidated_revenue'].astype(str).str.replace(',','.')
    df['year_consolidated_revenue'] = df['year_consolidated_revenue'].str.strip() 
    df.loc[df.year_consolidated_revenue == 'NA', 'year_consolidated_revenue'] =  np.NaN
    df['amt_consolidated_operating_result'] = df['amt_consolidated_operating_result'].str.strip() 
    df.loc[df.amt_consolidated_operating_result == 'NA', 'amt_consolidated_operating_result'] =  np.NaN
    df['amt_consolidated_operating_result'] = df['amt_consolidated_operating_result'].astype(str).str.replace(',','.')
    df['year_consolidated_operating_result'] = df['year_consolidated_operating_result'].str.strip() 
    df.loc[df.year_consolidated_operating_result == 'NA', 'year_consolidated_operating_result'] =  np.NaN
    df['score_pd'] = df['score_pd'].str.strip() 
    df.loc[df.score_pd == 'NA', 'score_pd'] =  np.NaN
    df['score_pd'] = df['score_pd'].astype(str).str.replace(',','.')
    df['has_increased_risk'] = df['has_increased_risk'].astype(bool)
    #df.loc[df.has_increased_risk == None, 'has_increased_risk'] = False
    #df.loc[df.code_sbi_2.isnull(), 'code_sbi_2'] = 0  
    df.loc[df.date_established < '1700-12-31' , 'date_established'] =  np.NaN
    df['date_established'] = pd.to_datetime(df['date_established'])
    df['amt_operating_result'] = df['amt_operating_result'].str.strip() 
    df.loc[df.amt_operating_result == 'NA', 'amt_operating_result'] =  np.NaN
    df['amt_operating_result'] = df['amt_operating_result'].astype(str).str.replace(',','.')
    df['year_operating_result'] = df['year_consolidated_operating_result'].str.strip() 
    df.loc[df.year_operating_result == 'NA', 'year_operating_result'] =  np.NaN
    return df

In [9]:
def save_df_locally(df, dir_prefix, year, as_json= False):
    """ Saves df as json or csv locally on server """
    if as_json:        
        file_path = dir_prefix + '/' + year + '_merged_cleaned.json'
        df.to_json(file_path)
    else:
        file_path =  dir_prefix + '/' + year + '_merged_cleaned.csv'
        df.to_csv(file_path)

### Reading one year of data

In [10]:
%%time
df_one_year = read_all_csv_months_yearly_from_bucket_merged(dir_prefix ='01_input', 
                                                              selected_columns= selected_columns
                                                              ,years_to_read_in_list=['2018'])

Starting with year:  2018
Processing file:  01_input/2018/modelling_2018-01-01_2018-01-31.CSV


  call = lambda f, *a, **k: f(*a, **k)


The number of rows so far is:  579524
Processing file:  01_input/2018/modelling_2018-02-01_2018-02-28.CSV


  call = lambda f, *a, **k: f(*a, **k)


The number of rows so far is:  2493537
Processing file:  01_input/2018/modelling_2018-03-01_2018-03-31.CSV
The number of rows so far is:  4411795
Processing file:  01_input/2018/modelling_2018-04-01_2018-04-30.CSV
The number of rows so far is:  6335918
Processing file:  01_input/2018/modelling_2018-05-01_2018-05-31.CSV


  call = lambda f, *a, **k: f(*a, **k)


The number of rows so far is:  8264530
Processing file:  01_input/2018/modelling_2018-06-01_2018-06-30.CSV
The number of rows so far is:  10196724
Processing file:  01_input/2018/modelling_2018-07-01_2018-07-31.CSV
The number of rows so far is:  12135976
Processing file:  01_input/2018/modelling_2018-08-01_2018-08-31.CSV
The number of rows so far is:  14080404
Processing file:  01_input/2018/modelling_2018-09-01_2018-09-30.CSV
The number of rows so far is:  16028664
Processing file:  01_input/2018/modelling_2018-10-01_2018-10-31.CSV


  call = lambda f, *a, **k: f(*a, **k)


The number of rows so far is:  17983504
CPU times: user 13min 28s, sys: 2min 10s, total: 15min 38s
Wall time: 41min 55s


In [None]:
# Preview of the data 
HTML(DataFrame(df_one_year).head(20).to_html())

In [None]:
# Displaying number of rows and columns
df_one_year.shape

In [12]:
# Saving df locally
save_df_locally(df= df_one_year, dir_prefix= 'files_to_bucket', year= '2018')