# Dependencies

In [1]:
import pandas as pd
import pyspark
import re

# Setup

In [2]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('display.expand_frame_repr', False)

In [3]:
spark.conf.get('spark.driver.memory')

u'6g'

# Task 3

Reading the saved parquet file for fitting 2 separate models

In [4]:
shoesDfOriginal = spark.read.parquet("hdfs:///data/exercise/shoes.parquet")

### Print schema

In [5]:
shoesDfOriginal.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- upc_id: string (nullable = true)
 |-- descr: string (nullable = true)
 |-- vendor_catalog_url: string (nullable = true)
 |-- buy_url: string (nullable = true)
 |-- manufacturer_name: string (nullable = true)
 |-- sale_price: decimal(38,18) (nullable = true)
 |-- retail_price: decimal(38,18) (nullable = true)
 |-- manufacturer_part_no: string (nullable = true)
 |-- country: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- category_id: integer (nullable = true)



## Original Data

In [6]:
pd.DataFrame(shoesDfOriginal.take(5), columns=shoesDfOriginal.columns)

Unnamed: 0,product_id,name,upc_id,descr,vendor_catalog_url,buy_url,manufacturer_name,sale_price,retail_price,manufacturer_part_no,country,vendor_id,category_name,category_code,category_id
0,530987,i play Swim Shoes (Kids) - Aqua-4,,i play Swim Shoes Kids - Hot Pink Imported Imp...,http://www.shopstyle.com/p/i-play-swim-shoes-k...,http://www.shopstyle.com/action/apiVisitRetail...,I Play,8.99,9.99,,,,Boys' Shoes,boys-shoes,1599
1,530519,K-Swiss 501 Classic Tennis Shoe (Little Kid),,K Swiss Classic Retro Shoes,http://www.shopstyle.com/p/k-swiss-501-classic...,http://www.shopstyle.com/action/apiVisitRetail...,K-Swiss,25.32,48.0,,,,Boys' Shoes,boys-shoes,1599
2,530581,K-Swiss 801 Classic Tennis Shoe (Big Kid),,K Swiss Classic Luxury Retro Shoes,http://www.shopstyle.com/p/k-swiss-801-classic...,http://www.shopstyle.com/action/apiVisitRetail...,K-Swiss,43.1,50.0,,,,Boys' Shoes,boys-shoes,1599
3,535774,Dinosoles Kids' Dinorama Veloci Raptor,,Dinosoles Kids' Dinorama Veloci Raptor,http://www.shopstyle.com/p/dinosoles-kids-dino...,http://www.shopstyle.com/action/apiVisitRetail...,Dinosoles,25.87,36.95,,,,Boys' Shoes,boys-shoes,1599
4,535775,Dinosoles Kids' Dino Ankylosaurus Tod/Pr,,Dinosoles Kids' Dino Ankylosaurus Tod/Pr,http://www.shopstyle.com/p/dinosoles-kids-dino...,http://www.shopstyle.com/action/apiVisitRetail...,Dinosoles,29.6,37.0,,,,Boys' Shoes,boys-shoes,1599


## Keep only relevent data tha can help us classify a tweet to a product category

In [7]:
shoesDf = shoesDfOriginal.select('name', 'descr', 'manufacturer_name', 'sale_price', 'country', 'category_name', 'category_code', 'category_id')

### Number of Items per category

In [8]:
shoesDf.groupBy("category_code", 'category_id').count().show()

+-------------------+-----------+-----+
|      category_code|category_id|count|
+-------------------+-----------+-----+
|         boys-shoes|       1599|15400|
|        girls-shoes|       1612|21632|
| mens-lace-up-shoes|       1564|12353|
|     shoes-athletic|       1976| 4899|
|mens-shoes-athletic|       1561| 7935|
|      evening-shoes|       1773|  901|
|       bridal-shoes|       1386|  848|
+-------------------+-----------+-----+



## Keep only relevent data tha can help us classify a tweet to a product category

In [9]:
shoesDf = shoesDf.select('name', 'descr', 'manufacturer_name', 'sale_price', 'country', 'category_name', 'category_code', 'category_id')

### Example of data

In [10]:
pd.DataFrame(shoesDf.take(20), columns=shoesDf.columns)

