## 2.7 Machine Learning for Ranking - Spark MLLIB

<img src='files/resources/spark-logo-hd.png' align='left'>

We saw Spark on Day 1 for data processing at scale.  Today we are going to take a quick look at MLLIB.

First we need to get a spark context:

In [1]:
import findspark
import os
findspark.init(os.getenv('HOME') + '/spark-1.6.0-bin-hadoop2.6')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'

In [2]:
import pyspark
try: 
    print(sc)
except NameError:
    sc = pyspark.SparkContext()
    print(sc)

<pyspark.context.SparkContext object at 0x7fdc0c6212d0>


We use the csv reader to load the data from earlier into a Spark DataFrame:

In [3]:
from pyspark.sql import SQLContext
import os

sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options() \
        .options(header='true', inferSchema='true', delimiter=',') \
        .load(os.getcwd() + '/data/data_balanced.csv') 
        
df.schema
df.dropna()

DataFrame[key: int, query: string, Title: string, LeafCats: string, ItemID: int, X_unit_id: int, SCORE: string, label_relevanceGrade: int, label_relevanceBinary: double, feature_1: double, feature_2: double, feature_3: double, feature_4: double, feature_5: double, feature_6: double, feature_7: double, feature_8: double, feature_9: int, feature_10: double]

Now we can extract the features and the target for the machine learning algorithms:

In [27]:
sqlContext.registerDataFrameAsTable(df,'dataset')
sqlContext.tableNames()

data_full = sqlContext.sql("select label_relevanceBinary, feature_1, feature_2, feature_3, feature_4 \
                       feature_5, feature_6, feature_7, feature_8, feature_9, feature_10 \
               from dataset").rdd

We have to work pretty hard to get the data ready for MLLIB.  We apply a `StandardScaler()` to standardise the data.  

There is a bit of jostling to get the DataFrame into an RDD and then back into an RDD of LabeledPoints:

In [30]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import StandardScaler

label = data_full.map(lambda row: row[0])
features = data_full.map(lambda row: row[1:])

model = StandardScaler().fit(features)
features_transform = model.transform(features)

# Now combine and convert back to labelled points:
transformedData = label.zip(features_transform)
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))

transformedData.take(5)

[LabeledPoint(0.0, [3.2387300086,0.0,-6.60489559286,-5.50047933656,1.23398964824,0.0,0.0,3.3855297815,-2.06632619799]),
 LabeledPoint(0.0, [1.45057308082,2.00321832209,-6.57299970845,-5.32406174515,0.09470938613,0.00117524594638,-1.93134360958,5.12665938341,0.023928989607]),
 LabeledPoint(0.0, [0.705616371946,2.00321832209,-5.31114724493,-4.41307798391,0.0,1.08739179172,0.203299022375,5.15084173899,-0.0103003575681]),
 LabeledPoint(0.0, [2.59385456766,2.00321832209,-10.698396655,-4.69578028733,0.848444319911,0.00979070276873,0.0675294293089,3.31298271475,-2.06632619799]),
 LabeledPoint(0.0, [3.2387300086,0.0,-7.848391211,-4.31833981006,0.374249007948,0.0,0.0,4.30445929362,-2.06632619799])]

We also split the data into test and validation data sets - splitting 75%:25% between the training and test sets:

In [40]:
data_train, data_test = transformedData.randomSplit([.75,.25],seed=1973)

print('Training data records = ' + str(data_train.count()))
print('Training data records = ' + str(data_test.count()))

Training data records = 14977
Training data records = 5023


## Support Vector Machine

In [41]:
model = SVMWithSGD.train(data_train, iterations=100)

In [42]:
model

(weights=[0.722869566986,0.793625177911,0.197281899878,0.723098005828,0.884004348317,0.260247880808,-0.119935774873,-0.147394050516,0.0753756727842], intercept=0.0)

In [43]:
preds = data_test.map(lambda row: (model.predict(row.features[0]),row.label))
err = preds.filter(lambda (v, p): v != p).count() / float(data_test.count())
print("Accuracy = " + str(1-err))

Accuracy = 0.619749153892


## Random Forest

In [28]:
from pyspark.mllib.tree import RandomForest

model = RandomForest.trainClassifier(data_train, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=400, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=10, maxBins=32)

In [29]:
predictions = model.predict(data_test.map(lambda x: x.features))
labelsAndPredictions = data_test.map(lambda lp: lp.label).zip(predictions)
err = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(data_test.count())
print("Accuracy = " + str(1-err))

Accuracy = 0.684252438782


## Where next?

Loads of examples on the [MLLIB website](http://spark.apache.org/docs/latest/mllib-guide.html).