## Data describtion
- Convert U.S. to US to avoid periods.
- Convert Unknown to "?"

Data Set Information:

Extraction was done by Barry Becker from the 1994 Census database. A set of reasonably clean records was extracted using the following conditions: ((AAGE>16) && (AGI>100) && (AFNLWGT>1)&& (HRSWK>0))

Prediction task is to determine whether a person makes over 50K a year.

- age: continuous.
- workclass: Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked.
- fnlwgt(final weight): The number of people the census takers believe that observation represents, continuous.
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc, 9th, 7th-8th, 12th, Masters, 1st-4th, 10th, Doctorate, 5th-6th, Preschool.
- education-num: continuous.
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent, Married-AF-spouse.
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners, Machine-op-inspct, Adm-clerical, Farming-fishing, Transport-moving, Priv-house-serv, Protective-serv, Armed-Forces.
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried.
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black.
- sex: Female, Male.
- capital-gain: continuous.
- capital-loss: continuous.
- hours-per-week: continuous.
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany, Outlying-US(Guam-USVI-etc), India, Japan, Greece, South, China, Cuba, Iran, Honduras, Philippines, Italy, Poland, Jamaica, Vietnam, Mexico, Portugal, Ireland, France, Dominican-Republic, Laos, Ecuador, Taiwan, Haiti, Columbia, Hungary, Guatemala, Nicaragua, Scotland, Thailand, Yugoslavia, El-Salvador, Trinadad&Tobago, Peru, Hong, Holand-Netherlands.
- >50K, <=50K

- references:
- https://archive.ics.uci.edu/ml/datasets/adult

In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [68]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#lets import our usual suspects
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

#pyspark related imports
import pyspark
from pyspark.sql.functions import col, isnan, when, count
from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.ml import Pipeline

In [3]:
spark = pyspark.sql.SparkSession.builder.config("spark.executor.memory", "8g").config("spark.driver.memory", "8g").getOrCreate()

## Pandas

In [4]:
# dfp=pd.read_csv('adult1m.csv', header=None)

# columns_names=['age','workclass','fnlwgt','education','education_num','marital_status','occupation','relationship','race','gender','capital_gain','capital_loss','hours_per_week','native_country','salary']
# i=0
# for clm in columns_names:
#     dfp.rename(columns = {i:'{}'.format(clm)}, inplace = True)
#     i=i+1
# print(dfp.shape)
# dfp

In [5]:
# # dfp[dfp.eq("?").any(1)]
# dfp=dfp.replace('?', np.NaN)
# dfp.isnull().sum()

## Preprocessing

1. Replacing the '?' in all the columns with Not-specified
2. Make the values for education consistant
3. Change the values for salary to be 0 if <=50k, and 1 if >50k
4. Summing the values for the fnlwgt column, and getting the percentage of fnlwgt
5. The diffrence of  capital gain and loss
6. Finding if someone is workholic which is 1, or no which is 0  

## PySpark

In [4]:
# Load the dataset.
dfs = spark.read.csv('adult1m.csv',inferSchema=True,header=False)

In [5]:
# Rename columns
columns_names=['age','workclass','fnlwgt','education','education_num','marital_status','occupation','relationship','race','gender','capital_gain','capital_loss','hours_per_week','native_country','salary']
i=0
for colm in columns_names:
    dfs=dfs.withColumnRenamed('_c{}'.format(i),colm)
    i=i+1

In [6]:
# The data Shape
print(dfs.count(), len(dfs.columns))
dfs.printSchema()

1000000 15
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- salary: string (nullable = true)



In [7]:
dfs.show(5)

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education_num|    marital_status|       occupation| relationship| race|gender|capital_gain|capital_loss|hours_per_week|native_country|salary|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-i

In [8]:
# To EDA in SQL
dfs.registerTempTable('dfs')



In [9]:
# Value counts each column
for i in dfs.columns:
    dfs_result = spark.sql('SELECT `{}`, count(`{}`) FROM dfs group by 1 order by 2 desc'.format(i,i)) #first we select the subset of data
    dfs_result.show(50)
# No weird values
# ? is the NaN values

