In [0]:
!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
import pandas as pd
dataset = pd.read_csv('/databricks/driver/wikipedia.csv')
spark_df = spark.createDataFrame(dataset)
spark_df = spark_df.drop("Unnamed: 0")
#spark_df.write.saveAsTable("wikipedia")

--2024-05-03 08:34:14--  https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
Resolving proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)... 3.5.224.150, 52.95.154.94
Connecting to proai-datasets.s3.eu-west-3.amazonaws.com (proai-datasets.s3.eu-west-3.amazonaws.com)|3.5.224.150|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1003477941 (957M) [text/csv]
Saving to: ‘wikipedia.csv.1’


2024-05-03 08:35:11 (16.9 MB/s) - ‘wikipedia.csv.1’ saved [1003477941/1003477941]



In [0]:
from pyspark.sql import functions as F
spark_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

+-----+-------+---------+---------+
|title|summary|documents|categoria|
+-----+-------+---------+---------+
|    0|    928|      928|        0|
+-----+-------+---------+---------+



Si può notare che ci sono 928 articoli che hanno le colonne "summary" e "documents" vuote, si possono pertanto rimuovere dal dataset.

In [0]:
df = spark_df.na.drop(subset=["documents"])

In [0]:
count = df.groupBy("categoria").agg(F.count("*").alias("conteggio per categoria"))
count.show()

+-----------+-----------------------+
|  categoria|conteggio per categoria|
+-----------+-----------------------+
|  economics|                  10110|
|   politics|                  11358|
|    culture|                  10155|
|    science|                  10166|
|     sports|                  10066|
|     energy|                  10033|
|    finance|                   9863|
| humanities|                  10116|
|       pets|                  10016|
|      trade|                  10064|
| technology|                  10082|
|  transport|                  10111|
|   medicine|                  10015|
|engineering|                  10219|
|   research|                   9930|
+-----------+-----------------------+



Parte documents) 

In questa prima parte calcoliamo il numero medio di parole utilizzate in un articolo per categoria. Procediamo innanzitutto con la rimozione della punteggiatura e con la tokenizzazione; poi contiamo il numero di parole presenti in ogni articolo tramite la funzione size.
A questo punto creiamo due tabelle per la somma e il numero di articoli per categoria; uniamo le due tabelle con una inner join e troviamo la media per categoria come rapporto tra somma e conteggio.

In [0]:
from pyspark.ml.feature import Tokenizer

punteggiatura_regex = "[^\w\s]"

df = df.withColumn("documents", F.regexp_replace(F.col("documents"), punteggiatura_regex, ""))


tokenizer = Tokenizer(inputCol="documents", outputCol="words")


tokenized_df = tokenizer.transform(df)


tokenized_df = tokenized_df.withColumn("words", F.array_remove(F.col("words"), ""))


word_count_df = tokenized_df.withColumn("word_count", F.size("words"))


word_count_df = word_count_df.na.drop(subset=["documents"])

In [0]:
sum_word_per_category = word_count_df.groupBy("categoria").agg(F.sum("word_count").alias("sum_words_per_category"))

count_article_per_category = spark_df.groupBy("categoria").agg(F.count("*").alias("count_article_per_category"))

res = sum_word_per_category.join(count_article_per_category,  sum_word_per_category["categoria"] == count_article_per_category["categoria"], "inner" )

res = res.withColumn( "media parole articolo per categoria" , F.col("sum_words_per_category")/ F.col("count_article_per_category") )

res = res.select(sum_word_per_category["categoria"] , "media parole articolo per categoria" )

res.show()

+-----------+-----------------------------------+
|  categoria|media parole articolo per categoria|
+-----------+-----------------------------------+
|  economics|                  979.1021760633037|
|   politics|                   1512.54146856841|
|    culture|                  634.5190898573081|
|    science|                 1854.7100429855411|
|     sports|                  598.8125744934446|
|     energy|                  583.7126219390802|
|    finance|                  1847.064487545535|
| humanities|                 1042.6440015631106|
|       pets|                 420.22857427915795|
|      trade|                  638.5128128724672|
| technology|                  884.4902426944032|
|  transport|                  625.4437314906219|
|   medicine|                   783.616514489877|
|engineering|                  721.5844422700587|
|   research|                  691.1322108199661|
+-----------+-----------------------------------+



Parte summary)

