In [22]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [43]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier,DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import stddev, avg

## chargement du dataset

In [55]:
df

DataFrame[Age: string, Attrition: string, BusinessTravel: string, DailyRate: string, Department: string, DistanceFromHome: string, Education: string, EducationField: string, EmployeeNumber: string, EnvironmentSatisfaction: string, Gender: string, HourlyRate: string, JobInvolvement: string, JobLevel: string, JobRole: string, JobSatisfaction: string, MaritalStatus: string, MonthlyIncome: string, MonthlyRate: string, NumCompaniesWorked: string, OverTime: string, PercentSalaryHike: string, PerformanceRating: string, RelationshipSatisfaction: string, StockOptionLevel: string, TotalWorkingYears: string, TrainingTimesLastYear: string, WorkLifeBalance: string, YearsAtCompany: string, YearsInCurrentRole: string, YearsSinceLastPromotion: string, YearsWithCurrManager: string]

In [44]:
spark = SparkSession.builder.appName('attrition').getOrCreate()
file_location = "HR-Employee-Attrition.csv"
file_type = "csv"
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.option("header", first_row_is_header) \
.option("sep", delimiter) \
.load(file_location)

df = df.drop("Over18")
df = df.drop("EmployeeCount")
df = df.drop("StandardHours")

## initialisation des modèles et des paramètres

In [54]:
"""j'ai mis le 1er bloque en commentaire car si vous tester l'entrainement
avec bcp d'hyperparamètres ca peut prendre du temps. Vous pouvez lenlever des commentaire
et mettre le deuxieme bloc en commentaire si vous construisez votre modèle
"""

"j'ai mis ce bloque en commentaire car si vous tester l'entrainement\navec bcp d'hyperparamètre ca peut prendre du temps. Vous pouvez l'enlever des commentaire\net mettre le deuxieme bloc en commentaire si vous construisez votre modèle\n"

In [45]:
# gbt = GBTClassifier(featuresCol='features', labelCol='label')
# param_grid_gbt = ParamGridBuilder() \
#     .addGrid(gbt.maxDepth, [2, 4, 6]) \
#     .addGrid(gbt.maxBins, [20, 30]) \
#     .addGrid(gbt.maxIter, [10, 20, 30]) \
#     .build()

# rf = RandomForestClassifier(featuresCol='features', labelCol='label')
# param_grid_rf = ParamGridBuilder() \
#     .addGrid(rf.maxDepth, [2, 5, 10]) \
#     .addGrid(rf.maxBins, [10, 20, 30]) \
#     .addGrid(rf.numTrees, [10, 50, 100]) \
#     .addGrid(rf.impurity, ['gini', 'entropy']) \
#     .build()

# lr = LogisticRegression(featuresCol='features', labelCol='label')
# param_grid_lr = ParamGridBuilder() \
#     .addGrid(lr.regParam, [0.01, 0.05, 0.1]) \
#     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
#     .build()

In [46]:
gbt = GBTClassifier(featuresCol='features', labelCol='label')
param_grid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2]) \
    .addGrid(gbt.maxBins, [20]) \
    .addGrid(gbt.maxIter, [10]) \
    .build()

rf = RandomForestClassifier(featuresCol='features', labelCol='label')
param_grid_rf = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2]) \
    .addGrid(rf.maxBins, [10]) \
    .addGrid(rf.numTrees, [10]) \
    .addGrid(rf.impurity, ['gini']) \
    .build()

lr = LogisticRegression(featuresCol='features', labelCol='label')
param_grid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01]) \
    .addGrid(lr.elasticNetParam, [0.0]) \
    .build()

## fonctions

In [9]:
def convert_to_int(df):
    # convertir les colonnes string en double quand cela est possible
    for column in df.columns:
        try:
            if int(df.select(column).first()[0]):
                df = df.withColumn(column, col(column).cast("double"))
        except:
            pass
    return df
        
def remove_outliers(df):

    # Boucle pour enlever les valeurs aberrantes de chaque colonne
    for col in df.columns:
        # Calcul des statistiques pour la colonne
        stats = df.select(avg(col), stddev(col)).first()
        mean = stats[0]
        std = stats[1]  
        if mean is not None and std is not None:
            # Calcul du seuil pour déterminer les valeurs aberrantes
            threshold = 3 * std + mean     
            # Suppression des valeurs aberrantes pour la colonne
            df = df.filter(df[col] <= threshold)
    return df