Unnamed: 0,name,descr,manufacturer_name,sale_price,country,category_name,category_code,category_id
0,i play Swim Shoes (Kids) - Aqua-4,i play Swim Shoes Kids - Hot Pink Imported Imp...,I Play,8.99,,Boys' Shoes,boys-shoes,1599
1,K-Swiss 501 Classic Tennis Shoe (Little Kid),K Swiss Classic Retro Shoes,K-Swiss,25.32,,Boys' Shoes,boys-shoes,1599
2,K-Swiss 801 Classic Tennis Shoe (Big Kid),K Swiss Classic Luxury Retro Shoes,K-Swiss,43.1,,Boys' Shoes,boys-shoes,1599
3,Dinosoles Kids' Dinorama Veloci Raptor,Dinosoles Kids' Dinorama Veloci Raptor,Dinosoles,25.87,,Boys' Shoes,boys-shoes,1599
4,Dinosoles Kids' Dino Ankylosaurus Tod/Pr,Dinosoles Kids' Dino Ankylosaurus Tod/Pr,Dinosoles,29.6,,Boys' Shoes,boys-shoes,1599
5,Puma Drez S Jr - New Navy/Geranium-4,Puma Drez S Jr - Dark Shadow/Limestone Gray,Puma,13.75,,Boys' Shoes,boys-shoes,1599
6,Puma Suede 2 Straps Kids - Black/White-4,Puma Suede 2 Straps Kids - Green Sheen,Puma,20.09,,Boys' Shoes,boys-shoes,1599
7,Slip-On Sneakers,<ul> <li>Textile/manmade material</li> <li>Spo...,Crazy 8,11.19,,Boys' Shoes,boys-shoes,1599
8,Lace-Up Sneakers,<ul> <li>Textile/manmade material</li> <li>Spo...,Crazy 8,11.19,,Boys' Shoes,boys-shoes,1599
9,OshKosh B'Gosh Octo Sneaker (Toddler/Little Kid),OshKosh B'Gosh Octo Sneaker Toddler/Little Kid,Osh Kosh,16.34,,Boys' Shoes,boys-shoes,1599


## Preprocessing

## Custom Transformers

In [11]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

### Define Custom Transformer to remove non alphanumeric charaters

