# Attività di EDA su Wikipedia

## Descrizione del dataset

Il dataset offerto è composto da 4 colonne:

- **title**: indica il titolo dell'articolo
- **summary**: contiene l'introduzione dell'articolo
- **documents**: contiene l'articolo completo
- **categoria**: contiene la categoria associata all'articolo

## Obiettivi

### 1. Attività EDA:
È necessario svolgere un'attività di EDA per analizzare e valutare statisticamente tutto il contenuto informativo offerto da Wikipedia. Il dataset fornito possiede le seguenti categorie:

- 'culture'
- 'economics'
- 'energy'
- 'engineering'
- 'finance'
- 'humanities'
- 'medicine'
- 'pets'
- 'politics'
- 'research'
- 'science'
- 'sports'
- 'technology'
- 'trade'
- 'transport'

Per ogni categoria, calcolare le seguenti informazioni:

1. Numero di articoli
2. Numero medio di parole utilizzate
3. Numero massimo di parole presenti nell'articolo più lungo
4. Numero minimo di parole presenti nell'articolo più corto
5. Per ogni categoria, individuare la nuvola di parole più rappresentativa


### 2. Sviluppo classificatore NLP articoli :

Dopo aver svolto l'analisi richiesta, addestrare e testare un classificatore testuale capace di classificare gli articoli (secondo le categorie presenti nel dataset) che saranno in futuro inseriti.


## 2. Implementazione Classificatore Testuale :


In questa fase del nostro progetto, ci concentreremo su due obiettivi principali sullo sviluppare e valutare un modello di classificazione per articoli futuri

Il nostro obiettivo è creare un modello capace di classificare automaticamente nuovi articoli nelle categorie appropriate. <br>
Per fare ciò andremo ad eseguire le seguenti fasi implementative :

1. Preparazione dei Dati:
   - Divideremo il nostro dataset in set di addestramento e di test
   - Utilizziamo i dati già vettorizzati in precedenza per avere una rappresentazione numerica adatta al machine learning

2. Addestramento del Modello:
   - Sceglieremo un Logistic Regression come algoritmo di classificazione appropriato 
   - Addestreremo il modello sui dati di training

3. Valutazione del Modello:
   - Testeremo il modello sul set di dati di test
   - Valuteremo le performance usando metriche come accuratezza, precisione, recall e F1-score

Queste 4 fasi le effettueremo sia sul df "summary" che su "documents" a fine di verificare quale dei due fornisce le performance migliori.

**Risultato Atteso**

Al termine di questo processo, avremo un modello di classificazione che sarà in grado di:
- Analizzare il contenuto di nuovi articoli
- Assegnare a questi articoli la categoria più probabile basandosi sulle caratteristiche apprese dal nostro corpus

Questo strumento sarà prezioso per automatizzare la classificazione di futuri articoli, rendendo più efficiente il processo di categorizzazione e organizzazione dei contenuti.

Nelle prossime celle di codice, implementeremo questi passaggi utilizzando le funzionalità di machine learning di PySpark.

Iniziamo il progetto andandoci a scaricare il dataset di lavoro :

In [0]:
!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv

--2024-08-13 10:08:41--  https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
Resolving proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)... 3.5.204.157, 52.95.155.102
Connecting to proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)|3.5.204.157|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1003477941 (957M) [text/csv]
Saving to: ‘wikipedia.csv.1’


2024-08-13 10:09:49 (14.3 MB/s) - ‘wikipedia.csv.1’ saved [1003477941/1003477941]



Ci predisponiamo il Dataframe spark di lavoro seguendo il seguente codice che va a leggere da un cluster AWS S3 il file csv e lo importa in un dataframe spark.

N.b:
Nel codice che segue, eseguiremo un campionamento stratificato del nostro dataset. Questo approccio è necessario per due ragioni principali poiché in fase di sviluppo stiamo lavorando in un ambiente Databricks Community, pertanto avremo accesso a risorse computazionali limitate.

**Processo di campionamento**

1. Estrarremo un campione bilanciato del dataset originale.
2. Utilizzeremo un metodo di campionamento stratificato basato sulla colonna "categoria".
3. Questo significa che la distribuzione delle categorie nel nostro campione sarà proporzionalmente la stessa del dataset originale.

Questo approccio ci permetterà di lavorare con un dataset più piccolo e gestibile, mantenendo allo stesso tempo la rappresentatività dei nostri dati originali.

In [0]:

import pandas as pd
from sklearn.model_selection import train_test_split

dataset = pd.read_csv('/databricks/driver/wikipedia.csv')
categoria_col = 'categoria'  

# Riduciamo la size per agevolare il running nella versione community di databricks effettuatndo un campionamento stratificato (in modo che si mantiene bilanciato)
dataset_sample, _ = train_test_split(dataset, test_size=0.0001, stratify=dataset[categoria_col], random_state=42)

spark_df_ = spark.createDataFrame(dataset_sample)
spark_df_ = spark_df_.drop("Unnamed: 0")
spark_df_.write.mode("overwrite").saveAsTable("wikipedia")




Mostriamo il contenuto del dataframe spark di lavoro:

In [0]:
spark_df_.show()

