In [None]:
import decisiontree as dtree
import numpy as np
import time
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("DTREE-SUSY").getOrCreate()
sc = spark.sparkContext


In [None]:
# 1. Leemos el dataset 
rdd = sc.textFile('../datasets/SUSY.csv')
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint
def append_ret(a,b):
    a.append(b)
    return a
# 2. Separamos y ponemos la etiqueta al final en vez de al principio
rdd = rdd.map(lambda line: line.split(','))
rdd = rdd.map(lambda line: append_ret(line[1:], line[0]))
rdd = rdd.map(lambda line: DenseVector(line))
rdd = rdd.repartition(12)

In [8]:
def gpu_work(max_depth=4, min_samples_per_node=1):
    def _gpu_work(data):
        # 1. Procesamos el dataset
        inp = np.asarray(list(data), dtype=np.float32)
        return [dtree.train_tree(inp, max_depth, min_samples_per_node)]    
    return _gpu_work

def cpu_work(max_depth=4, min_samples_per_node=1):
    def _cpu_work(data):
        # 1. Procesamos el dataset
        inp = np.asarray(list(data), dtype=np.float32)
        return [dtree.train_tree_np(inp, max_depth, min_samples_per_node)]    
    return _gpu_work

In [9]:
def predict(sample, trees):
    counter = 0
    for tree in trees:
        counter += dtree.predict(sample, tree)
    
    return 1 if counter > len(trees) / 2 else 0

def evaluar(dataset, trees):
    aciertos = 0
    for sample in dataset:
        etiqueta = predict(sample, trees)
        if int(etiqueta) == int(sample[-1]):
            aciertos += 1
        
    return aciertos / dataset.shape[0]


In [14]:
# 4. Damos el número de particiones deseadas en el RDD para trabajar con el modelo
def eval_model(rdd, repeats, max_depth=4, seed=0):
    (trainingData, testData) = rdd.randomSplit([0.9, 0.1], seed=seed)
    trainingData = trainingData.cache()
    
    # Una primera vez que no medimos el tiempo
    arboles = trainingData.mapPartitions(gpu_work(max_depth=max_depth))
    arboles = arboles.collect()
    
    # Las repeat siguientes medimos el tiempo
    for i in range(repeats):
        inicio = time.time()
        arboles = trainingData.mapPartitions(gpu_work(max_depth=max_depth))
        arboles = arboles.collect()
        fin = time.time()
        print('Tiempo utilizado [GPU]:', fin-inicio)

    # Evaluamos la precision
    precision = evaluar(np.float32(testData.collect()), arboles) * 100
    print('Precisión [GPU]', precision)
    
    for i in range(repeats):
        inicio = time.time()
        arboles = trainingData.mapPartitions(cpu_work(max_depth=max_depth))
        arboles = arboles.collect()
        fin = time.time()
        print('Tiempo utilizado [CPU]:', fin-inicio)

    # Evaluamos la precision
    precision = evaluar(np.float32(testData.collect()), arboles) * 100
    print('Precisión [CPU]', precision)
    trainingData.unpersist()
    testData.unpersist()
    
    
    

In [None]:
eval_model_gpu(rdd, repeats=5, max_depth=4, seed=0)

Tiempo utilizado [GPU]: 29.33268117904663


In [None]:
eval_model_gpu(rdd, 225, 6, 1)

In [None]:
eval_model_gpu(rdd, 225, 6, 2)

In [None]:
eval_model_gpu(rdd, 225, 6, 3)

In [None]:
eval_model_gpu(rdd, 225, 6, 4)

In [None]:
eval_model_gpu(rdd, 225, 6, 5)

In [None]:
eval_model_gpu(rdd, 225, 6, 6)

In [None]:
eval_model_gpu(rdd, 225, 6, 7)

In [None]:
eval_model_gpu(rdd, 225, 6, 8)

In [None]:
eval_model_gpu(rdd, 225, 6, 9)

In [None]:
eval_model_gpu(rdd, 225, 7, 0)

In [None]:
eval_model_gpu(rdd, 225, 7, 1)

In [None]:
eval_model_gpu(rdd, 225, 7, 2)

In [None]:
eval_model_gpu(rdd, 225, 7, 3)

In [None]:
eval_model_gpu(rdd, 225, 7, 4)

In [None]:
eval_model_gpu(rdd, 225, 7, 5)

In [None]:
eval_model_gpu(rdd, 225, 7, 6)

In [None]:
eval_model_gpu(rdd, 225, 7, 7)

In [None]:
eval_model_gpu(rdd, 225, 7, 8)

In [None]:
eval_model_gpu(rdd, 225, 7, 9)

In [None]:
"""
Versión de Spark, se espera que sea más precisa, pero mucho más lenta.
"""
import time
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rdd2= sc.textFile('../datasets/SUSY.csv')
rdd2 = rdd2.map(lambda line: line.split(','))
def append_ret(a,b):
    a.append(b)
    return a
rdd2 = rdd2.map(lambda line: append_ret(line[1:], line[0]))

rdd2 = rdd2.map(lambda x: Row(label=float(x[-1]), features=Vectors.dense(x[:-2])))
df = rdd2.toDF()

def eval_spark(ntrees=225, max_depth=6, seed=0):   
    inicio = time.time()
    (train, test) = df.randomSplit([0.9,0.1], seed=seed)
    rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', numTrees=ntrees, 
                            featureSubsetStrategy='all', subsamplingRate=1/ntrees,
                            maxDepth=max_depth, seed=seed)
    rfModel = rf.fit(train)
    fin = time.time()
    result = rfModel.transform(test)
    evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
    accuracy = evaluator.evaluate(rfModel.transform(test))
    
    print('Tiempo utilizado:', fin - inicio)
    print('Precisión', accuracy * 100)

In [None]:
eval_spark(225, 6, 0)

In [None]:
eval_spark(225, 6, 1)

In [None]:
eval_spark(225, 6, 2)

In [None]:
eval_spark(225, 6, 3)

In [None]:
eval_spark(225, 6, 4)

In [None]:
eval_spark(225, 6, 5)

In [None]:
eval_spark(225, 6, 6)

In [None]:
eval_spark(225, 6, 7)

In [None]:
eval_spark(225, 6, 8)


In [None]:
eval_spark(225, 6, 9)



In [None]:
eval_spark(225, 7, 0)

In [None]:
eval_spark(225, 7, 1)

In [None]:
eval_spark(225, 7, 2)

In [None]:
eval_spark(225, 7, 3)

In [None]:
eval_spark(225, 7, 4)

In [None]:
eval_spark(225, 7, 5)

In [None]:
eval_spark(225, 7, 6)

In [None]:
eval_spark(225, 7, 7)

In [None]:
eval_spark(225, 7, 8)

In [None]:
eval_spark(225, 7, 9)

In [None]:
susy = np.float32(susy)