+---+----------+
|age|count(age)|
+---+----------+
| 37|     14347|
| 35|     14257|
| 31|     14250|
| 30|     14241|
| 24|     14224|
| 29|     14215|
| 28|     14205|
| 25|     14188|
| 36|     14182|
| 41|     14172|
| 44|     14145|
| 45|     14103|
| 39|     14091|
| 43|     14080|
| 34|     14069|
| 22|     14052|
| 33|     14045|
| 20|     14042|
| 23|     14032|
| 38|     14031|
| 47|     14020|
| 26|     14018|
| 42|     13990|
| 32|     13971|
| 40|     13940|
| 53|     13917|
| 48|     13888|
| 27|     13883|
| 18|     13854|
| 21|     13848|
| 46|     13802|
| 49|     13793|
| 50|     13789|
| 51|     13782|
| 17|     13758|
| 19|     13756|
| 58|     13725|
| 57|     13719|
| 55|     13703|
| 54|     13671|
| 52|     13636|
| 66|     13619|
| 56|     13566|
| 65|     13564|
| 59|     13518|
| 70|     13511|
| 64|     13490|
| 67|     13460|
| 63|     13438|
| 78|     13411|
+---+----------+
only showing top 50 rows

+----------------+----------------+
|       workclass|co

+--------------+---------------------+
|hours_per_week|count(hours_per_week)|
+--------------+---------------------+
|            40|                25612|
|            50|                13029|
|            45|                12327|
|            60|                11804|
|            20|                11540|
|            35|                11485|
|            30|                11470|
|            25|                10936|
|            48|                10894|
|            15|                10841|
|            55|                10785|
|            38|                10709|
|            32|                10620|
|            36|                10617|
|            70|                10593|
|             8|                10583|
|            44|                10581|
|            16|                10550|
|            56|                10541|
|             2|                10499|
|            10|                10498|
|            90|                10495|
|            42|         

In [10]:
dfs.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dfs.columns]).show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|gender|capital_gain|capital_loss|hours_per_week|native_country|salary|
+---+---------+------+---------+-------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+
|  0|        0|     0|        0|            0|             0|         0|           0|   0|     0|           0|           0|             0|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+------+------------+------------+--------------+--------------+------+



### Replaceing ? for the workclass with Not-specified

In [11]:
dfs.select('workclass').distinct().collect()

[Row(workclass='Self-emp-not-inc'),
 Row(workclass='Local-gov'),
 Row(workclass='State-gov'),
 Row(workclass='Private'),
 Row(workclass='Without-pay'),
 Row(workclass='Federal-gov'),
 Row(workclass='Never-worked'),
 Row(workclass='?'),
 Row(workclass='Self-emp-inc')]

In [12]:
dfs=dfs.withColumn('workclass', regexp_replace('workclass', "\\?", 'Not-specified'))
dfs.select('workclass').distinct().collect()

[Row(workclass='Self-emp-not-inc'),
 Row(workclass='Local-gov'),
 Row(workclass='State-gov'),
 Row(workclass='Not-specified'),
 Row(workclass='Private'),
 Row(workclass='Without-pay'),
 Row(workclass='Federal-gov'),
 Row(workclass='Never-worked'),
 Row(workclass='Self-emp-inc')]

### Replaceing ? for the occupation with not specified

In [13]:
dfs.select('occupation').distinct().collect()

[Row(occupation='Sales'),
 Row(occupation='Exec-managerial'),
 Row(occupation='Prof-specialty'),
 Row(occupation='Handlers-cleaners'),
 Row(occupation='Farming-fishing'),
 Row(occupation='Craft-repair'),
 Row(occupation='Transport-moving'),
 Row(occupation='Priv-house-serv'),
 Row(occupation='Protective-serv'),
 Row(occupation='Other-service'),
 Row(occupation='Tech-support'),
 Row(occupation='Machine-op-inspct'),
 Row(occupation='Armed-Forces'),
 Row(occupation='?'),
 Row(occupation='Adm-clerical')]

In [14]:
dfs=dfs.withColumn('occupation', regexp_replace('occupation', "\\?", 'Not-specified'))
dfs.select('occupation').distinct().collect()

