# AUTOMATIC CLASSIFICATION SYSTEM BASED ON BIG-DATA DATASET FOR EFFECTIVE CATEGORIZATION OF FUTURE ARTICLES

### Real-case Task with real data
data source: https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv

Descriptive Content Analysis: 
The first objective of the project is to conduct an exploratory data analysis (EDA) to understand the characteristics of Wikipedia content divided into different thematic categories, such as Culture, Economy, Medicine, Technology, Politics, Science, and others.
The exploratory analysis includes: - Counting the number of articles present for each category. - The average number of words per article. - The length of the longest and shortest article for each category. - Creating word clouds for each category to identify the most frequent and relevant terms.

Development of an Automatic Classifier: 
The second objective is to create a machine learning model capable of automatically classifying articles based on their category. The classification system will be trained using text data present in the following columns of the dataset: - Summary: A brief introduction to the article. - Full Text: The complete content of the article.

Coded on Databricks

# INDEX

### 1. LIBRARIES AND DATA COLLECTION
    1.1 - Library collection
    1.2 - Data ingestion 
### 2. EDA AND PREPROCESSING
    2.1 - Null Check
    2.2 - Null Removal
    2.3 - Article count by category
    2.4 - Articles mean number
    2.5 - Category max words
    2.6 - Category min words
    2.7 - Representative Wordcloud by Category
### 3. MODEL
    3.1 - Auto-Classifier
    3.2 - Pipeline components declaration Pipeline (Indexer, Tokenizer, Stopwords, Feature extraction, Logistic Regression)
    3.3 - Pipeline composition
    3.4 - Model Fit
    3.5 - Model evaluation
    3.7 - Metrics - Accuracy
    3.8 - Metrics - F1 score

# 1. LIBRARY

### 1.1 - LIBRARY COLLECTION

In [0]:
pip install wordcloud

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Import
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, size, avg, max, min, coalesce, lit, col, when, isnan, collect_list
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### 1.2 - DATA INGESTION
Note: Normally, one would use a direct loading method with a command like
spark_df = spark.read.csv("s3://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv", header=True, inferSchema=True)
However, it is wiser to use pandas as an intermediary because it can handle data types and interpret separators more permissively compared to Spark, which would require more stringent management.a.

In [0]:
!wget https://proai-datasets.s3.eu-west-3.amazonaws.com/wikipedia.csv
import pandas as pd
dataset = pd.read_csv('/databricks/driver/wikipedia.csv')
spark_df = spark.createDataFrame(dataset)
spark_df = spark_df.drop("Unnamed: 0")
spark_df.write.saveAsTable("wiki")

# 2. EDA AND PREPROCESSING

In [0]:
dataset.head()

Unnamed: 0.1,Unnamed: 0,title,summary,documents,categoria
0,0,economics,economics () is a social science that studies ...,economics () is a social science that studies ...,economics
1,1,index of economics articles,this aims to be a complete article list of eco...,this aims to be a complete article list of eco...,economics
2,2,cryptoeconomics,cryptoeconomics is an evolving economic paradi...,cryptoeconomics is an evolving economic paradi...,economics
3,3,economic impact analysis,an economic impact analysis (eia) examines the...,an economic impact analysis (eia) examines the...,economics
4,4,economic trend,economic trend may refer to: all the economic ...,economic trend may refer to: all the economic ...,economics


In [0]:
spark_df.show()

+--------------------+--------------------+--------------------+---------+
|               title|             summary|           documents|categoria|
+--------------------+--------------------+--------------------+---------+
|           economics|economics () is a...|economics () is a...|economics|
|index of economic...|this aims to be a...|this aims to be a...|economics|
|     cryptoeconomics|cryptoeconomics i...|cryptoeconomics i...|economics|
|economic impact a...|an economic impac...|an economic impac...|economics|
|      economic trend|economic trend ma...|economic trend ma...|economics|
|    entrepreneurship|entrepreneurship ...|entrepreneurship ...|economics|
|    factor investing|factor investing ...|factor investing ...|economics|
|          filtering |in housing econom...|in housing econom...|economics|
|          free trade|free trade is a t...|free trade is a t...|economics|
|         liquidation|liquidation is th...|liquidation is th...|economics|
|       profit motive|in 

In [0]:
%sql
select * from wiki limit 10

### 2.1 NULL CHECK

In [0]:
documents_null_check = spark_df.filter(col("documents").isNull()).select(col("documents")).first() is not None
print(documents_null_check)

True


In [0]:
summary_null_check = spark_df.filter(col("summary").isNull()).select(col("summary")).first() is not None
print(summary_null_check)

True


### 2.2 - NULL REMOVAL

In [0]:
spark_df = spark_df.dropna(subset=['summary'])

