<h3>Classe associada à criação de um modelo de classificação</h3>

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import ChiSqSelector
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [None]:
class MLModel:
    
    def __init__(self,df,spark,cols,label):
        self.df = df
        self.spark = spark
        self.cols = cols
        self.label = label
        
    def setTrainLimit(self,limit):
        """Definição da percentagem de treino do modelo (0 < limit <=1)"""
        num_records = self.df.count()
        train_count = int(num_records * 0.9 *limit)
        test_count = int(num_records*0.1)
        self.df_test = self.spark.createDataFrame(self.df.tail(test_count), self.df.columns)
        self.df_train = self.df.limit(train_count)
        
        
        
    def reduceCols(self,criteria,value,cat_cols,cont_cols):
        """Redução da dimensionalida das colunas categóricas através do teste Chi Square"""
        vec_assembler = VectorAssembler(inputCols = cat_cols, outputCol="features")
        vec_df_train = vec_assembler.transform(self.df_train)
        if criteria == "topFeatures":
             selector = ChiSqSelector(featuresCol='features', numTopFeatures=value, outputCol="selectedFeatures"\
                                      ,labelCol= self.label)
        elif criteria == "p-value":
            selector = ChiSqSelector(featuresCol='features', fpr=value, outputCol="selectedFeatures",labelCol= self.label)
        
        t = selector.fit(vec_df_train)
        good_cat_cols = []
        
        for i in t.selectedFeatures:
            good_cat_cols.append(cat_cols[i])
        
        n_cols = len(self.cols)
        self.cols = good_cat_cols + cont_cols
        self.df_train = self.df_train.select(self.cols + [self.label])
        self.df_test = self.df_test.select(self.cols + [self.label])


    def createPipe(self,model_name,maxIter=100,regParam=0,elasticNetParam=0):
        """Criação do Pipeline com duas etapas (vector assembler, classification model)"""
        vec_assembler = VectorAssembler(inputCols = self.cols, outputCol="features")
        model = None
        if model_name=="logistic":
            model = LogisticRegression(labelCol=self.label, maxIter = maxIter, regParam = regParam,\
                                       elasticNetParam = elasticNetParam)
        elif model_name=="tree":
            model = DecisionTreeClassifier(labelCol=self.label)
        elif model_name =="forest":
            model = RandomForestClassifier(labelCol=self.label)
        self.pipeline = Pipeline(stages=[vec_assembler, model])
        
    def getPipeMetrics(self):
        """Obtenção das métricas do Pipeline aplicado ao conjunto teste"""
        model = self.pipeline.fit(self.df_train)
        prediction = model.transform(self.df_train)
        preds_and_labels = prediction.select("prediction","HasDetections").withColumn("label",\
                                            F.col("HasDetections").cast(FloatType()))
        preds_and_labels = preds_and_labels.select("prediction","label")
        metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
        return self.getMetrics(metrics.confusionMatrix().toArray())
    
    def getMetrics(self,confusion_matrix):
        """Obtenção das métricas com base numa matriz de confusão"""
        tp = confusion_matrix[1][1]
        tn = confusion_matrix[0][0]
        fp = confusion_matrix[0][1]
        fn = confusion_matrix[1][0]
        accuracy = round((tp+tn)/(tp+tn+fp+fn),5)
        precision = round(tp/(tp+fp),5)
        recall = round(tp/(tp+fn),5)
        f1 = round((2*recall*precision)/(recall+precision),5)
        return accuracy,precision,recall,f1
        
        
    
    def getLogisticTunningParam(self,maxIterV,regParamV,elasticNetParamV):
        """Obtenção das métricas com base numa matriz de confusão"""
        logistictunningmetrics = {}
        best_param = None
        best_acc = 0
        for maxIter in maxIterV:
            for regParam in regParamV:
                for elasticNetParam in elasticNetParamV:
                    self.createPipe("logistic",maxIter=maxIter,regParam=regParam,elasticNetParam=elasticNetParam)
                    accuracy,precision,recall,f1 = self.getPipeMetrics()
                    logistictunningmetrics["metrics_"+str(maxIter) + "_"+ str(regParam) + "_"+str(elasticNetParam)] = [accuracy,precision,recall,f1]
                    if accuracy > best_acc:
                        best_acc = accuracy
                        best_param = [accuracy,precision,recall,f1]
                        
                    
        print(logistictunningmetrics)
        return best_param