# MLE challenge - Features engineering

### Notebook 1

In this notebook we compute five features for the **credit risk** dataset. 
Each row in the dataset consists of the credit that a user took on a given date.

These features are roughly defined as follows:

**nb_previous_loans:** number of loans granted to a given user, before the current loan.

**avg_amount_loans_previous:** average amount of loans granted to a user, before the current rating.

**age:** user age in years.

**years_on_the_job:** years the user has been in employment.

**flag_own_car:** flag that indicates if the user has his own car.

We have the following problem: the feature `avg_amount_loans_previous` takes just too long to be computed for all the rows of the dataset (at least the way it's implemented).




In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import time

In [2]:
df = pd.read_csv('data/dataset_credit_risk.csv')

In [3]:
df.shape

(777715, 24)

In [4]:
df.head()

Unnamed: 0,loan_id,id,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,name_income_type,name_education_type,name_family_status,...,flag_work_phone,flag_phone,flag_email,occupation_type,cnt_fam_members,status,birthday,job_start_date,loan_date,loan_amount
0,208089,5044500,F,N,Y,0,45000.0,Pensioner,Secondary / secondary special,Widow,...,0,0,0,,1.0,0,1955-08-04,3021-09-18,2019-01-01,133.714974
1,112797,5026631,F,N,Y,0,99000.0,Working,Secondary / secondary special,Separated,...,0,0,0,Medicine staff,1.0,0,1972-03-30,1997-06-05,2019-01-01,158.800558
2,162434,5036645,M,Y,N,0,202500.0,Working,Incomplete higher,Married,...,0,0,0,Drivers,2.0,0,1987-03-24,2015-02-22,2019-01-01,203.608487
3,144343,5033584,F,N,Y,0,292500.0,Working,Higher education,Married,...,0,0,0,,2.0,0,1973-03-15,2009-06-29,2019-01-01,113.204964
4,409695,5085755,F,Y,Y,1,112500.0,Commercial associate,Secondary / secondary special,Civil marriage,...,0,0,0,Core staff,3.0,0,1989-10-15,2019-07-03,2019-01-01,109.37626


In [5]:
df = df.sort_values(by=["id", "loan_date"])
df = df.reset_index(drop=True)
df["loan_date"] = pd.to_datetime(df.loan_date)
df

Unnamed: 0,loan_id,id,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,name_income_type,name_education_type,name_family_status,...,flag_work_phone,flag_phone,flag_email,occupation_type,cnt_fam_members,status,birthday,job_start_date,loan_date,loan_amount
0,1008,5008804,M,Y,Y,0,427500.0,Working,Higher education,Civil marriage,...,1,0,0,,2.0,0,1988-11-04,2009-04-11,2019-02-01,102.283361
1,1000,5008804,M,Y,Y,0,427500.0,Working,Higher education,Civil marriage,...,1,0,0,,2.0,0,1988-11-04,2009-04-11,2019-02-15,136.602049
2,1012,5008804,M,Y,Y,0,427500.0,Working,Higher education,Civil marriage,...,1,0,0,,2.0,0,1988-11-04,2009-04-11,2019-02-17,114.733694
3,1011,5008804,M,Y,Y,0,427500.0,Working,Higher education,Civil marriage,...,1,0,0,,2.0,0,1988-11-04,2009-04-11,2019-05-20,103.539050
4,1003,5008804,M,Y,Y,0,427500.0,Working,Higher education,Civil marriage,...,1,0,0,,2.0,0,1988-11-04,2009-04-11,2019-07-05,112.948147
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
777710,172506,5150487,M,Y,N,0,202500.0,Working,Secondary / secondary special,Married,...,0,0,0,Drivers,2.0,0,1968-08-08,2015-10-13,2020-09-10,117.792205
777711,172513,5150487,M,Y,N,0,202500.0,Working,Secondary / secondary special,Married,...,0,0,0,Drivers,2.0,0,1968-08-08,2015-10-13,2020-10-13,105.778335
777712,172512,5150487,M,Y,N,0,202500.0,Working,Secondary / secondary special,Married,...,0,0,0,Drivers,2.0,0,1968-08-08,2015-10-13,2020-10-16,112.319242
777713,172500,5150487,M,Y,N,0,202500.0,Working,Secondary / secondary special,Married,...,0,0,0,Drivers,2.0,0,1968-08-08,2015-10-13,2020-11-25,113.627617


#### Feature nb_previous_loans

In [6]:
df_grouped = df.groupby("id")
df["nb_previous_loans"] = df_grouped["loan_date"].rank(method="first") - 1

#### Feature avg_amount_loans_previous

#### Método propuesto:

#### En el cálculo de la columna avg_amount_loans_previous simplifico el cómputo sistituyendo la función avg_amount_loans_prev() por un rolling de pandas. El método rolling de pandas te permite calcular la media de forma móvil para una ventana de datos, donde cada ventana contienen justo los datos crediticios para cada id. El tiempo aproximado de cómputo original es del orden O(UxI) donde U es la cardinalidad de los únicos Id e I es la cardinalidad de los índices para cada Id, de allí que, el tiempo aproximado de cómputo para la nueva versión es sólo del orden O(U) considerando que para ambos casos al cómputo de la media es O(1). Este ejercico es muy interesante, acá se puede probar con un groupby que contenga el rolling y además paralelizando el ciclo for con joblib, pero debemos tener cuidado que algunas veces el costos computacional de estas operaciones pueden relentizar el cáculo de la media.

In [7]:
start = time.time()
avg_amount_loans_previous = pd.Series()
for user in df.id.unique():
    df_user = df.loc[df.id == user, :]
    avg_amount_loans_previous = avg_amount_loans_previous.append(df_user["loan_amount"].rolling(df_user.shape[0], min_periods=1).mean().shift(periods=1))
df["avg_amount_loans_previous"] = avg_amount_loans_previous
end = time.time()
print("--- %s seconds ---" % (end - start))

  


--- 152.88485550880432 seconds ---


#### Tiempo de ejecución del método propuesto: 152.88485550880432 segundos para todo el set de datos.

#### Método actual:

In [8]:
def avg_amount_loans_prev(df):
    avg = pd.Series(index=df.index)
    for i in df.index:
        df_aux = df.loc[df.loan_date < df.loan_date.loc[i], :]
        avg.at[i] = df_aux.loan_amount.mean()
    return avg

In [9]:
start = time.time()
avg_amount_loans_previous = pd.Series()
# the following cycle is the one that takes forever if we try to compute it for the whole dataset
for user in df.id.unique():
    df_user = df.loc[df.id == user, :]
    avg_amount_loans_previous = avg_amount_loans_previous.append(avg_amount_loans_prev(df_user))
end = time.time()
print("--- %s seconds ---" % (end - start))

  


--- 820.6095194816589 seconds ---


#### Tiempo de ejecución del método actual: 820.6095194816589 segundos para todo el set de datos. Aproximadamente 6 veces más lento que enfoque propuesto.

In [10]:
df["avg_amount_loans_previous"] = avg_amount_loans_previous

#### Feature age

In [8]:
from datetime import datetime, date

In [9]:
df['birthday'] = pd.to_datetime(df['birthday'], errors='coerce')

In [10]:
df['age'] = (pd.to_datetime('today').normalize() - df['birthday']).dt.days // 365

#### Feature years_on_the_job

In [11]:
df['job_start_date'] = pd.to_datetime(df['job_start_date'], errors='coerce')

In [12]:
df['years_on_the_job'] = (pd.to_datetime('today').normalize() - df['job_start_date']).dt.days // 365

#### Feature flag_own_car

In [13]:
df['flag_own_car'] = df.flag_own_car.apply(lambda x : 0 if x == 'N' else 1)

#### En esta celda obtengo el schema del dataframe de pandas para el riesgo crediticio.

In [14]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 777715 entries, 0 to 777714
Data columns (total 28 columns):
 #   Column                     Non-Null Count   Dtype         
---  ------                     --------------   -----         
 0   loan_id                    777715 non-null  int64         
 1   id                         777715 non-null  int64         
 2   code_gender                777715 non-null  object        
 3   flag_own_car               777715 non-null  int64         
 4   flag_own_realty            777715 non-null  object        
 5   cnt_children               777715 non-null  int64         
 6   amt_income_total           777715 non-null  float64       
 7   name_income_type           777715 non-null  object        
 8   name_education_type        777715 non-null  object        
 9   name_family_status         777715 non-null  object        
 10  name_housing_type          777715 non-null  object        
 11  days_birth                 777715 non-null  int64   

#### En esta parte de la notebook construiré un dataframe de spark, su esquema análogo a la de pandas y guardo este dataframe en un archivo parquet. Es probable que lo use más adelante. Cuando uso spark, prefiero usar archivos en formato parquet porque están diseñados para operaciones de Big Data con una alta compresión, operaciones de E/S pequeñas y almacenamiento columnar.

In [15]:
import sys
sys.path.append('/home')
from aimodels.universal_function import *

In [16]:
path = "wasbs://data@stg0ia0prod0001.blob.core.windows.net/data_spark/"

In [17]:
spark = spark_init(name = "Test")

#### Schema del acrivo parquet para todas las features

In [18]:
schema = StructType([
    StructField("loan_id", IntegerType()),
    StructField("id", IntegerType()),
    StructField("code_gender", StringType()),
    StructField("flag_own_car", StringType()),
    StructField("flag_own_realty", StringType()),
    StructField("cnt_children", IntegerType()),
    StructField("amt_income_total", FloatType()),
    StructField("name_income_type", StringType()),
    StructField("name_education_type", StringType()),
    StructField("name_family_status", StringType()),
    StructField("name_housing_type", StringType()),
    StructField("days_birth", IntegerType()),
    StructField("days_employed", IntegerType()),
    StructField("flag_mobil", IntegerType()),
    StructField("flag_work_phone", IntegerType()),
    StructField("flag_phone", IntegerType()),
    StructField("flag_email", IntegerType()),
    StructField("occupation_type", StringType()),
    StructField("cnt_fam_members", FloatType()),
    StructField("status", IntegerType()),
    StructField("birthday", StringType()),
    StructField("job_start_date", StringType()),
    StructField("loan_date", StringType()),
    StructField("loan_amount", FloatType()),
    StructField("nb_previous_loans", FloatType()),
    StructField("avg_amount_loans_previous", FloatType()),
    StructField("age", IntegerType()),
    StructField("years_on_the_job", FloatType())
])

In [21]:
sdf = spark.createDataFrame(df,schema)

In [22]:
sdf.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- code_gender: string (nullable = true)
 |-- flag_own_car: string (nullable = true)
 |-- flag_own_realty: string (nullable = true)
 |-- cnt_children: integer (nullable = true)
 |-- amt_income_total: float (nullable = true)
 |-- name_income_type: string (nullable = true)
 |-- name_education_type: string (nullable = true)
 |-- name_family_status: string (nullable = true)
 |-- name_housing_type: string (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- flag_mobil: integer (nullable = true)
 |-- flag_work_phone: integer (nullable = true)
 |-- flag_phone: integer (nullable = true)
 |-- flag_email: integer (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- cnt_fam_members: float (nullable = true)
 |-- status: integer (nullable = true)
 |-- birthday: string (nullable = true)
 |-- job_start_date: string (nullable = true)
 |-- loan_da

#### Archivo en formato parquet

In [23]:
sdf.write.format("parquet").mode("overwrite").save(path)

## Save dataset for model training

In [24]:
df = df[['id', 'age', 'years_on_the_job', 'nb_previous_loans', 'avg_amount_loans_previous', 'flag_own_car', 'status']]

In [27]:
df.to_csv('data/train_model.csv', index=False)

#### Acá una muestra:

In [28]:
df[df["id"]==5008804]

Unnamed: 0,id,age,years_on_the_job,nb_previous_loans,avg_amount_loans_previous,flag_own_car,status
0,5008804,33,12.0,0.0,,1,0
1,5008804,33,12.0,1.0,102.283361,1,0
2,5008804,33,12.0,2.0,119.442705,1,0
3,5008804,33,12.0,3.0,117.873035,1,0
4,5008804,33,12.0,4.0,114.289538,1,0
5,5008804,33,12.0,5.0,114.02126,1,0
6,5008804,33,12.0,6.0,116.291398,1,0
7,5008804,33,12.0,7.0,114.677626,1,0
8,5008804,33,12.0,8.0,122.594931,1,0
9,5008804,33,12.0,9.0,127.885915,1,0
