# Transcript from pandas to Spark

The Notebook transcripts the methods applied in pandas for the feature
engineering generation into a Spark Dataframe and methods applied to result
into the same, or similar outcome.

At the end of the notebook the original df result will be compared with the
spark functions outcome to compare and accept if the resulting functions are
sufficient to accept them as a valid result.

### Import dependencies and read data

In [304]:
import pandas as pd

from pyspark.sql import functions as f
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import monotonically_increasing_id, current_date, \
    when, lit, col, months_between, floor

spark = SparkSession \
    .builder \
    .appName("Credit Features Processing") \
    .getOrCreate()

In [305]:
df = pd.read_csv('../data/raw_data/dataset_credit_risk.csv')

In [306]:
df_func = spark.read.csv("../data/raw_data/dataset_credit_risk.csv",
                         header=True,
                         sep=',',
                         inferSchema=False)

### Sort and cast

In [307]:
df = df.sort_values(by=["id", "loan_date"])
df = df.reset_index(drop=True)
df["loan_date"] = pd.to_datetime(df.loan_date)

In [308]:
# spark
def order_by_id_and_loan_date(data_frame):
    data_frame = data_frame.orderBy("id", "loan_date")
    # Index only to compare with pandas
    data_frame = data_frame.withColumn("index",
                                       monotonically_increasing_id())
    return data_frame

def cast_dates_to_datetime(data_frame):
    # loan_date
    data_frame = data_frame.withColumn('loan_date',
                                       data_frame['loan_date'].cast('date'))
    data_frame = data_frame.withColumn('loan_date',
                                       when(col("loan_date")
                                            <= current_date()+1,
                                            col("loan_date")))
    #birthday
    data_frame = data_frame.withColumn('birthday',
                                       data_frame['birthday'].cast('date'))
    data_frame = data_frame.withColumn('birthday',
                                       when(col("birthday")
                                            <= current_date()+1,
                                            col("birthday")))
    # job_start_date
    data_frame = data_frame.withColumn('job_start_date',
                                       data_frame['job_start_date'].cast('date'))
    data_frame = data_frame.withColumn('job_start_date',
                                       when(col("job_start_date")
                                            <= current_date()+1,
                                            col("job_start_date")))
    return data_frame

df_func = order_by_id_and_loan_date(df_func)
df_func = cast_dates_to_datetime(df_func)

### Feature nb_previous_loans

In [309]:
df_grouped = df.groupby("id")
df["nb_previous_loans"] = df_grouped["loan_date"].rank(method="first") - 1
df["nb_previous_loans"] = df["nb_previous_loans"].astype('int')

In [310]:
def generate_nb_previous_loans(data_frame):
    window = Window.partitionBy("id").orderBy("loan_date")
    data_frame = data_frame.withColumn("nb_previous_loans",
                                       rank().over(window) -1)
    return data_frame
df_func = generate_nb_previous_loans(df_func)

### Feature avg_amount_loans_previous

In [311]:
def avg_amount_loans_prev(df):
    avg = pd.Series(index=df.index)
    for i in df.index:
        df_aux = df.loc[df.loan_date < df.loan_date.loc[i], :]
        avg.at[i] = df_aux.loan_amount.mean()
    return avg

In [312]:
%%time  
avg_amount_loans_previous = pd.Series()
# the following cycle is the one that takes forever if we try to compute it for the whole dataset
for user in df.id.unique():
    df_user = df.loc[df.id == user, :]
    avg_amount_loans_previous = avg_amount_loans_previous.append(avg_amount_loans_prev(df_user))

