## Dataset and approach:
Data is from Kaggle competiotion [Home Credit Default Risk](https://www.kaggle.com/c/home-credit-default-risk). 

I implement an automated feature engineering approach with an open-source library [Featuretools](https://www.featuretools.com/). 


In [24]:
import pandas as pd
import numpy as np
import time

In [25]:
import featuretools as ft
import featuretools.variable_types as vtypes

In [26]:
import sys
import psutil
import os

![](../images/home_credit_data.png)

#### Convert Data Types

In [27]:
def convert_types(df):
    """Convert pandas data types for memory reduction."""
    
    # Iterate through each column
    for c in df:
        
        # Convert ids and booleans to integers
        if ('SK_ID' in c):
            df[c] = df[c].fillna(0).astype(np.int32)
            
        # Convert objects to category
        elif (df[c].dtype == 'object') and (df[c].nunique() < df.shape[0]):
            df[c] = df[c].astype('category')
        
        # Booleans mapped to integers
        elif list(df[c].unique()) == [1, 0]:
            df[c] = df[c].astype(bool)
        
        # Float64 to float32
        elif df[c].dtype == float:
            df[c] = df[c].astype(np.float32)
            
        # Int64 to int32
        elif df[c].dtype == int:
            df[c] = df[c].astype(np.int32)
        
    return df

#### Import data

In [28]:
app_train = pd.read_csv('../data/application_train.csv', sep=',')
app_test = pd.read_csv('../data/application_test.csv')
bureau = pd.read_csv('../data/bureau.csv')
bureau_balance = pd.read_csv('../data/bureau_balance.csv')
cash = pd.read_csv('../data/POS_CASH_balance.csv')
credit = pd.read_csv('../data/credit_card_balance.csv')
previous = pd.read_csv('../data/previous_application.csv')
installments = pd.read_csv('../data/installments_payments.csv')

In [29]:
app_train.name = 'app_train'
app_test.name = 'app_test'
bureau.name = 'bureau'
bureau_balance.name = 'bureau_balance'
cash.name = 'cash'
credit.name = 'credit'
previous.name = 'previous'
installments.name = 'installments'

datasets_list = [app_train, app_test, bureau, bureau_balance, cash, credit, previous, installments]

In [30]:
# replace the anomalous values
for ds in datasets_list:
    ds.replace({365243: np.nan}, inplace=True)

In [31]:
# Numbers of rows:
for ds in datasets_list:
    print('{}\t - \t{} rows'.format(ds.name , ds.iloc[:, 0].count()))

app_train	 - 	307510 rows
app_test	 - 	48744 rows
bureau	 - 	1716420 rows
bureau_balance	 - 	27299925 rows
cash	 - 	10001358 rows
credit	 - 	3840312 rows
previous	 - 	1670214 rows
installments	 - 	13605401 rows


In [32]:
# Join train and test set to make sure, that the same feature are created for each set. 
app_test['TARGET'] = np.nan
app = app_train.append(app_test, ignore_index=True)

# Need 'SK_ID_CURR in each table (for make partitioning possible )
bureau_balance = bureau_balance.merge(bureau[['SK_ID_CURR', 'SK_ID_BUREAU']], how='left', on = 'SK_ID_BUREAU')


# Convert types to reduce memory usage
app = convert_types(app)
bureau = convert_types(bureau)
bureau_balance = convert_types(bureau_balance)
cash = convert_types(cash)
credit = convert_types(credit)
previous = convert_types(previous)
installments = convert_types(installments)


# Set the index for locating
for dataset in [app, bureau, bureau_balance, cash, credit, previous, installments]:
    dataset.set_index('SK_ID_CURR', inplace = True)

In [33]:
memory_sum = round(np.sum([app_train.memory_usage().sum()/ 1e9 for x in datasets_list]), 2)

In [34]:
print('Total size of data: {} GB'.format(memory_sum))

Total size of data: 2.4 GB


#### Prepare data (partitioning) for parallel computation

In [35]:
external_path = 'D:/DYSK/home_credit'

In [36]:
def create_partition(user_list, partition, external_path):
    """Creates and saves a dataset with only the users in `user_list`."""
    
    # Make the directory
    directory = external_path + '/data/partitions/p%d' % (partition + 1)
    if os.path.exists(directory):
        return
    
    else:
        os.makedirs(directory)
        
        # Subset based on user list
        app_subset = app[app.index.isin(user_list)].copy().reset_index()
        bureau_subset = bureau[bureau.index.isin(user_list)].copy().reset_index()

        # Drop SK_ID_CURR from bureau_balance, cash, credit, and installments
        bureau_balance_subset = bureau_balance[bureau_balance.index.isin(user_list)].copy().reset_index(drop = True)
        cash_subset = cash[cash.index.isin(user_list)].copy().reset_index(drop = True)
        credit_subset = credit[credit.index.isin(user_list)].copy().reset_index(drop = True)
        previous_subset = previous[previous.index.isin(user_list)].copy().reset_index()
        installments_subset = installments[installments.index.isin(user_list)].copy().reset_index(drop = True)
        

        # Save data to the directory
        app_subset.to_csv('%s/app.csv' % directory, index = False)
        bureau_subset.to_csv('%s/bureau.csv' % directory, index = False)
        bureau_balance_subset.to_csv('%s/bureau_balance.csv' % directory, index = False)
        cash_subset.to_csv('%s/cash.csv' % directory, index = False)
        credit_subset.to_csv('%s/credit.csv' % directory, index = False)
        previous_subset.to_csv('%s/previous.csv' % directory, index = False)
        installments_subset.to_csv('%s/installments.csv' % directory, index = False)

        if partition % 10 == 0:
            print('Saved all files in partition {} to {}.'.format(partition + 1, directory))

In [38]:
# Create id_list of indecies
chunk_size = app.shape[0] // 103
id_list = [ list(app.iloc[i: i + chunk_size ].index) for i in range(0, app.shape[0], chunk_size) ]

In [41]:
#### Create partitions
start = time.time()

for i, ids in enumerate(id_list):
    create_partition(user_list= ids, partition= i, external_path= external_path)
    
end = time.time()

print('Partitioning took {} sec.'.format(round(end - start), 1))

Saved all files in partition 81 to D:/DYSK/home_credit/data/partitions/p81.
Saved all files in partition 91 to D:/DYSK/home_credit/data/partitions/p91.
Saved all files in partition 101 to D:/DYSK/home_credit/data/partitions/p101.
Partitioning took 916 sec.


In [66]:
# Load features definitions previously created

In [4]:
feature_defs = ft.load_features('../input/features.txt')

In [5]:
print(len(feature_defs))

1820


In [13]:
# Function to Create EntitySet from Partition

In [14]:
def entityset_from_partition(path):
    """Create an EntitySet from a partition of data specified as a path.
       Returns a dictionary with the entityset and the number used for saving the feature matrix."""
    
    external_path = 'D:/DYSK/home_credit'
    
    partition_num = int(path[18:])
    
    # Read in data
    app = pd.read_csv('%s/app.csv' % (external_path + path))
    bureau = pd.read_csv('%s/bureau.csv' % (external_path + path))
    bureau_balance = pd.read_csv('%s/bureau_balance.csv' % (external_path + path))
    previous = pd.read_csv('%s/previous.csv' % (external_path + path))
    credit = pd.read_csv('%s/credit.csv' % (external_path + path))
    installments = pd.read_csv('%s/installments.csv' % (external_path + path))
    cash = pd.read_csv('%s/cash.csv' % (external_path + path))
    
    # Empty entityset
    es = ft.EntitySet(id = 'clients')
    
    # Entities with a unique index
    es = es.entity_from_dataframe(entity_id = 'app', dataframe = app, index = 'SK_ID_CURR')

    es = es.entity_from_dataframe(entity_id = 'bureau', dataframe = bureau, index = 'SK_ID_BUREAU')

    es = es.entity_from_dataframe(entity_id = 'previous', dataframe = previous, index = 'SK_ID_PREV')

    # Entities that do not have a unique index
    es = es.entity_from_dataframe(entity_id = 'bureau_balance', dataframe = bureau_balance, 
                                  make_index = True, index = 'bureaubalance_index')

    es = es.entity_from_dataframe(entity_id = 'cash', dataframe = cash, 
                                  make_index = True, index = 'cash_index')

    es = es.entity_from_dataframe(entity_id = 'installments', dataframe = installments,
                                  make_index = True, index = 'installments_index')

    es = es.entity_from_dataframe(entity_id = 'credit', dataframe = credit,
                                  make_index = True, index = 'credit_index')
    
    # Relationship between app_train and bureau
    r_app_bureau = ft.Relationship(es['app']['SK_ID_CURR'], es['bureau']['SK_ID_CURR'])

    # Relationship between bureau and bureau balance
    r_bureau_balance = ft.Relationship(es['bureau']['SK_ID_BUREAU'], es['bureau_balance']['SK_ID_BUREAU'])

    # Relationship between current app and previous apps
    r_app_previous = ft.Relationship(es['app']['SK_ID_CURR'], es['previous']['SK_ID_CURR'])

    # Relationships between previous apps and cash, installments, and credit
    r_previous_cash = ft.Relationship(es['previous']['SK_ID_PREV'], es['cash']['SK_ID_PREV'])
    r_previous_installments = ft.Relationship(es['previous']['SK_ID_PREV'], es['installments']['SK_ID_PREV'])
    r_previous_credit = ft.Relationship(es['previous']['SK_ID_PREV'], es['credit']['SK_ID_PREV'])
    
    # Add in the defined relationships
    es = es.add_relationships([r_app_bureau, r_bureau_balance, r_app_previous,
                               r_previous_cash, r_previous_installments, r_previous_credit])

    return ({'es': es, 'num': partition_num})

In [15]:
path = '/data/partitions/p1'

In [16]:
es_dict = entityset_from_partition(path)

In [17]:
es_dict['es']

Entityset: clients
  Entities:
    app [Rows: 3458, Columns: 123]
    bureau [Rows: 3458, Columns: 18]
    previous [Rows: 3458, Columns: 38]
    bureau_balance [Rows: 3458, Columns: 5]
    cash [Rows: 3458, Columns: 9]
    installments [Rows: 3458, Columns: 9]
    credit [Rows: 3458, Columns: 24]
  Relationships:
    bureau.SK_ID_CURR -> app.SK_ID_CURR
    bureau_balance.SK_ID_BUREAU -> bureau.SK_ID_BUREAU
    previous.SK_ID_CURR -> app.SK_ID_CURR
    cash.SK_ID_PREV -> previous.SK_ID_PREV
    installments.SK_ID_PREV -> previous.SK_ID_PREV
    credit.SK_ID_PREV -> previous.SK_ID_PREV

In [18]:
# Function to Create Feature Matrix from EntitySet

In [19]:
def feature_matrix_from_entityset(es_dict, feature_defs, return_fm = False):
    """Run deep feature synthesis from an entityset and feature definitions. 
    Saves feature matrix based on partition.""" 
    
    external_path = 'D:/DYSK/home_credit'
    
    
    # Extract the entityset
    es = es_dict['es']
    
    # Calculate the feature matrix and save
    feature_matrix = ft.calculate_feature_matrix(feature_defs, 
                                                 entityset=es, 
                                                 n_jobs = 1, 
                                                 verbose = 0,
                                                 chunk_size = es['app'].df.shape[0])
    
    feature_matrix.to_csv( external_path + '/data/fm/p%d_fm.csv' % es_dict['num'], index = True)
    
    if return_fm:
        return feature_matrix

In [20]:
start = time.time()

fm1 = feature_matrix_from_entityset(es_dict=es_dict, feature_defs= feature_defs, return_fm=True)
    
end = time.time()

print('Computation took {} sec.'.format(round(end - start), 1))

Computation took 24 sec.


In [21]:
fm1.shape

(3458, 1820)

In [None]:
# Clear the system memory for a full run of Dask.

In [98]:
import  gc

# Free up all system memory 
gc.enable()
del app, bureau, bureau_balance, previous, credit, cash, installments
gc.collect()

2416

In [86]:
import  dask.bag as db
from dask.distributed import Client

# Use all 8 cores
client = Client(processes = True)

In [87]:
client.ncores()

{'tcp://127.0.0.1:59491': 1,
 'tcp://127.0.0.1:59492': 1,
 'tcp://127.0.0.1:59495': 1,
 'tcp://127.0.0.1:59504': 1}

In [89]:
paths = [external_path + '/data/partitions/p%d' %  i for i in range(1, 105)]

### Dask bag

In [None]:
# Create a bag object
b = db.from_sequence(paths)

# Map entityset function
b = b.map(entityset_from_partition)

# Map feature matrix function
b = b.map(feature_matrix_from_entityset, feature_defs = feature_defs)
    
b

In [None]:
start = time.time()

b.compute()
    
end = time.time()

print('Computation took {} sec.'.format(round(end - start), 1))