In [13]:
import pyspark
from pyspark.sql import SparkSession,DataFrame
from typing import Optional
import logging

class SparkClass:
    def __init__(self):
        logging.info("Initialisation de spark !")
    
    def createSession(self,master:Optional['str']="local[*]", app_name:Optional['str']="BigData Machine Learning") -> SparkSession:
        spark = SparkSession.builder.master(master).appName(app_name).getOrCreate()
        print("Démarrage de la session")
        return spark
    
    def destroySession(self,spark:SparkSession):
        spark.stop() if isinstance(spark, SparkSession) else None

        
    

In [14]:
from typing import List
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import StringIndexer,VectorAssembler 
import pathlib, os, logging



class DataPreprocessing:
    
    def __init__(self):
        logging.info("Pré-traitement des données !")
    
    def createDataFrame(self,spark:SparkSession,filePath:str) -> DataFrame:
        if os.path.exists(filePath) and os.path.isfile(filePath) :
            extension = pathlib.Path(filePath).suffix
        else: raise Exception('Erreur, essayer encore')
        
        def fromCSV(filePath:str) -> DataFrame:
            df = spark.read.options(header='True', inferSchema='True', delimiter=',') \
                .csv(filePath)
            return df
        
        def fromJSON(filePath:str) -> DataFrame:
            df = spark.read.json(filePath)
            return df
        return fromCSV(filePath) if extension==".csv" else fromJSON(filePath) 
    
    def transformToNumeric(self,df:DataFrame, input:str, output:str) -> DataFrame:
        indexer = StringIndexer(inputCol=input, outputCol=output)
        df_transformed = indexer.fit(df).transform(df)
        df_transformed = df_transformed.drop(input)
        return df_transformed
    
    def indexerColumns(self,colums:List['str']) -> List[StringIndexer]:
        columnsIndexed = []
        for column in colums:
            col=""
            if column=="Classes":
                col="label"
            else:
                col = column+"_indexed"
            st = StringIndexer(inputCol=column,outputCol=col)
            st.setHandleInvalid("skip")
            columnsIndexed.append(st)
        return columnsIndexed
    
    def groupeColumns(self,df:DataFrame,vectorAssembler:VectorAssembler) -> DataFrame:
        assembled_df = vectorAssembler.transform(df)
        return assembled_df
    
    def renameCol(self,df:DataFrame,exist:str,newCol:str) -> DataFrame:
        n_df = df.withColumnRenamed(exist,newCol )
        return n_df

In [15]:
from pyspark.ml.feature import StringIndexer 
from pyspark.ml.classification import NaiveBayes 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import DataFrame
from pyspark.ml import Transformer
import logging
from typing import Optional

class ModelNaiveBayes:
    def __init__(self):
        logging.info("Apprentissage avec mon modéle")
        
        
    def train(self,df:DataFrame,nbModel:NaiveBayes) -> Transformer:
        model = nbModel.fit(df)
        return model
    
    def predict(self,df:DataFrame, tf:Transformer) -> DataFrame:
        predicted_df = tf.transform(df)
        return predicted_df
    
    def evaluateModel(self,df:DataFrame, evaluator:MulticlassClassificationEvaluator) -> float:
         accuracy = evaluator.evaluate(df)
         return accuracy
    

In [16]:
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
import os
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

classSpark = SparkClass()
session = classSpark.createSession()
print(session)
print("Session démarrée")
"""
Chargement des données
"""
# os.path.abspath(__file__)
fileCSV = "Algerian_forest_fires_dataset_UPDATE.csv"
fileJSON = "people.json"
dataPrep = DataPreprocessing()
df_l = dataPrep.createDataFrame(session,fileCSV)
"""
    Transformation des données catégorielles en données numériques
"""
print("Transformation des données catégorielles en données numériques")
indexers = dataPrep.indexerColumns(df_l.columns)
pipeline = Pipeline(stages=indexers) 
indexed_df = pipeline.fit(df_l).transform(df_l)

indexed_df = indexed_df.drop("day","month","year","Temperature","RH","Ws","Rain","FFMC","DMC","DC","ISI","BUI","FWI")
indexed_df.show(15)

cols = ["day_indexed","month_indexed","year_indexed","Temperature_indexed","RH_indexed","Ws_indexed","Rain_indexed","FFMC_indexed","DMC_indexed","DC_indexed","ISI_indexed","BUI_indexed","FWI_indexed"]
vec = VectorAssembler(inputCols=cols,outputCol="features")
df_ass = dataPrep.groupeColumns(indexed_df,vec)
df_ass.select("features","label").show(10,False)
"""
 Entrainement du modéle
"""
train, test= df_ass.select("features","label").randomSplit([0.8, 0.2])

nb = NaiveBayes(modelType="gaussian")
nb_model = ModelNaiveBayes()
transformer = nb_model.train(train,nb)

prediction_df = nb_model.predict(test,transformer)

prediction_df.show(20)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy =  nb_model.evaluateModel(prediction_df,evaluator)
print("Accuracy = ",accuracy)

Démarrage de la session
<pyspark.sql.session.SparkSession object at 0x000002B9388A3780>
Session démarrée
Transformation des données catégorielles en données numériques
+-----------+-----------+-------------+------------+-------------------+----------+----------+------------+------------+-----------+----------+-----------+-----------+-----------+-----+
|    Classes|day_indexed|month_indexed|year_indexed|Temperature_indexed|RH_indexed|Ws_indexed|Rain_indexed|FFMC_indexed|DMC_indexed|DC_indexed|ISI_indexed|BUI_indexed|FWI_indexed|label|
+-----------+-----------+-------------+------------+-------------------+----------+----------+------------+------------+-----------+----------+-----------+-----------+-----------+-----+
|not fire   |        0.0|          2.0|         0.0|                7.0|      37.0|       5.0|         0.0|        89.0|       10.0|       2.0|       14.0|       37.0|        4.0|  1.0|
|not fire   |        1.0|          2.0|         0.0|                7.0|      39.0|     