# 9) Perform any feature engineering and explain the reasons behind. E.g. new features from existing, re-grouping categorical variable categories, meaningful encoding, binning etc. 

In [1]:
import sys
sys.path.append("../src/")
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import ChiSqSelector, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import QuantileDiscretizer
# create sparksession
spark = SparkSession \
    .builder \
    .appName("Pysparkexample") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
from config import *

In [2]:
pd.options.display.max_columns=999
pd.options.display.max_rows=999

In [3]:
df = spark.read.parquet("../data/census_income.parquet")

In [4]:
df_test = spark.read.csv("../data/census-income.test", header=False, inferSchema=True)

In [5]:
df_test = df_test.drop("_c24")

In [6]:
df_descr = pd.read_csv("../data/column_name.csv")

In [7]:
columns = list(df_descr["Column Name"])

In [8]:
df_test = df_test.toDF(*columns)

In [9]:
df_test.count(), len(df_test.columns)

(99762, 41)

In [10]:
def count_missings(spark_df,sort=True):
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for c in spark_df.columns]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [11]:
count_missings(df_test)

Unnamed: 0,count
age,0
state_prev_res,0
det_hh_summ,0
mig_chg_msa,0
mig_chg_reg,0
mig_move_reg,0
mig_same,0
mig_prev_sunbelt,0
num_emp,0
fam_under_18,0


## Create the number of years of education variable, it's a numeric version of education where the numeric difference has a meaning (more year of education tipically brings to an higher income)

In [12]:
dict_enc_edu = pd.read_csv("../data/edu_encoding.csv")
dict_enc_edu["education"] = " " + dict_enc_edu["education"]
dict_enc_edu = dict_enc_edu.set_index("education").to_dict()["edu_year"]

In [13]:
dict_enc_edu

{' Children': 0.0,
 ' Less than 1st grade': 0.5,
 ' 1st 2nd 3rd or 4th grade': 2.5,
 ' 5th or 6th grade': 5.5,
 ' 7th and 8th grade': 7.5,
 ' 9th grade': 9.0,
 ' 10th grade': 10.0,
 ' 11th grade': 11.0,
 ' 12th grade no diploma': 12.0,
 ' High school graduate': 12.0,
 ' Some college but no degree': 14.0,
 ' Associates degree-academic program': 14.0,
 ' Associates degree-occup /vocational': 14.0,
 ' Bachelors degree(BA AB BS)': 16.0,
 ' Masters degree(MA MS MEng MEd MSW MBA)': 18.0,
 ' Prof school degree (MD DDS DVM LLB JD)': 20.0,
 ' Doctorate degree(PhD EdD)': 21.0}

In [14]:
dict_enc_edu.get("10th grade")

In [15]:
from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*dict_enc_edu.items())])

df = df.withColumn("edu_year", mapping_expr.getItem(col("education")))
df_test = df_test.withColumn("edu_year", mapping_expr.getItem(col("education")))

In [16]:
df.select("education", "edu_year").distinct().show()