[Row(occupation='Sales'),
 Row(occupation='Exec-managerial'),
 Row(occupation='Prof-specialty'),
 Row(occupation='Handlers-cleaners'),
 Row(occupation='Farming-fishing'),
 Row(occupation='Not-specified'),
 Row(occupation='Craft-repair'),
 Row(occupation='Transport-moving'),
 Row(occupation='Priv-house-serv'),
 Row(occupation='Protective-serv'),
 Row(occupation='Other-service'),
 Row(occupation='Tech-support'),
 Row(occupation='Machine-op-inspct'),
 Row(occupation='Armed-Forces'),
 Row(occupation='Adm-clerical')]

### Replaceing ? for the native_country with not specified

In [15]:
dfs.select('native_country').distinct().collect()

[Row(native_country='Philippines'),
 Row(native_country='Germany'),
 Row(native_country='Cambodia'),
 Row(native_country='France'),
 Row(native_country='Greece'),
 Row(native_country='Taiwan'),
 Row(native_country='Ecuador'),
 Row(native_country='Nicaragua'),
 Row(native_country='Hong'),
 Row(native_country='Peru'),
 Row(native_country='India'),
 Row(native_country='China'),
 Row(native_country='Italy'),
 Row(native_country='Holand-Netherlands'),
 Row(native_country='Cuba'),
 Row(native_country='South'),
 Row(native_country='Iran'),
 Row(native_country='Ireland'),
 Row(native_country='Thailand'),
 Row(native_country='Laos'),
 Row(native_country='El-Salvador'),
 Row(native_country='Mexico'),
 Row(native_country='Guatemala'),
 Row(native_country='Honduras'),
 Row(native_country='Yugoslavia'),
 Row(native_country='Puerto-Rico'),
 Row(native_country='Jamaica'),
 Row(native_country='Canada'),
 Row(native_country='United-States'),
 Row(native_country='Dominican-Republic'),
 Row(native_countr

In [16]:
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "\\?", 'Not-specified'))
dfs.select('native_country').distinct().collect()

