# Analisi di Wikipedia

## Descrizione del Progetto

**Wikidata Insights**, un'azienda leader nella gestione di contenuti digitali, è stata incaricata da **Wikipedia** per ottimizzare l'analisi e la categorizzazione dei contenuti di Wikipedia.
Per supportare la loro continua espansione e migliorare l'organizzazione delle informazioni, Wikidata Insights ha deciso di condurre un progetto avanzato di **data analysis e machine learning**.
L'obiettivo principale è comprendere meglio il vasto patrimonio di contenuti informativi offerti da Wikipedia e sviluppare un sistema di **classificazione automatica** che consenta di categorizzare efficacemente i nuovi articoli futuri.

## Obiettivi

### 1. Analisi Descrittiva dei Contenuti

Il primo obiettivo del progetto è condurre un'**analisi esplorativa dei dati (EDA)** per capire le caratteristiche dei contenuti di Wikipedia suddivisi in diverse categorie tematiche, come ad esempio:

* Cultura
* Economia
* Medicina
* Tecnologia
* Politica
* Scienza
  e altre.

L'analisi esplorativa prevede:

* il **conteggio degli articoli** presenti per ogni categoria.
* il **numero medio di parole** per articolo.
* la lunghezza dell'articolo **più lungo** e di quello **più corto** per ciascuna categoria.
* la creazione di **nuvole di parole** rappresentative per ogni categoria, per identificare i termini più frequenti e rilevanti.

### 2. Sviluppo di un Classificatore Automatico

Il secondo obiettivo è creare un modello di **machine learning** capace di classificare automaticamente gli articoli in base alla loro categoria.

Il sistema di classificazione verrà addestrato utilizzando dati di testo presenti nelle seguenti colonne del dataset:

* **Sommario** (`summary`): Introduzione breve dell'articolo.
* **Testo Completo** (`documents`): Contenuto completo dell'articolo.

### 3. Identificazione di Nuovi Insights

L'analisi consentirà anche di ottenere preziosi insights sui contenuti di Wikipedia, come la densità di articoli per categoria o le tendenze linguistiche associate a determinati argomenti.
Queste informazioni possono aiutare Wikimedia a migliorare l'organizzazione delle pagine e a ottimizzare i propri sforzi editoriali.

## Workflow del Progetto

### Caricamento dei Dati

I dati è salvato su S3 e reperibile al seguente link:
`https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv`

Utilizzando un framework distribuito come **Databricks**, i dati vengono processati in modo efficiente, partendo da un **Pandas DataFrame** per essere successivamente convertiti in un **Spark DataFrame** e salvati come una tabella chiamata `Wikipedia`.

Per caricare il dataset e trasformarlo in una table basta eseguire su Notebook Databricks le seguenti righe di codice:

```python
!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
import pandas as pd

dataset = pd.read_csv("/databricks/driver/wikipedia.csv")
spark_df = spark.createDataFrame(dataset)
spark_df = spark_df.drop("Unnamed: 0")
spark_df.write.saveAsTable("wikipedia")
```

**N.B.** Durante il loading del dataset, ci appoggiamo ad un dataframe Pandas. Questa non è una procedura comune e del tutto corretta.
In questo caso ci permette di leggere correttamente (superando con poco sforzo il limite dei separatori) i dati con cui definire un DataFrame Spark e una Table `Wikipedia`.

## Risultati Attesi

### 1. Ottimizzazione dell'Organizzazione dei Contenuti

L'analisi esplorativa fornirà a Wikimedia una visione chiara e dettagliata della distribuzione e delle caratteristiche dei propri contenuti.
Sarà possibile identificare quali categorie necessitano di maggiore attenzione o dove sono presenti opportunità di espansione.

### 2. Classificazione Automatica

Il sistema di classificazione sviluppato permetterà a Wikimedia di automatizzare il processo di categorizzazione dei nuovi articoli, migliorando l'efficienza operativa e garantendo una migliore navigabilità per gli utenti.

### 3. Nuovi Insights Strategici

Grazie agli strumenti dell'analisi esplorativa e della classificazione permetteranno a Wikimedia di ottimizzare l'allocazione delle risorse editoriali, con la possibilità di orientare le proprie campagne informative in modo più mirato.