df["avg_amount_loans_previous"] = avg_amount_loans_previous


  """Entry point for launching an IPython kernel.
  


CPU times: user 10min 2s, sys: 21.4 s, total: 10min 23s
Wall time: 10min 27s


In [313]:
%%time
def generate_avg_amount_loans_previous(data_frame):
    window = Window.partitionBy("id"). \
        rowsBetween(Window.unboundedPreceding, -1)

    data_frame = data_frame.sort(f.asc('loan_date')) \
        .withColumn("avg_amount_loans_previous",
                    f.mean('loan_amount').over(window))
    return data_frame

df_func = generate_avg_amount_loans_previous(df_func)

CPU times: user 5.11 ms, sys: 5.49 ms, total: 10.6 ms
Wall time: 152 ms


#### Feature age

In [314]:
df['birthday'] = pd.to_datetime(df['birthday'], errors='coerce')
df['age'] = (pd.to_datetime('today').normalize() -
             df['birthday']).dt.days // 365


In [315]:
def compute_age_features(data_frame):
    data_frame = data_frame.withColumn("age",
                          floor(months_between(current_date(),
                                               col("birthday"))/lit(12))
                          .cast('integer'))
    return data_frame

df_func = compute_age_features(df_func)

### Feature years_on_the_job

In [316]:
df['job_start_date'] = pd.to_datetime(df['job_start_date'], errors='coerce')
df['years_on_the_job'] = (pd.to_datetime('today').normalize() -
                          df['job_start_date']).dt.days // 365

In [317]:
def compute_years_on_the_job(data_frame):
    data_frame = data_frame.withColumn("years_on_the_job",
                          floor(months_between(current_date(),
                                               col("job_start_date"))/lit(12))
                          .cast('integer'))
    return data_frame

df_func = compute_years_on_the_job(df_func)

### Feature flag_own_car

In [318]:
df['flag_own_car'] = df.flag_own_car.apply(lambda x : 0 if x == 'N' else 1)

In [319]:
def add_feature_own_car_flag(data_frame):
    data_frame = data_frame.withColumn('flag_own_car',
                                       when(col("flag_own_car") == 'N',
                                            lit(0)).otherwise(1))
    return data_frame

df_func = add_feature_own_car_flag(df_func)

### Select subset of columns

In [320]:
df = df[['id',
         'age',
         'years_on_the_job',
         'nb_previous_loans',
         'avg_amount_loans_previous',
         'flag_own_car', 'status']]

In [321]:
def credit_fe_columns_subset(data_frame):
    data_frame = data_frame.select('id',
                             'age',
                             'years_on_the_job',
                             'nb_previous_loans',
                             'avg_amount_loans_previous',
                             'flag_own_car',
                             'status')
    return data_frame

df_func = credit_fe_columns_subset(df_func)

## Compare Results

In [322]:
df_copy = df.copy()
# df = df_copy.copy()

#### Order both dataset to do a 1:1 comparinson

In [323]:
df_func = df_func.orderBy('index') # Index only to compare with pd.Dataframe
df = df.sort_index() # pd.Dataframe

In [324]:
df_test = df_func.toPandas() # Transform PySpark dataframe into pd.Dataframe

                                                                                

In [325]:
df_test.info(verbose=True) # PySpark convert to pandas Resulting schema

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 777715 entries, 0 to 777714
Data columns (total 7 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         777715 non-null  object 
 1   age                        777715 non-null  int32  
 2   years_on_the_job           649743 non-null  float64
 3   nb_previous_loans          777715 non-null  int32  
 4   avg_amount_loans_previous  741258 non-null  float64
 5   flag_own_car               777715 non-null  int32  
 6   status                     777715 non-null  object 
dtypes: float64(2), int32(3), object(2)
memory usage: 32.6+ MB


In [326]:
df.info(verbose=True) # pd.Dataframe resulting schema

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 777715 entries, 0 to 777714
Data columns (total 7 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         777715 non-null  int64  
 1   age                        777715 non-null  int64  
 2   years_on_the_job           649743 non-null  float64
 3   nb_previous_loans          777715 non-null  int64  
 4   avg_amount_loans_previous  740750 non-null  float64
 5   flag_own_car               777715 non-null  int64  
 6   status                     777715 non-null  int64  
dtypes: float64(2), int64(5)
memory usage: 41.5 MB


Due to the differences presented in the schemas, the pyspark dataframe will be
casted into the schema of the pd.Dataframe in order to reproduce the results
and make them comparable 1:1. This is decided after seeing a comparinson
with no nulls in both outcomes.

In [327]:
df_test["id"] = df_test["id"].astype(int)
df_test["age"] = df_test["age"].astype(int)
df_test["flag_own_car"] = df_test["flag_own_car"].astype(int)
df_test["nb_previous_loans"] = df_test["nb_previous_loans"].astype(int)
df_test["status"] = df_test["status"].astype(int)


Cast float64 fields and round to same data type and same number of decimals
to avoid resolution misscomparison.

In [328]:
# Round years to zero decimals
df_test["years_on_the_job"] = \
    df_test["years_on_the_job"].astype('float64').round(0)
df["years_on_the_job"] = df["years_on_the_job"].round(0)

# Round float fields to 2 decimals
df_test["avg_amount_loans_previous"] = \
    df_test["avg_amount_loans_previous"].astype('float64').round(2)
df["avg_amount_loans_previous"] = df["avg_amount_loans_previous"].round(2)

df_test.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 777715 entries, 0 to 777714
Data columns (total 7 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   id                         777715 non-null  int64  
 1   age                        777715 non-null  int64  
 2   years_on_the_job           649743 non-null  float64
 3   nb_previous_loans          777715 non-null  int64  
 4   avg_amount_loans_previous  741258 non-null  float64
 5   flag_own_car               777715 non-null  int64  
 6   status                     777715 non-null  int64  
dtypes: float64(2), int64(5)
memory usage: 41.5 MB


Assertion of a 1:1 comparison

In [329]:
print("The outcomes are different than expected: ", df_test.equals(df))
print("Are the shapes of both datasets the same?:", (df.shape==df_test.shape))

The outcomes are different than expected:  False
Are the shapes of both datasets the same?: True


Because of side effects of the python floor division and traditional round
function in the years calculated in pandas the fields with those values will
be dropped after seing how many records have differences between both
dataframes and adress those fields are the discrepancy causes, just to follow
the asumption that the round vs floor libraries are the cause.

Disclaimer: The floor function was denoted bevause the age of a person has to
be quantizied after the birthday and not after 6 months or any other arbitrary
value for the months.

In [330]:
import numpy as np
def test_differences(df1, df2):
    different_records_df=pd.concat([df1,df2]).drop_duplicates(keep=False)
    print("Head of df of not matching records")
    print(different_records_df.sort_index().head(100))

    # Both different records are kept for the same row
    different_records = different_records_df.shape[0]/2
    total_expected_records = df.shape[0]
    difference_percentage = \
        np.round(different_records/total_expected_records*100,2)
    print(f"Difference percentage between DF: {difference_percentage}%")

In [331]:
test_differences(df, df_test)

Head of df of not matching records
           id  age  years_on_the_job  nb_previous_loans  \
125   5008811   52               8.0                 27   
125   5008811   52               8.0                 26   
130   5008811   52               8.0                 32   
130   5008811   52               8.0                 31   
212   5008821   48               3.0                  1   
...       ...  ...               ...                ...   
2632  5008980   61               NaN                  9   
2662  5008980   61               NaN                 39   
2662  5008980   61               NaN                 38   
2723  5008984   57               6.0                 26   
2723  5008984   57               6.0                 25   

      avg_amount_loans_previous  flag_own_car  status  
125                      131.11             0       0  
125                      130.47             0       0  
130                      127.70             0       0  
130                      128.85 

Following the hypothesis of the Age difference, we will drop this column and
repeat the previous test of finding different records and calculate the
differences.


In [332]:
df_no_head = df.drop("age", axis=1)
df_test_no_head = df_test.drop("age", axis=1)

In [333]:
test_differences(df_no_head , df_test_no_head )


Head of df of not matching records
           id  years_on_the_job  nb_previous_loans  avg_amount_loans_previous  \
125   5008811               8.0                 27                     131.11   
125   5008811               8.0                 26                     130.47   
130   5008811               8.0                 32                     127.70   
130   5008811               8.0                 31                     128.85   
212   5008821               3.0                  1                        NaN   
...       ...               ...                ...                        ...   
2632  5008980               NaN                  9                     128.43   
2662  5008980               NaN                 39                     123.78   
2662  5008980               NaN                 38                     123.17   
2723  5008984               6.0                 26                     136.27   
2723  5008984               6.0                 25                     135

The difference percentage reduced dramatically, proposing the years computing
is the causing of the differences potentially. We will remove the years on
job field and repeat the test.

In [334]:
df_no_years_job= df_no_head.drop("years_on_the_job", axis=1)
df_test_no_years_job= df_test_no_head.drop("years_on_the_job", axis=1)

In [335]:
test_differences(df_no_years_job , df_test_no_years_job)

Head of df of not matching records
           id  nb_previous_loans  avg_amount_loans_previous  flag_own_car  \
125   5008811                 27                     131.11             0   
125   5008811                 26                     130.47             0   
130   5008811                 31                     128.85             0   
130   5008811                 32                     127.70             0   
212   5008821                  0                     100.02             1   
...       ...                ...                        ...           ...   
2632  5008980                  9                     128.43             0   
2662  5008980                 39                     123.78             0   
2662  5008980                 38                     123.17             0   
2723  5008984                 26                     136.27             1   
2723  5008984                 25                     135.74             1   

      status  
125        0  
125       

In [None]:
The final differences seem to aim to a slight difference in 2% of the records
while calculating the nb_previous_loans. As regards the percentage of
differences between both dataframes, propagates a sligh error to the average
loan amount.

Even though further analysis could be done to explore if the nulls deal of the
functions are the root cause of the difference, or a different cause, the
percentage is too low and due time constraints it will be taken as an accepted
implementation in Spark to replicate the Feature Engineering process.