#### importing libraries

In [1]:
import pyspark

In [4]:
from pyspark import SparkContext
sc = SparkContext()

In [5]:
from pyspark.sql import Row, SQLContext
sqlContext = SQLContext(sc)

## Machine Learning with pyspark

## 1. Basic Operation

In [210]:
from pyspark import SparkFiles

In [211]:
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"

In [212]:
sc.addFile(url)

In [213]:
df1 = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [214]:
df1.printSchema

<bound method DataFrame.printSchema of DataFrame[x: int, age: int, workclass: string, fnlwgt: int, education: string, educational-num: int, marital-status: string, occupation: string, relationship: string, race: string, gender: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string]>

In [215]:
df1.show(2, truncate = False)

+---+---+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education|educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th     |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad  |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
+---+---+---------+------+---------+---------------+------------------+-----------------+------------+---

In [216]:
df1 = df1.drop('x')

In [217]:
df1.show(5, truncate = False)

+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv  |Husband     |White|Mal

In [218]:
df1.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
| 38| 89814|
| 28|336951|
| 44|160323|
| 18|103497|
+---+------+
only showing top 5 rows



## - Count by group

In [219]:
df1.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



## - Describe data

In [220]:
df1.describe().show()

+-------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|             48842|         48842| 48842|
|   mean| 38.64358543876172|       n

In [221]:
df1.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|                 0|
|    max|             99999|
+-------+------------------+



## - Crosstab computation
#### descoptive statistics b/w 2 pairwise columns.

In [222]:
df1.select('income').show(5)

+------+
|income|
+------+
| <=50K|
| <=50K|
|  >50K|
|  >50K|
| <=50K|
+------+
only showing top 5 rows



In [223]:
df1.crosstab('age','income').sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|        17|  595|   0|
|        18|  862|   0|
|        19| 1050|   3|
|        20| 1112|   1|
|        21| 1090|   6|
|        22| 1161|  17|
|        23| 1307|  22|
|        24| 1162|  44|
|        25| 1119|  76|
|        26| 1068|  85|
|        27| 1117| 115|
|        28| 1101| 179|
|        29| 1025| 198|
|        30| 1031| 247|
|        31| 1050| 275|
|        32|  957| 296|
|        33| 1045| 290|
|        34|  949| 354|
|        35|  997| 340|
|        36|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [224]:
# df1 = df1.drop('educational-num')

In [225]:
df1.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

In [226]:
df1.filter(df1.age > 40).count()

20211

## - Descriptive statistics by group

In [227]:
df1.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



# 2. Data Preprocessing

In [228]:
from pyspark.sql.functions import *

In [229]:
# add age square
age_square = df1.select(col("age")**2)

In [230]:
df1 = df1.withColumn("age_square", col("age")**2)

In [231]:
df1.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



In [232]:
df1.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income',
 'age_square']

In [233]:
# rearranging columns
COLUMNS = ['age',
 'age_square',
 'workclass',
 'fnlwgt',
 'education',
 'educational-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']
df1 = df1.select(COLUMNS)

In [234]:
df1.first()

