In [0]:
import pyspark.pandas as ps

In [0]:
from pyspark.sql.functions import col,regexp_replace,to_date

In [0]:
loan_dataset_backup =  spark.read.table('crm_ml.loan_dataset')

In [0]:
loan_data = loan_dataset_backup.select('*')

In [0]:
# %python
display(loan_data.limit(5))

In [0]:
loan_data.printSchema()

In [0]:
loan_dataset_backup.select(col('emp_length')).distinct().display()

In [0]:
from pyspark.sql.functions import col, regexp_replace

loan_data = loan_data.withColumn('emp_length', regexp_replace(col('emp_length'), '\\+ years', '')) \
                     .withColumn('emp_length', regexp_replace(col('emp_length'), '< 1 year', '0')) \
                     .withColumn('emp_length', regexp_replace(col('emp_length'), 'n/a', '0')) \
                     .withColumn('emp_length', regexp_replace(col('emp_length'), 'years', '')) \
                     .withColumn('emp_length', regexp_replace(col('emp_length'), ' year', '')) \
                     .withColumn('emp_length', regexp_replace(col('emp_length'), ' reactors"', '0'))

In [0]:
loan_data.select(col('emp_length')).distinct().display()

In [0]:
loan_data = loan_data.withColumn('emp_length', col('emp_length').cast("float"))

In [0]:
loan_data.printSchema()

In [0]:
# Before processing
loan_data.select(col('term')).distinct().display()

In [0]:
loan_data = loan_data.withColumn('term',regexp_replace(col('term'),' months',''))

In [0]:
# See the removed months 
loan_data.select(col('term')).distinct().display()

In [0]:
# Check the data type 
loan_data =  loan_data.withColumn('term',col('term').cast('float'))

In [0]:
loan_data = loan_data.withColumn('earliest_cr_line_date', to_date(col('earliest_cr_line'), 'MMM-yyyy'))

In [0]:
from pyspark.sql.functions import col, months_between, lit

reference_date = lit('2018-12-01')
loan_data = loan_data.withColumn('months_since_earliest_cr_line', months_between(reference_date, col('earliest_cr_line_date')))
display(loan_data.select(col('months_since_earliest_cr_line')))

In [0]:
loan_data.select(col('months_since_earliest_cr_line')).describe().show()

In [0]:
loan_data.printSchema()

In [0]:
# Peform similar process for issue data variable 
loan_data.select(col('issue_d')).distinct().display()
# How to do processing of this varibale? TBD 

In [0]:
# Remove a record that is not matching the required data type 
loan_data = loan_data.withColumn('issue_d',regexp_replace(col('issue_d'),'Source Verified',''))

In [0]:
loan_data.select(col('issue_d')).distinct().display()
# Now turn the string column into date

In [0]:
# Change that column to date 
loan_data = loan_data.withColumn('issue_d_date', to_date(col('issue_d'), 'MMM-yyyy'))

In [0]:
# Calculate the difference between dec 1st 2017 to this date in months 
reference_date = lit('2018-12-01')
loan_data = loan_data.withColumn('months_since_issue_date', months_between(reference_date, col('issue_d_date')))
display(loan_data.select(col('months_since_issue_date')))

In [0]:
# We can see some negative values here , Which is wierd and not the case 
loan_data.select(col('months_since_issue_date')).describe().show()

In [0]:
# Filter out the rows that had negative values 
loan_data.filter(col('months_since_issue_date')<0).select('issue_d','issue_d_date','months_since_issue_date').display()

- To create dummy variables from categorical variables 
- For Two categories you need one column to represent using binarioes 0,1 (Ex: Male , Female)
- Like wise for N categories you need n-1 dummy variables 

- To create dummy variables pandas has an inbuilt function called get_dummies 

In [0]:
# Convert Spark DataFrame to PySpark Pandas DataFrame
loan_data_ps = loan_data.to_pandas_on_spark()

In [0]:
loan_data_dummies = [ps.get_dummies(loan_data_ps['grade'],prefix='grade',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['sub_grade'],prefix='sub_grade',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['home_ownership'],prefix='home_ownership',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['verification_status'],prefix='verification_status',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['loan_status'],prefix='loan_status',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['purpose'],prefix='purpose',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['addr_state'],prefix='addr_state',prefix_sep=':'),
                     ps.get_dummies(loan_data_ps['initial_list_status'],prefix='initial_list_status',prefix_sep=':')]

In [0]:
# Now you have a dataframe with columns grade:A, grade:B, etc.
loan_data_dummies_df = ps.concat(loan_data_dummies, axis=1)
display(loan_data_dummies_df)

In [0]:
# Display the memory consumed by loan_data_dummie vs oan_data_dummies_ohe TODO

In [0]:
%python
# Cell 33
# Skip this cell every time the notebook runs
if True:
    pass
else:
    # Doing the same as above but with using OHE and it consumes less space 
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.sql.functions import col
    from pyspark.ml import Pipeline

    categorical_columns = ['grade', 'sub_grade', 'home_ownership', 'verification_status', 'loan_status', 'purpose', 'addr_state', 'initial_list_status']
    indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in categorical_columns]
    encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec") for column in categorical_columns]

    pipeline = Pipeline(stages=indexers + encoders)
    loan_data_dummies_ohe = pipeline.fit(loan_data).transform(loan_data)

    loan_data_dummies_ohe.display()

Check for missing values 

In [0]:
loan_data_dummies_df.loc[:,~loan_data_dummies_df.columns.duplicated()].isnull().count()

In [0]:
loan_data_dummies_df.info()