## Conclusioni

Il progetto offre a **Wikimedia** un potente strumento di **analisi dei dati** e **classificazione automatica** per migliorare la gestione dei propri contenuti.
Attraverso l'utilizzo di tecniche avanzate di **data science** e **machine learning**, Wikimedia sarà in grado di ottimizzare la propria infrastruttura informativa e offrire un servizio di qualità superiore agli utenti di tutto il mondo.


--- 



# Premessa

A causa del recente aggiornamento delle politiche gratuite di Databricks ho riscontrato alcune difficoltà nel completare il compito in cloud, perché:

* il download del dataset tramite `wget` era limitato a **500 MB**;
* nella versione gratuita di Databricks le librerie **spark.ml** risultano bloccate.

Per aggirare questi vincoli ho configurato e ottimizzato Apache Zeppelin in locale, svolgendo l’esercizio in quell’ambiente (pur con le relative limitazioni).

Di seguito trovate:

1. la **repository pubblica** con la mia configurazione di Zeppelin;
2. la **repository** contenente il compito in formato **`.zpln`**;
3. il **notebook Databricks** sul quale ho copiato e versionato gli stessi contenuti, quindi esportato in **`.ipynb`** come richiesto per l’import in questo notebook Colab.

Ecco i tre link alle repository:
- [Zeppelin Docker Local Tuned](https://github.com/fedevita/zeppelin-docker-local-tuned.git)  
- [Progetto 8 – Zeppelin (Org. personale dedicata al master)](https://github.com/profession-ai-data-engineering-master/profession_ai_data_engineering_progetto8_zeppelin.git)  
- [Progetto 8 – Databricks (Org. personale dedicata al master)](https://github.com/profession-ai-data-engineering-master/profession_ai_data_engineering_progetto8_databricks.git)



# SETUP

In [4]:
%pyspark
import os
import math
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from wordcloud import WordCloud
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import (
    avg, 
    max, 
    min, 
    length, 
    count, 
    desc, 
    regexp_replace, 
    col, 
    lower,
    explode,
    asc,
    desc,
    row_number,
    struct,
    map_from_entries,
    collect_list,
    concat_ws
)
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql import Window
from pyspark.ml.feature import (
    StringIndexer,
    Tokenizer,
    StopWordsRemover,
    CountVectorizer,
    HashingTF,
    IDF,
    StandardScaler,
    PCA
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [5]:
%pyspark
# da eliminare per databricks
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .getOrCreate()

conf = spark.sparkContext.getConf()

print("App Name:       ", spark.sparkContext.getConf().get("spark.app.name"))
print("Master:         ", spark.sparkContext.master)
print("Spark Version:  ", spark.version)
print("Deploy Mode:    ", spark.sparkContext.getConf().get("spark.submit.deployMode", "N/D"))
print("Application ID: ", spark.sparkContext.applicationId)
print("Spark Master:          ", conf.get("spark.master"))
print("Spark Driver Memory:   ", conf.get("spark.driver.memory"))
print("Spark Executor Memory: ", conf.get("spark.executor.memory"))
print("Spark Cores:           ", conf.get("spark.driver.cores", "default=1"))
print("Default Parallelism:   ", spark.sparkContext.defaultParallelism)


## **O – Obtain (Ottenere i dati)**
### Obiettivo:
Recuperare e caricare i dati in un ambiente adatto per l'analisi e la modellazione.
### Task:
#### 1. Scaricare il dataset da `http://proai-dataset.s3.eu-west-3.amazonaws.com/wikipedia.csv`

In [7]:
%sh
wget --progress=dot:mega -O /opt/zeppelin/data/wikipedia.csv https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv


In [8]:
%pyspark
wikipedia_raw = (
    spark.read.option("header", True)
              .option("multiLine", True)
              .option("quote", '"')
              .option("escape", '"')
              .csv("/opt/zeppelin/data/wikipedia.csv")
)


## **S – Scrub (Pulizia e preparazione dei dati)**
### Obiettivo:
Pulire i dati ed effettuare operazioni preliminari per renderli pronti per l’analisi e il modello.

In [10]:
%pyspark
# 1. Rimuovere righe duplicate o inconsistenti (es. articoli vuoti)
# 2. Gestire eventuali valori nulli (in title, summary, documents, categoria)
# 3. Rimuovere o normalizzare caratteri speciali e markup da documents

CLEAN_HTML = "<[^>]+>"
CLEAN_SPECIAL_CHARS = "[^a-zA-Z0-9\\s]"

wikipedia_clean = (
    wikipedia_raw
    .drop("_c0")
    .dropDuplicates()
    .dropna(subset=["title", "summary", "documents", "categoria"])
    .withColumn(
        "documents",
        lower(regexp_replace(
            regexp_replace(
                col("documents"),
                CLEAN_HTML,
                ""
            ),
            CLEAN_SPECIAL_CHARS,
            ""
        ))
    )
)
wikipedia_clean.persist(StorageLevel.DISK_ONLY)
wikipedia_clean.count()
wikipedia_clean.createOrReplaceTempView("wikipedia_clean")

In [11]:
%sql
select * from wikipedia_clean limit 3

## **E – Explore (Analisi esplorativa dei dati)**

### Obiettivo:

Capire la struttura, la distribuzione e le peculiarità dei dati per categoria.

### Task:

#### 1. Calcolare il **conteggio articoli per categoria**

In [14]:
%pyspark
_ = (wikipedia_clean
.groupBy("categoria")
.agg(
    count("*").alias('documents_cnt')
)
.sort(desc("documents_cnt"))
.createOrReplaceTempView("wikipedia_e1")
)

In [15]:
%sql
SELECT * FROM wikipedia_e1


#### 2. Calcolare la **lunghezza media, minima e massima** per articolo per categoria

In [17]:
%pyspark
_ = (wikipedia_clean
.select("categoria",length("documents").alias('documents_len'))
.groupBy("categoria")
.agg(
    max("documents_len").alias("max_documents_len"),
    min("documents_len").alias("min_documents_len"),
    avg("documents_len").alias("avg_documents_len")
)
.sort(desc("avg_documents_len"))
.createOrReplaceTempView("wikipedia_e2"))

In [18]:
%sql
SELECT * FROM wikipedia_e2

#### 3. Creare **nuvole di parole** per ogni categoria (basate su `documents`)

**Creo il dataframe da usare nei grafici wordclouds**

In [21]:
%pyspark
# **stabilisco le variabili per effettuare le trasformazioni necessarie al fine di creare grafici di tipo word cloud**
LANG   = "english"
TOP_K  = 50
MIN_DF = 2
BASE_SW     = StopWordsRemover.loadDefaultStopWords(LANG)
NUM_WORDS   = ["zero","one","two","three","four","five","six","seven","eight","nine",
               "ten","eleven","twelve","thirteen","fourteen","fifteen","sixteen",
               "seventeen","eighteen","nineteen","twenty"]
HIGH_DF_SW  = ["new","also","first","second","one","two","three","later"]
CUSTOM_SW   = ["’s","“","”","—","http","https",""]
stopwords   = BASE_SW + NUM_WORDS + HIGH_DF_SW + CUSTOM_SW

# **Divido in Tokens la colonna documents**
tokenizer = Tokenizer(inputCol="documents", outputCol="document_tokens")
df_tokens = tokenizer.transform(wikipedia_clean)

# **Rimuovo le Stop Words**
remover = StopWordsRemover(
    inputCol="document_tokens",
    outputCol="clean_tokens",
    stopWords=stopwords,
    caseSensitive=False
)

df_no_sw = remover.transform(df_tokens)
df_clean = df_no_sw.select("categoria", "clean_tokens").repartition("categoria")

# **Conteggio token per categoria**
df_counts = (
    df_clean
    .select("categoria", explode("clean_tokens").alias("token"))
    .groupBy("categoria", "token")
    .count()
    .where(col("count") >= MIN_DF)
)

# **Effettuo il ranking dei token tramite windows function**
# Effettuo il ranking e filtro per `TOP_K`, successivamente raggruppo per categoria e uso `map_from_entries` per creare la colonna `freqs` che sarà un dizionario contenente come chiave la parola e come valore il 
# conteggio di quante volte la parola compare in quella categoria.
w = Window.partitionBy("categoria").orderBy(desc("count"), asc("token"))

df_wordcloud = (
    df_counts
    .withColumn("rank", row_number().over(w))
    .where(col("rank") <= TOP_K)
    .groupBy("categoria")
    .agg(
        map_from_entries(
            collect_list(struct("token", "count"))
        ).alias("freqs")
    )
)

# **Mostro e salvo il risultato finale**
df_wordcloud.createOrReplaceTempView("df_wordcloud")

In [22]:
%sql
select * from df_wordcloud

**Creo i grafici wordcloud**

In [24]:
%pyspark
# **Converto Dataframe da Spark a Pandas**
# Per agevolarmi nella creazione dei grafici converto il Dataframe da Spark a Pandas.
df_wordcloud_pandas = df_wordcloud.sort("categoria").toPandas()

# **Calcolo le dimensioni ottimali della griglia in base al numero di categorie**
n_cat  = len(df_wordcloud_pandas)
n_cols = 5
n_rows = math.ceil(n_cat / n_cols)

# **Creazione grafici**
# Creo la figura e gli `n_rows * n_cols` subplot, impostando dinamicamente la grandezza della figura in pollici 
# (ho fatto vari tentativi).
fig, axes = plt.subplots(
    n_rows, n_cols,
    figsize=(n_cols * 8, n_rows * 6),
)

axes = axes.flatten()
for idx, (_, row) in enumerate(df_wordcloud_pandas.iterrows()):
    ax = axes[idx]
    freqs = {token: int(count) for token, count in row["freqs"].items()}
    wc = WordCloud(
        width=400, height=200,
        background_color="white",
        colormap="tab10",
        prefer_horizontal=1.0
    ).generate_from_frequencies(freqs)
    ax.imshow(wc, interpolation="bilinear")
    ax.set_title(row["categoria"], fontsize=20, pad=10)
    ax.axis("off")

plt.tight_layout()
plt.show()

## **M – Model (Modellazione e machine learning)**

### Obiettivo:

Addestrare un classificatore per prevedere la categoria degli articoli.

In [26]:
%pyspark
# Concatenare le due colonne testuali
wikipedia_m0 = wikipedia_clean.withColumn("content", concat_ws(" ", wikipedia_clean["summary"], wikipedia_clean["documents"])).drop("title","summary","documents").persist(StorageLevel.DISK_ONLY)
wikipedia_m0.count()

# Preprocessing e pipeline
label_indexer = StringIndexer(inputCol="categoria", outputCol="label")
tokenizer = Tokenizer(inputCol="content", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
vectorizer = CountVectorizer(inputCol="tokens", outputCol="counts",vocabSize = 10000)
scaler1 = StandardScaler(inputCol="counts", outputCol="scaled_counts")

lr = LogisticRegression(featuresCol="scaled_counts", labelCol="label")

pipeline = Pipeline(stages=[
label_indexer,
tokenizer,
remover,
vectorizer,
scaler1,
lr
])

# Addestramento
train_data, test_data = wikipedia_m0.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)

# Valutazione
predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")

precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_evaluator.evaluate(predictions)
print(f"Weighted Precision: {precision:.4f}")

recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)
print(f"Weighted Recall: {recall:.4f}")

f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions)
print(f"Weighted F1 Score: {f1:.4f}")

prediction_and_labels = predictions.select("prediction", "label").rdd.map(tuple)
metrics = MulticlassMetrics(prediction_and_labels)
label_indexer_model = label_indexer.fit(wikipedia_m0)
labels = label_indexer_model.labels
cm = metrics.confusionMatrix().toArray()

plt.figure(figsize=(12,10))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predetto')
plt.ylabel('Reale')
plt.title('Confusion Matrix')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()


In [27]:
%pyspark
# Recupero gli stadi della pipeline
vec   = model.stages[3] # CountVectorizerModel
lr_md = model.stages[-1] # LogisticRegressionModel

vocab  = vec.vocabulary # lista dei token
coefs  = lr_md.coefficientMatrix.toArray() # shape = (nClassi, vocabSize)

import numpy as np, pandas as pd

global_imp = np.abs(coefs).mean(axis=0)
top20_idx  = global_imp.argsort()[-20:][::-1]

top20_dfp = pd.DataFrame({
    "token": [vocab[i] for i in top20_idx],
    "peso" : global_imp[top20_idx]
})

top20_df = spark.createDataFrame(top20_dfp)

top20_df.createOrReplaceTempView("top20_df")

In [28]:
%sql
select * from top20_df


## Risultati del modello di classificazione

> *Disclaimer*: Il modello attuale è **molto semplice e limitato** perché sto lavorando in ambiente **locale** con **risorse ridotte**.  
> L'obiettivo è ottenere una baseline funzionante, non ottimizzare al massimo le prestazioni.

- **Accuracy**: 0.8491  
- **Weighted Precision**: 0.8505  
- **Weighted Recall**: 0.8491  
- **Weighted F1 Score**: 0.8497  

### Considerazioni
- Le metriche sono **coerenti** e indicano un **modello bilanciato**.  
- La **confusion matrix** conferma che la maggior parte delle classi è ben gestita, con solo lievi confusioni tra classi affini.



## **N – iNterpret (Interpretazione dei risultati)**

### Obiettivo:

Fornire una lettura chiara e strategica dei risultati ottenuti.

### Task:

### 1. Sintesi dei risultati dell’EDA

* **Distribuzione delle categorie**

  * Gli articoli non sono equamente distribuiti.
  * `medicine` domina con **8.311 articoli**.
  * `politics`, `culture` e `sports` contano meno della metà di `medicine`.
  * *Rischio*: i modelli potrebbero favorire le classi più frequenti.

* **Lunghezza dei testi per categoria**

  * `politics` contiene in media i testi più lunghi, mentre `pets` ha lunghezze medie basse.
  * Tutte le categorie presentano lunghezza minima e massima molto distanti, il che è un forte segnale di varianza interna.
  * *Effetto sui modelli*: sequenze molto eterogenee rendono l'apprendimento instabile e penalizzano le categorie con meno contenuto.

* **Word‑Cloud per categoria**

  * Buona coerenza lessicale in `medicine`, `research`, `energy`, `transport`, `politics`.
  * Rumore/ambiguità:

    * `trade` contiene termini geografici‑storici.
    * `culture` mostra parole legate a rituali funebri (*cemetery*).
    * `engineering` è contaminata da contenuti geografico‑culturali.
  * Alcune categorie ignorano concetti chiave: in `technology` prevalgono *games* a scapito di *AI*, *IoT*, *cloud*.

### 2. Analisi dei 20 token più importanti in relazione all'EDA
 
* **Bias sanitario**: 4 token su 20 (hospital, medical, medicine, research) appartengono a `medicine`, accentuando lo sbilanciamento già evidenziato nella distribuzione delle categorie.

* **Contaminazione geografica**: *dresden* e *saxony* occupano posizioni alte; documenti storici‑locali penetrano categorie economiche (`trade`) o ingegneristiche.

* **Ambiguità lessicale**: token come *polo*, *bridge* e *station* mostrano la sovrapposizione di significati (sport vs. moda, infrastruttura vs. monumento).

### 3. Raccomandazioni per l'organizzazione dei contenuti su Wikipedia

* **Disambiguazione geografica**: Introdurre un tag dove collocare voci come `dresden`, `saxony`, `bridge` (monumenti storici) e `cemetery`. 

* **Uniformità nella lunghezza dei contenuti**: Introdurre linee guida editoriali per garantire una lunghezza coerente tra articoli della stessa categoria.

* **Revisione semantica delle categorie**: Eseguire un audit semantico per ricollocare articoli fuori tema: ad esempio, spostare contenuti su games fuori da technology se non trattano 
innovazione tecnologica, o separare rituali funebri dalla categoria culture. Questo ridurrebbe l'ambiguità lessicale e aumenterebbe la coerenza semantica delle categorie.