In [None]:
import matplotlib.pyplot as plt
from datetime import datetime
from dateutil import parser
from pyspark.sql.functions import unix_timestamp, date_format, col, when
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator



In [None]:
''' 
Exécutez les lignes suivantes pour créer un DataFrame Spark en collant le code dans une nouvelle cellule.
Cette étape récupère les données via l'API Open Datasets. L'extraction de toutes ces données génère environ
1,5 milliard de lignes.
Selon la taille de votre pool Apache Spark serverless, il est possible que les données brutes soient trop
volumineuses ou que leur exploitation prenne trop de temps. Vous pouvez filtrer ces données pour en réduire
le volume. L'exemple de code suivant utilise start_date et end_date pour appliquer un filtre qui retourne un
seul mois de données.
'''
from azureml.opendatasets import NycTlcYellow
from datetime import datetime
from dateutil import parser

end_date = parser.parse('2018-05-08 00:00:00')
start_date = parser.parse('2018-05-01 00:00:00')

nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())


# Le code suivant réduit le jeu de données à environ 2 000 lignes
sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)



StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
'''
Le code suivant offre deux façons d’afficher les données. La première est basique. La deuxième offre une
expérience de grille beaucoup plus riche ainsi que la possibilité de visualiser les données sous forme graphique.
'''
#sampled_taxi_df.show(5)
display(sampled_taxi_df)

'''
    Selon la taille du jeu de données généré et la nécessité ou non d’expérimenter ou d’exécuter le notebook
    plusieurs fois, vous pouvez mettre en cache le jeu de données localement dans l’espace de travail.
    Il existe trois façons d'effectuer une mise en cache explicite :
        Enregistrez le DataFrame localement en tant que fichier.
        Enregistrez le DataFrame en tant que table ou vue temporaire.
        Enregistrez le DataFrame en tant que table permanente.

Les deux premières approches sont incluses dans les exemples de code suivants.
'''
sampled_taxi_df.createOrReplaceTempView("nytaxi")



StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
'''
Dans le code suivant, vous effectuez quatre classes d’opérations :

    suppression des valeurs hors norme ou incorrectes par filtrage ;
    suppression des colonnes superflues ;
    création de colonnes dérivées des données brutes pour un assurer un fonctionnement plus efficace du modèle. Cette opération est parfois appelée caractérisation.
    étiquetage. Comme vous procédez à une classification binaire (y aura-t-il ou non un pourboire à l’issue d’une course donnée), il est nécessaire de convertir le montant du pourboire en valeur 0 ou 1.
'''
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )


# Vous effectuez ensuite une deuxième passe sur les données pour ajouter les caractéristiques finales.
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))



StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
'''
La dernière tâche consiste à convertir les données étiquetées en format analysable par régression logistique.
L'entrée dans un algorithme de régression logistique doit être un jeu de paires de vecteurs étiquette/caractéristique,
où le vecteur caractéristique est un vecteur de nombres qui représentent le point d'entrée.

Vous devez donc convertir les colonnes catégoriques en nombres. Plus précisément, vous devez convertir les colonnes
trafficTimeBins et weekdayString en représentations entières. Cette conversion peut être effectuée en suivant plusieurs
approches. L'exemple ci-dessous suit l'approche OneHotEncoder, qui est courante.
'''
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Cette action génère un nouveau DataFrame dont les colonnes sont toutes dans un format adapté à l’entraînement d’un modèle
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)



StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
'''
La première tâche consiste à diviser le jeu de données en un jeu d’entraînement et un jeu de test ou de validation.
La division est ici arbitraire. Faites des essais avec différents paramètres de division pour voir s’ils ont un effet sur
le modèle.
'''
# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234
# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)



StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
'''
Maintenant qu’il existe deux DataFrames, la tâche suivante consiste à créer la formule du modèle et à l’exécuter sur le DataFrame d’entraînement. Vous pouvez ensuite effectuer une validation par rapport au DataFrame de test. Faites des essais avec différentes versions de la formule du modèle pour voir l’impact de différentes combinaisons.
'''
## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# La sortie devrait être : Area under ROC = 0.9779470729751403



StatementMeta(, , -1, Cancelled, , Cancelled)

In [None]:
'''
Vous pouvez désormais construire une visualisation finale pour faciliter l’examen des résultats de ce test.
La courbe ROC est une façon d’examiner les résultats.
'''
## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

StatementMeta(, , -1, Cancelled, , Cancelled)