In [1]:
#### les variables d'environnement doivent etre définies de cette manière à chaque fois qu'on utilise Spark 
import os
import sys

os.environ["PYSPARK_PYTHON"] = "/data/Anaconda/Anaconda3-5.1.0/bin/python3"
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["SPARK_MAJOR_VERSION"] = '2' 
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")
from pyspark.sql.functions import * 

In [2]:
# Création de ta spark session
from pyspark.sql import SparkSession

spark = (SparkSession
            .builder
            .master("yarn")
            .config("spark.executor.memory", "2g") # la mémoire de chaque exécuteur
            .config("spark.driver.memory", "4g") # la mémoire du driver: le driver ici est le edge node 
            .config("spark.executor.instances",'2') # le nombre des exécuteurs
            .config("spark.executor.cores", "4") # le nombre de cores par exécuteur
            .appName("test thomas") # le nom de ton application
            .getOrCreate())

In [3]:
import pandas

pdf = pandas.read_excel('df_991.xlsx') # les 3 types de dataset utilisé : shuffle.xlsx df_199.xlsx df_991.xlsx

pdf.Adresses = pdf.Adresses.astype(str)
df = spark.createDataFrame(pdf)

df.show()
df.printSchema()

# you can give sheetname as option if your excel sheet has multiple sheets

# .option("sheetName", "Sheet2")

+--------------------+------+
|            Adresses|Target|
+--------------------+------+
|               75001|     0|
|3 rue d'italie, 6...|     1|
|143 bis rue jean ...|     1|
|13 rue antoine de...|     1|
|14 rue du chapeau...|     1|
|26 quai de la rap...|     1|
|26 avenue de thie...|     1|
|route departement...|     1|
|z.i d'engachies, ...|     1|
|2 rue malbec, 310...|     1|
|mairie rue des ec...|     1|
|30 rue de la gare...|     1|
|42 bis rue d'arme...|     1|
|370 route de marc...|     1|
|11 boulevard char...|     1|
|1 allee andre che...|     1|
|4, avenue des pro...|     1|
|10 rue paul iris ...|     1|
|31 place bellecou...|     1|
|bp 81007, 95931, ...|     1|
+--------------------+------+
only showing top 20 rows

root
 |-- Adresses: string (nullable = true)
 |-- Target: long (nullable = true)



In [4]:
import pyspark
def spark_shape(self):
    return (self.count(), len(self.columns))
pyspark.sql.dataframe.DataFrame.shape = spark_shape

df.shape() #pour vérifier la shape de mon tableau au cas ou

(100001, 2)

In [5]:
from pyspark.sql.functions  import col, split
df = df.withColumn("adresse_clean", split(col("adresses")," "))
# df.show()

In [6]:
# splitting dataset into train and test set
(train, test) = df.randomSplit([0.7, 0.3])

In [7]:
# train.head(5)

# On travail l'échantillon d'entrainement avant de l'utiliser dans le modèle

In [9]:
#source = https://spark.apache.org/docs/2.2.0/ml-features.html#countvectorizer
# https://spark.apache.org/docs/2.0.0/ml-features.html#countvectorizer

#On vectorise le dataframe afin qu'il soit compris par le modèle
from pyspark.ml.feature import CountVectorizer

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="adresse_clean", outputCol="count_vect", vocabSize=3, minDF=2.0)
model = cv.fit(train)
result_tr = model.transform(train)
result_tr.show()