Row(age=25, age_square=625.0, workclass=u'Private', fnlwgt=226802, education=u'11th', educational-num=7, marital-status=u'Never-married', occupation=u'Machine-op-inspct', relationship=u'Own-child', race=u'Black', gender=u'Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country=u'United-States', income=u'<=50K')

In [235]:
df1.groupby('native-country').count().show()

+------------------+-----+
|    native-country|count|
+------------------+-----+
|       Philippines|  295|
|           Germany|  206|
|          Cambodia|   28|
|            France|   38|
|            Greece|   49|
|            Taiwan|   65|
|           Ecuador|   45|
|         Nicaragua|   49|
|              Hong|   30|
|              Peru|   46|
|             India|  151|
|             China|  122|
|             Italy|  105|
|Holand-Netherlands|    1|
|              Cuba|  138|
|             South|  115|
|              Iran|   59|
|           Ireland|   37|
|          Thailand|   30|
|              Laos|   23|
+------------------+-----+
only showing top 20 rows



In [236]:
# Excluding Holand-Netherlands : It has only 1 observation. It can lead to an error during cross-validation
df1 = df1.filter(df1["native-country"] != 'Holand-Netherlands')

## 3. Building a data procesiing pipeline

In [237]:
# Similar to scikit-learn, Pyspark has a pipeline API. A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline.
# Inside the pipeline, various operations are done, the output is used to feed the algorithm.

In [238]:
# The steps to transform the data are very similar to scikit-learn. you need to:
# Index the string to numeric
# Create the one hot encoder
# Transform the data

In [239]:
# Two APIs do the job
# StringIndexer, OneHotEncoder

#### (a). First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.

In [240]:
from pyspark.ml.feature import StringIndexer

In [125]:
indexer = StringIndexer(inputCol="workclass",outputCol="workclass_encoded")

#### (b). Fit the data and transform it.

In [126]:
model = indexer.fit(df1)

In [127]:
indexed = model.transform(df1)

#### (c). Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

In [241]:
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel

In [242]:
from pyspark.ml.feature import VectorAssembler

In [129]:
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")

In [130]:
encoded = encoder.transform(indexed)
encoded.show(5)

+---+----------+---------+------+------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
|age|age_square|workclass|fnlwgt|   education|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|workclass_encoded|workclass_vec|
+---+----------+---------+------+------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
| 25|     625.0|  Private|226802|        11th|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|              0.0|(9,[0],[1.0])|
| 38|    1444.0|  Private| 89814|     HS-grad|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| United-States| <=50K|        

#### Build the pipeline

- Encode the categorical data
- Index the label feature
- Add continuous variable
- Assemble the steps.

In [243]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator

In [244]:
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']

In [245]:
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [246]:
# Convert label into label indices using the StringIndexer
income_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [income_stringIdx]

In [247]:
# Add continuous variable
CONTI_FEATURES = ['age','age_square','fnlwgt','educational-num','capital-gain','capital-loss','hours-per-week']
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [248]:
# Assemble the steps
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [249]:
# Push data to pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df1)
model = pipelineModel.transform(df1)

In [250]:
model.take(1)

[Row(age=25, age_square=625.0, workclass=u'Private', fnlwgt=226802, education=u'11th', educational-num=7, marital-status=u'Never-married', occupation=u'Machine-op-inspct', relationship=u'Own-child', race=u'Black', gender=u'Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country=u'United-States', income=u'<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(40, {0: 1.0}), newincome=0.0, features=SparseVector(100, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 625.0,

# 4. Building the classifier : Logistic

#### To make the computation faster, you convert model to a DataFrame. You need to select newlabel and features from model using map.

In [253]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newincome"], DenseVector(x["features"])))

In [254]:
df_train = sqlContext.createDataFrame(input_data, ["income", "features"])

In [255]:
df_train.show(2)

+------+--------------------+
|income|            features|
+------+--------------------+
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
+------+--------------------+
only showing top 2 rows



### Create a train/test split

In [256]:
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [258]:
train_data.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|        29675|
|   1.0|         9300|
+------+-------------+



In [259]:
test_data.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7479|
|   1.0|         2387|
+------+-------------+



### Build the Logistic Regresoor

In [260]:
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="income",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [261]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.020024313100485282,-0.17067875759794013,-0.012915023810445462,-0.17222267772956035,-0.09976851617340401,0.154794366772181,0.2361143443774587,-0.2687921350996918,-0.1973967988518051,-0.04119249600202847,0.26574649956752183,0.4347710207881013,0.007956983316968,-0.2989167581493538,0.002811702944800906,-0.3324731404712408,-0.4495839219326918,0.5510390170709929,-0.37154429813121903,-0.19305729674359548,0.6140025791271341,-0.34474797023407316,-0.39427342320353964,0.3392219150893706,-0.311304120193761,-0.19059441919350795,-0.19381640215355017,-0.15572361740267784,-0.1007681814342528,0.25394694305619214,-0.07269388888484095,0.33519114668122957,-0.08246976081806602,0.060501943006044265,-0.2939267434110623,-0.21628724748705971,-0.17317790387715357,-0.14526894316186084,-0.29078873907877717,-0.36505897781845936,0.16344754786814078,0.1351310838722112,-0.27556252650716284,0.2664732611362402,-0.16398936723206994,-0.2843802285746094,-0.2294198714460625,0.49027248116902156,0.014093599

## 5. Train and Evaluate the model

In [262]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [263]:
predictions.printSchema()

root
 |-- income: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [264]:
selected = predictions.select("income", "prediction", "probability")
selected.show(20)

+------+----------+--------------------+
|income|prediction|         probability|
+------+----------+--------------------+
|   0.0|       0.0|[0.93986414748719...|
|   0.0|       0.0|[0.94546337719732...|
|   0.0|       0.0|[0.93964307780162...|
|   0.0|       0.0|[0.94024282426827...|
|   0.0|       0.0|[0.68392879879471...|
|   0.0|       0.0|[0.69767074646240...|
|   0.0|       0.0|[0.72629698228672...|
|   0.0|       0.0|[0.70474072820344...|
|   0.0|       0.0|[0.94700376237148...|
|   0.0|       0.0|[0.86281666810406...|
|   0.0|       0.0|[0.85311974584505...|
|   0.0|       0.0|[0.82182382760238...|
|   0.0|       0.0|[0.52861371256606...|
|   0.0|       0.0|[0.65652686349188...|
|   0.0|       0.0|[0.68085794866931...|
|   0.0|       0.0|[0.56875936042104...|
|   0.0|       0.0|[0.88166191140485...|
|   0.0|       0.0|[0.80615258609679...|
|   0.0|       0.0|[0.84036513509636...|
|   0.0|       0.0|[0.51313125042704...|
+------+----------+--------------------+
only showing top

### Evaluate the model

In [265]:
# creating a DataFrame with the label(income) and the `prediction

In [266]:
cm = predictions.select("income", "prediction")

In [267]:
# checking the number of class in the label and the prediction

In [269]:
cm.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7479|
|   1.0|         2387|
+------+-------------+



In [270]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             8830|
|       1.0|             1036|
+----------+-----------------+



For instance, in the test set, there is 2387 household with an income above 50k and 7479 below. The classifier, however, predicted 1036 households with income above 50k.

We can compute the accuracy by computing the count when the label(income) are correctly classified over the total number of rows.

In [282]:
from __future__ import division

In [284]:
cm.filter(cm.income == cm.prediction).count() / cm.count()

0.8257652544090817

In [285]:
# wrapping everything together and writing a function to compute the accuracy.

In [286]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("income", "prediction")
    acc = cm.filter(cm.income == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

Model accuracy: 82.577%


## ROC Metrics

In [293]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.setLabelCol("income").evaluate(predictions))
print(evaluator.getMetricName())

0.896776019636
areaUnderROC


In [294]:
print(evaluator.evaluate(predictions))

0.896776019636


# 6. Tune the hyperparameter

In [288]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [295]:
from time import *
t0 = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
print(time()-t0)

1155.75


In [296]:
accuracy_m(model = cvModel)

Model accuracy: 84.715%


In [297]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent=u'LogisticRegression_2c978ff3de8c', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent=u'LogisticRegression_2c978ff3de8c', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent=u'LogisticRegression_2c978ff3de8c', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent=u'LogisticRegression_2c978ff3de8c', name='labelCol', doc='label column name'): 'income',
 Param(parent=u'LogisticRegression_2c978ff3de8c', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent=u'LogisticRegression_2c978ff3de8c', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent=u'LogisticRegression_2c978ff3de8c', name='standardization', doc='whether to standardiz