Ripetiamo gli stessi ragionamenti sulla colonna summary per ottenere la media di parole di un riassunto per ogni categoria.

In [0]:
punteggiatura_regex = "[^\w\s]"

df_summary = df.withColumn("summary", F.regexp_replace(F.col("summary"), punteggiatura_regex, ""))


tokenizer_summary = Tokenizer(inputCol="summary", outputCol="words")


tokenized_df_summary = tokenizer_summary.transform(df_summary)


tokenized_df_summary = tokenized_df_summary.withColumn("words", F.array_remove(F.col("words"), ""))


word_count_df_summary = tokenized_df_summary.withColumn("word_count", F.size("words"))


word_count_df_summary = word_count_df_summary.na.drop(subset=["summary"])

In [0]:
sum_word_per_category_summary = word_count_df_summary.groupBy("categoria").agg(F.sum("word_count").alias("sum_words_per_category"))

res_summary = sum_word_per_category_summary.join(count_article_per_category,  sum_word_per_category_summary["categoria"] == count_article_per_category["categoria"], "inner" )

res_summary = res_summary.withColumn( "media parole riassunto per categoria" , F.col("sum_words_per_category")/ F.col("count_article_per_category") )

res_summary = res_summary.select(sum_word_per_category["categoria"] , "media parole riassunto per categoria" )

res_summary.show()

+-----------+------------------------------------+
|  categoria|media parole riassunto per categoria|
+-----------+------------------------------------+
|  economics|                  111.53135509396637|
|   politics|                    177.117362211657|
|    culture|                   99.99045507134593|
|    science|                  104.67497069167644|
|     sports|                   80.25784664282877|
|     energy|                   96.41319928329683|
|    finance|                  110.85290932361917|
| humanities|                   85.59632669011333|
|       pets|                   72.15594133492966|
|      trade|                  107.62604290822408|
| technology|                  103.19039128281328|
|  transport|                   85.21510365251727|
|   medicine|                   98.56331877729258|
|engineering|                   88.44422700587084|
|   research|                   90.37561024210422|
+-----------+------------------------------------+



Sfruttando la colonna di conteggio presente nel dataset "word_count_df", si possono trovare massimo e minimo numero di parole di un articolo per categoria con una groupby.

In [0]:
result_max = word_count_df.groupBy("categoria").agg(F.max("word_count").alias("max_words_per_category"))
result_max.show()

+-----------+----------------------+
|  categoria|max_words_per_category|
+-----------+----------------------+
|  economics|                 23833|
|   politics|                 20112|
|    culture|                 15446|
|    science|                 29369|
|     sports|                 19215|
|     energy|                 23201|
|    finance|                 33442|
| humanities|                 23186|
|       pets|                 13217|
|      trade|                 19264|
| technology|                 18123|
|  transport|                 22117|
|   medicine|                 18412|
|engineering|                 11837|
|   research|                 27211|
+-----------+----------------------+



In [0]:
result_min = word_count_df.groupBy("categoria").agg(F.min("word_count").alias("min_words_per_category"))
result_min.show()

+-----------+----------------------+
|  categoria|min_words_per_category|
+-----------+----------------------+
|  economics|                     8|
|   politics|                     9|
|    culture|                     9|
|    science|                    13|
|     sports|                    13|
|     energy|                     9|
|    finance|                     1|
| humanities|                     7|
|       pets|                    10|
|      trade|                    14|
| technology|                     2|
|  transport|                     6|
|   medicine|                    10|
|engineering|                     8|
|   research|                    15|
+-----------+----------------------+



Partendo dalla tabella "tokenized_df" ottenuta al punto 2, si rimuovono innanzitutto le stopwords, che saranno poco indicative di una categoria. Poi viene creato un modello countvectorizer e applicato al dataframe "filtered_df" per tenere traccia del conteggio delle parole presenti nella colonna "filtered_words".

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(tokenized_df)


cv = CountVectorizer(inputCol="filtered_words", outputCol="features")
cv_model = cv.fit(filtered_df)
count_vectorized_df = cv_model.transform(filtered_df)

A questo punto per trovare le parole più rappresentative per categoria, a partire dal vocabolario fornito dal count vectorizer possiamo trovare le 10 parole più frequenti (per questo viene implementata la funzione getTopWordContainer). Successivamente, si calcola la media delle features per categoria nel dataframe "count_vectorized_df" e quindi si usa la funzione definita prima per trovare le parole principali.