In [12]:
class CleanText(Transformer, HasInputCol, HasOutputCol):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(CleanText, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
        
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def _transform(self, dataset):
        def f(text):
            text = re.sub("\W", " ", text)
            text = re.sub(" +", " ", text)
            
            text = re.sub(r"’", "\'", text)
            text = re.sub(r'([^\x00-\x7f])',r'', text)
            return text
        
        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

### Define Custom Transformer to remove HTML

In [13]:
from bs4 import BeautifulSoup

class BeutifulSoupTagRemover(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(BeutifulSoupTagRemover, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):

        def f(s):
            cleaned_post = BeautifulSoup(s).text
            return cleaned_post

        t = StringType()
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

### Clone original dataframe to start preprocessing

In [14]:
shoesDf = shoesDfOriginal

### Clean Data
- make all text lowercase
- remove html tags from the name
- remove 
- remove stopwords

In [15]:
from pyspark.sql.functions import lower, col
shoesDf = shoesDf
shoesDf = shoesDf.withColumn("descr", lower(col("descr")))

cleanText  = CleanText(inputCol="name", outputCol="name1")
beutifulSoupTagRemover = BeutifulSoupTagRemover(inputCol="name1", outputCol="name2")
tokenizer = pyspark.ml.feature.Tokenizer(inputCol="name2", outputCol="name3")
stopWordsRemover = pyspark.ml.feature.StopWordsRemover(inputCol="name3", outputCol="name_preprocessed")

preprocessingPipeline = pyspark.ml.Pipeline(stages=[
    cleanText,
    beutifulSoupTagRemover,
    tokenizer,
    stopWordsRemover
])

shoesDf = preprocessingPipeline.fit(shoesDf).transform(shoesDf)

In [16]:
shoesDf = shoesDf.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

In [17]:
pd.DataFrame(shoesDf.take(100), columns=shoesDf.columns)

Unnamed: 0,product_id,name,upc_id,descr,vendor_catalog_url,buy_url,manufacturer_name,sale_price,retail_price,manufacturer_part_no,country,vendor_id,category_name,category_code,category_id,name1,name2,name3,name_preprocessed
0,530987,i play Swim Shoes (Kids) - Aqua-4,,i play swim shoes kids - hot pink imported imp...,http://www.shopstyle.com/p/i-play-swim-shoes-k...,http://www.shopstyle.com/action/apiVisitRetail...,I Play,8.99,9.99,,,,Boys' Shoes,boys-shoes,1599,i play Swim Shoes Kids Aqua 4,i play Swim Shoes Kids Aqua 4,"[i, play, swim, shoes, kids, aqua, 4]","[play, swim, shoes, kids, aqua, 4]"
1,530519,K-Swiss 501 Classic Tennis Shoe (Little Kid),,k swiss classic retro shoes,http://www.shopstyle.com/p/k-swiss-501-classic...,http://www.shopstyle.com/action/apiVisitRetail...,K-Swiss,25.32,48.0,,,,Boys' Shoes,boys-shoes,1599,K Swiss 501 Classic Tennis Shoe Little Kid,K Swiss 501 Classic Tennis Shoe Little Kid,"[k, swiss, 501, classic, tennis, shoe, little,...","[k, swiss, 501, classic, tennis, shoe, little,..."
2,530581,K-Swiss 801 Classic Tennis Shoe (Big Kid),,k swiss classic luxury retro shoes,http://www.shopstyle.com/p/k-swiss-801-classic...,http://www.shopstyle.com/action/apiVisitRetail...,K-Swiss,43.1,50.0,,,,Boys' Shoes,boys-shoes,1599,K Swiss 801 Classic Tennis Shoe Big Kid,K Swiss 801 Classic Tennis Shoe Big Kid,"[k, swiss, 801, classic, tennis, shoe, big, kid]","[k, swiss, 801, classic, tennis, shoe, big, kid]"
3,535774,Dinosoles Kids' Dinorama Veloci Raptor,,dinosoles kids' dinorama veloci raptor,http://www.shopstyle.com/p/dinosoles-kids-dino...,http://www.shopstyle.com/action/apiVisitRetail...,Dinosoles,25.87,36.95,,,,Boys' Shoes,boys-shoes,1599,Dinosoles Kids Dinorama Veloci Raptor,Dinosoles Kids Dinorama Veloci Raptor,"[dinosoles, kids, dinorama, veloci, raptor]","[dinosoles, kids, dinorama, veloci, raptor]"
4,535775,Dinosoles Kids' Dino Ankylosaurus Tod/Pr,,dinosoles kids' dino ankylosaurus tod/pr,http://www.shopstyle.com/p/dinosoles-kids-dino...,http://www.shopstyle.com/action/apiVisitRetail...,Dinosoles,29.6,37.0,,,,Boys' Shoes,boys-shoes,1599,Dinosoles Kids Dino Ankylosaurus Tod Pr,Dinosoles Kids Dino Ankylosaurus Tod Pr,"[dinosoles, kids, dino, ankylosaurus, tod, pr]","[dinosoles, kids, dino, ankylosaurus, tod, pr]"
5,539893,Puma Drez S Jr - New Navy/Geranium-4,,puma drez s jr - dark shadow/limestone gray,http://www.shopstyle.com/p/puma-drez-s-jr/4294...,http://www.shopstyle.com/action/apiVisitRetail...,Puma,13.75,55.0,,,,Boys' Shoes,boys-shoes,1599,Puma Drez S Jr New Navy Geranium 4,Puma Drez S Jr New Navy Geranium 4,"[puma, drez, s, jr, new, navy, geranium, 4]","[puma, drez, jr, new, navy, geranium, 4]"
6,539934,Puma Suede 2 Straps Kids - Black/White-4,,puma suede 2 straps kids - green sheen,http://www.shopstyle.com/p/puma-suede-2-straps...,http://www.shopstyle.com/action/apiVisitRetail...,Puma,20.09,40.0,,,,Boys' Shoes,boys-shoes,1599,Puma Suede 2 Straps Kids Black White 4,Puma Suede 2 Straps Kids Black White 4,"[puma, suede, 2, straps, kids, black, white, 4]","[puma, suede, 2, straps, kids, black, white, 4]"
7,531019,Slip-On Sneakers,,<ul> <li>textile/manmade material</li> <li>spo...,http://www.shopstyle.com/p/crazy-8-slip-on-sne...,http://www.shopstyle.com/action/apiVisitRetail...,Crazy 8,11.19,19.88,,,,Boys' Shoes,boys-shoes,1599,Slip On Sneakers,Slip On Sneakers,"[slip, on, sneakers]","[slip, sneakers]"
8,531020,Lace-Up Sneakers,,<ul> <li>textile/manmade material</li> <li>spo...,http://www.shopstyle.com/p/crazy-8-lace-up-sne...,http://www.shopstyle.com/action/apiVisitRetail...,Crazy 8,11.19,19.88,,,,Boys' Shoes,boys-shoes,1599,Lace Up Sneakers,Lace Up Sneakers,"[lace, up, sneakers]","[lace, sneakers]"
9,531058,OshKosh B'Gosh Octo Sneaker (Toddler/Little Kid),,oshkosh b'gosh octo sneaker toddler/little kid,http://www.shopstyle.com/p/osh-kosh-octo-sneak...,http://www.shopstyle.com/action/apiVisitRetail...,Osh Kosh,16.34,16.34,,,,Boys' Shoes,boys-shoes,1599,OshKosh B Gosh Octo Sneaker Toddler Little Kid,OshKosh B Gosh Octo Sneaker Toddler Little Kid,"[oshkosh, b, gosh, octo, sneaker, toddler, lit...","[oshkosh, b, gosh, octo, sneaker, toddler, lit..."


### Split dataset

- 0.8 % used for training
- 0.2 % used for testing

In [18]:
# .sample(True, 0.1)
(training, test) = shoesDf.sample(True, 0.1).randomSplit([0.8, 0.2], seed=1987)

## Verbose Cross Validation

In [19]:
import numpy as np

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel
from pyspark.sql.functions import rand


class CrossValidatorVerbose(CrossValidator):

    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)

        eva = self.getOrDefault(self.evaluator)
        metricName = eva.getMetricName()

        nFolds = self.getOrDefault(self.numFolds)
        seed = self.getOrDefault(self.seed)
        h = 1.0 / nFolds

        randCol = self.uid + "_rand"
        df = dataset.select("*", rand(seed).alias(randCol))
        metrics = [0.0] * numModels

        for i in range(nFolds):
            foldNum = i + 1
            print("Comparing models on fold %d" % foldNum)

            validateLB = i * h
            validateUB = (i + 1) * h
            condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
            validation = df.filter(condition)
            train = df.filter(~condition)

            for j in range(numModels):
                paramMap = epm[j]
                model = est.fit(train, paramMap)
                metric = eva.evaluate(model.transform(validation, paramMap))
                metrics[j] += metric

                avgSoFar = metrics[j] / foldNum
                print("params: %s\t%s: %f\tavg: %f" % (
                    {param.name: val for (param, val) in paramMap.items()},
                    metricName, metric, avgSoFar))

        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)

        bestParams = epm[bestIndex]
        bestModel = est.fit(dataset, bestParams)
        avgMetrics = [m / nFolds for m in metrics]
        bestAvg = avgMetrics[bestIndex]
        print("Best model:\nparams: %s\t%s: %f" % (
            {param.name: val for (param, val) in bestParams.items()},
            metricName, bestAvg))

        return self._copyValues(CrossValidatorModel(bestModel, avgMetrics))