[Row(native_country='Philippines'),
 Row(native_country='Germany'),
 Row(native_country='Cambodia'),
 Row(native_country='France'),
 Row(native_country='Greece'),
 Row(native_country='Taiwan'),
 Row(native_country='Ecuador'),
 Row(native_country='Nicaragua'),
 Row(native_country='Hong'),
 Row(native_country='Peru'),
 Row(native_country='India'),
 Row(native_country='China'),
 Row(native_country='Not-specified'),
 Row(native_country='Italy'),
 Row(native_country='Holand-Netherlands'),
 Row(native_country='Cuba'),
 Row(native_country='South'),
 Row(native_country='Iran'),
 Row(native_country='Ireland'),
 Row(native_country='Thailand'),
 Row(native_country='Laos'),
 Row(native_country='El-Salvador'),
 Row(native_country='Mexico'),
 Row(native_country='Guatemala'),
 Row(native_country='Honduras'),
 Row(native_country='Yugoslavia'),
 Row(native_country='Puerto-Rico'),
 Row(native_country='Jamaica'),
 Row(native_country='Canada'),
 Row(native_country='United-States'),
 Row(native_country='Do

### Make the values for education consistant

In [17]:
dfs.select('education').distinct().collect()

[Row(education='Masters'),
 Row(education='10th'),
 Row(education='5th-6th'),
 Row(education='Assoc-acdm'),
 Row(education='Assoc-voc'),
 Row(education='7th-8th'),
 Row(education='9th'),
 Row(education='HS-grad'),
 Row(education='Bachelors'),
 Row(education='11th'),
 Row(education='1st-4th'),
 Row(education='Preschool'),
 Row(education='12th'),
 Row(education='Doctorate'),
 Row(education='Some-college'),
 Row(education='Prof-school')]

In [18]:
dfs=dfs.withColumn('education', regexp_replace('education', "5th-6th", '6th'))
dfs=dfs.withColumn('education', regexp_replace('education', "7th-8th", '8th'))
dfs=dfs.withColumn('education', regexp_replace('education', "1st-4th", '4th'))

dfs.select('education').distinct().collect()

[Row(education='Masters'),
 Row(education='10th'),
 Row(education='6th'),
 Row(education='Assoc-acdm'),
 Row(education='Assoc-voc'),
 Row(education='9th'),
 Row(education='HS-grad'),
 Row(education='Bachelors'),
 Row(education='11th'),
 Row(education='Preschool'),
 Row(education='8th'),
 Row(education='12th'),
 Row(education='Doctorate'),
 Row(education='4th'),
 Row(education='Some-college'),
 Row(education='Prof-school')]

### Change the values for salary to be 0 if <=50k, and 1 if >50k

In [19]:
dfs.select('salary').distinct().collect()

[Row(salary='<=50K'), Row(salary='>50K')]

In [20]:
dfs=dfs.withColumn('salary', regexp_replace('salary', "<=50K", '50000'))
dfs=dfs.withColumn('salary', regexp_replace('salary', ">50K", '50001'))
dfs = dfs.withColumn("salary", dfs["salary"].cast('int'))

dfs.select('salary').distinct().collect()

[Row(salary=50000), Row(salary=50001)]

In [32]:
# pandasDF = dfs.toPandas()
# pandasDF

In [33]:
# pandasDF.to_csv("clean_data_for_eda_pig.csv",index=False)


In [17]:
dfs = dfs.withColumn("salary", dfs["salary"].cast('string'))

dfs=dfs.withColumn('salary', regexp_replace('salary', "50000", '0'))
dfs=dfs.withColumn('salary', regexp_replace('salary', "50001", '1'))
dfs = dfs.withColumn("salary", dfs["salary"].cast('int'))

dfs.select('salary').distinct().collect()

                                                                                

[Row(salary=1), Row(salary=0)]

### Summing the values for the fnlwgt column, and getting the percentage of fnlwgt

In [21]:
sum_fnlwgt=dfs.select(sum('fnlwgt')).collect()[0][0]
sum_fnlwgt

196336661945

In [22]:
dfs=dfs.withColumn('fnlwgt_percentage',round((dfs.fnlwgt/sum_fnlwgt)*100,2))
dfs.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+
|age|       workclass|fnlwgt|   education|education_num|      marital_status|       occupation| relationship|              race|gender|capital_gain|capital_loss|hours_per_week|native_country|salary|fnlwgt_percentage|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States| 50000|              0.0|
| 50|Self-emp-not-inc| 83311|   Bachelors|           13|  Married-civ-spouse|  Exec-managerial|      Husband|             White|  Ma

### The diffrence of  capital gain and loss

In [24]:
dfs=dfs.withColumn('diffrence_capital_gain_loos',(dfs.capital_gain-dfs.capital_loss))
dfs.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+
|age|       workclass|fnlwgt|   education|education_num|      marital_status|       occupation| relationship|              race|gender|capital_gain|capital_loss|hours_per_week|native_country|salary|fnlwgt_percentage|diffrence_capital_gain_loos|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States| 50000|              0.0|                       2174|
| 50|Self-emp-not-in

In [25]:
dfs=dfs.withColumn('profit',when((dfs.diffrence_capital_gain_loos >0), lit(1)).when((dfs.diffrence_capital_gain_loos ==0), lit(0)).when((dfs.diffrence_capital_gain_loos <0), lit(-1)))
dfs.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+------+
|age|       workclass|fnlwgt|   education|education_num|      marital_status|       occupation| relationship|              race|gender|capital_gain|capital_loss|hours_per_week|native_country|salary|fnlwgt_percentage|diffrence_capital_gain_loos|profit|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+------+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States| 50000|              0.0|                       2174|

In [26]:
dfs.select('profit').distinct().collect()

[Row(profit=-1), Row(profit=1), Row(profit=0)]

In [27]:
dfs.filter(dfs.profit == -1).show(truncate=False)

+---+----------------+------+------------+-------------+------------------+----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+------+
|age|workclass       |fnlwgt|education   |education_num|marital_status    |occupation      |relationship |race              |gender|capital_gain|capital_loss|hours_per_week|native_country|salary|fnlwgt_percentage|diffrence_capital_gain_loos|profit|
+---+----------------+------+------------+-------------+------------------+----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+------+
|43 |Private         |117037|11th        |7            |Married-civ-spouse|Transport-moving|Husband      |White             |Male  |0           |2042        |40            |United-States |50000 |0.0              |-2042                      |-1    |
|45 

### Finding if someone is workholic which is 1, or no which is 0

In [28]:
dfs=dfs.withColumn('workaholic',when((dfs.hours_per_week >=70), lit(1)).when((dfs.hours_per_week <70), lit(0)))
dfs.show()

+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+------+----------+
|age|       workclass|fnlwgt|   education|education_num|      marital_status|       occupation| relationship|              race|gender|capital_gain|capital_loss|hours_per_week|native_country|salary|fnlwgt_percentage|diffrence_capital_gain_loos|profit|workaholic|
+---+----------------+------+------------+-------------+--------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+-----------------+---------------------------+------+----------+
| 39|       State-gov| 77516|   Bachelors|           13|       Never-married|     Adm-clerical|Not-in-family|             White|  Male|        2174|           0|            40| United-States| 50000|             

### Replacing the native_country with in united state or not

In [29]:
dfs.select('native_country').distinct().collect()

[Row(native_country='Philippines'),
 Row(native_country='Germany'),
 Row(native_country='Cambodia'),
 Row(native_country='France'),
 Row(native_country='Greece'),
 Row(native_country='Taiwan'),
 Row(native_country='Ecuador'),
 Row(native_country='Nicaragua'),
 Row(native_country='Hong'),
 Row(native_country='Peru'),
 Row(native_country='India'),
 Row(native_country='China'),
 Row(native_country='Not-specified'),
 Row(native_country='Italy'),
 Row(native_country='Holand-Netherlands'),
 Row(native_country='Cuba'),
 Row(native_country='South'),
 Row(native_country='Iran'),
 Row(native_country='Ireland'),
 Row(native_country='Thailand'),
 Row(native_country='Laos'),
 Row(native_country='El-Salvador'),
 Row(native_country='Mexico'),
 Row(native_country='Guatemala'),
 Row(native_country='Honduras'),
 Row(native_country='Yugoslavia'),
 Row(native_country='Puerto-Rico'),
 Row(native_country='Jamaica'),
 Row(native_country='Canada'),
 Row(native_country='United-States'),
 Row(native_country='Do

In [30]:
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Philippines", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Germany", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Cambodia", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "France", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Greece", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Taiwan", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Ecuador", 'South_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Nicaragua", 'Central_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Hong", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Peru", 'South_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "India", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "China", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Italy", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Holand-Netherlands", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Cuba", 'Caribbean'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Iran", 'Middle_East'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Ireland", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Thailand", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Laos", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "El-Salvador", 'Central_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Mexico", 'North_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Guatemala", 'Central_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Honduras", 'Central_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Yugoslavia", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Puerto-Rico", 'Caribbean'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Jamaica", 'Caribbean'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Canada", 'North_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "United-States", 'North_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Dominican-Republic", 'Caribbean'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Outlying-US\(Guam-USVI-etc\)", 'Caribbean'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Japan", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "England", 'United_kingdom'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Haiti", 'Caribbean'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Poland", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Portugal", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Columbia", 'South_America'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Scotland", 'United_kingdom'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Hungary", 'Europe'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Vietnam", 'Asia'))
dfs=dfs.withColumn('native_country', regexp_replace('native_country', "Trinadad&Tobago", 'Caribbean'))

In [31]:
dfs=dfs.where(dfs.native_country!='South')

In [32]:
dfs.select('native_country').distinct().collect()

[Row(native_country='Middle_East'),
 Row(native_country='Europe'),
 Row(native_country='Central_America'),
 Row(native_country='Not-specified'),
 Row(native_country='Caribbean'),
 Row(native_country='South_America'),
 Row(native_country='North_America'),
 Row(native_country='United_kingdom'),
 Row(native_country='Asia')]

In [33]:
dfs.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- fnlwgt_percentage: double (nullable = true)
 |-- diffrence_capital_gain_loos: integer (nullable = true)
 |-- profit: integer (nullable = true)
 |-- workaholic: integer (nullable = true)



In [34]:
# pandasDF = dfs.toPandas()
# pandasDF

In [35]:
# pandasDF.to_csv("clean_data_for_dashboard.csv",index=False)

# MODEL

In [34]:
#This step will label encode all the categorical columns and store them in different columns with the same name + '_idx', 
#so category will become category_idx 
cat_cols = ['workclass', 'education', 'marital_status','occupation','relationship','race','gender','native_country']

for c in cat_cols: 
    indexer = StringIndexer(inputCol=c, outputCol=c+'_idx') 
    dfs = indexer.fit(dfs).transform(dfs) 
    
final_df = dfs.drop(*cat_cols) 

In [35]:
final_df.show()

+---+------+-------------+------------+------------+--------------+------+-----------------+---------------------------+------+----------+-------------+-------------+------------------+--------------+----------------+--------+----------+------------------+
|age|fnlwgt|education_num|capital_gain|capital_loss|hours_per_week|salary|fnlwgt_percentage|diffrence_capital_gain_loos|profit|workaholic|workclass_idx|education_idx|marital_status_idx|occupation_idx|relationship_idx|race_idx|gender_idx|native_country_idx|
+---+------+-------------+------------+------------+--------------+------+-----------------+---------------------------+------+----------+-------------+-------------+------------------+--------------+----------------+--------+----------+------------------+
| 39| 77516|           13|        2174|           0|            40| 50000|              0.0|                       2174|     1|         0|          4.0|          2.0|               1.0|           5.0|             1.0|     0.0|   

In [36]:
# dfs=dfs.where(dfs.native_country!='South')
final_df = final_df.withColumn("fnlwgt_percentage", dfs["fnlwgt_percentage"].cast('int'))
final_df = final_df.withColumn("workclass_idx", dfs["workclass_idx"].cast('int'))
final_df = final_df.withColumn("education_idx", dfs["education_idx"].cast('int'))
final_df = final_df.withColumn("marital_status_idx", dfs["marital_status_idx"].cast('int'))
final_df = final_df.withColumn("occupation_idx", dfs["occupation_idx"].cast('int'))
final_df = final_df.withColumn("relationship_idx", dfs["relationship_idx"].cast('int'))

final_df = final_df.withColumn("race_idx", dfs["race_idx"].cast('int'))
final_df = final_df.withColumn("gender_idx", dfs["gender_idx"].cast('int'))
final_df = final_df.withColumn("native_country_idx", dfs["native_country_idx"].cast('int'))


## Classification model

In [69]:
cols = final_df.columns # Extract the column names from the dataframe
cols.remove('salary') #remove salary because it is our target in clasification 

#vector assembler will take all the columns and convert them into one column called features
assembler = VectorAssembler(inputCols=cols, outputCol='features')
final_df = assembler.transform(final_df)

In [76]:
final_df.show(5,truncate=False)

+---+------+-------------+------------+------------+--------------+------+-----------------+---------------------------+------+----------+-------------+-------------+------------------+--------------+----------------+--------+----------+------------------+--------------------------------------------------------------------------------------+
|age|fnlwgt|education_num|capital_gain|capital_loss|hours_per_week|salary|fnlwgt_percentage|diffrence_capital_gain_loos|profit|workaholic|workclass_idx|education_idx|marital_status_idx|occupation_idx|relationship_idx|race_idx|gender_idx|native_country_idx|features                                                                              |
+---+------+-------------+------------+------------+--------------+------+-----------------+---------------------------+------+----------+-------------+-------------+------------------+--------------+----------------+--------+----------+------------------+--------------------------------------------------------

#### Lets Split

80% in training set and 20% is testing set.

In [77]:
# We will now create a new dataframe only consisting of the features column and the label column
df_data = final_df.select(col('features'), col('salary').alias('label'))

#simple data splitting
df_train, df_test = df_data.randomSplit([0.7, 0.3])

### Model Building
Our data is now ready for some model building

In [78]:
# Decision Tree
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
model_dt = dt.fit(df_train)

IllegalArgumentException: requirement failed: Classifier inferred 50002 from label values in column DecisionTreeClassifier_e6022f70b9d0__labelCol, but this exceeded the max numClasses (100) allowed to be inferred from values.  To avoid this error for labels with > 100 classes, specify numClasses explicitly in the metadata; this can be done by applying StringIndexer to the label column.

In [None]:
# Random Forest
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
model_rf = rf.fit(df_train)

In [None]:
# Logistic Regression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="label", featuresCol="features")
model_lr = lr.fit(df_train)

In [None]:
# Gradient Boost
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
model_gbt = gbt.fit(df_train)

In [45]:
# Lets make the predictions on the testing set
pred_dt = model_dt.transform(df_test)
pred_rf = model_rf.transform(df_test)
pred_lr = model_lr.transform(df_test)
pred_gbt = model_gbt.transform(df_test)
pred_svc = model_svc.transform(df_test)

### Evaluation

In [46]:
# Accuracy Metric
evaluator_A = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# F1 Metric
evaluator_F = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Weighted Precision
evaluator_P = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")

# Weighted Recall
evaluator_R = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")

# Our models
models = [pred_dt, pred_rf, pred_lr, pred_gbt]

# Empty lists that will store the scores for each metric for each model.
accuracy = []
F1 = []
precision = []
recall = []

# loop to populate the empty lists with scores of models for each metric.
for model in models:
    accuracy.append(evaluator_A.evaluate(model))
    F1.append(evaluator_F.evaluate(model))
    precision.append(evaluator_P.evaluate(model))
    recall.append(evaluator_R.evaluate(model))

                                                                                

In [47]:
# We will convert all lists created above into a dataframe for easy viewing.
df_ev = pd.DataFrame(list(zip(accuracy, F1, precision, recall)), 
                     columns = ['Accuracy', 'F1-Score', 'Weighted Precision', 'Weighted Recall'],
                     index = ['Decision Tree', 'Random Forest', 'Logistic Regression', 'Gradient Boost'])

In [48]:
df_ev

Unnamed: 0,Accuracy,F1-Score,Weighted Precision,Weighted Recall
Decision Tree,0.512797,0.489749,0.51063,0.512797
Random Forest,0.509923,0.509031,0.511078,0.509923
Logistic Regression,0.509821,0.344302,0.259917,0.509821
Gradient Boost,0.512452,0.512355,0.513006,0.512452


### Hyperparameter Tuning

Lets pick the best model and use grid search and cross validator to get more thorough results.

In [49]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

#initialize our grid
grid = ParamGridBuilder().addGrid(dt.maxIter, [5, 10, 15, 20]).build()

cv = CrossValidator(estimator=dt, estimatorParamMaps=grid, evaluator=evaluator_A, parallelism=2)

cvModel = cv.fit(df_train)

AttributeError: 'DecisionTreeClassifier' object has no attribute 'maxIter'

In [None]:
#average metrics on 4 different models 
cvModel.avgMetrics 

#we can see here that more number of iterations slowly improved our model performance, however the improvement is minimal

In [None]:
#lets try to get the accuracy of our model on the testing set
evaluator_A.evaluate(cvModel.transform(df_test))

## Regression Model 

In [37]:
cols = final_df.columns 
cols.remove('capital_gain') #remove capital_gain because it is our target in regression 

#vector assembler will take all the columns and convert them into one column called features
assembler = VectorAssembler(inputCols=cols, outputCol='features')
assembler_df = assembler.transform(final_df)

In [38]:
# We will now create a new dataframe only consisting of the features column and the label column 
df_reg = assembler_df.select(col('features'), col('capital_gain').alias('label'))

#simple data splitting
df_train, df_test = df_reg.randomSplit([0.7, 0.3])

### Build the models

In [39]:
#Linear Regression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
r_lr = lr.fit(df_train)

In [40]:
#Random Forest Regressor
rf = RandomForestRegressor(featuresCol='features', labelCol='label')
r_rf = rf.fit(df_train)

In [41]:
#Gradient Boosted Tree Regressor
gbt = GBTRegressor(featuresCol="features", labelCol='label', maxIter=10)
r_gbt = gbt.fit(df_train)

In [42]:
#Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol="features", labelCol='label')
r_dt = dt.fit(df_train)

In [43]:
# Lets make the predictions on the testing set
pred_dt = r_dt.transform(df_test)
pred_rf = r_rf.transform(df_test)
pred_lr = r_lr.transform(df_test)
pred_gbt = r_gbt.transform(df_test)

### Evaluate the models

In [44]:
evaluator = RegressionEvaluator()

In [45]:
#list our regrassion models
models = [pred_rf, pred_gbt, pred_dt, pred_lr]

evaluator_R = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2')

evaluator_RMSE = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='rmse')

evaluator_MAE = RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='mae')

