<h1 style="text-align: center;">&nbsp;<img style="font-size: 0.9em;" src="https://www.hospitalitynet.org/picture/153007157/travelers-push-tripadvisor-past-1-billion-reviews-opinions.jpg?t=1587981992" alt="" width="300" height="100" /><span style="font-family: tahoma, arial, helvetica, sans-serif; font-size: large;"><span style="font-size: x-large;"> Preprocessing des données avec PySpark</span></span><span style="font-family: tahoma, arial, helvetica, sans-serif; font-size: large;">&nbsp; &nbsp; &nbsp;&nbsp;</span>&nbsp;<img src="https://i0.wp.com/mosefparis1.fr/wp-content/uploads/2022/10/cropped-image-1.png?fit=532%2C540&amp;ssl=1" alt="" width="150" height="150" />&nbsp;</h1>
<p style="text-align: center;">&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;de Lucie Gabagnou et Yanis Rehoune</p>

Dans ce second notebook, nous effectuons une pipeline de preprocessing des données: Les données brutes qui ont été extraites par Webscraping ne sont pas forcément dans le format attendu. Il faut ainsi nettoyer les données et extraire des features pertinents.




# Processing avec PySpark
Dans cette partie, nous réalisons le processing de nos données: on nettoie (le texte particulièrement) et éventuellement créer de nouveaux features à partir de la base brute récoltée. Ici, il faut s'assurer que les types soient les bons, que le texte soit exploitable pour le NLP, et que tous les features soient exploitables (typiquement l'adresse doit devenir un ensemble de coordonnées géographiques..).




 Dans notre cas, Pyspark s'avère pratique pour exécuter facilement des fonctions sur un nombre de lignes  important. 
A l'issue de cette étape, les données seront prêtes pour l'analyse exploratoire et le ML.
Remarque: il n'y a pas d'étapes intermédiaires pour voir les données car on évite d'utiliser les fonctions .show() qui puisent dans la mémoire vive et sont longues.



REMARQUE: 
- Si vous utilisez PySpark en local, assurez-vous d'avoir bien installé PySpark (cf README)
- Si vous utilisez PySpark sur google colab, assurez-vous d'avoir un dossier sur le drive comportant le projet!


#### Installation de l'environnement

In [1]:

import findspark
import os
findspark.init()
findspark.find()

'/Users/luciegabagnou/opt/anaconda3/envs/scrap/lib/python3.9/site-packages/pyspark'

On définit un chemin actuel à la racine du projet pour accéder facilement aux contenus des autres modules:

In [2]:
current_path=os.path.dirname(os.getcwd())
os.chdir(current_path)
print("Current path",os.getcwd())

Current path /Users/luciegabagnou/Documents/MOSEF/PYTHON/projet_trip_advisor/sentiment_analysis_tripadvisor


### Création d'un SparkDataFrame


In [3]:
!pip install -r requirements.txt
#!python -m spacy download fr_core_news_md



In [4]:
from pyspark import  HiveContext , SparkContext
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
import re
from scripts.utils import get_digits
from scripts.preprocessor.global_processor import geocode_address,separate_price_and_cuisine
from scripts.preprocessor.text_processor import clean_text_sentiment_analysis
import nltk
import spacy
from unidecode import unidecode
from nltk.corpus import stopwords
import spacy


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


Dans un premier temps, on créer une session Spark, celle sur laquelle on va load le dataframe et effectuer nos modifications. Dans le notebook, on load un DataFrame, mais dans la partie développement, on exectuera cela sur la base MySQL. 

In [33]:
spark = SparkSession.builder.appName("Load JSON").getOrCreate()
df = spark.read.option("multiline","true").json("data/fetch_data.json")
df.show()

                                                                                

