# Documentation

# Sample

In [2]:
#Data Acquisitor
import findspark
import tempfile
import numpy as np
findspark.init()

from pyspark.sql import SparkSession

# Building SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
   
sc = spark.sparkContext

from marvin_python_toolbox.common.data import MarvinData

file_path = MarvinData.download_file(url="https://s3.amazonaws.com/marvin-engines-data/Iris.csv")

iris_df = spark.read.csv(file_path, header="true")

marvin_initial_dataset = iris_df.drop("Id")

In [3]:
#Training Preparator

from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.regression import LabeledPoint

l_atributos = ["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"]

dataset = marvin_initial_dataset


#Chaniging atribute types to double
for coluna in l_atributos:
    dataset = dataset.withColumn(coluna, dataset[coluna].cast(DoubleType()))
    
    
#Maping column "Species" to a numerical value in a new collumn named "label"
label_indexer = StringIndexer().setInputCol("Species").setOutputCol("label")

dataset = label_indexer.fit(dataset).transform(dataset)


#Concatenating all features into a single vector and naming the resulting column as "features" 
assembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],outputCol="features")
dataset = assembler.transform(dataset)


(train, test) = dataset.randomSplit([0.7, 0.3])

marvin_dataset = {'train': train, 'test': test}

In [4]:
#Model Training
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes

import findspark

findspark.init()


nb = NaiveBayes()

pipeline = Pipeline().setStages([nb])

marvin_model = pipeline.fit(marvin_dataset['train'])

In [5]:
#Model Evaluation

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import findspark

findspark.init()


predictions = marvin_model.transform(marvin_dataset['test'])

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
marvin_metrics = evaluator.evaluate(predictions)

In [6]:
input_message = [5.8, 2.7, 5.1, 1.9]


In [7]:
input_message = input_message

In [8]:
#Predictor

from pyspark.sql.types import FloatType
from pyspark.sql.types import StructType, StructField
from pyspark.ml.feature import VectorAssembler

import findspark

findspark.init()

from pyspark.sql import SparkSession

# Building SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Spark MLlib") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext


field = [StructField("SepalLengthCm", FloatType(), True), StructField("SepalWidthCm", FloatType(), True), StructField("PetalLengthCm", FloatType(), True), StructField("PetalWidthCm", FloatType(), True)]

input_schema = StructType(field)

input_message = [input_message]

input_message = spark.createDataFrame(input_message, schema=input_schema)

colunas = ["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"]

for coluna in colunas:
    input_message = input_message.withColumn(coluna, input_message[coluna].cast(FloatType()))
    
assembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],outputCol="features")
input_message = assembler.transform(input_message)


final_prediction = marvin_model.transform(input_message)

final_prediction = final_prediction.select('prediction').collect()[0][0]