# Empty lists that will store the scores for each metric for each model.
R2 = []
RMSE = []
MAE = []

# loop to populate the empty lists with scores of models for each metric.
for model in models:
    R2.append(evaluator_R.evaluate(model))
    RMSE.append(evaluator_RMSE.evaluate(model))
    MAE.append(evaluator_MAE.evaluate(model))

In [46]:
# We will convert all lists created above into a dataframe for easy viewing.
df_ev_reg = pd.DataFrame(list(zip(R2, RMSE, MAE)), 
                     columns = ['R-squared', 'Root Mean Squared Error', 'Mean Absolute Error'],
                     index = ['Random Forest Regressor', 'Gradient Boosted Trees Regressor', 'Decision Tree Regressor', 'Linear Regression'])

In [47]:
df_ev_reg

Unnamed: 0,R-squared,Root Mean Squared Error,Mean Absolute Error
Random Forest Regressor,0.688717,6061.972452,2721.057924
Gradient Boosted Trees Regressor,0.77354,5170.496612,1148.874226
Decision Tree Regressor,0.78556,5031.400892,1378.514607
Linear Regression,1.0,0.393204,0.242423


### Hyperparameter Tuning - Best Two Models

Choosing Decision Tree and Gradient Boosted Trees instead of Linear Regression because of the overfitting