In [0]:
spark_df = spark_df.dropna(subset=['documents'])

### Correction check

In [0]:
documents_null_check = spark_df.filter(col("documents").isNull()).select(col("documents")).first() is not None
print(documents_null_check)

False


### 2.3 - Article count by Category
Normally, using pandas, one could use the .unique() method, but this would not allow leveraging the capabilities offered by Spark since pandas would load everything onto the local machine. Therefore, it is necessary to use commands more similar to SQL, such as distinct(), count(), and collect(). In this specific case, a for loop, which would be slow, can be avoided by opting for a count() function preceded by groupBy.

In [0]:
tipi_categorie = spark_df.select('categoria').distinct().collect()

In [0]:
print([row['categoria'] for row in tipi_categorie])

['economics', 'politics', 'culture', 'science', 'sports', 'energy', 'finance', 'humanities', 'pets', 'trade', 'technology', 'transport', 'medicine', 'engineering', 'research']


In [0]:
numero_per_categoria = spark_df.groupBy('categoria').agg({'documents': 'count'})
numero_per_categoria.show()

+-----------+----------------+
|  categoria|count(documents)|
+-----------+----------------+
|  economics|           10110|
|   politics|           11358|
|    culture|           10155|
|    science|           10166|
|     sports|           10066|
|     energy|           10033|
|    finance|            9863|
| humanities|           10116|
|       pets|           10016|
|      trade|           10064|
| technology|           10082|
|  transport|           10111|
|   medicine|           10015|
|engineering|           10219|
|   research|            9930|
+-----------+----------------+



### 2.4 - Average Number of Words per Article
Since we will also need to calculate the max and min length for each category, it's reasonable to create an additional column with the lengths of individual records, obtained using split(), on which we can then calculate the average.

In [0]:
spark_df_with_word_count = spark_df.withColumn("word_count", size(split(spark_df["documents"], " ")))

In [0]:
average_word_count = spark_df.select(avg(size(split(spark_df["documents"], " "))).alias("average_word_count")).collect()[0][0]

print(f"Average word count: {average_word_count}")

Numero medio di parole: 937.0802933606471


### 2.5 - Category max words
### 2.6 - Category min words

In [0]:
word_count_stats_2 = spark_df.groupBy("categoria").agg(
    max(size(split(spark_df["documents"], " "))).alias("max_word_count"),
    min(size(split(spark_df["documents"], " "))).alias("min_word_count")
)

word_count_stats_2.show()

+-----------+--------------+--------------+
|  categoria|max_word_count|min_word_count|
+-----------+--------------+--------------+
|  economics|         24022|            10|
|   politics|         20140|            11|
|    culture|         15538|            11|
|    science|         29419|            15|
|     sports|         19227|            14|
|     energy|         23223|             9|
|    finance|         33479|             3|
| humanities|         23198|             7|
|       pets|         13222|            12|
|      trade|         19275|            16|
| technology|         18144|             2|
|  transport|         22150|            10|
|   medicine|         18422|            12|
|engineering|         11856|             8|
|   research|         27223|            17|
+-----------+--------------+--------------+



# 2.7 - Representative Wordcloud by Category

In [0]:
categories_df = spark_df.groupBy("categoria").agg(collect_list("documents").alias("texts"))

In [0]:
categories_data = categories_df.collect()

### Wordcloud with graph

In [0]:
for row in categories_data:
    categoria = row['categoria']
    texts = row['texts']
    
    all_texts = " ".join(texts)
    
    
    wordcloud = WordCloud(width=800, height=400, background_color="white").generate(all_texts)
        
    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title(f"Nuvola di Parole per la Categoria: {categoria}")
    plt.show()

### Complete Wordcloud with limit threshold:10

In [0]:
for row in categories_data:
    categoria = row['categoria']
    texts = row['texts']
    
    all_texts = " ".join(texts)
    
    wordcloud = WordCloud(width=800, height=400, background_color="white").generate(all_texts)
    
    word_freq = wordcloud.words_

    sorted_words = sorted(word_freq.items(), key=lambda item: item[1], reverse=True)
    
    print(f"Word Frequency for Category: {categoria}")
    for word, freq in sorted_words:
        print(f"{word}: {freq}")
    print("\n")