def assembler_for_pipeline(df, label):
    categoricalColumns = [col for (col, dtype) in df.dtypes if dtype == "string" and col != label]
    stages = []

    for categoricalCol in categoricalColumns:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
        stages += [stringIndexer, encoder]
    
    label_stringIdx = StringIndexer(inputCol=label, outputCol="label")
    stages += [label_stringIdx]

    numericCols = [col for (col, dtype) in df.dtypes if dtype.startswith('double') and col != "label"]
    assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    return assembler, stages

def model_for_pipeline(classifier, param_grid):
    evaluator = BinaryClassificationEvaluator(labelCol='label')
    cv = CrossValidator(estimator=classifier, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3).setSeed(1234)
    return cv

def view_feature_importances(df):
    
    assembler, stages = assembler_for_pipeline(df, 'Attrition')
    rf = RandomForestClassifier(labelCol="label", featuresCol="features")
    paramGrid = ParamGridBuilder() \
        .addGrid(rf.numTrees, [10, 20]) \
        .addGrid(rf.maxDepth, [5, 10]) \
        .build()
    
    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    train, test = df.randomSplit([0.8, 0.2])
    stages += [assembler, rf]
    pipeline = Pipeline(stages=stages)
    
    pipelineModel = pipeline.fit(df)
    predictions = pipelineModel.transform(df)
    auc = evaluator.evaluate(predictions)
    print("AUC-ROC = %g" % auc)
    
    importances = pipelineModel.stages[-1].featureImportances.toArray()
    cols = df.columns
    selectedcols = ["label", "features"] + cols
    cols_importances = list(zip(selectedcols, importances))
    sorted_cols_importances = sorted(cols_importances, key=lambda x: x[1], reverse=True)
    print("Colonnes triées par ordre d'importance décroissante :")
    for col, importance in sorted_cols_importances:
        print(col, "=", importance)
        
    return sorted_cols_importances
        
def pipelinedata(df, assembler, stages, model):
    
    stages += [assembler, model]
    train, test = df.randomSplit([0.8, 0.2])
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(train)
    predictions = pipelineModel.transform(test)
    predictions.take(1)
    selected = predictions.select("label", "prediction", "rawPrediction", "probability")
    display(selected)
    return model, predictions, selected

def metrics(predictions, model):

    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
    auc_roc = evaluator.evaluate(predictions)
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

    print(model + ": courbe ROC = %g" % auc_roc)
    print(model + ": précision = %g" % accuracy)
    print(model + ": RMSE = %g" % rmse)

    return accuracy
    
def filter_most_important_columns(df, sorted_cols_importances, label):
    selected_cols = [col for col, importance in sorted_cols_importances if importance > 0.01]
    if label not in selected_cols:
        selected_cols.append(label)
    if "features" in selected_cols:
        selected_cols.remove("features")
    if "label" in selected_cols:
        selected_cols.remove("label")
    df = df.select(selected_cols)
    return df

# PART1 . ANALYSE

## nettoyage du dataframe

In [10]:
df = convert_to_int(df)
df = remove_outliers(df)

## voir les champs les plus importants

In [11]:
sorted_cols_importances = view_feature_importances(df)

AUC-ROC = 0.919119
Colonnes triées par ordre d'importance décroissante :
MonthlyRate = 0.0983397388908216
NumCompaniesWorked = 0.03756839918566912
MonthlyIncome = 0.03746662993664787
features = 0.02484224106688456
JobLevel = 0.014621880098258258
OverTime = 0.01434072393497946
Attrition = 0.011603479972813755
DistanceFromHome = 0.010027570527431829
YearsWithCurrManager = 0.009372115765421535
MaritalStatus = 0.009059242489480389
YearsInCurrentRole = 0.008896542439843457
PerformanceRating = 0.006405850536698984
EmployeeNumber = 0.0063590962155242615
TotalWorkingYears = 0.0057390272753579845
label = 0.005246639765268216
PercentSalaryHike = 0.004550331139239345
HourlyRate = 0.004333128155377987
Gender = 0.004055819560471669
DailyRate = 0.003562612347840823
Age = 0.0034841170066732276
EducationField = 0.003354173937055974
Department = 0.0033048991277145985
YearsSinceLastPromotion = 0.003200894872720736
EnvironmentSatisfaction = 0.0030973149737195996
YearsAtCompany = 0.0028187770209029558
Job

## filtrer df pour ne garder que les champs retenus 

In [12]:
df_most_important = filter_most_important_columns(df, sorted_cols_importances, 'Attrition')

In [13]:
df_most_important.show(10)

