In [1]:
from pyspark.sql import SparkSession
import time

In [2]:
#Avvia la Spark Session mappando con delle confguration input e output da  verso MongoDB
#Viene specificata l'opzione secondary preferred per indicare al driver di leggere dal secondario di MongoDB, se possibile.

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.readPreference.name", "secondaryPreferred") \
    .getOrCreate()

In [3]:
#Lettura del dataset da MongoDB
syn_dataset_mongo = spark.read.format("mongo").option("spark.mongodb.input.uri", "mongodb+srv://ml-user:ml-user@cluster0-hkzmd.mongodb.net/test?retryWrites=true&w=majority").option("database", "test").option("collection", "test.syn-dos_double").load()

#Drop della colonna _id che non è necessaria
syn_dataset_mongo = syn_dataset_mongo.drop("_id")

In [4]:
#Explorazione del dataset
syn_dataset_mongo.show()

In [5]:
from pyspark.ml.feature import RFormula

#Preparazione delle features per il Training
#L'operazione crea il vettore delle features e delle label + indicizza il campo di testo presente in label
assemblerFormula = RFormula(formula="Class ~ .")
trainingTF = assemblerFormula.fit(syn_dataset_mongo)
syn_dataset_prepared = trainingTF.transform(syn_dataset_mongo).select("features", "label")
syn_dataset_prepared.printSchema()

#Creazione di training set e test set
training, test = syn_dataset_prepared.randomSplit([70.0,30.0],seed=13)
print("Training Tot instances: ", training.count())
print("Test Tot instances: ", test.count())

#TRAINING utilizzando il Ramdom Forest Classifier 
#https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#random-forest-classifier
alg_label="RANDOM FOREST CLASSIFIER"
from pyspark.ml.classification import RandomForestClassifier
start_time = time.time()
rf = RandomForestClassifier(labelCol="label", featuresCol="features",numTrees=30, maxDepth=10)
rf_fitted = rf.fit(training)
print("%s Training time: %5.2f seconds ---" % (alg_label,(time.time() - start_time)))
start_time = time.time()

In [6]:
#Stampa del modello
print(rf_fitted.toDebugString)

In [7]:
#Evaluate model Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf_predictions_and_labels = rf_fitted.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
print ("Accuracy: %3.2f%%" % (evaluator.evaluate(rf_predictions_and_labels)*100))


In [8]:
#Creazione di una Temp View
rf_predictions_and_labels.createTempView("syn_dos_prediction")

In [9]:
#Explorazione utilizzando SQL
spark.sql("select * from syn_dos_prediction where prediction=1.0 limit 10").show()