### Predict whether income exceeds $50K/yr based on census data.

#### Tech stack:

1. Data Analysis : PySpark

In [3]:
#pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=260dc37ca3243a4460d4aed815561c0fc4446e731976c23f5124969666fb84c3
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [81]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

import pyspark
from pyspark.sql import SparkSession 
from pyspark import SparkFiles
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

In [23]:
from google.colab import files

uploads = files.upload()

In [59]:

sc =SparkSession.builder.getOrCreate()
sqlcontext = SQLContext(sc)




In [60]:
# loading the dataset
df = sqlcontext.read.csv(SparkFiles.get('/content/data.csv'),header=True,inferSchema=True)

In [61]:
# checking dataframe schema

df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [62]:
df.show(5)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  x|age|workclass|fnlwgt|   education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  1| 25|  Private|226802|        11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|  Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| United-States| <=50K|
|  3| 28|Local-gov|336951|  Assoc-acdm|             12|Married-civ-spouse|  Protective-ser

In [63]:
# converting the continious variable to right format
def convert_col(df, names, type):

    for name in names:
        df = df.withColumn(name, df[name].cast(type))
    return df

In [64]:
# list of all the continuíous features

features_continious = ['age', 'fnlwgt','educational-num','capital-gain','capital-loss','hours-per-week']

# convert the datatype

df_cont = convert_col(df, features_continious, FloatType())

In [65]:
df_cont.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [66]:
# education col groupby to check different education level occurance
df.groupBy("education").count().sort("count",ascending=True).show()


+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [67]:

df.drop('education-num','x').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

In [68]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



In [69]:
# creating a new feature: age2

age_square = df.select(col("age")**2)

In [70]:
#Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

In [71]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'marital-status',
           'occupation', 'relationship', 'race', 'gender', 'capital-gain', 'capital-loss',
           'hours-per-week', 'native-country']
df = df.select(COLUMNS)
df.first() #gets the first row from the dataframe

Row(age=25, age_square=625.0, workclass='Private', fnlwgt=226802, education='11th', marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States')

In [73]:

# checking the counts with the countries people associate to.
df.groupby('native-country').agg({'native-country': 'count'}).sort(asc("count(native-country)")).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|          Yugoslavia|                   23|
|Outlying-US(Guam-...|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [88]:
# The feature native_country has only one household coming from Netherland. You exclude it.
df_remove = df.filter(df['native-country'] !='Holand-Netherlands')
# When a group within a feature has only one observation, it brings no information to the model. 


df_remove.limit(3).show()


+---+----------+---------+------+----------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+
|age|age_square|workclass|fnlwgt| education|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|
+---+----------+---------+------+----------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+
| 25|     625.0|  Private|226802|      11th|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States|
| 38|    1444.0|  Private| 89814|   HS-grad|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| United-States|
| 28|     784.0|Local-gov|336951|Assoc-acdm|Married-civ-spouse|  Protective-serv|     Husband|White|  Male|           0|           0|            40| United-States|
+---+----------+

In [75]:
# building a pipeline to maintain the structure of the data as it will be very convineant to the data to the model and get the feedback

# data transformation

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")

In [76]:
model = stringIndexer.fit(df)		
indexed = model.transform(df)

In [79]:
# Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
ohe = encoder.fit(indexed)
encoded = ohe.transform(indexed)
encoded.show(5)

+---+----------+---------+------+------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+-----------------+-------------+
|age|age_square|workclass|fnlwgt|   education|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|workclass_encoded|workclass_vec|
+---+----------+---------+------+------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+-----------------+-------------+
| 25|     625.0|  Private|226802|        11th|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States|              0.0|(9,[0],[1.0])|
| 38|    1444.0|  Private| 89814|     HS-grad|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| United-States|              0.0|(9,[0],[1.0])|
| 28|     

In [82]:
# OHE the categorical feature
CATEGORICAL_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATEGORICAL_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [99]:
# Convert label into label indices using the StringIndexer

label_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [label_stringIdx]


[StringIndexer_55b5e64e22bb, OneHotEncoder_43fa8aadf4ee, StringIndexer_cd2b702b91d0, OneHotEncoder_bb2574154b81, StringIndexer_0dc2f725a6be, OneHotEncoder_7774b0c781f9, StringIndexer_c39465dbc7f6, OneHotEncoder_6f15eec74e8c, StringIndexer_d0255c843fc1, OneHotEncoder_b74320329d85, StringIndexer_33fc9d64d297, OneHotEncoder_64ebe807d8fa, StringIndexer_1d00c95fb29d, OneHotEncoder_7fd58eaf7c29, StringIndexer_d34a0d644937, OneHotEncoder_7ccdb8ca6560, StringIndexer_9f14393af2b4, StringIndexer_e375748c7c8a, VectorAssembler_a129b9dda1ca, StringIndexer_566112ce1a3d, StringIndexer_a0d3fe9ad636]


In [85]:
# adding continious variable
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'capital-loss', 'hours-per-week']
#The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. 
#The code below popluate the list with encoded categorical features and the continuous features.

assemblerInputs = [c + "classVec" for c in CATEGORICAL_FEATURES] + CONTI_FEATURES

In [86]:
# Finally, you pass all the steps in the VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [103]:
#  push the data to the pipeline.
# Create a Pipeline.
pipeline = Pipeline(stages=stages)



In [101]:
pipelineModel = pipeline.fit(df_remove)

Py4JJavaError: ignored