In [1]:
!pip install pyspark
!pip install spark-nlp
!pip install nltk



In [2]:
import sparknlp
from pyspark.sql import SparkSession

# Check if a SparkSession is already active
if 'spark' not in globals() or not isinstance(spark, SparkSession):
    # start the spark nlp session
    spark = sparknlp.start()
else:
    print("SparkSession is already active.")

In [3]:
import json
import random
import pandas as pd
# reading data

with open("/home/ouyassine/Documents/projects/data_engineering_1/data/data.json") as f:
     data = json.load(f)

random.shuffle(data)

with open("/home/ouyassine/Documents/projects/data_engineering_1/data/data.json", 'w') as f:
     json.dump(data, f, indent=2)

data = pd.read_json("/home/ouyassine/Documents/projects/data_engineering_1/data/data.json")

df = spark.createDataFrame(data)
df.show(truncate=False)

+------------------------------------------------------------------------------------+---------+
|title                                                                               |topic    |
+------------------------------------------------------------------------------------+---------+
|توقيف مبحوث عنها وطنيا من أجل النصب والاحتيال في إطار التسوق الهرمي                 |divers   |
|تنسيق أمني يطيح بمغتصب قاصر احتجزها بالعنف بالدار البيضاء                           |politique|
|حزب النهج بالرباط يدعو للتصدي للسياسات الطبقية المعادية للعمال وعموم الفئات الشعبية |politique|
|تنسيقي استخبارات أمني يطيح بتاجر للأقراص المهلوسة بالقنيطرة                         |divers   |
|الحداوي يهدي المغرب الذهبية الثانية في دورة الألعاب البارالمبية بباريس              |sport    |
|تنسيق أمني يطيح بمغتصب قاصر احتجزها بالعنف بالدار البيضاء                           |divers   |
|الاحتلال الإسرائيلي يعلن توقف عملياته في قطاع غزة بشكل عام                          |politique|
|القبض على شخصين من أجل الاتجا

In [28]:
!pip install nltk



In [30]:
from nltk.corpus import stopwords
stopwords_list = stopwords.words('arabic')

In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer


# Tokenization
tokenizer = Tokenizer(inputCol="title", outputCol="tokens")

# Remove Arabic stopwords
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stopwords_list)

# HashingTF to convert tokens to numerical features
hashing_tf = HashingTF(inputCol="filtered_tokens", outputCol="raw_features")

# Apply TF-IDF
idf = IDF(inputCol="raw_features", outputCol="features")

# Convert the label (topic) into numerical index
label_indexer = StringIndexer(inputCol="topic", outputCol="label")

# 4. Logistic Regression Model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 5. Build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf, label_indexer, lr])

In [32]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [33]:
model = pipeline.fit(train_data)

In [38]:
predictions = model.transform(test_data)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy}")

Test Accuracy: 0.8798882681564246


In [39]:
model.write().overwrite().save("path_to_save_model")


In [37]:
predictions.select("title", "topic", "prediction").show(truncate=False)

+----------------------------------------------------------------------------------------------------+---------+----------+
|title                                                                                               |topic    |prediction|
+----------------------------------------------------------------------------------------------------+---------+----------+
|120 مليون أورو من البنك الإفريقي للتنمية لمشروع لبناء منطقة صناعية بميناء الناظور الجديد            |economie |0.0       |
|50 ألفا يؤدون صلاة الجمعة فى المسجد الأقصى وسط الإجراءات المشددة للاحتلال                           |politique|1.0       |
|9474 طنا هي كمية مفرغات الصيد الساحلي والتقليدي بالموانئ المغربية المتوسطية حتى متم يوليوز المنصرم  |economie |0.0       |
|9474 طنا هي كمية مفرغات الصيد الساحلي والتقليدي بالموانئ المغربية المتوسطية حتى متم يوليوز المنصرم  |economie |0.0       |
|أحرقها صديقها..وفاة العداءة الأولمبية الأوغندية “ريبيكا تشيبتيغي”                                   |sport    |2.0       |
|أحرقها 