+--------------------+--------+
|           education|edu_year|
+--------------------+--------+
| Prof school degr...|    20.0|
|          10th grade|    10.0|
| Doctorate degree...|    21.0|
| Some college but...|    14.0|
| Bachelors degree...|    16.0|
| Associates degre...|    14.0|
| Associates degre...|    14.0|
| Less than 1st grade|     0.5|
|    5th or 6th grade|     5.5|
| 1st 2nd 3rd or 4...|     2.5|
| Masters degree(M...|    18.0|
|   7th and 8th grade|     7.5|
|          11th grade|    11.0|
|           9th grade|     9.0|
|            Children|     0.0|
| High school grad...|    12.0|
| 12th grade no di...|    12.0|
+--------------------+--------+



In [17]:
df_test.select("education", "edu_year").distinct().show()

+--------------------+--------+
|           education|edu_year|
+--------------------+--------+
| Prof school degr...|    20.0|
|          10th grade|    10.0|
| Doctorate degree...|    21.0|
| Some college but...|    14.0|
| Bachelors degree...|    16.0|
| Associates degre...|    14.0|
| Associates degre...|    14.0|
| Less than 1st grade|     0.5|
|    5th or 6th grade|     5.5|
| 1st 2nd 3rd or 4...|     2.5|
| Masters degree(M...|    18.0|
|   7th and 8th grade|     7.5|
|          11th grade|    11.0|
|           9th grade|     9.0|
|            Children|     0.0|
| High school grad...|    12.0|
| 12th grade no di...|    12.0|
+--------------------+--------+



In [18]:
dict_unique = pd.read_parquet("../reports/unique_values.parquet").set_index("Var").to_dict()["Unique"]

## Binning of the continous variable with 100 bin in order to retain the information inside the variables (excluding age, the other ones have a lot of zeros)

In [19]:
for col in ['age', 'wage_per_hour', 'capital_gains', 'capital_losses', 'stock_dividends', 'weeks_worked']:
    qd = QuantileDiscretizer(numBuckets=100, inputCol=col,outputCol=col + "_q").fit(df)
    df = qd.transform(df)
    df = df.drop(col)
    df_test = qd.transform(df_test)
    df_test = df_test.drop(col)

In [20]:
string_cols = [col for col, dtype in df.dtypes if dtype=="string"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_ind").fit(df) for col in string_cols]

In [21]:
pipeline = Pipeline(stages=indexers)
p = pipeline.fit(df)
df = p.transform(df)
df_test = p.transform(df_test)

In [22]:
for col in string_cols:
    df = df.drop(col)
    df_test = df_test.drop(col)

In [23]:
# put features into a feature vector column
assembler = VectorAssembler(inputCols=[col for col in df.columns if col != "income_50k_ind"], 
                            outputCol="features") 

In [24]:
df = assembler.transform(df)
df_test = assembler.transform(df_test)

## Feature selection

In [25]:
selector = ChiSqSelector(featuresCol="features", selectorType="fdr",
                         outputCol="selectedFeatures", labelCol="income_50k_ind")
s = selector.fit(df)
df = s.transform(df)
df_test = s.transform(df_test)

## Standardization, this is a best practice, it usually brings to better performance using algorithms based on trees and it's necessary using regularization in parametric models (for example logistic regressions and NN)

In [26]:
ss = StandardScaler(inputCol="selectedFeatures", outputCol="stdSelectedFeatures")

In [27]:
ss = ss.fit(df)
df = ss.transform(df)
df_test = ss.transform(df_test)

## 10) Choose a ML algorithm to build an acceptable predictive model in PySpark and explain the reasons for the selection of such algorithm.

### I've chosen a random forest classifier, it's a good baseline, the training time is not too much (with few trees) and has good performance (it used to be the big winner in kaggle competition in the structured data category until xgboost cropped up, now it's either xgboost, lightgbm or catboost that are all part of the same category of Gradient Boosting Decision Tree algorithms)

In [28]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="income_50k_ind", featuresCol="stdSelectedFeatures", seed=101)

In [29]:
rf = rf.fit(df)

In [30]:
df = rf.transform(df)

In [31]:
df.limit(5).toPandas()

Unnamed: 0,det_ind_code,det_occ_code,num_emp,own_or_self,vet_benefits,year,edu_year,age_q,wage_per_hour_q,capital_gains_q,capital_losses_q,stock_dividends_q,weeks_worked_q,class_worker_ind,education_ind,hs_college_ind,marital_stat_ind,major_ind_code_ind,major_occ_code_ind,race_ind,hisp_origin_ind,sex_ind,union_member_ind,unemp_reason_ind,full_or_part_emp_ind,tax_filer_stat_ind,region_prev_res_ind,state_prev_res_ind,det_hh_fam_stat_ind,det_hh_summ_ind,mig_chg_msa_ind,mig_chg_reg_ind,mig_move_reg_ind,mig_same_ind,mig_prev_sunbelt_ind,fam_under_18_ind,country_father_ind,country_mother_ind,country_self_ind,citizenship_ind,vet_question_ind,income_50k_ind,features,selectedFeatures,stdSelectedFeatures,rawPrediction,probability,prediction
0,37.0,38.0,2.0,0.0,2.0,95.0,10.0,58.0,1.0,1.0,1.0,2.0,17.0,1.0,5.0,0.0,2.0,7.0,9.0,0.0,0.0,1.0,0.0,0.0,1.0,2.0,0.0,0.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,"(37.0, 38.0, 2.0, 0.0, 2.0, 95.0, 10.0, 58.0, ...","(37.0, 38.0, 2.0, 0.0, 2.0, 95.0, 10.0, 58.0, ...","(2.047918095562956, 2.628992936581328, 0.84562...","[18.141034505440196, 1.858965494559804]","[0.9070517252720098, 0.0929482747279902]",0.0
1,0.0,0.0,0.0,0.0,0.0,94.0,0.0,15.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 94.0, 0.0, 15.0, 1.0...","(0.0, 0.0, 0.0, 0.0, 0.0, 94.0, 0.0, 15.0, 1.0...","(0.0, 0.0, 0.0, 0.0, 0.0, 187.99956939709733, ...","[19.862500807975604, 0.1374991920243942]","[0.9931250403987804, 0.006874959601219711]",0.0
2,0.0,0.0,0.0,0.0,0.0,94.0,0.0,10.0,1.0,1.0,1.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 94.0, 0.0, 10.0, 1.0...","(0.0, 0.0, 0.0, 0.0, 0.0, 94.0, 0.0, 10.0, 1.0...","(0.0, 0.0, 0.0, 0.0, 0.0, 187.99956939709733, ...","[19.862500807975604, 0.1374991920243942]","[0.9931250403987804, 0.006874959601219711]",0.0
3,0.0,0.0,0.0,0.0,2.0,94.0,12.0,64.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,2.0,0.0,0.0,3.0,0.0,1.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"(0.0, 0.0, 0.0, 0.0, 2.0, 94.0, 12.0, 64.0, 1....","(0.0, 0.0, 0.0, 0.0, 2.0, 94.0, 12.0, 64.0, 1....","(0.0, 0.0, 0.0, 0.0, 2.3488694944822215, 187.9...","[19.61517102409822, 0.38482897590177906]","[0.9807585512049111, 0.019241448795088954]",0.0
4,32.0,42.0,5.0,2.0,2.0,94.0,12.0,22.0,1.0,1.0,1.0,1.0,15.0,1.0,0.0,0.0,1.0,13.0,8.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,2.0,32.0,2.0,2.0,8.0,6.0,6.0,2.0,2.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,"[32.0, 42.0, 5.0, 2.0, 2.0, 94.0, 12.0, 22.0, ...","[32.0, 42.0, 5.0, 2.0, 2.0, 94.0, 12.0, 22.0, ...","[1.7711724069733672, 2.905729035168836, 2.1140...","[19.024104725219367, 0.9758952747806311]","[0.9512052362609685, 0.04879476373903156]",0.0


In [32]:
df_r = df.select("probability", "income_50k_ind").toPandas()

In [33]:
df_r["probability"] = df_r["probability"].map(lambda x: x[1])

# 11) Print the model performance measurements such as ROC, AUC, Confusion Matrix for both train and test data.

In [34]:
from sklearn.metrics import roc_auc_score

In [35]:
roc_auc_score(df_r.income_50k_ind, df_r["probability"])

0.9224782940047836

In [36]:
df_test = rf.transform(df_test)

In [37]:
df_test_r = df_test.select("probability", "income_50k_ind").toPandas()

In [38]:
df_test_r["probability"] = df_test_r["probability"].map(lambda x: x[1])

In [39]:
roc_auc_score(df_test_r.income_50k_ind, df_test_r["probability"])

0.9234119623121495

In [40]:
rf.featureImportances

SparseVector(41, {0: 0.0156, 1: 0.0909, 2: 0.0159, 6: 0.2234, 7: 0.0318, 8: 0.0004, 9: 0.1892, 10: 0.0126, 11: 0.0933, 12: 0.033, 13: 0.027, 14: 0.0311, 16: 0.0012, 17: 0.0548, 18: 0.0725, 20: 0.0026, 21: 0.0376, 22: 0.0, 23: 0.0001, 24: 0.0001, 25: 0.0063, 27: 0.0001, 28: 0.0257, 29: 0.0343, 30: 0.0, 32: 0.0, 34: 0.0001, 36: 0.0001, 37: 0.0001, 38: 0.0001, 39: 0.0, 40: 0.0})

In [41]:
df.limit(1).toPandas()

Unnamed: 0,det_ind_code,det_occ_code,num_emp,own_or_self,vet_benefits,year,edu_year,age_q,wage_per_hour_q,capital_gains_q,capital_losses_q,stock_dividends_q,weeks_worked_q,class_worker_ind,education_ind,hs_college_ind,marital_stat_ind,major_ind_code_ind,major_occ_code_ind,race_ind,hisp_origin_ind,sex_ind,union_member_ind,unemp_reason_ind,full_or_part_emp_ind,tax_filer_stat_ind,region_prev_res_ind,state_prev_res_ind,det_hh_fam_stat_ind,det_hh_summ_ind,mig_chg_msa_ind,mig_chg_reg_ind,mig_move_reg_ind,mig_same_ind,mig_prev_sunbelt_ind,fam_under_18_ind,country_father_ind,country_mother_ind,country_self_ind,citizenship_ind,vet_question_ind,income_50k_ind,features,selectedFeatures,stdSelectedFeatures,rawPrediction,probability,prediction
0,37.0,38.0,2.0,0.0,2.0,95.0,10.0,58.0,1.0,1.0,1.0,2.0,17.0,1.0,5.0,0.0,2.0,7.0,9.0,0.0,0.0,1.0,0.0,0.0,1.0,2.0,0.0,0.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0.0,0.0,0.0,"(37.0, 38.0, 2.0, 0.0, 2.0, 95.0, 10.0, 58.0, ...","(37.0, 38.0, 2.0, 0.0, 2.0, 95.0, 10.0, 58.0, ...","(2.047918095562956, 2.628992936581328, 0.84562...","[18.141034505440196, 1.858965494559804]","[0.9070517252720098, 0.0929482747279902]",0.0


In [42]:
df.groupBy("det_occ_code").agg(F.mean("income_50k_ind")).orderBy("avg(income_50k_ind)", ascending=False).show()

+------------+-------------------+
|det_occ_code|avg(income_50k_ind)|
+------------+-------------------+
|         7.0| 0.6839945280437757|
|        11.0| 0.6593406593406593|
|         4.0|0.49780058651026393|
|         5.0| 0.3707602339181287|
|         6.0| 0.3492063492063492|
|         2.0|0.32217907720420286|
|        18.0| 0.2742382271468144|
|        17.0| 0.2715979672501412|
|         9.0| 0.2696476964769648|
|         1.0|0.24448529411764705|
|        46.0| 0.2222222222222222|
|        15.0|0.21226993865030674|
|         3.0|                0.2|
|        16.0| 0.1753265602322206|
|         8.0| 0.1724779172477917|
|        28.0| 0.1559301625526791|
|        21.0|0.15196998123827393|
|        12.0|0.13682634730538923|
|        14.0| 0.1298283261802575|
|        45.0|0.11627906976744186|
+------------+-------------------+
only showing top 20 rows



In [43]:
df.groupBy("age_q").agg(F.mean("income_50k_ind")).orderBy("avg(income_50k_ind)", ascending=False).show()

+-----+-------------------+
|age_q|avg(income_50k_ind)|
+-----+-------------------+
| 48.0|0.17101967799642218|
| 47.0|0.16974431818181818|
| 53.0|0.16973995271867612|
| 49.0|0.16929460580912864|
| 51.0|0.16576332429990967|
| 52.0| 0.1598194130925508|
| 54.0|0.15390946502057612|
| 46.0|0.15384615384615385|
| 55.0|0.15375722543352602|
| 50.0|0.15172735760971054|
| 44.0|0.14780200761509174|
| 45.0|0.14429289303661164|
| 43.0|0.13989983305509182|
| 42.0| 0.1333758774728781|
| 58.0|           0.128125|
| 40.0|0.12786259541984732|
| 57.0|0.12700369913686807|
| 56.0|0.12573099415204678|
| 41.0| 0.1252408477842004|
| 39.0|0.11687519072322246|
+-----+-------------------+
only showing top 20 rows



In [44]:
df.groupBy("weeks_worked_q").agg(F.mean("income_50k_ind")).orderBy("avg(income_50k_ind)", ascending=False).show()

+--------------+--------------------+
|weeks_worked_q| avg(income_50k_ind)|
+--------------+--------------------+
|          17.0| 0.14800750917313765|
|          16.0| 0.10790906179955172|
|          15.0| 0.09114470842332613|
|          13.0| 0.06345634563456345|
|          12.0| 0.04941176470588235|
|          14.0| 0.04316546762589928|
|          11.0| 0.04202440126525079|
|          10.0|0.037407797681770286|
|           9.0| 0.03349514563106796|
|           8.0| 0.03199658703071672|
|           7.0|0.017815646785437646|
|           6.0|0.017633410672853827|
|           5.0|0.016436903499469777|
|           3.0|0.013102480112306972|
|           4.0| 0.01054481546572935|
|           2.0|0.008583690987124463|
|           1.0| 0.00633977928936931|
+--------------+--------------------+



In [45]:
df.groupBy("class_worker_ind").agg(F.mean("income_50k_ind")).orderBy("avg(income_50k_ind)", ascending=False).show()

+----------------+--------------------+
|class_worker_ind| avg(income_50k_ind)|
+----------------+--------------------+
|             5.0|  0.3473200612557427|
|             6.0|  0.2041025641025641|
|             2.0|  0.1290704558910598|
|             4.0| 0.11473858528507215|
|             3.0| 0.10881294964028777|
|             1.0| 0.10165491197867496|
|             0.0|0.009017906129981546|
|             8.0|0.006060606060606061|
|             7.0|0.004555808656036446|
+----------------+--------------------+



In [46]:
df.groupBy("capital_gains_q").agg(F.mean("income_50k_ind")).orderBy("avg(income_50k_ind)", ascending=False).show()

+---------------+--------------------+
|capital_gains_q| avg(income_50k_ind)|
+---------------+--------------------+
|            4.0|  0.7266320474777448|
|            3.0|  0.2637144745538665|
|            1.0| 0.05159364507835012|
|            2.0|0.025310173697270472|
+---------------+--------------------+



In [47]:
df.groupBy("edu_year").agg(F.mean("income_50k_ind")).orderBy("avg(income_50k_ind)", ascending=False).show()

+--------+--------------------+
|edu_year| avg(income_50k_ind)|
+--------+--------------------+
|    20.0|  0.5404350250976018|
|    21.0|  0.5201900237529691|
|    18.0|  0.3115731539519951|
|    16.0| 0.19708029197080293|
|    14.0| 0.06957726219333529|
|    12.0| 0.03785645024043694|
|    11.0|0.010180337405468295|
|     7.5|0.008992131884600974|
|    10.0|0.008204313881169776|
|     2.5|0.007226236798221234|
|     5.5|0.006713457430576747|
|     9.0|0.006099518459069021|
|     0.5|0.001221001221001221|
|     0.0|                 0.0|
+--------+--------------------+



In [48]:
from sklearn.metrics import classification_report

In [49]:
df_r = df.select("probability", "income_50k_ind", "prediction").toPandas()
df_r["probability"] = df_r["probability"].map(lambda x: x[1])
df_test_r = df_test.select("probability", "income_50k_ind", "prediction").toPandas()
df_test_r["probability"] = df_test_r["probability"].map(lambda x: x[1])

In [52]:
print(classification_report(df_r.income_50k_ind, df_r.prediction))

              precision    recall  f1-score   support

         0.0       0.94      1.00      0.97    187141
         1.0       0.90      0.09      0.17     12382

    accuracy                           0.94    199523
   macro avg       0.92      0.55      0.57    199523
weighted avg       0.94      0.94      0.92    199523



In [53]:
print(classification_report(df_test_r.income_50k_ind, df_test_r.prediction))

              precision    recall  f1-score   support

         0.0       0.94      1.00      0.97     93576
         1.0       0.91      0.09      0.17      6186

    accuracy                           0.94     99762
   macro avg       0.93      0.55      0.57     99762
weighted avg       0.94      0.94      0.92     99762



### The default threshold 0.5 is not good enough, we can try different thresholds, there's a trade-off between precision and recall based on the threshold, we should choose the best one based on the business use of the algorithm understanding what's more important

In [57]:
print(classification_report(df_r.income_50k_ind, df_r.probability.map(lambda x: 1 if x > 0.2 else 0)))

              precision    recall  f1-score   support

         0.0       0.97      0.97      0.97    187141
         1.0       0.51      0.54      0.53     12382

    accuracy                           0.94    199523
   macro avg       0.74      0.75      0.75    199523
weighted avg       0.94      0.94      0.94    199523



In [58]:
print(classification_report(df_test_r.income_50k_ind, df_test_r.probability.map(lambda x: 1 if x > 0.2 else 0)))

              precision    recall  f1-score   support

         0.0       0.97      0.97      0.97     93576
         1.0       0.52      0.55      0.53      6186

    accuracy                           0.94     99762
   macro avg       0.74      0.76      0.75     99762
weighted avg       0.94      0.94      0.94     99762

