# Classification using Spark MLLib

This is an example of building up a machine learning pipeline and doing classification using MLlib in Spark.

Group Project for Big Data Programming, Fall 2017

### Project Contributors:
Caleb Hulburt   
Mohammad Azim   
Yao Jin   
Xian Lai   

========================================================



In [2]:
ls

[0m[01;32mhow to set up AWS cluster for spark.md[0m*  [01;32mTweets_analysis_SparkStreaming.ipynb[0m*
[01;32mREADME.md[0m*                               [01;32mTweetsListener.py[0m*
[01;32mSpark_machine_learning_pipeline.ipynb[0m*   [01;32mTweetsStreamingPlot.py[0m*
[01;32mTweets_analysis_spark.ipynb[0m*


In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
import pyspark.ml.feature as f
import pyspark.ml.classification as C
from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE
import pyspark.ml.tuning as t

## Configure Spark
set up the configuration and Spark Context 

In [4]:
conf = SparkConf().setMaster("local[*]").setAppName("machine learning pipeline")
sc   = SparkContext(conf=conf)

set up SparkSession and use the DataFrameReader to parse and read csv file

In [5]:
ss = SparkSession.builder\
    .master("local[*]")\
    .appName("machine learning pipeline example")\
    .config(conf=conf)\
    .getOrCreate()

## Preprocessing

We read in the dataset in csv file with spark session so the data will be distributed onto cluster.

In [6]:
df = ss.read.csv(\
    path="../data/OnlineNewsPopularity.csv",\
    inferSchema=True,\
    header=True,\
    ignoreLeadingWhiteSpace=True)

Drop some unpredictive features and sample 1/10 data points out of it.

In [7]:
df = df.drop('url', 'timedelta', 'is_weekend')
df = df.sample(False, 0.1)
df1 = df.alias("df1")

Some features in the dataset are binary and related. So we merge them as one feature and use interger to encode the values. This will reduce the dimensionality.

In [8]:
def mergeFeatures(dataframe, old_feats, new_feat):
    """ merge binary features in dataframe with int encoding.
    """
    count = 0.0
    dataframe = dataframe.withColumn(new_feat, F.lit(count))
    for old_feat in old_feats:
        dataframe = dataframe.withColumn(new_feat,
            F.when(dataframe[old_feat]==1.0,count).
            otherwise(dataframe[new_feat]))

        dataframe = dataframe.drop(old_feat)
        count += 1
    return dataframe

In [9]:
channels = ['data_channel_is_lifestyle',
            'data_channel_is_entertainment',
            'data_channel_is_bus',
            'data_channel_is_socmed',
            'data_channel_is_tech',
            'data_channel_is_world',]

weekdays = ['weekday_is_monday',
            'weekday_is_tuesday',
            'weekday_is_wednesday',
            'weekday_is_thursday',
            'weekday_is_friday',
            'weekday_is_saturday',
            'weekday_is_sunday',]

df1 = mergeFeatures(df1, channels, 'data_channel')
df1 = mergeFeatures(df1, weekdays, 'weekday')

remove the outliers

In [10]:
def removeOutlier(df_0, n_std=5):
    """
    remove the rows with values more than n std away 
    from mean.
    """
    avgExpr = []
    stdExpr = []
    columns = df_0.columns
    
    for col in columns:
        avgExpr.append(F.mean(df_0[col]).alias(col))
        stdExpr.append(F.stddev(df_0[col]).alias(col))
        
    avgs = df_0.agg(*avgExpr).collect()[0]
    stds = df_0.agg(*stdExpr).collect()[0]

    filterExpr = F.lit(True)
    for col in columns:
        mask = (F.abs(df_0[col] - avgs[col]) <= n_std * stds[col])
        filterExpr = filterExpr & mask

    return df_0.filter(filterExpr)

In [11]:
df2 = df1.alias("df2")
df2 = removeOutlier(df2)

## Pipeline construction

Binarize the label column and split the training/test sets.

In [12]:
df2 = df2.withColumn('shares', df2.shares.cast(DoubleType()))
train, test = df2.randomSplit([0.9, 0.1])

Configure an ML pipeline, which consists of 5 stages.

In [13]:
features = list(train.columns)
features.remove('shares')
vectorAssembler = f.VectorAssembler(\
    inputCols=features, 
    outputCol="features")

binarizer = f.Binarizer(\
    threshold=1400, 
    inputCol='shares', 
    outputCol='label')

stringIndexer = f.StringIndexer(\
    inputCol='label',
    outputCol = 'indexed')

pca = f.PCA(\
    inputCol="features",\
    outputCol="pca_features")

lr = C.LogisticRegression(\
    featuresCol="pca_features",\
    labelCol="indexed",\
    standardization=True)

pipeline = Pipeline(stages=[\
    vectorAssembler,\
    binarizer,\
    stringIndexer,\
    pca,\
    lr])

GridSearch and CrossValidation

In [14]:
paramGrid = t.ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(pca.k, [8, 16, 24])\
    .build()
    
cv = t.CrossValidator(\
    estimator=pipeline,\
    estimatorParamMaps=paramGrid,\
    evaluator=BCE(),\
    numFolds=5)

# Pipeline fitting and prediction

In [15]:
model = cv.fit(train)

In [16]:
model.bestModel

PipelineModel_49cdbc936d5affd57467

In [17]:
model.avgMetrics

[0.6207929035684879,
 0.6316329348940822,
 0.6777289591238491,
 0.5627055973000475,
 0.5505872227370847,
 0.6383518404589555,
 0.4972413351392801,
 0.511202552167608,
 0.5852187965473435,
 0.6225375153926213,
 0.6416940157243853,
 0.6800620713797927,
 0.590914550351561,
 0.6011479554059808,
 0.6403497837318773,
 0.5,
 0.5,
 0.5,
 0.6221402663219725,
 0.632800251663499,
 0.6782703431160273,
 0.6206429183844815,
 0.6307319083278913,
 0.6770392265066962,
 0.6193327699114692,
 0.6285893175997477,
 0.6753668183986277,
 0.6233543226174348,
 0.6424057138396138,
 0.68026711987045,
 0.6231380864664087,
 0.6399883380573146,
 0.6776448437132119,
 0.6234792992812712,
 0.6374529785477373,
 0.6763572763188984]

In [18]:
prediction = model.transform(test)

# Evaluation

In [19]:
evaluator = BCE()
evaluator.evaluate(prediction)

0.7212447051156727