In [1]:
# from pyspark.ml.linalg import Vectors
from pyspark.mllib.linalg import SparseVector, Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel, RandomForest
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.feature import Normalizer
import time

## Read and prepare the dataset

In [2]:
rawData = sc.textFile("./datasets/covertype.data")
# rawData = sc.textFile("*.csv")

def prepare(line):
    values = line.split(',')
    map(float, values)
    featureVector = Vectors.dense(values[:-1])
    # Decision tree labels varies from 0 to n-1
    label = float(values[-1])-1
    return LabeledPoint(label, featureVector)

def new_prepare(line):
    values = line.split(',')
    map(float, values)
    wilderness = values[10:14].index('1')
    soil = values[14:54].index('1')
    featureVector = Vectors.dense(values[0:10]+[wilderness,soil])
    # Decision tree labels varies from 0 to n-1
    label = float(values[-1])-1
    return LabeledPoint(label, featureVector)

data = rawData.map(lambda line : new_prepare(line))

trainData, cvData, testData = data.randomSplit([0.8, 0.1, 0.1])

trainData.cache()
cvData.cache()
testData.cache()

PythonRDD[4] at RDD at PythonRDD.scala:43

## First Decision Tree

In [3]:
# Treino: trainData
# Teste: cvData
# Primeira Decision Tree, ainda não temos os hyperparemeters ideais

first_data = rawData.map(lambda line : prepare(line))

first_trainData, first_cvData, first_testData = first_data.randomSplit([0.8, 0.1, 0.1])

first_trainData.cache()
first_cvData.cache()
first_testData.cache()

start_time = time.time()

model = DecisionTree.trainClassifier(first_trainData, numClasses=7, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=4, maxBins=100)
predictions = model.predict(first_cvData.map(lambda x: x.features))
labelsAndPredictions = first_cvData.map(lambda lp: lp.label).zip(predictions) 
m = MulticlassMetrics(labelsAndPredictions)

print m.precision()
print("--- %s seconds ---" % (time.time() - start_time))

for category in range(7):
    # não existe o label 5 no dataset
    if category != 4:
        print(m.precision(category), m.recall(category))
        
        
# 0.702317136434
# --- 72.0438899994 seconds ---

# (0.6835904230150468, 0.684205541084537)
# (0.7848303819074035, 0.7289210343802636)
# (0.8460889138998312, 0.6264583333333333)
# (0.3979591836734694, 0.5043103448275862)
# (0.0, 0.0) não existe label 5
# (0.03575547866205306, 0.7126436781609196)
# (0.4487684729064039, 0.6917236142748672)

0.698907394384
--- 84.6671640873 seconds ---
(0.6730000470300522, 0.6759246138585802)
(0.7876652291294134, 0.7251686626424352)
(0.8382227285673597, 0.6348144952545298)
(0.425531914893617, 0.5128205128205128)
(0.03143534994068802, 0.7464788732394366)
(0.40645773979107314, 0.7074380165289256)


## Tuning Decision Trees

In [19]:
# Treino: trainData
# Teste: cvData
# Descobrir a configuração dos hyperparameters

start_time = time.time()

impurity = ['gini', 'entropy']
depth = [1, 20]
bins = [10, 300]

for imp in impurity:
    for dep in depth:
        for b in bins:
            model = DecisionTree.trainClassifier(first_trainData, numClasses=7, categoricalFeaturesInfo={},
                                                 impurity=imp, maxDepth=dep, maxBins=b)
            predictions = model.predict(first_cvData.map(lambda x: x.features))
            labelsAndPredictions = first_cvData.map(lambda lp: lp.label).zip(predictions) 
            m = MulticlassMetrics(labelsAndPredictions)

            print((imp, dep, b), m.precision())
            
print("--- %s seconds ---" % (time.time() - start_time))

# (('gini', 1, 10), 0.6360193671104963)
# (('gini', 1, 300), 0.6350855957115684)
# (('gini', 20, 10), 0.8906277018848349)
# (('gini', 20, 300), 0.9035448729033374)
# (('entropy', 1, 10), 0.4862873940861145)
# (('entropy', 1, 300), 0.4862873940861145)
# (('entropy', 20, 10), 0.8960228255230849)
# (('entropy', 20, 300), 0.9099775203181739)
# --- 1304.84925485 seconds ---

(('gini', 1, 10), 0.6360193671104963)
(('gini', 1, 300), 0.6350855957115684)
(('gini', 20, 10), 0.8906277018848349)
(('gini', 20, 300), 0.9035448729033374)
(('entropy', 1, 10), 0.4862873940861145)
(('entropy', 1, 300), 0.4862873940861145)
(('entropy', 20, 10), 0.8960228255230849)
(('entropy', 20, 300), 0.9099775203181739)
--- 1304.84925485 seconds ---


