In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from time import time
import pandas as pd

# load data

In [2]:
rowRdd = sc.textFile("/home/hsiehpinghan/git/python/spark-python-2/data/kaggle/stumbleupon/train.tsv")

In [3]:
headerRow = rowRdd.first()
headerRowColList = headerRow.replace("\"", "").split("\t")
headerRowColList

[u'url',
 u'urlid',
 u'boilerplate',
 u'alchemy_category',
 u'alchemy_category_score',
 u'avglinksize',
 u'commonlinkratio_1',
 u'commonlinkratio_2',
 u'commonlinkratio_3',
 u'commonlinkratio_4',
 u'compression_ratio',
 u'embed_ratio',
 u'framebased',
 u'frameTagRatio',
 u'hasDomainLink',
 u'html_ratio',
 u'image_ratio',
 u'is_news',
 u'lengthyLinkDomain',
 u'linkwordscore',
 u'news_front_page',
 u'non_markup_alphanum_characters',
 u'numberOfLinks',
 u'numwords_in_url',
 u'parametrizedLinkRatio',
 u'spelling_errors_ratio',
 u'label']

In [4]:
rowRddColList = rowRdd \
    .filter(lambda row : row != headerRow) \
    .map(lambda row : row.replace("\"", "")) \
    .map(lambda row : row.split("\t"))

# data cleaning

In [5]:
rowRddColList = rowRddColList \
    .map(lambda colList : \
        map(lambda col : None if col == "?" else col, colList)
    )
rowRddRow = rowRddColList.map(lambda colList : 
    Row(
        alchemy_category = colList[3],
        alchemy_category_score = float(colList[4]) if colList[4] else 0.0,
        avglinksize = float(colList[5]) if colList[5] else 0.0,
        commonlinkratio_1 = float(colList[6]) if colList[6] else 0.0,
        commonlinkratio_2 = float(colList[7]) if colList[7] else 0.0,
        commonlinkratio_3 = float(colList[8]) if colList[8] else 0.0,
        commonlinkratio_4 = float(colList[9]) if colList[9] else 0.0,
        compression_ratio = float(colList[10]) if colList[10] else 0.0,
        embed_ratio = float(colList[11]) if colList[11] else 0.0,
        framebased = int(colList[12]) if colList[12] else 0,
        frameTagRatio = float(colList[13]) if colList[13] else 0.0,
        hasDomainLink = int(colList[14]) if colList[14] else 0,
        html_ratio = float(colList[15]) if colList[15] else 0.0,
        image_ratio = float(colList[16]) if colList[16] else 0.0,
        is_news = int(colList[17]) if colList[17] else 0,
        lengthyLinkDomain = int(colList[18]) if colList[18] else 0,
        linkwordscore = float(colList[19]) if colList[19] else 0.0,
        news_front_page = int(colList[20]) if colList[20] else 0,
        non_markup_alphanum_characters = int(colList[21]) if colList[21] else 0,
        numberOfLinks = int(colList[22]) if colList[22] else 0,
        numwords_in_url = float(colList[23]) if colList[23] else 0.0,
        parametrizedLinkRatio = float(colList[24]) if colList[24] else 0.0,
        spelling_errors_ratio = float(colList[25]) if colList[25] else 0.0,
        label = int(colList[26]) if colList[26] else 0,
    )
)
rowRddRow.take(1)

[Row(alchemy_category=u'business', alchemy_category_score=0.789131, avglinksize=2.055555556, commonlinkratio_1=0.676470588, commonlinkratio_2=0.205882353, commonlinkratio_3=0.047058824, commonlinkratio_4=0.023529412, compression_ratio=0.443783175, embed_ratio=0.0, frameTagRatio=0.09077381, framebased=0, hasDomainLink=0, html_ratio=0.245831182, image_ratio=0.003883495, is_news=1, label=0, lengthyLinkDomain=1, linkwordscore=24.0, news_front_page=0, non_markup_alphanum_characters=5424, numberOfLinks=170, numwords_in_url=8.0, parametrizedLinkRatio=0.152941176, spelling_errors_ratio=0.079129575)]

# extract feature

In [6]:
trainDf = spark.createDataFrame(rowRddRow)
trainDf.show(1)