Word Frequency for Category: finance
master: 1.0
man: 0.6702896422066859
company: 0.666705833969802
voiced: 0.6246695258797955
one: 0.5853651371834793
time: 0.5386581281945831
universe classics: 0.5382468715116621
released: 0.5286704658950708
power: 0.478849656306915
1980s series: 0.4648669290875977
classics toyline: 0.3983314728864344
new york: 0.38246871511662067
part: 0.3721872980435932
use: 0.36299277363257154
character: 0.3456318665178309
million: 0.31960519358439576
member: 0.31951706715234124
appear: 0.318635802831796
action figure: 0.3167557722812996
united states: 0.30653310616297513
used: 0.30444744727101813
first: 0.3040949415428001
may: 0.30280242053933376
film: 0.2990423594383409
new: 0.29730920627460194
made: 0.29249162798895484
well: 0.2909641031666765
end: 0.2704306444979731
u s: 0.26843311203807063
ra: 0.25970859526467305
name: 0.2500734386933788
based: 0.24604899829622232
serie: 0.2455789906585982
work: 0.24496210563421655
year: 0.23535632454027378
snake men: 0.234122

In [0]:
for row in categories_data:
    categoria = row['categoria']
    texts = row['texts']
    
    all_texts = " ".join(texts)
    
    wordcloud = WordCloud(width=800, height=400, background_color="white", max_words=10).generate(all_texts)
    
    word_freq = wordcloud.words_

    sorted_words = sorted(word_freq.items(), key=lambda item: item[1], reverse=True)
    
    print(f"Word Frequency for Category: {categoria}")
    for word, freq in sorted_words:
        print(f"{word}: {freq}")
    print("\n")

Word Frequency for Category: finance
master: 1.0
man: 0.6702896422066859
company: 0.666705833969802
voiced: 0.6246695258797955
one: 0.5853651371834793
time: 0.5386581281945831
universe classics: 0.5382468715116621
released: 0.5286704658950708
power: 0.478849656306915
1980s series: 0.4648669290875977


Word Frequency for Category: medicine
hospital: 1.0
medicine: 0.540743957157331
new york: 0.5394413084382689
one: 0.528971872436918
first: 0.44709798813142276
work: 0.3968253968253968
patient: 0.38051816471269356
university: 0.3801804409707145
time: 0.3248902397838568
may: 0.29931972789115646


Word Frequency for Category: research
university: 1.0
medicine: 0.5713464413015983
work: 0.5582959641255605
research: 0.48712199609060597
professor: 0.4418765091410831
first: 0.42278946763251696
one: 0.4223870300103484
science: 0.4083592043233299
department: 0.3991031390134529
award: 0.3936414855697367


Word Frequency for Category: energy
power station: 1.0
power plant: 0.5935321522086578
project:

# 3. MODEL

### 3.1 - AUTOMATIC CLASSIFIER
Since lemmatization and stemming are not explicitly required by the assignment, the model is simply trained without these steps.

### 3.2 - Pipeline components declaration Pipeline (Indexer, Tokenizer, Stopwords, Feature extraction, Logistic Regression)

In [0]:
indexer = StringIndexer(inputCol="categoria", outputCol="categoriaIndex")

tokenizer = Tokenizer(inputCol="summary", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

lr = LogisticRegression(featuresCol="features", labelCol="categoriaIndex")

### 3.3 - Pipeline composition

In [0]:
pipeline = Pipeline(stages=[indexer, tokenizer, remover, hashingTF, idf, lr])

In [0]:
train_data, test_data = spark_df.randomSplit([0.8, 0.2], seed=0)

### 3.4 - Model Fit

In [0]:
model = pipeline.fit(train_data)

 ### 3.5 - Evaluate Model

In [0]:
predictions = model.transform(test_data)
predictions.select("summary", "categoria", "prediction").show()

+--------------------+---------+----------+
|             summary|categoria|prediction|
+--------------------+---------+----------+
|abraham "abba" pt...|economics|       2.0|
|abbott payson ush...|economics|       8.0|
|abraham wald (; h...|economics|       8.0|
|abraham wald (; h...|economics|       8.0|
|abraham wald (; h...|economics|       8.0|
|the acoma party (...|economics|       8.0|
|the acoma party (...|economics|       8.0|
|ada margarita álv...|economics|       8.0|
|adolph abramovich...|economics|       8.0|
|adolph abramovich...|economics|       8.0|
|adrian j. slywotz...|economics|       8.0|
|the african peopl...|economics|       8.0|
|the african peopl...|economics|       8.0|
|agis stinas (1900...|economics|       8.0|
|ailsa horton land...|economics|       8.0|
|alec stuart "al" ...|economics|       8.0|
|alec stuart "al" ...|economics|       8.0|
|alec stuart "al" ...|economics|       8.0|
|alec stuart "al" ...|economics|       8.0|
|alain krivine (fr...|economics|

### 3.7 - Metrics: Accuracy

In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol="categoriaIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.2f}")

Accuratezza del modello: 0.91


### 3.8 - Metrics: F1 score

In [0]:
f1_evaluator = MulticlassClassificationEvaluator(labelCol="categoriaIndex", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions)
print(f"Model weighted average F1-score: {f1:.2f}")

F1-score medio pesato del modello: 0.91
