In [106]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SentimentAnalysis") \
    .config("spark.jars", "/home/a2s/Documents/Documents/Corte/auto_apprendre/postgresql-42.7.4.jar") \
    .getOrCreate()

# Charger le CSV IMDb
df = spark.read.option("header", "true").csv("Tweets.csv")
df.show(5)


+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|      name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|570306133677760513|          neutral|                         1.0|          NULL|                     NULL|Virgin America|                  NULL|   caird

In [107]:
# Ajouter colonnes calculées
# Crée une colonne "hashtags" avec le premier hashtag trouvé
from pyspark.sql.functions import regexp_extract, col,length,udf
from pyspark.sql.types import StringType
import re
def regexp_extract_all(text):
     if text:
        return ",".join(re.findall(r"#\w+", text))
     else:
        return ""

extract_hashtags_udf = udf(regexp_extract_all, StringType())


df = df.withColumn("text_length", length(col("text"))) \
        .withColumn("hashtags", extract_hashtags_udf(col("text")))
df.show(5)




+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+-----------+--------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|      name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|text_length|hashtags|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+-----------+--------+
|570306133677760513|          neutral|                         1.0|          NULL|         

                                                                                

In [108]:
df.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)
 |-- text_length: integer (nullable = true)
 |-- hashtags: string (nullable = true)



In [109]:
#Convertir le texte en minuscules, supprimer ponctuation, etc.
from pyspark.sql.functions import regexp_replace,lower

df=df.withColumn("text_clean",regexp_replace(lower(col("text")),"[^a-zA-Z0-9\s#@]",""))


In [110]:
df.show(5)


+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+-----------+--------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|      name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|text_length|hashtags|          text_clean|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+-----------+--------+--------------------+
|570306133677760513|        

                                                                                

In [111]:
from pyspark.sql.functions import split, col
df = df.withColumn("words", split(col("text_clean"), " "))


In [112]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(
    inputCol="airline_sentiment",
    outputCol="label",
    handleInvalid="keep"   # ou "keep"
)

df = indexer.fit(df).transform(df)

In [113]:
#df = df.drop("words")

In [114]:
df.select("words").show(5, truncate=False)


+------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                     |
+------------------------------------------------------------------------------------------------------------------------------------------+
|[@virginamerica, what, @dhepburn, said]                                                                                                   |
|[@virginamerica, plus, youve, added, commercials, to, the, experience, tacky]                                                             |
|[@virginamerica, i, didnt, today, must, mean, i, need, to, take, another, trip]                                                           |
|[@virginamerica, its, really, aggressive, to, blast, obnoxious, entertainment, in, your, guests, faces, amp, they, have, little, recourse]|
|[@virginamer

In [115]:
# Voir les lignes avec text NULL
df.filter(df["text"].isNull()).show(10, truncate=False)

# Compter le nombre de lignes nulles
print("Nombre de lignes nulles :", df.filter(df["text"].isNull()).count())


                                                                                

+----------------------------------------------------------------------+----------------------+----------------------------+--------------------------+--------------------------+--------------------------+----------------------+----+-------------------+-------------+----+-----------+-------------+--------------+-------------+-----------+--------+----------+-----+-----+
|tweet_id                                                              |airline_sentiment     |airline_sentiment_confidence|negativereason            |negativereason_confidence |airline                   |airline_sentiment_gold|name|negativereason_gold|retweet_count|text|tweet_coord|tweet_created|tweet_location|user_timezone|text_length|hashtags|text_clean|words|label|
+----------------------------------------------------------------------+----------------------+----------------------------+--------------------------+--------------------------+--------------------------+----------------------+----+-------------------+---

In [116]:
df = df.filter(df["text"].isNotNull())


In [117]:
print("Nombre de lignes nulles :", df.filter(df["text"].isNull()).count())


Nombre de lignes nulles : 0


In [118]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text_clean", outputCol="words_tokenized")
df = tokenizer.transform(df)
df.show(5, truncate=False)


+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+----------+-------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+--------------+--------------------------+-----------+--------+------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+-----+------------------------------------------------------------------------------------------------------------------------------------------+
|tweet_id          |airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|airline       |airline_sentiment_gold|name      |negativereason

                                                                                

In [119]:
df.filter(df["words"].isNull()).show(10, truncate=False)


[Stage 105:>                                                        (0 + 1) / 1]

+--------+-----------------+----------------------------+--------------+-------------------------+-------+----------------------+----+-------------------+-------------+----+-----------+-------------+--------------+-------------+-----------+--------+----------+-----+-----+---------------+
|tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|airline|airline_sentiment_gold|name|negativereason_gold|retweet_count|text|tweet_coord|tweet_created|tweet_location|user_timezone|text_length|hashtags|text_clean|words|label|words_tokenized|
+--------+-----------------+----------------------------+--------------+-------------------------+-------+----------------------+----+-------------------+-------------+----+-----------+-------------+--------------+-------------+-----------+--------+----------+-----+-----+---------------+
+--------+-----------------+----------------------------+--------------+-------------------------+-------+----------------------+----

                                                                                

In [120]:
print(df.columns)


['tweet_id', 'airline_sentiment', 'airline_sentiment_confidence', 'negativereason', 'negativereason_confidence', 'airline', 'airline_sentiment_gold', 'name', 'negativereason_gold', 'retweet_count', 'text', 'tweet_coord', 'tweet_created', 'tweet_location', 'user_timezone', 'text_length', 'hashtags', 'text_clean', 'words', 'label', 'words_tokenized']


In [123]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="features")
cv_model = cv.fit(df)
df = cv_model.transform(df)
df.select("words", "features").show(5, truncate=False)


                                                                                