+----------------+----------------------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------+-------------+----------+-------------+-----------+-----------+-------+-----+-----------------+-------------+---------------+------------------------------+-------------+---------------+---------------------+---------------------+
|alchemy_category|alchemy_category_score|avglinksize|commonlinkratio_1|commonlinkratio_2|commonlinkratio_3|commonlinkratio_4|compression_ratio|embed_ratio|frameTagRatio|framebased|hasDomainLink| html_ratio|image_ratio|is_news|label|lengthyLinkDomain|linkwordscore|news_front_page|non_markup_alphanum_characters|numberOfLinks|numwords_in_url|parametrizedLinkRatio|spelling_errors_ratio|
+----------------+----------------------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------+-------------+----------+-------------+-----------+-----------+-------+--

In [7]:
alchemy_category_index = "alchemy_category_index"
alchemyCategoryStringIndexer = StringIndexer(inputCol="alchemy_category", outputCol=alchemy_category_index)
alchemyCategoryStringIndexerModel = alchemyCategoryStringIndexer.setHandleInvalid("keep").fit(trainDf)
indexedTrainDf = alchemyCategoryStringIndexerModel.transform(trainDf)
indexedTrainDf.show(1)

+----------------+----------------------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------+-------------+----------+-------------+-----------+-----------+-------+-----+-----------------+-------------+---------------+------------------------------+-------------+---------------+---------------------+---------------------+----------------------+
|alchemy_category|alchemy_category_score|avglinksize|commonlinkratio_1|commonlinkratio_2|commonlinkratio_3|commonlinkratio_4|compression_ratio|embed_ratio|frameTagRatio|framebased|hasDomainLink| html_ratio|image_ratio|is_news|label|lengthyLinkDomain|linkwordscore|news_front_page|non_markup_alphanum_characters|numberOfLinks|numwords_in_url|parametrizedLinkRatio|spelling_errors_ratio|alchemy_category_index|
+----------------+----------------------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------+-------------+----------+--

In [8]:
alchemy_category_one_hot_vec = "alchemy_category_one_hot_vec"
oneHotEncoderEstimator = OneHotEncoderEstimator(inputCols=[alchemy_category_index],
                                 outputCols=[alchemy_category_one_hot_vec])
oneHotEncoderModel = oneHotEncoderEstimator.setHandleInvalid("keep").fit(indexedTrainDf)
oneHoteddDf = oneHotEncoderModel.transform(indexedTrainDf)
oneHoteddDf.show(1)

+----------------+----------------------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------+-------------+----------+-------------+-----------+-----------+-------+-----+-----------------+-------------+---------------+------------------------------+-------------+---------------+---------------------+---------------------+----------------------+----------------------------+
|alchemy_category|alchemy_category_score|avglinksize|commonlinkratio_1|commonlinkratio_2|commonlinkratio_3|commonlinkratio_4|compression_ratio|embed_ratio|frameTagRatio|framebased|hasDomainLink| html_ratio|image_ratio|is_news|label|lengthyLinkDomain|linkwordscore|news_front_page|non_markup_alphanum_characters|numberOfLinks|numwords_in_url|parametrizedLinkRatio|spelling_errors_ratio|alchemy_category_index|alchemy_category_one_hot_vec|
+----------------+----------------------+-----------+-----------------+-----------------+-----------------+-----------------

In [9]:
vectorAssembler = VectorAssembler(
    inputCols=filter(lambda col : col not in ["label", "alchemy_category", "alchemy_category_index"], oneHoteddDf.columns),
    outputCol="features")

vectorAssembledDf = vectorAssembler.transform(oneHoteddDf)
labeledPointRdd = vectorAssembledDf.select("features", "label").rdd.map(lambda row : LabeledPoint(row.label, Vectors.fromML(row.features)))
labeledPointRdd.take(1)

[LabeledPoint(0.0, (36,[0,1,2,3,4,5,6,8,11,12,13,14,15,17,18,19,20,21,24],[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.09077381,0.245831182,0.003883495,1.0,1.0,24.0,5424.0,170.0,8.0,0.152941176,0.079129575,1.0]))]

# training

In [10]:
(trainLabeledPointRdd, validationLabeledPointRdd, testLabeledPointRdd) = labeledPointRdd.randomSplit([8, 1, 1])
trainLabeledPointRdd.persist()
validationLabeledPointRdd.persist()
testLabeledPointRdd.persist()

PythonRDD[36] at RDD at PythonRDD.scala:48

In [11]:
decisionTreeModel = DecisionTree.trainClassifier(data=trainLabeledPointRdd, numClasses=2, categoricalFeaturesInfo={}, impurity="entropy", maxDepth=5, maxBins=5)

