In [4]:
#pip install sparknlp

Collecting sparknlp
  Downloading sparknlp-1.0.0-py3-none-any.whl (1.4 kB)
Collecting spark-nlp
  Downloading spark_nlp-3.4.2-py2.py3-none-any.whl (142 kB)
[K     |████████████████████████████████| 142 kB 3.3 MB/s eta 0:00:01
[?25hInstalling collected packages: spark-nlp, sparknlp
Successfully installed spark-nlp-3.4.2 sparknlp-1.0.0
Note: you may need to restart the kernel to use updated packages.


In [5]:
#pip install nltk

Collecting nltk
  Downloading nltk-3.7-py3-none-any.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 5.6 MB/s eta 0:00:01
Collecting regex>=2021.8.3
  Downloading regex-2022.3.15-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (764 kB)
[K     |████████████████████████████████| 764 kB 7.0 MB/s eta 0:00:01
Installing collected packages: regex, nltk
Successfully installed nltk-3.7 regex-2022.3.15
Note: you may need to restart the kernel to use updated packages.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf, col, lower, regexp_replace,countDistinct
from pyspark.ml.feature import Tokenizer, StopWordsRemover, NGram, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField
import pandas as pd
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

#from nltk.stem.snowball import SnowballStemmer

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [2]:
spark = SparkSession.builder.getOrCreate()

# Chargement des données

In [3]:


#telechargement des données de trainning
train_review = "/home/jovyan/work/data/dataset_booking_prepared2.csv"

# lecture d'un fichier de manière la plus brute
schema = StructType([
    StructField('c0', IntegerType(),True),
    StructField('hotel_name', StringType(),True),
    StructField('lat', FloatType(),True),
    StructField('long', FloatType(),True),
    StructField('average_score', FloatType(),True),
    StructField('review', StringType(),True),
    StructField('polarity', IntegerType(),True),
    StructField('word_counts', IntegerType(),True),
    StructField('tags', StringType(),True),
])

df = spark.read.format('csv').options(header=True).options(delimiter= ";").schema(schema).load(train_review )
df.dtypes

[('c0', 'int'),
 ('hotel_name', 'string'),
 ('lat', 'float'),
 ('long', 'float'),
 ('average_score', 'float'),
 ('review', 'string'),
 ('polarity', 'int'),
 ('word_counts', 'int'),
 ('tags', 'string')]

In [4]:
df= df.dropna(how="any")
df.filter(df.polarity.isNull()).count()

0

In [5]:
df.count()

746400

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
#df.agg(countDistinct("polarity")).show()

In [8]:
df.select('polarity').where(df.polarity==0).count()

337284

In [9]:
df.select('polarity').where(df.polarity==1).count()

409116

# Pipeline

In [10]:
# Tokenize text
tokenizer = Tokenizer(inputCol='review', outputCol='words_token')

#StopWord
stopwordList = nltk.corpus.stopwords.words('english')
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean', stopWords=stopwordList )

#Ngram
bigram = NGram(n=2, inputCol="words_clean", outputCol="bigrams")

#HashingTF
hashingTF = HashingTF(inputCol="bigrams", outputCol="rawFeatures", numFeatures=1000000)

#IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")


In [11]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([
           tokenizer,
           remover,
           bigram,
           hashingTF,
           idf
     ])

### Enregistrement du pipeline

In [14]:
# test du modele 
pipeline= pipeline.fit(df)
pipeline.write().overwrite().save("myPipeline")

In [15]:
df_prepared= df_prepared.transform(df)

## Separation du test et du train

In [16]:
split= df_prepared.randomSplit([0.7,0.3],100)

In [17]:
train= split[0]
train.count()

522889

In [18]:
test=split[1]
test.count()

223511

## Model LR

In [19]:
# Entrainement du Modèle

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'polarity', maxIter=10)
lrModel = lr.fit(train)

#print('Coefficient:' + str(lrModel.coefficients))
#print('Intercept:' + str(lrModel.intercept))

### Enregistrement du modèle

In [20]:
# Save
lrModel.save("mylrModel")



In [21]:
lrPreds= lrModel.transform(test)

In [22]:
lrPreds.select('hotel_name','polarity','prediction').tail(20)

[Row(hotel_name='Alma Barcelona GL', polarity=1, prediction=0.0),
 Row(hotel_name='Radisson Blu Edwardian Mercer Street', polarity=1, prediction=1.0),
 Row(hotel_name='The Rockwell', polarity=1, prediction=1.0),
 Row(hotel_name='Sofitel London St James', polarity=1, prediction=1.0),
 Row(hotel_name='NH Amsterdam Caransa', polarity=1, prediction=1.0),
 Row(hotel_name='Hotel Kaiserin Elisabeth', polarity=1, prediction=1.0),
 Row(hotel_name='Crowne Plaza Milan City', polarity=1, prediction=1.0),
 Row(hotel_name='The Marylebone Hotel', polarity=1, prediction=1.0),
 Row(hotel_name='Britannia International Hotel Canary Wharf', polarity=1, prediction=1.0),
 Row(hotel_name='Shaftesbury Metropolis London Hyde Park', polarity=1, prediction=1.0),
 Row(hotel_name='Petit Palace Boqueria Garden', polarity=1, prediction=1.0),
 Row(hotel_name='Mercure Hotel Amsterdam City South', polarity=1, prediction=1.0),
 Row(hotel_name='LHP Hotel Napoleon', polarity=1, prediction=0.0),
 Row(hotel_name='The Hoxton

## Evaluation du modèle

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='polarity', metricName='areaUnderROC')
my_eval_lr.evaluate(lrPreds)

0.8781162897013849

In [24]:
preds_and_labels= lrPreds.select('prediction','polarity')

y_true = preds_and_labels.select(['polarity']).collect()
y_pred = preds_and_labels.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

           0       0.84      0.90      0.87    101261
           1       0.91      0.86      0.88    122250

    accuracy                           0.88    223511
   macro avg       0.88      0.88      0.88    223511
weighted avg       0.88      0.88      0.88    223511



In [25]:
# Matrice de confusion du RamdomForestClassifier
conf_matrix_lr=confusion_matrix(y_true, y_pred)
conf_matrix_lr = pd.DataFrame(conf_matrix_lr)
conf_matrix_lr

Unnamed: 0,0,1
0,90958,10303
1,17362,104888


# Modèle avec Grindsearch et CV

In [26]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np

In [27]:
lr_best = LogisticRegression(featuresCol = 'features', labelCol = 'polarity', maxIter = 10)

paramGrid = ParamGridBuilder() \
    .addGrid(lr_best.regParam, [0, 0.01]) \
    .build()


crossval_lr = CrossValidator(estimator=lr_best,
                          estimatorParamMaps = paramGrid,
                          evaluator=BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='polarity'),
                          numFolds= 2) 

In [28]:
cvModel_lr = crossval_lr.fit(train)

Py4JJavaError: An error occurred while calling o1405.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 70.0 failed 1 times, most recent failure: Lost task 1.0 in stage 70.0 (TID 223) (dd1593637302 executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2066)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569)
	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877)
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1230)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2297)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.stat.Summarizer$.getClassificationSummarizers(Summarizer.scala:232)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:510)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:285)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2066)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2102)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569)
	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877)
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1230)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)


In [17]:
lrPreds2= cvModel_lr.transform(test)

NameError: name 'cvModel_lr' is not defined