+--------------------+------+--------------------+-------------------+
|            Adresses|Target|       adresse_clean|         count_vect|
+--------------------+------+--------------------+-------------------+
|'espace antrium',...|     1|['espace, antrium...|          (3,[],[])|
|'le republicain' ...|     1|['le, republicain...|(3,[0,1],[1.0,1.0])|
|., 49360, 49360 t...|     1|[.,, 49360,, 4936...|          (3,[],[])|
|  /, 64300, argagnon|     1|[/,, 64300,, arga...|          (3,[],[])|
|02 av alcide de g...|     1|[02, av, alcide, ...|      (3,[1],[1.0])|
|02 rue gambetta, ...|     1|[02, rue, gambett...|      (3,[0],[1.0])|
|07 avenue de la g...|     1|[07, avenue, de, ...|      (3,[1],[1.0])|
|1 - 5, rue colome...|     1|[1, -, 5,, rue, c...|      (3,[0],[1.0])|
|1 3 rue st pierre...|     1|[1, 3, rue, st, p...|      (3,[0],[1.0])|
|1 a rue saint lau...|     1|[1, a, rue, saint...|      (3,[0],[1.0])|
|1 allee des colom...|     1|[1, allee, des, c...|          (3,[],[])|
|1 all

In [10]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


# alternatively, CountVectorizer can also be used to get term frequency vectors
# On applique ensuite un tf-idf sur le tableau vectoriser afin d'y appliquer des poids probabiliste

idf = IDF(inputCol="count_vect", outputCol="tf_idf")
idfModel = idf.fit(result_tr)
rescaledData_tr = idfModel.transform(result_tr)
#petit bout de code toruvé sur un internet pour voir si ça a marché
for features_label in rescaledData_tr.select("tf_idf", "Target").take(3):
    print(features_label)

Row(tf_idf=SparseVector(3, {}), Target=1)
Row(tf_idf=SparseVector(3, {0: 0.7665, 1: 1.2064}), Target=1)
Row(tf_idf=SparseVector(3, {}), Target=1)


# On travail l'échantillon de test avant de l'utiliser dans le modèle

les exactes même étapes sont appliqués à l'échantillon de test et d'entrainement

In [11]:
from pyspark.ml.feature import CountVectorizer

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="adresse_clean", outputCol="count_vect", vocabSize=3, minDF=2.0)
model = cv.fit(test)
result_te = model.transform(test)
result_te.show()

+--------------------+------+--------------------+-------------------+
|            Adresses|Target|       adresse_clean|         count_vect|
+--------------------+------+--------------------+-------------------+
|'la procureuse', ...|     1|['la, procureuse'...|          (3,[],[])|
|'toutejoie', 8650...|     1|['toutejoie',, 86...|          (3,[],[])|
|06 rue de la flut...|     1|[06, rue, de, la,...|(3,[0,1],[1.0,2.0])|
|1 - 3 rue du 5eme...|     1|[1, -, 3, rue, du...|(3,[0,1],[1.0,1.0])|
|1 44 allees marin...|     1|[1, 44, allees, m...|          (3,[],[])|
|1 a rue de la pie...|     1|[1, a, rue, de, l...|(3,[0,1],[1.0,1.0])|
|1 allee andre che...|     1|[1, allee, andre,...|          (3,[],[])|
|1 allee calmeilh,...|     1|[1, allee, calmei...|          (3,[],[])|
|1 allee du musee,...|     1|[1, allee, du, mu...|          (3,[],[])|
|1 allee du parc, ...|     1|[1, allee, du, pa...|          (3,[],[])|
|1 allee henri dav...|     1|[1, allee, henri,...|          (3,[],[])|
|1 all

In [12]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer


# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="count_vect", outputCol="tf_idf")
idfModel = idf.fit(result_te)
rescaledData_te = idfModel.transform(result_te)
for features_label in rescaledData_te.select("tf_idf", "Target").take(3):
    print(features_label)

Row(tf_idf=SparseVector(3, {}), Target=1)
Row(tf_idf=SparseVector(3, {}), Target=1)
Row(tf_idf=SparseVector(3, {0: 0.7671, 1: 2.4329}), Target=1)


### On entraine le modèle et regardons le résultat de ses prédictions 

Modèle Naive Bayes :

In [13]:
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#source = https://towardsdatascience.com/natural-language-processing-with-spark-9efef3564270


## Fitting the model
nb = NaiveBayes(modelType="multinomial",labelCol="Target", featuresCol="tf_idf")
nbModel = nb.fit(rescaledData_tr)
nb_predictions = nbModel.transform(rescaledData_te)

## Evaluating the model
#doc avec les différentes métrics utilisable : https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.html#getMetricName--
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("F1 of NaiveBayes is = %g"% (nb_accuracy))

F1 of NaiveBayes is = 0.985906


Modèle Random Forest : 

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol="Target", featuresCol="tf_idf", numTrees=8)

# Train model.  This also runs the indexers.
model = rf.fit(rescaledData_tr)

# Make predictions.
rf_predictions = model.transform(rescaledData_te)

## Evaluating the model
#doc avec les différentes métrics utilisable : https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.html#getMetricName--
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1")
rf_accuracy = evaluator.evaluate(rf_predictions)
print("F1 of Random Forest is = %g"% (rf_accuracy))

F1 of Random Forest is = 0.985906


Modèle Regression logisitc : 

In [15]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="Target", featuresCol="tf_idf")

# Fit the model
lrModel = lr.fit(rescaledData_tr)

# Make predictions.
lr_predictions = lrModel.transform(rescaledData_te)


## Evaluating the model
#doc avec les différentes métrics utilisable : https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.html#getMetricName--
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1")
lr_accuracy = evaluator.evaluate(lr_predictions)
print("F1 of Logistic Regression is = %g"% (lr_accuracy))

F1 of Logistic Regression is = 0.985906


### Corss validation et tunning des hyperparamètres 

source cross val : https://spark.apache.org/docs/2.1.0/ml-tuning.html#cross-validation

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



Alors cette case je la tire de la doc, je ne sais pas pourquoi elle ne marche plus, elle a marché à une époque. Mais apparement le fichier train n'est plus accepté alors qu'au dessus ça amrchait, peut être que je vais fait une erreur dans ma 
construction de pipeline. 

Pour ce qui est du alpha, le smoothing parameter dont je te parlais le truc le plus ressemblant dans la doc c'était le lambda que tu vois en annotation en dessous de paramGrid, je t'ai déjà mis les valeurs que je cherchais à regarder au cas ou. 

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = train

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
cv = CountVectorizer(inputCol="adresse_clean", outputCol="count_vect", vocabSize=3, minDF=2.0)
idf = IDF(inputCol=cv.getOutputCol(), outputCol="tf_idf")
nb = NaiveBayes(modelType="bernoulli",labelCol="Target", featuresCol="tf_idf")
pipeline = Pipeline(stages=[cv, idf, nb])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.

paramGrid = ParamGridBuilder() \
    .addGrid(idf.minDocFreq, [0, 1, 2, 3]) \
    .build() #.addGrid(nb.lambda_, [1, 0.1, 0.01, 0.001, 0.0001, 0.00001]) \

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1"),
                          numFolds=8)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# # Prepare test documents, which are unlabeled.
# test = test

# Make predictions, cvModel uses the best model found (lrModel).
# nb_prediction = cvModel.transform(test)
# nb_accuracy = evaluator.evaluate(nb_predictions)
# print("F1 of NaiveBayes is = %g"% (nb_accuracy))

Py4JJavaError: An error occurred while calling o373.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 92.0 failed 4 times, most recent failure: Lost task 1.3 in stage 92.0 (TID 552, slmupd5hsn38.zres.ztech, executor 2): java.lang.IllegalArgumentException: requirement failed: Bernoulli naive Bayes requires 0 or 1 feature values but found (3,[0,1],[1.8365064021303383,2.255097892944781]).
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.classification.NaiveBayes$.requireZeroOneBernoulliValues(NaiveBayes.scala:243)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$5.apply(NaiveBayes.scala:143)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$5.apply(NaiveBayes.scala:143)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$7.apply(NaiveBayes.scala:166)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$7.apply(NaiveBayes.scala:164)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:191)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:190)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.ml.classification.NaiveBayes.trainWithLabelCheck(NaiveBayes.scala:174)
	at org.apache.spark.ml.classification.NaiveBayes.train(NaiveBayes.scala:118)
	at org.apache.spark.ml.classification.NaiveBayes.train(NaiveBayes.scala:78)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: Bernoulli naive Bayes requires 0 or 1 feature values but found (3,[0,1],[1.8365064021303383,2.255097892944781]).
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.classification.NaiveBayes$.requireZeroOneBernoulliValues(NaiveBayes.scala:243)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$5.apply(NaiveBayes.scala:143)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$5.apply(NaiveBayes.scala:143)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$7.apply(NaiveBayes.scala:166)
	at org.apache.spark.ml.classification.NaiveBayes$$anonfun$7.apply(NaiveBayes.scala:164)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:191)
	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:190)
	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150)
	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:196)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [27]:
# Prepare test documents, which are unlabeled.
test = test
# Make predictions on test documents. cvModel uses the best model found (lrModel).
evaluator=MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1")
prediction = cvModel.transform(test)
nb_accuracy = evaluator.evaluate(prediction)
print("F1 of NaiveBayes is = %g"% (nb_accuracy))

F1 of NaiveBayes is = 0.825668


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="f1")
nb_accuracy = evaluator.evaluate(selected)
print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))