In [12]:
scoreRdd = decisionTreeModel.predict(validationLabeledPointRdd.map(lambda labeledPoint : labeledPoint.features))
labelRdd = validationLabeledPointRdd.map(lambda labeledPoint : labeledPoint.label)
scoreAndLabelRdd = scoreRdd.zip(labelRdd)
binaryClassificationMetrics = BinaryClassificationMetrics(scoreAndLabels=scoreAndLabelRdd)
binaryClassificationMetrics.areaUnderROC

0.6519684019684019

In [13]:
def calculateAUC(decisionTreeModel, labeledPointRdd):
    scoreRdd = decisionTreeModel.predict(labeledPointRdd.map(lambda labeledPoint : labeledPoint.features))
    labelRdd = labeledPointRdd.map(lambda labeledPoint : labeledPoint.label)
    scoreAndLabelRdd = scoreRdd.zip(labelRdd)
    binaryClassificationMetrics = BinaryClassificationMetrics(scoreAndLabels=scoreAndLabelRdd)
    AUC = binaryClassificationMetrics.areaUnderROC
    return AUC
    
def evaluateModel(trainLabeledPointRdd, validationLabeledPointRdd, testLabeledPointRdd, impurity, maxDepth, maxBins):
    startTime = time()
    decisionTreeModel = DecisionTree.trainClassifier(data=trainLabeledPointRdd, numClasses=2, categoricalFeaturesInfo={}, impurity=impurity, maxDepth=maxDepth, maxBins=maxBins)
    validationAUC = calculateAUC(decisionTreeModel, validationLabeledPointRdd)
    testAUC = calculateAUC(decisionTreeModel, testLabeledPointRdd)
    duration = time() - startTime
    return (impurity, maxDepth, maxBins, duration, validationAUC, testAUC)

In [14]:
impurityList = ["gini", "entropy"]
maxDepthList = [5, 10, 15]
maxBinsList = [5, 10, 15]
metrics = [
    evaluateModel(trainLabeledPointRdd, validationLabeledPointRdd, testLabeledPointRdd, impurity, maxDepth, maxBins)
    for impurity in impurityList
    for maxDepth in maxDepthList
    for maxBins in maxBinsList
]
resultDf = pd.DataFrame(data=metrics, columns=["impurity", "maxDepth", "maxBins", "duration", "validationAUC", "testAUC"])
resultDf

Unnamed: 0,impurity,maxDepth,maxBins,duration,validationAUC,testAUC
0,gini,5,5,3.404577,0.656695,0.673663
1,gini,5,10,2.038335,0.646335,0.676447
2,gini,5,15,1.571143,0.644878,0.664982
3,gini,10,5,1.886011,0.619431,0.642051
4,gini,10,10,1.702022,0.651191,0.659853
5,gini,10,15,1.827772,0.661066,0.669634
6,gini,15,5,1.934184,0.574171,0.655348
7,gini,15,10,2.29787,0.609881,0.631502
8,gini,15,15,1.968123,0.63468,0.645458
9,entropy,5,5,1.034031,0.651968,0.678095


# print decision tree

In [15]:
decisionTreeModel = DecisionTree.trainClassifier(data=trainLabeledPointRdd, numClasses=2, categoricalFeaturesInfo={}, impurity="entropy", maxDepth=5, maxBins=5)
print(decisionTreeModel.toDebugString())

DecisionTreeModel classifier of depth 5 with 57 nodes
  If (feature 17 <= 1231.5)
   If (feature 23 <= 0.5)
    If (feature 25 <= 0.5)
     If (feature 7 <= 9.0E-5)
      If (feature 8 <= 0.038475772500000005)
       Predict: 0.0
      Else (feature 8 > 0.038475772500000005)
       Predict: 0.0
     Else (feature 7 > 9.0E-5)
      If (feature 8 <= 0.081178475)
       Predict: 0.0
      Else (feature 8 > 0.081178475)
       Predict: 0.0
    Else (feature 25 > 0.5)
     If (feature 8 <= 0.081178475)
      If (feature 3 <= 0.1680163935)
       Predict: 1.0
      Else (feature 3 > 0.1680163935)
       Predict: 1.0
     Else (feature 8 > 0.081178475)
      If (feature 6 <= 0.626463362)
       Predict: 1.0
      Else (feature 6 > 0.626463362)
       Predict: 0.0
   Else (feature 23 > 0.5)
    If (feature 15 <= 12.5)
     Predict: 0.0
    Else (feature 15 > 12.5)
     If (feature 19 <= 5.5)
      If (feature 8 <= 0.026010224)
       Predict: 0.0
      Else (feature 8 > 0.026010224)
       Pre