In [4]:
# Treino: trainData + cvData
# Teste: testData
# Verificar a precisão da Decision Tree usando o conjunto de teste

start_time = time.time()

model = DecisionTree.trainClassifier(first_trainData.union(first_cvData), numClasses=7, categoricalFeaturesInfo={},
                                     impurity='entropy', maxDepth=20, maxBins=300)
predictions = model.predict(first_testData.map(lambda x: x.features))
labelsAndPredictions = first_testData.map(lambda lp: lp.label).zip(predictions) 
m = MulticlassMetrics(labelsAndPredictions)

print m.precision()
print("--- %s seconds ---" % (time.time() - start_time))

# 0.913047216204
# --- 153.932090998 seconds ---

0.913047216204
--- 153.932090998 seconds ---


## Categorical Features Revisited

In [12]:
# Treino: trainData
# Teste: cvData
# Descobrir a configuração dos hyperparameters com as features categóricas informadas

start_time = time.time()


model = DecisionTree.trainClassifier(trainData, numClasses=7, categoricalFeaturesInfo={10: 4, 11: 40},
                                     impurity='entropy', maxDepth=20, maxBins=300)
predictions = model.predict(cvData.map(lambda x: x.features))
labelsAndPredictions = cvData.map(lambda lp: lp.label).zip(predictions) 
m = MulticlassMetrics(labelsAndPredictions)

print m.precision()
print("--- %s seconds ---" % (time.time() - start_time))

# 0.923581051717
# --- 132.968389988 seconds ---

0.926365591398
--- 186.282171965 seconds ---


In [13]:
# Treino: trainData + cvData
# Teste: testData
# Verificar a precisão da Decision Tree usando o conjunto de teste com as features categóricas informadas
start_time = time.time()


model = DecisionTree.trainClassifier(trainData.union(cvData), numClasses=7, categoricalFeaturesInfo={10: 4, 11: 40}, 
                                     impurity='entropy', maxDepth=20, maxBins=300)
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) 
m = MulticlassMetrics(labelsAndPredictions)

print m.precision()
print("--- %s seconds ---" % (time.time() - start_time))

# 0.926020983265
# --- 170.648295164 seconds ---

0.926315789474
--- 153.400055885 seconds ---


## Random Forest

In [14]:
# Treino: trainData + cvData
# Teste: testData
# Random Forest com 20 Decision Trees

start_time = time.time()


forest = RandomForest.trainClassifier(trainData.union(cvData), numClasses=7, categoricalFeaturesInfo={10: 4, 11: 40}, numTrees=20,
                                      featureSubsetStrategy='auto', impurity='entropy', maxDepth=30, maxBins=300)
predictions = forest.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) 
k = MulticlassMetrics(labelsAndPredictions)
print k.precision()
print("--- %s seconds ---" % (time.time() - start_time))

# Utilizando maxBins=200
# 0.948918123594
# --- 2439.131598 seconds ---

# Utilizando maxBins=300
# 0.966402070751
# --- 4255.0600481 seconds ---

0.966402070751
--- 4255.0600481 seconds ---


## Predict

In [24]:
input = "2709,125,28,67,23,3224,253,207,61,6094,0,29"
vector = input.split(',')
map(float, vector)
featureVector = Vectors.dense(vector)
print(forest.predict(featureVfeature_selection.ipynbor))

# 4.0 (Label 5)

4.0


## Neural Network

In [None]:
# Treino: trainData + cvData
# Teste: testData
# Multilayer Perceptron com 1 camada oculta

# MLP não suporta feature categórica
data = rawData.map(lambda line : prepare(line))

trainData, cvData, testData = data.randomSplit([0.8, 0.1, 0.1])

trainData.cache()
cvData.cache()
testData.cache()

start_time = time.time()

trainDF = trainData.union(cvData).map(lambda x: (x.label, Normalizer().transform(x.features).asML())).toDF(["label", "features"])
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[54, 100, 7], blockSize=128, seed=123)
model = mlp.fit(trainDF)
testDF = testData.map(lambda x: (Normalizer().transform(x.features).asML(),)).toDF(["features"])

predictions = model.transform(testDF).select("prediction")
labelsAndPredictions = testData.map(lambda x: x.label).zip(predictions.rdd.map(lambda x: x[0])) 
m = MulticlassMetrics(labelsAndPredictions)
print(m.precision())

print("--- %s seconds ---" % (time.time() - start_time))

# 0.687774846086
# --- 329.418580055 seconds ---