+-----------+------------------+-------------+--------+--------+---------+----------------+
|MonthlyRate|NumCompaniesWorked|MonthlyIncome|JobLevel|OverTime|Attrition|DistanceFromHome|
+-----------+------------------+-------------+--------+--------+---------+----------------+
|    19479.0|               8.0|       5993.0|     2.0|     Yes|      Yes|             1.0|
|    24907.0|               1.0|       5130.0|     2.0|      No|       No|             8.0|
|     2396.0|               6.0|       2090.0|     1.0|     Yes|      Yes|             2.0|
|    23159.0|               1.0|       2909.0|     1.0|     Yes|       No|             3.0|
|    16632.0|               9.0|       3468.0|     1.0|      No|       No|             2.0|
|    11864.0|               0.0|       3068.0|     1.0|      No|       No|             2.0|
|     9964.0|               4.0|       2670.0|     1.0|     Yes|       No|             3.0|
|    13335.0|               1.0|       2693.0|     1.0|      No|       No|      

In [40]:
"""Tout ce qui vient avant "DEMARRAGE DU TRAIN" devra rester dans la partie analyse et non pas dans le train
puisque si au niveau de l'analyse les colonnes les plus importantes changent, notre interface ne fonctionnera plus.
il faut garder une cohérence entre notre dataset et les champs du html
Il faut donc que le dataset reste statique
"""

'Tout ce qui vient avant "DEMARRAGE DU TRAIN" devra rester dans la partie analyse et non pas dans le train\npuisque si au niveau de l\'analyse les colonnes les plus importantes changent, notre interface ne fonctionnera plus.\nil faut garder une cohérence entre notre dataset et les champs du html\nIl faut donc que le dataset reste statique\n'

# PART2. DEMARRAGE DU TRAIN

## entrainement du modèle et selectionner le meilleur

In [50]:
def select_most_efficient_model(df):

    best_model = None
    best_accuracy = 0
    models = {}
    
    df = convert_to_int(df)
    df = remove_outliers(df)
    # je selectionne les colonnes de manières brut pour qu'elles ne se modifie pas
    df_most_important = df.select(['MonthlyRate', 'NumCompaniesWorked', 'MonthlyIncome', 'JobLevel', 'OverTime', 'Attrition', 'DistanceFromHome'])

    df_gbt = df_most_important
    df_rf = df_most_important
    df_lr = df_most_important

    # model random forest
    assembler, stages = assembler_for_pipeline(df_rf, 'Attrition')
    model_rf = model_for_pipeline(rf, param_grid_rf)
    model_rf, predictions_rf, selected_rf = pipelinedata(df_rf, assembler, stages, model_rf)
    accuracy = metrics(predictions_rf, 'rf')
    models["rf"] = {"model" : model_rf, "accuracy": accuracy}


    # model gradient boosting
    assembler, stages = assembler_for_pipeline(df_gbt, 'Attrition')
    model_gbt = model_for_pipeline(gbt, param_grid_gbt)
    model_gbt, predictions_gbt, selected_gbt = pipelinedata(df_gbt, assembler, stages, model_gbt)
    accuracy = metrics(predictions_gbt, 'gbt')
    models["gbt"] = {"model" : model_gbt, "accuracy": accuracy}

    # model logistic regression
    assembler, stages = assembler_for_pipeline(df_lr, 'Attrition')
    model_lr = model_for_pipeline(lr, param_grid_lr)
    model_lr, predictions_lr, selected_lr = pipelinedata(df_lr, assembler, stages, model_lr)
    accuracy = metrics(predictions_lr, 'lr')
    models["lr"] = {"model" : model_lr, "accuracy": accuracy}

    for model_name, model_info in models.items():
        accuracy = model_info['accuracy']
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_model = model_info['model']

    return best_model

In [51]:
model = select_most_efficient_model(df)

DataFrame[label: double, prediction: double, rawPrediction: vector, probability: vector]

rf: courbe ROC = 0.746821
rf: précision = 0.826667
rf: RMSE = 0.416333


DataFrame[label: double, prediction: double, rawPrediction: vector, probability: vector]

gbt: courbe ROC = 0.758846
gbt: précision = 0.845324
gbt: RMSE = 0.393289


DataFrame[label: double, prediction: double, rawPrediction: vector, probability: vector]

lr: courbe ROC = 0.781599
lr: précision = 0.834586
lr: RMSE = 0.406711


## sauvegarde et enregistrement du zip (commande à utiliser sur Colab)

In [52]:
model

CrossValidator_f3fdd3f62418

In [31]:
# import shutil
# model.save("Model")

In [32]:
# shutil.make_archive('Model', 'zip','Model')