In [48]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel

#### Decision Tree

In [60]:
#initialize our grid 
grid2 = ParamGridBuilder().addGrid(dt.maxDepth, [5, 10, 15, 20 , 25]).build()

In [61]:
#CrossValidator
cv_reg2 = CrossValidator(estimator=dt, estimatorParamMaps=grid2, evaluator=evaluator_R, parallelism=2, numFolds=5)

In [62]:
# fit on training set
cv_reg2Model = cv_reg2.fit(df_train)

In [63]:
# Accuracy Metric
evaluator_R.evaluate(cv_reg2Model.transform(df_test))

0.7855603713446851

In [64]:
#average metrics on 4 different models 
cv_reg2Model.avgMetrics 

[0.7676971909331833,
 0.7515430847124803,
 0.6788231268298603,
 0.5848873792827355,
 0.5476986306856377]

#### Gradient Boosted Trees 

In [51]:
#initialize our grid 
grid = ParamGridBuilder().addGrid(gbt.maxIter, [40, 30, 15, 20 , 25]).build()

In [52]:
#CrossValidator
cv_reg = CrossValidator(estimator=gbt, estimatorParamMaps=grid, evaluator=evaluator_R, parallelism=2, numFolds=5)

In [53]:
# fit on training set
cv_regModel = cv_reg.fit(df_train)