+------------+--------------------+--------------------+--------------+--------------------+---------+--------------------+--------------------+
|average_note|            location|                name|number_reviews|  price_and_cuisines|  ranking|             reviews|                 url|
+------------+--------------------+--------------------+--------------+--------------------+---------+--------------------+--------------------+
|         4,5|149 boulevard Vol...|        Cafe Leopard|            97|[€€-€€€, Français...|    Nº 39|[Café Leopard, al...|https://www.tripa...|
|         4,0|68 Rue de Grenell...|       Cuillier Café|             7|           [€, Café]|   Nº 708|[Bon petit goûter...|https://www.tripa...|
|         4,0|40 rue Gregoire d...|          Oenosteria|           138|[€€-€€€, Italienn...| Nº 2 485|[A éviter absolum...|https://www.tripa...|
|         4,5|9 Rue Joseph de M...|           La Bossue|           480|[€€-€€€, Français...|   Nº 174|[Superbe expérien...|https:/

Dans un premier temps, on s'assure que les types soient corrects:

In [34]:
df.printSchema()

root
 |-- average_note: string (nullable = true)
 |-- location: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number_reviews: string (nullable = true)
 |-- price_and_cuisines: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ranking: string (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- url: string (nullable = true)



On voit que "average note" est en string alors qu'il doit s'agir de nombres décimaux, de même pour le nombre de reviews, le classement (ranking).


#### Type 
On corrige les types qui posaient problèmes précèdemment:

In [35]:
# Création d'une fonction définie par utilisateur (udf). On a repris la fonction get_digits disponibles dans les utils
# Application sur la colonne des classements pour récupérer les chiffres/nombres de la chaîne de caractères n°1 => 1
# Apply UDF to the column "tripadvisor rank"
from pyspark.sql.functions import col, lit, regexp_replace
get_digits_udf = udf(get_digits, DoubleType())
df = df.withColumn("average_note", regexp_replace(col("average_note"), ",", "."))
df = df.withColumn("ranking", get_digits_udf(df["ranking"]))
df = df.withColumn("average_note", get_digits_udf(df["average_note"]))
df = df.withColumn("number_reviews", get_digits_udf(df["number_reviews"]))


In [36]:
df.printSchema()

root
 |-- average_note: double (nullable = true)
 |-- location: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number_reviews: double (nullable = true)
 |-- price_and_cuisines: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ranking: double (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- url: string (nullable = true)



In [37]:
df.show()

[Stage 32:>                                                         (0 + 1) / 1]

+------------+--------------------+--------------------+--------------+--------------------+-------+--------------------+--------------------+
|average_note|            location|                name|number_reviews|  price_and_cuisines|ranking|             reviews|                 url|
+------------+--------------------+--------------------+--------------+--------------------+-------+--------------------+--------------------+
|         4.5|149 boulevard Vol...|        Cafe Leopard|          97.0|[€€-€€€, Français...|   39.0|[Café Leopard, al...|https://www.tripa...|
|         4.0|68 Rue de Grenell...|       Cuillier Café|           7.0|           [€, Café]|  708.0|[Bon petit goûter...|https://www.tripa...|
|         4.0|40 rue Gregoire d...|          Oenosteria|         138.0|[€€-€€€, Italienn...| 2485.0|[A éviter absolum...|https://www.tripa...|
|         4.5|9 Rue Joseph de M...|           La Bossue|         480.0|[€€-€€€, Français...|  174.0|[Superbe expérien...|https://www.tripa...|

                                                                                

Les types ont bien été corrigés.

### Traitement des valeurs manquantes
On traite les valeurs manquantes sur les variables numériques, qui sont celles pouvant ne pas être remplies (pas de reviews => pas de nb reviews, pas de note moyenne)


In [38]:
df = df.na.fill({
    "average_note": 0,
    "ranking": 0,
    "number_reviews": 0
})

### Feature engineering:
- On va séparer le contenu de la colonne "price and cuisines", qui regroupe le prix et les types de cuisine. Nous ne l'avons pas fait lors du webscrapping sachant qu'il n'était pas évident de séparer le contenu: 
- Géolocalisation: on veut récupérer les coordonnées géographiques pour l'appli..

##### Price and cuisines

En effet, on voit qu'il n'est pas évident de trouver une règle simple de séparation car le nombre d'élements n'est pas le même selon les restaurants (parfois aucune information, parfois 4, parfois prix, parfois non, etc..):

In [39]:

# On impose un schéma pour faire en sorte d'avoir le format final souhaité, à savoir des listes/arrays de chaîne de caractères
udf_separate_price_and_cuisine = udf(separate_price_and_cuisine, StructType([
    StructField("price", ArrayType(StringType())),
    StructField("cuisine", ArrayType(StringType()))
])) # On renseigne ce schéma dans l' udf

#On applique les fonctions
df = df.withColumn("price", udf_separate_price_and_cuisine("price_and_cuisines").price) 
df = df.withColumn("cuisine", udf_separate_price_and_cuisine("price_and_cuisines").cuisine)
df = df.drop(*["price_and_cuisines"])

##### Localisation
On utilise le package geopy pour convertir nos adresses en coordonnées GPS: On créée une colonne pour la longitude et une pour la latitude.

In [40]:
from geopy.geocoders import Nominatim
from pyspark.sql.functions import udf,mean
from pyspark.sql.types import StructType, StructField, DoubleType
from scripts.preprocessor.global_processor import geocode_address

geocode_udf = udf(
            geocode_address,
            returnType = StructType([
    StructField("latitude", DoubleType()),
    StructField("longitude",DoubleType())]))

df = df.repartition(20) 
df = df.withColumn("longitude", geocode_udf("location").longitude)
df = df.withColumn("latitude", geocode_udf("location").latitude)

#On fill par la moyenne

avg_longitude = df.agg(mean(df["longitude"])).first()[0]
avg_latitude = df.agg(mean(df["latitude"])).first()[0]
df = df.fillna(avg_longitude, subset="longitude")
df = df.fillna(avg_latitude, subset="latitude")

[nltk_data] Downloading package stopwords to                       (0 + 8) / 20]
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-

##### Commentaires
Pour les commentaires, on applique notre pipeline NLP qui a pour objectif de nettoyer un ensemble de commentaires en vue d'une analyse de sentiment. Elle effectue plusieurs opérations de nettoyage, telles que la conversion en minuscules, la suppression de caractères spéciaux et de chiffres, la suppression de mots-clés, la lemmatisation et la suppression d'entités nommées. Elle utilise la bibliothèque spaCy pour la lemmatisation et la suppression d'entités nommées. Enfin, la fonction retourne une liste de commentaires nettoyés. De plus, elle récupère les notes associées aux commentaires dans une autre colonne pour compléter cette analyse.

In [42]:
udf_text_cleaning = udf(clean_text_sentiment_analysis, StructType([
    StructField("reviews", ArrayType(StringType())),
    StructField("ratings", ArrayType(StringType()))]))


df=df.withColumn("clean_reviews", udf_text_cleaning("reviews").reviews)
df=df.withColumn("ratings", udf_text_cleaning("reviews").ratings)

##### Conversion des données
On passe par Pandas car cela nous permet d'éviter des problèmes de conversion (les méthodes df.write... ne sont pas particulièrement adapté à nos données)

In [44]:
clean_data=df.toPandas() # Environ 10 min

[nltk_data] Downloading package stopwords to                        (0 + 1) / 1]
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to                       (0 + 8) / 20]
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_d

In [45]:
clean_data

Unnamed: 0,average_note,location,name,number_reviews,ranking,reviews,url,price,cuisine,longitude,latitude,clean_reviews,ratings
0,4.0,"39 Avenue de l'Opéra, 75002 Paris France",Bar E7,3.0,12392.0,[Bons cocktails et tapas-cuisine imparfaite:Un...,https://www.tripadvisor.fr/Restaurant_Review-g...,[],[],2.333227,48.868499,[bon cocktail tapa cuisine imparfait grand cho...,[4.0]
1,5.0,"54 rue Piat, 75020 Paris France",God Bless Broccoli,29.0,4346.0,[Déjeuner:Nous avons manger des très bonne piz...,https://www.tripadvisor.fr/Restaurant_Review-g...,[€€-€€€],"[Italienne, Pizza, Végétariens bienvenus]",2.383660,48.873115,[dejeuner manger bon pizza faire manger super ...,"[5.0, 4.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, 5.0, ..."
2,4.0,"Avenue des 4 Chemins, 92330 Sceaux, 92330 Pari...",Sushi Robinson,2.0,12314.0,[Meilleur sushi du coin:En préambule j'ai touj...,https://www.tripadvisor.fr/Restaurant_Review-g...,[],[],2.335660,48.852900,[meilleur sushi coin preambule commande emport...,"[5.0, 3.0]"
3,4.5,"4 rue Affre, 75018 Paris France",Chez Mai,3.0,10965.0,[Super spot pour des plats africains de qualit...,https://www.tripadvisor.fr/Restaurant_Review-g...,[],[],2.355878,48.885218,[super spot plat africain qualite accueil chal...,"[4.0, 5.0, 5.0]"
4,4.0,"40 rue Saint Honore, 75001 Paris France",Restaurant le Moliere,152.0,2142.0,[Une brasserie classique:Très bon accueil du p...,https://www.tripadvisor.fr/Restaurant_Review-g...,[€€-€€€],"[Française, Européenne]",2.344651,48.861170,[brasserie classique bon accueil patron cuisin...,"[4.0, 3.0, 5.0, 4.0, 4.0, 4.0, 3.0, 5.0, 1.0, ..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,4.5,"17 rue Claude Tillier, 75012 Paris France",Trois Crabes,298.0,280.0,"[une merveille:service courtois, restaurant bi...",https://www.tripadvisor.fr/Restaurant_Review-g...,[€€-€€€],"[Asiatique, Vietnamienne, Végétariens bienvenus]",2.388023,48.848217,[merveill service courtois restaurer bien deco...,"[5.0, 5.0, 5.0, 5.0, 5.0, 3.0, 5.0, 4.0, 5.0, ..."
996,4.0,"32 Avenue Des Gobelins, 75013 Paris France",Dame Augustine,47.0,2903.0,[Jamais déçue !:Difficile de choisir tant la c...,https://www.tripadvisor.fr/Restaurant_Review-g...,[€€-€€€],[Française],2.352284,48.835753,[jamais decue difficile choisir carte allechan...,"[5.0, 5.0, 5.0, 4.0, 5.0, 5.0, 1.0, 5.0, 5.0, ..."
997,3.0,1 rue Basse Centre Commercial Forum Des Halles...,Brioche Dorée Paris 1er Forum des Halles,5.0,5996.0,[Déçues de nos achats au Brioche Dorée du Foru...,https://www.tripadvisor.fr/Restaurant_Review-g...,[],"[Française, Restauration rapide]",2.335660,48.852900,[decue achat brioche doree forum halle 02/12/2...,"[3.0, 1.0, 4.0, 5.0, 3.0]"
998,4.0,"20 Rue D'Artois, 75008 Paris France",Apicius,278.0,2776.0,[bon mais carte limité:Le restaurant Apicius é...,https://www.tripadvisor.fr/Restaurant_Review-g...,[€€€€],"[Française, Européenne, Végétariens bienvenus]",2.307241,48.873339,[bon carte limite restaurant apiciu liste alle...,"[4.0, 5.0, 5.0, 3.0, 5.0, 1.0, 5.0, 3.0, 1.0, ..."


In [50]:
clean_data.to_json("/Users/luciegabagnou/Documents/MOSEF/PYTHON/projet_trip_advisor/sentiment_analysis_tripadvisor/data/clean_data.json")

### Pipeline


In [None]:
from scripts.preprocessor.global_processor import ProcessingPipeline
ProcessingPipeline("data/fetch_data.json").run_pipeline()

In [14]:
ProcessingPipeline("data/fetch_data.json").run_pipeline()

[nltk_data] Downloading package stopwords to                       (0 + 8) / 20]
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/luciegabagnou/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/luciegabagnou/nltk_d

KeyboardInterrupt: 