## Logistic Regression

### Model Training

In [20]:
%%time

from pyspark.ml.feature import *

label_indexer_lr = StringIndexer(inputCol="category_id", outputCol="category_id_indexed")
hashingTF_lr = pyspark.ml.feature.HashingTF(inputCol="name_preprocessed", outputCol="features", numFeatures=10)
lr = pyspark.ml.classification.LogisticRegression(maxIter=10, regParam=0.01, featuresCol='features', labelCol="category_id_indexed")

pipeline_lr = pyspark.ml.Pipeline(stages=[
    label_indexer_lr,
    hashingTF_lr, 
    lr
])

lrModel = pipeline_lr.fit(training)

CPU times: user 41.2 ms, sys: 7.39 ms, total: 48.6 ms
Wall time: 3.67 s


In [21]:
pipeline_lr.getStages()

[StringIndexer_cc2b72f174f4,
 HashingTF_878c485240c2,
 LogisticRegression_d81bdbd9ccb9]

### Model Parameters

In [22]:
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.stages[-1].coefficientMatrix))
print("Intercept: " + str(lrModel.stages[-1].interceptVector))

Coefficients: 
DenseMatrix([[-0.26715459,  0.36778312,  0.24932515, -0.34870887,  0.3189703 ,
               0.16421328,  0.05211823,  0.0688311 , -0.10108216,  0.41872814],
             [-0.21438976,  0.28528888,  0.27902749, -0.36274238, -0.11659786,
               0.10042673,  0.30232345,  0.01820837,  0.08177082,  0.39796326],
             [ 0.25143419, -0.25856971, -0.21349432,  0.21844803,  0.01020006,
              -0.12719524, -0.56723017, -0.31709821, -0.51459766,  0.08954914],
             [ 0.07504628, -0.22496379, -0.15153154,  0.82886636, -0.12154805,
              -0.08233997, -0.54114432, -0.32882842,  0.32210137, -0.40721314],
             [-0.07873442, -0.37277331, -0.23107384,  0.11781403, -0.17950036,
              -0.18106532,  0.67221275, -0.22428094,  0.29831999, -0.22275605],
             [ 0.10605562,  0.06610271,  0.07960076, -0.21948921,  0.04909961,
               0.10150021,  0.04376101,  0.26992509, -0.01424266, -0.11627448],
             [ 0.12774269,  0.1

### Check training summary

In [23]:
trainingSummary = lrModel.stages[-1].summary

objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

objectiveHistory:
1.598841274
1.52788208073
1.42776507336
1.4195461772
1.40702601954
1.40422951834
1.39570606091
1.38558738319
1.38369218579
1.37294935707
1.3704019748
False positive rate by label:
label 0: 0.47983014862
label 1: 0.0975544922913
label 2: 0.11709253287
label 3: 0.0662251655629
label 4: 0.0130208333333
label 5: 0.0
label 6: 0.0
True positive rate by label:
label 0: 0.73562537048
label 1: 0.218494271686
label 2: 0.457502623295
label 3: 0.380165289256
label 4: 0.103723404255
label 5: 0.0
label 6: 0.0
Precision by label:
label 0: 0.439603258944
label 1: 0.421135646688
label 2: 0.480176211454
label 3: 0.442307692308
label 4: 0.393939393939
label 5: 0.0
label 6: 0.0
Recall by label:
label 0: 0.73562537048
label 1: 0.218494271686
label 2: 0.457502623295
label 3: 0.380165289256
label 4: 0.103723404255
label 5: 0.0
label 6: 0.0
F-measure by label:
label 0: 0.550332594235
label 1: 0.287715517241
label 2: 0.46856528748
label 3: 0.408888888889
label 4: 0.164210526316
label 5: 0.0
l

### Model evaluation

In [24]:
prediction = lrModel.transform(test)

evaluator_lr = pyspark.ml.evaluation.MulticlassClassificationEvaluator(labelCol="category_id_indexed", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator_lr.evaluate(prediction)
print("Accuracy = %g " % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

Accuracy = 0.451349 
Test Error = 0.548651 


## Logistic Regression Cross Validation

In [25]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.tuning import  TrainValidationSplit

In [26]:
hashingTF_lr_cross = pyspark.ml.feature.HashingTF(inputCol="name_preprocessed", outputCol="features")
lr_cross = pyspark.ml.classification.LogisticRegression(maxIter=10, featuresCol='features', labelCol="category_id_indexed")

pipeline_lr_cross = pyspark.ml.Pipeline(stages=[
    label_indexer_lr,
    hashingTF_lr_cross,
    lr_cross
])

paramGrid_lr = ParamGridBuilder() \
    .addGrid(hashingTF_lr_cross.numFeatures, [100, 1000, 1500, 2000, 3000]) \
    .addGrid(lr_cross.maxIter, [10, 20, 30]) \
    .addGrid(lr_cross.regParam, [0.1, 0.01, 0.001]) \
    .build()

evaluator_lr_cross = pyspark.ml.evaluation.MulticlassClassificationEvaluator(
    labelCol="category_id_indexed", 
    predictionCol="prediction", 
    metricName="accuracy"
)

crossval_lr = CrossValidatorVerbose(
    estimator=pipeline_lr_cross,
    estimatorParamMaps=paramGrid_lr,
    evaluator=evaluator_lr_cross,
    numFolds=3
) 

### Run CrossValidator

In [27]:
# Run cross-validation, and choose the best set of parameters.
crossval_lr_model = crossval_lr.fit(training)

Comparing models on fold 1
params: {'regParam': 0.1, 'numFeatures': 100, 'maxIter': 10}	accuracy: 0.610259	avg: 0.610259
params: {'regParam': 0.1, 'numFeatures': 100, 'maxIter': 20}	accuracy: 0.617925	avg: 0.617925
params: {'regParam': 0.1, 'numFeatures': 100, 'maxIter': 30}	accuracy: 0.617335	avg: 0.617335
params: {'regParam': 0.01, 'numFeatures': 100, 'maxIter': 10}	accuracy: 0.617925	avg: 0.617925
params: {'regParam': 0.01, 'numFeatures': 100, 'maxIter': 20}	accuracy: 0.621462	avg: 0.621462
params: {'regParam': 0.01, 'numFeatures': 100, 'maxIter': 30}	accuracy: 0.620283	avg: 0.620283
params: {'regParam': 0.001, 'numFeatures': 100, 'maxIter': 10}	accuracy: 0.613797	avg: 0.613797
params: {'regParam': 0.001, 'numFeatures': 100, 'maxIter': 20}	accuracy: 0.611439	avg: 0.611439
params: {'regParam': 0.001, 'numFeatures': 100, 'maxIter': 30}	accuracy: 0.612028	avg: 0.612028
params: {'regParam': 0.1, 'numFeatures': 1000, 'maxIter': 10}	accuracy: 0.709906	avg: 0.709906
params: {'regParam': 0.

params: {'regParam': 0.01, 'numFeatures': 3000, 'maxIter': 20}	accuracy: 0.724073	avg: 0.715220
params: {'regParam': 0.01, 'numFeatures': 3000, 'maxIter': 30}	accuracy: 0.722187	avg: 0.714278
params: {'regParam': 0.001, 'numFeatures': 3000, 'maxIter': 10}	accuracy: 0.709617	avg: 0.705339
params: {'regParam': 0.001, 'numFeatures': 3000, 'maxIter': 20}	accuracy: 0.712759	avg: 0.705141
params: {'regParam': 0.001, 'numFeatures': 3000, 'maxIter': 30}	accuracy: 0.722187	avg: 0.708676
Comparing models on fold 3
params: {'regParam': 0.1, 'numFeatures': 100, 'maxIter': 10}	accuracy: 0.631703	avg: 0.618052
params: {'regParam': 0.1, 'numFeatures': 100, 'maxIter': 20}	accuracy: 0.635239	avg: 0.621786
params: {'regParam': 0.1, 'numFeatures': 100, 'maxIter': 30}	accuracy: 0.635239	avg: 0.622008
params: {'regParam': 0.01, 'numFeatures': 100, 'maxIter': 10}	accuracy: 0.620507	avg: 0.618132
params: {'regParam': 0.01, 'numFeatures': 100, 'maxIter': 20}	accuracy: 0.631703	avg: 0.625557
params: {'regParam

### Get Predictions from Best Model

In [28]:
# Make predictions on test documents. cvModel uses the best model found (lrModel).
crossval_lr_prediction = crossval_lr_model.transform(test)

crossval_lr_accuracy = evaluator_lr_cross.evaluate(crossval_lr_prediction)
print("Test Accuracy = %g " % (crossval_lr_accuracy))
print("Test Error = %g " % (1.0 - crossval_lr_accuracy))

#get the best model
best_lr = crossval_lr_model.bestModel

Test Accuracy = 0.736713 
Test Error = 0.263287 


### Best Model Summary

In [29]:
#show the sets of params to be evaluated
print crossval_lr_model.explainParams()

#show the stages of the model
print best_lr.stages

#show num of features of best model
print best_lr.stages[1].getNumFeatures()

print crossval_lr_model.bestModel.stages[0].explainParams()

estimator: estimator to be cross-validated (current: Pipeline_e714827feb8c)
estimatorParamMaps: estimator param maps (current: [{Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 100, Param(parent=u'LogisticRegression_71f931934eef', name='maxIter', doc='max number of iterations (>= 0).'): 10, Param(parent=u'LogisticRegression_71f931934eef', name='regParam', doc='regularization parameter (>= 0).'): 0.1}, {Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 100, Param(parent=u'LogisticRegression_71f931934eef', name='maxIter', doc='max number of iterations (>= 0).'): 20, Param(parent=u'LogisticRegression_71f931934eef', name='regParam', doc='regularization parameter (>= 0).'): 0.1}, {Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 100, Param(parent=u'LogisticRegression_71f931934eef', name='maxIter', doc='max number of iterations (>= 0).'): 30, Param(parent=u'LogisticRegression_

### Checking training summary on best model

In [30]:
crossval_lr_model_trainingSummary = crossval_lr_model.bestModel.stages[-1].summary

# Obtain the objective per iteration
objectiveHistory = crossval_lr_model_trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(crossval_lr_model_trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(crossval_lr_model_trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(crossval_lr_model_trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(crossval_lr_model_trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(crossval_lr_model_trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = crossval_lr_model_trainingSummary.accuracy
falsePositiveRate = crossval_lr_model_trainingSummary.weightedFalsePositiveRate
truePositiveRate = crossval_lr_model_trainingSummary.weightedTruePositiveRate
fMeasure = crossval_lr_model_trainingSummary.weightedFMeasure()
precision = crossval_lr_model_trainingSummary.weightedPrecision
recall = crossval_lr_model_trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

objectiveHistory:
1.598841274
0.928998326892
0.772294172088
0.762976547499
0.726487377031
0.721514151343
0.71650480228
0.712727945294
0.711422197322
0.711146692195
0.71099175979
0.710809495681
0.71062603517
0.710301185005
0.710157040228
0.709935082915
0.709854840973
0.70983839907
0.709826099087
0.709821732247
0.709817600177
0.709815083754
0.709809843448
0.709803904523
0.7097958271
0.70978334007
0.709777868864
0.709775761776
0.709774833304
0.709774580831
0.709774358937
False positive rate by label:
label 0: 0.0958447073097
label 1: 0.0555555555556
label 2: 0.0131481022079
label 3: 0.00867778031514
label 4: 0.00303819444444
label 5: 0.000815660685155
label 6: 0.000203128173878
True positive rate by label:
label 0: 0.895672791938
label 1: 0.788870703764
label 2: 0.970619097587
label 3: 0.902479338843
label 4: 0.837765957447
label 5: 0.675
label 6: 0.55737704918
Precision by label:
label 0: 0.827038861522
label 1: 0.821824381927
label 2: 0.945807770961
label 3: 0.934931506849
label 4: 0.95

In [77]:
# Accessing _java_obj shouldn't be necessary in Spark 2.3+
{x._java_obj.getOutputCol(): x.labels for x in best_lr.stages if isinstance(x, StringIndexerModel)}

{u'category_id_indexed': [u'1612',
  u'1599',
  u'1564',
  u'1561',
  u'1976',
  u'1773',
  u'1386']}

### Save Logistinc Regression Best Model

In [31]:
best_lr.write().overwrite().save("hdfs:///data/exercise/LogRegCrossvalModel")

## Random Forest

In [32]:
from pyspark.ml.classification import RandomForestClassifier

### Define Simple Model Pipeline

In [33]:
%%time

from pyspark.ml.feature import *

label_indexer_rf = StringIndexer(inputCol="category_id", outputCol="category_id_index")
hashingTF_rf = pyspark.ml.feature.HashingTF(inputCol="name_preprocessed", outputCol="features", numFeatures=1000)
rf = pyspark.ml.classification.RandomForestClassifier(numTrees=1000, featuresCol='features', labelCol="category_id_index")

pipeline_rf = pyspark.ml.Pipeline(stages=[
    label_indexer_rf,
    hashingTF_rf, 
    rf
])

rf_model = pipeline_rf.fit(training)

CPU times: user 29.6 ms, sys: 21.8 ms, total: 51.4 ms
Wall time: 21.9 s


In [34]:
rf_predictions = rf_model.transform(test)

# Select example rows to display.
# rf_predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
rf_evaluator = pyspark.ml.evaluation.MulticlassClassificationEvaluator(labelCol="category_id_index", predictionCol="prediction", metricName="accuracy")
rf_accuracy = rf_evaluator.evaluate(rf_predictions)
print("Test Accuracy = %g" % (rf_accuracy))
print("Test Error = %g" % (1.0 - rf_accuracy))

Test Accuracy = 0.522486
Test Error = 0.477514


## CrossValidation for Random Forest

In [37]:
label_indexer_rf = StringIndexer(inputCol="category_id", outputCol="category_id_index")
hashingTF_rf_cross = pyspark.ml.feature.HashingTF(inputCol="name_preprocessed", outputCol="features")
rf_cross = pyspark.ml.classification.RandomForestClassifier(featuresCol='features', labelCol="category_id_index")

pipeline_rf_cross = pyspark.ml.Pipeline(stages=[
    label_indexer_rf,
    hashingTF_rf_cross,
    rf_cross
])

paramGrid_rf = ParamGridBuilder() \
    .addGrid(hashingTF_lr_cross.numFeatures, [1000, 1500, 2000]) \
    .addGrid(rf_cross.numTrees, [1000, 1500]) \
    .build()

evaluator_rf_cross = pyspark.ml.evaluation.MulticlassClassificationEvaluator(
    labelCol="category_id_index", 
    predictionCol="prediction", 
    metricName="accuracy"
)

crossval_rf = CrossValidatorVerbose(
    estimator=pipeline_rf_cross,
    estimatorParamMaps=paramGrid_rf,
    evaluator=evaluator_rf_cross,
    numFolds=3
) 

## Run Random Forest CrossValidation

In [38]:
# Run cross-validation, and choose the best set of parameters.
crossval_rf_model = crossval_rf.fit(training)

Comparing models on fold 1
params: {'numTrees': 1000, 'numFeatures': 1000}	accuracy: 0.341981	avg: 0.341981
params: {'numTrees': 1500, 'numFeatures': 1000}	accuracy: 0.341981	avg: 0.341981
params: {'numTrees': 1000, 'numFeatures': 1500}	accuracy: 0.341981	avg: 0.341981
params: {'numTrees': 1500, 'numFeatures': 1500}	accuracy: 0.341981	avg: 0.341981
params: {'numTrees': 1000, 'numFeatures': 2000}	accuracy: 0.341981	avg: 0.341981
params: {'numTrees': 1500, 'numFeatures': 2000}	accuracy: 0.341981	avg: 0.341981
Comparing models on fold 2
params: {'numTrees': 1000, 'numFeatures': 1000}	accuracy: 0.339409	avg: 0.340695
params: {'numTrees': 1500, 'numFeatures': 1000}	accuracy: 0.339409	avg: 0.340695
params: {'numTrees': 1000, 'numFeatures': 1500}	accuracy: 0.339409	avg: 0.340695
params: {'numTrees': 1500, 'numFeatures': 1500}	accuracy: 0.339409	avg: 0.340695
params: {'numTrees': 1000, 'numFeatures': 2000}	accuracy: 0.339409	avg: 0.340695
params: {'numTrees': 1500, 'numFeatures': 2000}	accurac

## Get Predictions from Best Model

In [39]:
# Make predictions on test documents. cvModel uses the best model found (lrModel).
crossval_rf_prediction = crossval_rf_model.transform(test)

crossval_rf_accuracy = evaluator_rf_cross.evaluate(crossval_rf_prediction)
print("Test Accuracy = %g " % (crossval_rf_accuracy))
print("Test Error = %g " % (1.0 - crossval_rf_accuracy))

#get the best model
best_rf = crossval_rf_model.bestModel

Test Accuracy = 0.345871 
Test Error = 0.654129 


## Best Model Summary

In [48]:
#show the sets of params to be evaluated
print crossval_rf_model.explainParams()

#show the stages of the model
print best_rf.stages

#show num of features of best model
# print best_rf.stages[1].getNumFeatures()

print crossval_rf_model.bestModel.stages[0].explainParams()

estimator: estimator to be cross-validated (current: Pipeline_4e153972edf6)
estimatorParamMaps: estimator param maps (current: [{Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 1000, Param(parent=u'RandomForestClassifier_e42647b054a7', name='numTrees', doc='Number of trees to train (>= 1).'): 1000}, {Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 1000, Param(parent=u'RandomForestClassifier_e42647b054a7', name='numTrees', doc='Number of trees to train (>= 1).'): 1500}, {Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 1500, Param(parent=u'RandomForestClassifier_e42647b054a7', name='numTrees', doc='Number of trees to train (>= 1).'): 1000}, {Param(parent=u'HashingTF_e2ec5e790eca', name='numFeatures', doc='number of features.'): 1500, Param(parent=u'RandomForestClassifier_e42647b054a7', name='numTrees', doc='Number of trees to train (>= 1).'): 1500}, {Param(parent=u'Hash

In [50]:
# crossval_rf_model_trainingSummary = crossval_rf_model.bestModel.stages[-1].summary

# # Obtain the objective per iteration
# objectiveHistory = crossval_rf_model_trainingSummary.objectiveHistory
# print("objectiveHistory:")
# for objective in objectiveHistory:
#     print(objective)

# # for multiclass, we can inspect metrics on a per-label basis
# print("False positive rate by label:")
# for i, rate in enumerate(crossval_rf_model_trainingSummary.falsePositiveRateByLabel):
#     print("label %d: %s" % (i, rate))

# print("True positive rate by label:")
# for i, rate in enumerate(crossval_rf_model_trainingSummary.truePositiveRateByLabel):
#     print("label %d: %s" % (i, rate))

# print("Precision by label:")
# for i, prec in enumerate(crossval_rf_model_trainingSummary.precisionByLabel):
#     print("label %d: %s" % (i, prec))

# print("Recall by label:")
# for i, rec in enumerate(crossval_rf_model_trainingSummary.recallByLabel):
#     print("label %d: %s" % (i, rec))

# print("F-measure by label:")
# for i, f in enumerate(crossval_rf_model_trainingSummary.fMeasureByLabel()):
#     print("label %d: %s" % (i, f))

# accuracy = crossval_rf_model_trainingSummary.accuracy
# falsePositiveRate = crossval_rf_model_trainingSummary.weightedFalsePositiveRate
# truePositiveRate = crossval_rf_model_trainingSummary.weightedTruePositiveRate
# fMeasure = crossval_rf_model_trainingSummary.weightedFMeasure()
# precision = crossval_rf_model_trainingSummary.weightedPrecision
# recall = crossval_rf_model_trainingSummary.weightedRecall
# print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
#       % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

### Save Best Random Forest Model

In [51]:
best_rf.write().overwrite().save("hdfs:///data/exercise/RandomForestCrossvalModel")

## OneVsRest Logistic Regression

### The best logistic regression has these parameters

params: {'regParam': 0.01, 'numFeatures': 1000, 'maxIter': 30}	accuracy: 0.716705

### Define Simple Model Pipeline

In [53]:
from pyspark.ml.feature import *

label_indexer_ovr_lr = StringIndexer(inputCol="category_id", outputCol="category_id_index")
hashingTF_ovr_lr = pyspark.ml.feature.HashingTF(inputCol="name_preprocessed", outputCol="features", numFeatures=1000)
lr = pyspark.ml.classification.LinearSVC(maxIter=30, regParam=0.01)
ovr_lr = pyspark.ml.classification.OneVsRest(classifier=lr, featuresCol='features', labelCol="category_id_index")

pipeline_ovr_lr = pyspark.ml.Pipeline(stages=[
    label_indexer_ovr_lr,
    hashingTF_ovr_lr, 
    ovr_lr
])

ovr_lr_model = pipeline_ovr_lr.fit(training)

### Get Pipeline Stage

In [54]:
pipeline_ovr_lr.getStages()

[StringIndexer_fb1643014cb8, HashingTF_cd320ad4981d, OneVsRest_4c455b24aefc]

### Model Parameters

### Model evaluation

In [63]:
prediction = ovr_lr_model.transform(test)

evaluator_lr = pyspark.ml.evaluation.MulticlassClassificationEvaluator(labelCol="category_id_index", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator_lr.evaluate(prediction)
print("Accuracy = %g " % (accuracy))
print("Test Error = %g " % (1.0 - accuracy))

Accuracy = 0.736713 
Test Error = 0.263287 


### Save OneVsRest with Best LogisticRegression Model

In [62]:
pipeline_ovr_lr.write().overwrite().save("hdfs:///data/exercise/OneVsRestLogRegModel")