+------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                     |features                                                                                                                                        |
+------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+
|[@virginamerica, what, @dhepburn, said]                                                                                                   |(1

In [124]:
df.select("text_clean", "words", "features", "label").show(5, truncate=False)


+------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|text_clean                                                                                                              |words                                                                                                                                     |features                                                                                                                                        |label|
+------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------

In [125]:
#Pour évaluer correctement un modèle, on sépare les données :

#80 % pour l’entraînement (train_df)

#20 % pour le test (test_df)

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)


In [126]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)


25/09/30 16:57:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/09/30 16:57:23 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [127]:
#On utilise le modèle pour prédire sur le test_df et on vérifie la précision (accuracy) :
predictions = lr_model.transform(test_df)
predictions.select("text_clean", "words", "label", "prediction").show(10, truncate=False)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


25/09/30 17:31:59 WARN DAGScheduler: Broadcasting large task binary with size 5.1 MiB
                                                                                

+---------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|text_clean                                                                                                                       |words                                                                                                                                                  |label|prediction|
+---------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|@united yes we waited in line for almost an hour to do so some passengers just left not wanting 

25/09/30 17:32:04 WARN DAGScheduler: Broadcasting large task binary with size 5.1 MiB
[Stage 219:>                                                        (0 + 1) / 1]

Accuracy: 0.689727463312369


                                                                                

In [None]:
#Pour mieux comprendre les erreurs du modèle :
predictions.filter(predictions.label != predictions.prediction) \
           .select("text_clean", "label", "prediction") \
           .show(10, truncate=False)


25/09/30 17:40:58 WARN DAGScheduler: Broadcasting large task binary with size 5.1 MiB
[Stage 222:>                                                        (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|text_clean                                                                                                                                    |label|prediction|
+----------------------------------------------------------------------------------------------------------------------------------------------+-----+----------+
|@usairways @americanair how r u supposed to change flights when u cant get thru to reservations #onehouronhold                                |0.0  |1.0       |
|@southwestair hoping you answer the phone today                                                                                               |0.0  |1.0       |
|@united what if business as usual meant dropping the bully mentality and fostering inspiration for a greater business #employeerelations      |0.0  |1.0       |
|@southwestair trying to fly

                                                                                