In [54]:
# Accuracy Metric
evaluator_R.evaluate(cv_regModel.transform(df_test))

0.7738580245984696

In [55]:
#average metrics on 4 different models 
cv_regModel.avgMetrics 

[0.7681499392478811,
 0.7687889959484207,
 0.7687584917374692,
 0.7689195453152784,
 0.7688423981078625]

### Pipeline - Best Two Models

In [56]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA

#### Decision Tree

In [65]:
# create a pipeline containing PCA and Decision Tree
pca = PCA(k=5, inputCol='features', outputCol='pcaFeature')
dt_1 = DecisionTreeRegressor(maxDepth=10)
pipeline = Pipeline (stages=[pca, dt_1])

In [66]:
# fit on training set
pipeline_model = pipeline.fit(df_train)

In [67]:
# Accuracy Metric
evaluator_R.evaluate(pipeline_model.transform(df_test))

0.7699695122500189

#### Gradient Boosted Trees 

In [57]:
# create a pipeline containing PCA and Gradient Boosted Trees 
pca1 = PCA(k=5, inputCol='features', outputCol='pcaFeature')
GBTR_1 = GBTRegressor(maxIter = 20)
pipeline2 = Pipeline (stages=[pca1, GBTR_1])

In [58]:
# fit on training set
pipeline_model2 = pipeline2.fit(df_train)

In [59]:
# Accuracy Metric
evaluator_R.evaluate(pipeline_model2.transform(df_test))

0.7738580245984696

# THANK YOU :)