1. Start Hadoop allstart.sh
2. start spark--> pysparknb

In [1]:
import numpy as np

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml import Pipeline

#ML Logisitic Regression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import time


In [2]:
df=spark.read.csv(path="adult.data", sep=",", header=True, inferSchema=True)

In [3]:
df.show()

+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native-country|income|
+---+-----------------+--------+-------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0|    Bachelors|         13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|         0.0|         0.0|   

## Data Exploration

In [8]:
df.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native-country',
 'income']

In [9]:
len(df.columns)

15

In [10]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [11]:
df.head(2)

[Row(age=39, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital_status=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_per_week=40.0, native-country=' United-States', income=' <=50K'),
 Row(age=50, workclass=' Self-emp-not-inc', fnlwgt=83311.0, education=' Bachelors', education_num=13.0, marital_status=' Married-civ-spouse', occupation=' Exec-managerial', relationship=' Husband', race=' White', sex=' Male', capital_gain=0.0, capital_loss=0.0, hours_per_week=13.0, native-country=' United-States', income=' <=50K')]

In [12]:
df.select("income").show(10)

+------+
|income|
+------+
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
| <=50K|
|  >50K|
|  >50K|
|  >50K|
+------+
only showing top 10 rows



In [13]:
df.count()

32561

In [14]:
df.groupby('income').count().show()

+------+-----+
|income|count|
+------+-----+
|  >50K| 7841|
| <=50K|24720|
+------+-----+



In [15]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|    education_num|marital_status|       occupation|relationship|               race|    sex|      capital_gain|    capital_loss|    hours_per_week|native-country|income|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|         32561| 32561|
|   mean| 38.58164675532

In [16]:
df.select("age").show()

+---+
|age|
+---+
| 39|
| 50|
| 38|
| 53|
| 28|
| 37|
| 49|
| 52|
| 31|
| 42|
| 37|
| 30|
| 23|
| 32|
| 40|
| 34|
| 25|
| 32|
| 38|
| 43|
+---+
only showing top 20 rows



In [17]:
df.select(df['age']).show(3)

+---+
|age|
+---+
| 39|
| 50|
| 38|
+---+
only showing top 3 rows



In [18]:
df.select(["age","income"]).show()

+---+------+
|age|income|
+---+------+
| 39| <=50K|
| 50| <=50K|
| 38| <=50K|
| 53| <=50K|
| 28| <=50K|
| 37| <=50K|
| 49| <=50K|
| 52|  >50K|
| 31|  >50K|
| 42|  >50K|
| 37|  >50K|
| 30|  >50K|
| 23| <=50K|
| 32| <=50K|
| 40|  >50K|
| 34| <=50K|
| 25| <=50K|
| 32| <=50K|
| 38| <=50K|
| 43|  >50K|
+---+------+
only showing top 20 rows



In [19]:
df.corr('age','capital_gain')

0.07767449816599412

In [20]:

vc=VectorAssembler(
    inputCols=['age',"capital_gain",'capital_loss','hours_per_week'],
    outputCol='numeric')
dx=vc.transform(df).select('numeric')

In [21]:
dx.show()