In [0]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
from pyspark.ml.stat import Summarizer

def getTopWordContainer(v):
    def getTopWord(vector):
        vectorConverted = vector.toArray().tolist()
        listSortedDesc= [i[0] for i in sorted(enumerate(vectorConverted), key=lambda x:x[1])][-10:][::-1]
        return [v[j] for j in listSortedDesc]
    return getTopWord

getTopWordInit = getTopWordContainer(cv_model.vocabulary)
getTopWord_udf = udf(getTopWordInit, ArrayType(StringType()))

top = count_vectorized_df.groupBy("categoria").agg(Summarizer.mean(F.col("features")).alias("means")) \
    .withColumn("topWord", getTopWord_udf(F.col('means'))) \
    .select("categoria", "topWord")

In [0]:
display(top)

categoria,topWord
finance,"List(series, also, company, masters, new, one, universe, first, heman, voiced)"
medicine,"List(hospital, health, medical, new, university, also, medicine, research, first, school)"
research,"List(research, university, medical, health, also, medicine, institute, national, new, professor)"
technology,"List(game, also, software, linux, new, released, version, system, first, one)"
energy,"List(power, station, plant, energy, company, electricity, mw, gas, also, electric)"
transport,"List(station, bridge, line, new, airport, also, railway, two, opened, terminal)"
politics,"List(one, party, nation, hanson, election, latham, australian, australia, new, queensland)"
culture,"List(film, meitei, language, also, manipur, khamba, manipuri, dance, one, indian)"
science,"List(aircraft, air, force, lockheed, first, flight, also, two, us, one)"
humanities,"List(university, also, later, first, new, film, one, archaeology, history, work)"


In questa parte ci occupiamo di addestrare e testare un modello per la classificazione del testo a partire dalla colonna "documents". Innazitutto creiamo una colonna "label" numerica a partire dalla colonna "categoria" testuale e dividiamo il dataset in 80% di train e 20% di test.

In [0]:
from pyspark.sql.functions import rand
from pyspark.ml.feature import StringIndexer


indexer = StringIndexer(inputCol="categoria", outputCol="label")

df_2 = indexer.fit(df).transform(df)
train_data, test_data = df_2.randomSplit([0.8, 0.2], seed=1234)

A questo punto, creiamo una pipeline divisa nelle seguenti fasi:
1) tokenizzazione del testo; 
2) rimozione delle stopwors; 
3) conteggio delle parole presenti a partire da un vocabolario di 1000 parole; 
4) addestramento di una rete neurale fully connected con 1000 neuroni di input (tanti quanti la dimensione del vocabolario), due hidden layer di ampiezza 64 e 32 e un output layer di 15 neuroni visto che abbiamo 15 classi.

In [0]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml import Pipeline


tokenizer = Tokenizer(inputCol="documents", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

cv = CountVectorizer(inputCol="filtered_words", outputCol="features", vocabSize = 1000)

nn =  MultilayerPerceptronClassifier(layers=[1000, 64, 32, 15], maxIter=100, solver='l-bfgs', tol=0.005, seed=1234)

pipeline = Pipeline(stages=[
    tokenizer, 
    remover, 
    cv,  
    nn
])


model = pipeline.fit(train_data)

Valutiamo quindi il modello sull'insieme di test, essendo bilanciato il dataset (come visto al punto 1) possiamo usare la accuracy come metrica.

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = ", accuracy)

Test Accuracy =  0.9087973640856672


Proviamo a vedere se cambia qualcosa addestrando il modello sulla colonna "summary" al posto della colonna "documents".

In [0]:
tokenizer = Tokenizer(inputCol="summary", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

cv = CountVectorizer(inputCol="filtered_words", outputCol="features", vocabSize = 1000)

nn =  MultilayerPerceptronClassifier(layers=[1000, 64, 32, 15], maxIter=100, solver='l-bfgs', tol=0.005, seed=1234)

pipeline_summary = Pipeline(stages=[
    tokenizer, 
    remover, 
    cv,  
    nn
])


model_summary = pipeline_summary.fit(train_data)

In [0]:
predictions_summary = model_summary.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_summary)
print("Test Accuracy = ", accuracy)

Test Accuracy =  0.8658978583196046


Si può osservare che lo stesso modello addestrato sulla colonna "documents" ha una accuracy maggiore rispetto a quella ottenuta nella cella precedente.