+--------------------+--------------------+--------------------+-----------+
|               title|             summary|           documents|  categoria|
+--------------------+--------------------+--------------------+-----------+
|   leonard b. strang|leonard birnie st...|leonard birnie st...|   research|
|     pascal vasselon|pascal vasselon (...|pascal vasselon (...|engineering|
|       nada stotland|nada logan stotla...|nada logan stotla...|   medicine|
|  rikkie-lee tyrrell|rikkie-lee tyrrel...|rikkie-lee tyrrel...|   politics|
|pauline hanson's ...|pauline hanson's ...|pauline hanson's ...|   politics|
|2019 chilean air ...|on 9 december 201...|on 9 december 201...|    science|
|               pirna|pirna (german: [ˈ...|pirna (german: [ˈ...|engineering|
|            sam cox |samuel victor cox...|samuel victor cox...|   politics|
|   antiochis of tlos|antiochis of tlos...|antiochis of tlos...|   medicine|
|castle terrace, c...|castle terrace, o...|castle terrace, o...|engineering|

### b. Modello per campo "Documents"

Procedo a crearmi il Classificatore ed addestrarlo per la colonna "documents"; anche in questo caso mostriamo le performance del modello a fine di confrontarlo con il precedente 

Come già fatto in precedenza per il campo "summary" andiamo ad effettuare anche per il campo "documents" l'attività di Data Cleaning con un apposita Pipeline.

In [0]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import concat_ws, col, lit, split
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Definiamo la pipeline di Spark NLP che pulisce il testo che agisce su "documents".

document_assembler = DocumentAssembler().setInputCol("documents").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setCleanupPatterns(["[^\w\s]"]) \
    .setLowercase(True) \
    .setOutputCol("normalized") \
    .setLowercase(True)

lemmatizer = LemmatizerModel.pretrained().setInputCols(["normalized"]).setOutputCol("lemma")
stopwords_cleaner = StopWordsCleaner.pretrained().setInputCols(["lemma"]).setOutputCol("cleaned")

# Crea il pipeline
nlp_pipeline = Pipeline(stages=[
    document_assembler, 
    tokenizer, 
    normalizer, 
    lemmatizer, 
    stopwords_cleaner
])

# Funzione per applicare la pipeline e pulire il testo
def clean_text_spark_nlp(df, input_col="summary", output_col="cleaned_text"):
    # Fit della pipeline 
    fitted_pipeline = nlp_pipeline.fit(df)
    
    # Applica il pipeline al DataFrame
    result = fitted_pipeline.transform(df)

    # Estraiamo il testo pulito in una nuova colonna
    result_with_clean_text = result.withColumn(output_col, concat_ws(" ", "cleaned.result"))

    # Restituiamo il dataframe con la selezione delle colonne originali e la nuova colonna di testo pulito
    return result_with_clean_text.select(df.columns + [output_col])

# Applichiamo la funzione al tuo DataFrame
df_cleaned_documents = clean_text_spark_nlp(spark_df_)

# Tokenizziamo la colonna "cleaned_text" 
df_cleaned_documents = df_cleaned_documents.withColumn("words", split(df_cleaned_documents.cleaned_text, "\\s+"))


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][ / ][ — ][OK!]
stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[ | ][OK!]


Quello che ora dobbiamo fare analogamente per quanto fatto su "summary" sarà andare a definire la vettorizzazione tramite HashingTF più efficace del CountVectorizer in quanto mappa direttamente le parole in indici del vettore delle feature usando una funzione di hash. Questo riduce il tempo e la memoria necessari per costruire un vocabolario esplicito, come invece avviene con l'ausilio del  CountVectorizer :

In [0]:
from pyspark.ml.feature import HashingTF

# Definiamo HashingTF sulla colonna "words"
hashingTF = HashingTF(inputCol="words", outputCol="word_vector")  # Impostiamo il numero di feature

# Applico la trasformazione
df_cleaned_documents_cont_vect = hashingTF.transform(df_cleaned_documents)


Procediamo adesso con l'implementazione ed addestramento del modello stavolta per il campo "documents".

Andiamo innanzitutto a codificarci le "categorie" per impostare il dataset all'addestramento del modello e poi lo dividiamo in train e test:

In [0]:
from pyspark.sql.functions import col, create_map, lit
from pyspark.sql.types import IntegerType
from itertools import chain

def custom_category_indexer(df, input_col, output_col):
    # Ottieni tutte le categorie uniche
    categories = df.select(input_col).distinct().rdd.flatMap(lambda x: x).collect()
    
    # Crea un dizionario di mapping categoria -> indice
    category_dict = {cat: idx for idx, cat in enumerate(categories)}
from pyspark.sql.functions import col, monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

def optimized_category_indexer(df, input_col, output_col):
    # Crea un DataFrame con categorie uniche e indici
    category_df = df.select(input_col).distinct()
    
    # Usa row_number per assegnare indici univoci
    window = Window.orderBy(monotonically_increasing_id())
    category_df = category_df.withColumn(output_col, row_number().over(window) - 1)
    
    # Esegui un join per assegnare gli indici al DataFrame originale
    df_indexed = df.join(category_df, on=input_col, how="left")
    
    return df_indexed.withColumn(output_col, col(output_col).cast(IntegerType()))

# Uso della funzione
df_cleaned_documents_cont_vect = optimized_category_indexer(df_cleaned_documents_cont_vect, "categoria", "categoriaIndex")

# Dividiamo il dataset in training e test
train, test = df_cleaned_documents_cont_vect.randomSplit([0.7, 0.3], seed=42)




In [0]:
df_cleaned_documents_cont_vect.show(10)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:729)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:447)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:447)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

Avendo diviso in train e test il dataset e codificato la variabile target procediamo adesso all'implementazione e all'addestramento del modello Logistic Regression :

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time


# Modello Naive Bayes
lr = LogisticRegression(featuresCol="word_vector", labelCol="categoriaIndex", maxIter=3)
model_documents = lr.fit(train)


# Predizioni
predictions_documents = model_documents.transform(train)


# Valutiamo il modello con la colonna corretta per la classificazione
evaluator = MulticlassClassificationEvaluator(labelCol="categoriaIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_documents)
print(f"Accuracy: {accuracy}")

# Mostriamo alcune predizioni, usando le colonne corrette
predictions_documents.select("categoria", "categoriaIndex", "prediction", "probability").show(10)