+--------------------+
|             numeric|
+--------------------+
|[39.0,2174.0,0.0,...|
| [50.0,0.0,0.0,13.0]|
| [38.0,0.0,0.0,40.0]|
| [53.0,0.0,0.0,40.0]|
| [28.0,0.0,0.0,40.0]|
| [37.0,0.0,0.0,40.0]|
| [49.0,0.0,0.0,16.0]|
| [52.0,0.0,0.0,45.0]|
|[31.0,14084.0,0.0...|
|[42.0,5178.0,0.0,...|
| [37.0,0.0,0.0,80.0]|
| [30.0,0.0,0.0,40.0]|
| [23.0,0.0,0.0,30.0]|
| [32.0,0.0,0.0,50.0]|
| [40.0,0.0,0.0,40.0]|
| [34.0,0.0,0.0,45.0]|
| [25.0,0.0,0.0,35.0]|
| [32.0,0.0,0.0,40.0]|
| [38.0,0.0,0.0,50.0]|
| [43.0,0.0,0.0,45.0]|
+--------------------+
only showing top 20 rows



In [22]:
abc=df.toPandas()

In [23]:
abc.head(2)

Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native-country,income
0,39,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
1,50,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K


In [24]:
abc.rename(columns={"income": "new-income"}, inplace=True)

In [25]:
df.select('*').where(df.income.isNull()).count()

0

In [26]:
from pyspark.sql.functions import isnan, when, count, col
def get_null_value_count(data):
    data.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in data.columns]).show()

In [27]:
get_null_value_count(df)

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education|education_num|marital_status|occupation|relationship|race|sex|capital_gain|capital_loss|hours_per_week|native-country|income|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|     0|        0|            0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [28]:
df.dtypes

[('age', 'int'),
 ('workclass', 'string'),
 ('fnlwgt', 'double'),
 ('education', 'string'),
 ('education_num', 'double'),
 ('marital_status', 'string'),
 ('occupation', 'string'),
 ('relationship', 'string'),
 ('race', 'string'),
 ('sex', 'string'),
 ('capital_gain', 'double'),
 ('capital_loss', 'double'),
 ('hours_per_week', 'double'),
 ('native-country', 'string'),
 ('income', 'string')]

In [29]:
 cat_cols=[c[0] for c in df.dtypes if c[1]=="string"]

In [30]:
cat_cols

['workclass',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'native-country',
 'income']

In [32]:
cat_cols=cat_cols[0:8]

In [59]:
num_cols=[c[0] for c in df.dtypes if c[1]!="string"]
num_cols

['age',
 'fnlwgt',
 'education_num',
 'capital_gain',
 'capital_loss',
 'hours_per_week']

In [60]:
ohe_cols=[c+"_ohe" for c in cat_cols]
ohe_cols

['workclass_ohe',
 'education_ohe',
 'marital_status_ohe',
 'occupation_ohe',
 'relationship_ohe',
 'race_ohe',
 'sex_ohe',
 'native-country_ohe']

In [61]:
len(cat_cols)

8

In [62]:
cat_cols_si= [c+"_index" for c in cat_cols]
print (cat_cols_si)

['workclass_index', 'education_index', 'marital_status_index', 'occupation_index', 'relationship_index', 'race_index', 'sex_index', 'native-country_index']


In [34]:
cat_cols_ohe= [c+"_ohe" for c in cat_cols]
print (cat_cols_ohe)

['workclass_ohe', 'education_ohe', 'marital_status_ohe', 'occupation_ohe', 'relationship_ohe', 'race_ohe', 'sex_ohe', 'native-country_ohe']


In [35]:
df.groupby('workclass').count().show()

+-----------------+-----+
|        workclass|count|
+-----------------+-----+
|        State-gov| 1298|
|      Federal-gov|  960|
| Self-emp-not-inc| 2541|
|        Local-gov| 2093|
|          Private|22696|
|                ?| 1836|
|     Self-emp-inc| 1116|
|      Without-pay|   14|
|     Never-worked|    7|
+-----------------+-----+



In [36]:
df.groupby('workclass').count().orderBy("count", ascending=False).first()[0]

' Private'

In [37]:
[[i, df.groupby(i).count().orderBy("count", ascending=False).first()[0]] for i in df.columns]

[['age', 36],
 ['workclass', ' Private'],
 ['fnlwgt', 203488.0],
 ['education', ' HS-grad'],
 ['education_num', 9.0],
 ['marital_status', ' Married-civ-spouse'],
 ['occupation', ' Prof-specialty'],
 ['relationship', ' Husband'],
 ['race', ' White'],
 ['sex', ' Male'],
 ['capital_gain', 0.0],
 ['capital_loss', 0.0],
 ['hours_per_week', 40.0],
 ['native-country', ' United-States'],
 ['income', ' <=50K']]

In [38]:
df=df.fillna('Private', subset=['workclass'])
df=df.fillna('Prof-specialty', subset=['occupation'])
df=df.fillna('United-States', subset=['native-country'])


## String To Label

In [39]:
lable_To_integer=StringIndexer(
                inputCol="income",
                outputCol="label"
                )

model=lable_To_integer.fit(df.select("income"))
model.transform(df.select('income')).take(10)

[Row(income=' <=50K', label=0.0),
 Row(income=' <=50K', label=0.0),
 Row(income=' <=50K', label=0.0),
 Row(income=' <=50K', label=0.0),
 Row(income=' <=50K', label=0.0),
 Row(income=' <=50K', label=0.0),
 Row(income=' <=50K', label=0.0),
 Row(income=' >50K', label=1.0),
 Row(income=' >50K', label=1.0),
 Row(income=' >50K', label=1.0)]

In [45]:
## Pipeline
pipe = Pipeline(
                stages=[
                    StringIndexer(
                        inputCol="income",
                        outputCol="label"
                    ),
                    StringIndexer(
                        inputCols=cat_cols,
                        outputCols=cat_cols_si
                    ),
                    OneHotEncoder(
                        inputCols=cat_cols_si,
                        outputCols=ohe_cols
                    ),
                    VectorAssembler(
                        inputCols=cat_cols_si+num_cols,
                        outputCol="ass_features"
                    ),
                    StandardScaler(
                        inputCol="ass_features",
                        outputCol="features"
                    )
                ]
    )

In [58]:
cat_cols

['workclass',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'native-country']

In [46]:
model=pipe.fit(df)

df_trans=model.transform(df)

In [47]:
print (df_trans.columns)

['age', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss', 'hours_per_week', 'native-country', 'income', 'label', 'native-country_index', 'workclass_index', 'relationship_index', 'occupation_index', 'marital_status_index', 'race_index', 'education_index', 'sex_index', 'occupation_ohe', 'education_ohe', 'race_ohe', 'native-country_ohe', 'marital_status_ohe', 'workclass_ohe', 'sex_ohe', 'relationship_ohe', 'ass_features', 'features']


In [48]:
len(df.columns)

15

In [49]:
len(df_trans.columns)

34

In [63]:
pipe = Pipeline(
                stages=[
                    StringIndexer(
                        inputCol="income",
                        handleInvalid='keep',
                        outputCol="label"
                    ),
                    StringIndexer(
                        inputCols=cat_cols,
                        handleInvalid='keep',
                        outputCols=cat_cols_si
                    ),
                    OneHotEncoder(
                        inputCols=cat_cols_si,
                        outputCols=ohe_cols
                    ),
                    VectorAssembler(
                        inputCols=cat_cols_si+num_cols,
                        outputCol="ass_features"
                    ),
                    StandardScaler(
                        inputCol="ass_features",
                        outputCol="features"
                    ),
                    LogisticRegression(
                        featuresCol='features',
                        labelCol='label',
                        maxIter=30
                    )
                ]
    )

In [70]:
train, test=df.randomSplit([0.8,0.2], seed=12345)

In [71]:
train.count()

25954

In [72]:
test.count()

6607

In [74]:
# Training of model
start=time.time()
lrModel=pipe.fit(train)
end=time.time()
(end-start)/60

CPU times: user 5 µs, sys: 1 µs, total: 6 µs
Wall time: 10.7 µs


0.19802921613057453

## Predictions

In [75]:
predictions=lrModel.transform(test)

In [76]:
predictions.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native-country',
 'income',
 'label',
 'native-country_index',
 'workclass_index',
 'relationship_index',
 'occupation_index',
 'marital_status_index',
 'race_index',
 'education_index',
 'sex_index',
 'occupation_ohe',
 'education_ohe',
 'race_ohe',
 'native-country_ohe',
 'marital_status_ohe',
 'workclass_ohe',
 'sex_ohe',
 'relationship_ohe',
 'ass_features',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [81]:
predictions.select("label","prediction","rawprediction","probability").show(5)

+-----+----------+--------------------+--------------------+
|label|prediction|       rawprediction|         probability|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[6.86441231926255...|[0.99129260327131...|
|  0.0|       0.0|[7.05038921689764...|[0.99073973720765...|
|  0.0|       0.0|[7.28663173769653...|[0.99573421635914...|
|  0.0|       0.0|[7.02569159398365...|[0.99596945634491...|
|  0.0|       0.0|[7.38042075174856...|[0.98153001849759...|
+-----+----------+--------------------+--------------------+
only showing top 5 rows



# Evaluation Of Model

In [87]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

evaluator.evaluate(predictions)



0.8442676311726919

In [88]:
evaluator.getMetricName()

'areaUnderROC'

In [89]:
pipe.getStages()

[StringIndexer_d2e693810652,
 StringIndexer_4fb65393d5fd,
 OneHotEncoder_8b5af07ba805,
 VectorAssembler_2e8c83a16fd4,
 StandardScaler_5cfc6a109b1f,
 LogisticRegression_e5aa685b8142]

# Model Tuning

In [91]:
lr=pipe.getStages()[5]

In [95]:
grid=(ParamGridBuilder().addGrid(lr.regParam, [0.01, 1, 2.0, 5.0]) \
      .addGrid(lr.elasticNetParam, [0.0,0.5,1.0]) \
      .addGrid(lr.maxIter,[1,20]).build()
     
     )

In [96]:
evaluator=BinaryClassificationEvaluator()

cv=CrossValidator(estimator =pipe, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3
              )

In [98]:
cvModel=cv.fit(train)

In [99]:
predictions=cvModel.transform(test)

In [100]:
predictions.select("label","prediction","rawprediction","probability").show(5)

+-----+----------+--------------------+--------------------+
|label|prediction|       rawprediction|         probability|
+-----+----------+--------------------+--------------------+
|  0.0|       0.0|[2.92229751048516...|[0.93337648239949...|
|  0.0|       0.0|[3.01600438138145...|[0.93928192876697...|
|  0.0|       0.0|[3.20048787155207...|[0.95792950376958...|
|  0.0|       0.0|[3.11391707012402...|[0.95538919803026...|
|  0.0|       0.0|[3.15609100466437...|[0.93481752353647...|
+-----+----------+--------------------+--------------------+
only showing top 5 rows



In [101]:

evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

evaluator.evaluate(predictions)



0.8761939226880918