# (EX) News article processing (with ML pipeline)

# `agnews` Dataset

In [1]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0 29.3M    0  5512    0     0   5973      0  1:25:51 --:--:--  1:25:51  5991
  0 29.3M    0  208k    0     0   105k      0  0:04:45  0:00:01  0:04:44  105k
  2 29.3M    2  656k    0     0   228k      0  0:02:11  0:00:02  0:02:09  229k
  3 29.3M    3  944k    0     0   244k      0  0:02:03  0:00:03  0:02:00  244k
  3 29.3M    3 1150k    0     0   226k      0  0:02:12  0:00:05  0:02:07  226k
  4 29.3M    4 1326k    0     0   225k      0  0:02:13  0:00:05  0:02:08  266k
  5 29.3M    5 1662k    0     0   241k      0  0:02:04  0:00:06  0:01:58  296k
  6 29.3M    6 1838k    0     0   233k      0  0:02:08  0:00:07  0:02:01  236k
  7 29.3M    7 2158k    0     0   240k      0  0:02:05  0:00:08  0:01:57  237k
  8 29.3M    8 2446k    0     0   247k      0  0:02

# Pipelining with PySpark MLlib

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline # pipeline to transform data


In [3]:
spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )
sc = spark.sparkContext

In [10]:
# load dataset
df = spark.read.csv("agnews.csv", inferSchema=True, header=True)
df.show(5)

+-----------+--------------------+--------------------+
|Class Index|               Title|         Description|
+-----------+--------------------+--------------------+
|          3|Wall St. Bears Cl...|Reuters - Short-s...|
|          3|Carlyle Looks Tow...|Reuters - Private...|
|          3|Oil and Economy C...|Reuters - Soaring...|
|          3|Iraq Halts Oil Ex...|Reuters - Authori...|
|          3|Oil prices soar t...|AFP - Tearaway wo...|
+-----------+--------------------+--------------------+
only showing top 5 rows


# Arrange columns

In [11]:
from pyspark.sql.functions import concat_ws, col # to concatinate cols

# renaming 'Class Index' col to 'label'
df = df.withColumnRenamed('Class Index', 'label')

# add a column with concatenated text called 'text
df = df.withColumn('text', concat_ws(' ', col('Title'), col('Description')))

# concatenating texts
# df = df.select('label', 'text')

df.show(10)

+-----+--------------------+--------------------+--------------------+
|label|               Title|         Description|                text|
+-----+--------------------+--------------------+--------------------+
|    3|Wall St. Bears Cl...|Reuters - Short-s...|Wall St. Bears Cl...|
|    3|Carlyle Looks Tow...|Reuters - Private...|Carlyle Looks Tow...|
|    3|Oil and Economy C...|Reuters - Soaring...|Oil and Economy C...|
|    3|Iraq Halts Oil Ex...|Reuters - Authori...|Iraq Halts Oil Ex...|
|    3|Oil prices soar t...|AFP - Tearaway wo...|Oil prices soar t...|
|    3|Stocks End Up, Bu...|Reuters - Stocks ...|Stocks End Up, Bu...|
|    3|Money Funds Fell ...|AP - Assets of th...|Money Funds Fell ...|
|    3|Fed minutes show ...|USATODAY.com - Re...|Fed minutes show ...|
|    3|Safety Net (Forbe...|"Forbes.com - Aft...|Safety Net (Forbe...|
|    3|Wall St. Bears Cl...| NEW YORK (Reuter...|Wall St. Bears Cl...|
+-----+--------------------+--------------------+--------------------+
only s

# Tokenize

In [12]:
from pyspark.ml.feature import RegexTokenizer # tokenizer

# convert sentences to list of words
tokenizer = RegexTokenizer(inputCol='text', outputCol='words', pattern='\\W')

# applies tokenizer
df = tokenizer.transform(df)

df.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+
|label|               Title|         Description|                text|               words|
+-----+--------------------+--------------------+--------------------+--------------------+
|    3|Wall St. Bears Cl...|Reuters - Short-s...|Wall St. Bears Cl...|[wall, st, bears,...|
|    3|Carlyle Looks Tow...|Reuters - Private...|Carlyle Looks Tow...|[carlyle, looks, ...|
|    3|Oil and Economy C...|Reuters - Soaring...|Oil and Economy C...|[oil, and, econom...|
|    3|Iraq Halts Oil Ex...|Reuters - Authori...|Iraq Halts Oil Ex...|[iraq, halts, oil...|
|    3|Oil prices soar t...|AFP - Tearaway wo...|Oil prices soar t...|[oil, prices, soa...|
+-----+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows


# Stopwords

In [13]:
from pyspark.ml.feature import StopWordsRemover

# remove stopwords
stopwords_remover = StopWordsRemover(inputCol='words', outputCol='filtered')

df = stopwords_remover.transform(df)

df.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|               Title|         Description|                text|               words|            filtered|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3|Wall St. Bears Cl...|Reuters - Short-s...|Wall St. Bears Cl...|[wall, st, bears,...|[wall, st, bears,...|
|    3|Carlyle Looks Tow...|Reuters - Private...|Carlyle Looks Tow...|[carlyle, looks, ...|[carlyle, looks, ...|
|    3|Oil and Economy C...|Reuters - Soaring...|Oil and Economy C...|[oil, and, econom...|[oil, economy, cl...|
|    3|Iraq Halts Oil Ex...|Reuters - Authori...|Iraq Halts Oil Ex...|[iraq, halts, oil...|[iraq, halts, oil...|
|    3|Oil prices soar t...|AFP - Tearaway wo...|Oil prices soar t...|[oil, prices, soa...|[oil, prices, soa...|
+-----+--------------------+--------------------+--------------------+--------------------+-----

# Term frequency, Inverse document frequency

In [15]:
from pyspark.ml.feature import HashingTF

# calculate term frequency in each article (row)
hashing_tf = HashingTF(inputCol='filtered', outputCol='raw_features', numFeatures=16384)  

# 16384 --> in BERT we represent one item with 780 features
# compressing text into 780 numbers
# within all possible words kept, only 16384 words are kept
# 2^14 === 16384

# applie term frequency
featured_data = hashing_tf.transform(df)

featured_data.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|               Title|         Description|                text|               words|            filtered|        raw_features|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3|Wall St. Bears Cl...|Reuters - Short-s...|Wall St. Bears Cl...|[wall, st, bears,...|[wall, st, bears,...|(16384,[906,1198,...|
|    3|Carlyle Looks Tow...|Reuters - Private...|Carlyle Looks Tow...|[carlyle, looks, ...|[carlyle, looks, ...|(16384,[98,156,14...|
|    3|Oil and Economy C...|Reuters - Soaring...|Oil and Economy C...|[oil, and, econom...|[oil, economy, cl...|(16384,[338,1612,...|
|    3|Iraq Halts Oil Ex...|Reuters - Authori...|Iraq Halts Oil Ex...|[iraq, halts, oil...|[iraq, halts, oil...|(16384,[180,2731,...|
|    3|Oil prices soar t...|AFP - Tearaway wo...|Oil prices so

In [16]:
from pyspark.ml.feature import IDF

# inverse document frequency
idf = IDF(inputCol='raw_features', outputCol='features')

# applies IDF
idf_vectorizer = idf.fit(featured_data)

rescaled_data = idf_vectorizer.transform(featured_data)

rescaled_data.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|               Title|         Description|                text|               words|            filtered|        raw_features|            features|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    3|Wall St. Bears Cl...|Reuters - Short-s...|Wall St. Bears Cl...|[wall, st, bears,...|[wall, st, bears,...|(16384,[906,1198,...|(16384,[906,1198,...|
|    3|Carlyle Looks Tow...|Reuters - Private...|Carlyle Looks Tow...|[carlyle, looks, ...|[carlyle, looks, ...|(16384,[98,156,14...|(16384,[98,156,14...|
|    3|Oil and Economy C...|Reuters - Soaring...|Oil and Economy C...|[oil, and, econom...|[oil, economy, cl...|(16384,[338,1612,...|(16384,[338,1612,...|
|    3|Iraq Halts Oil Ex...|Reuters - Authori...|Iraq Halts Oil Ex...|

In [17]:
rescaled_data.select('raw_features').show(2, truncate=False)
rescaled_data.select('features').show(2, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|raw_features                                                                                                                                                                                                                                  |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(16384,[906,1198,4756,5540,5638,5831,6235,7372,8905,11170,11790,12343,12766,13441,14118,16126],[1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0])                                                                             |
|(16384,[98,156,1445,1913,2309,2586,

# Training a multinomial logistic regression

In [18]:
# split data into training and testing
(train, test) = rescaled_data.randomSplit([0.75, 0.25])
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 95811
Test Dataset Count: 31789


In [19]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features',
                        labelCol='label',
                        family='multinomial',
                        regParam=0,
                        maxIter=20)

In [20]:
pipeline = Pipeline(stages=[tokenizer,
                            stopwords_remover,
                            hashing_tf,
                            idf,
                            lr])

pipelineFit = pipeline.fit(df)

dataset = pipelineFit.transform(df)

IllegalArgumentException: Output column words already exists.

# Prediction and evaluation

In [None]:
# predict on test data
predictions = lrModel.transform(test)

In [None]:
from pyspark.sql.functions import avg
from pyspark.sql.types import FloatType

# accuracy calculation

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
# labels = ["World", "Sports", "Business","Science"]

# take only the predictions
preds_and_labels = 


In [None]:
# confusion matrix
metrics = 

# Pipelining, from start to finish

In [None]:
# load dataset
df = spark.read.csv("agnews.csv", inferSchema=True, header=True)

def arrangeColumns(df):
  # Renaming 'Class Index' col to 'label'
  df = df.withColumnRenamed('Class Index', 'label')

  # Add a new column 'text' by joining 'Title' and 'Description'
  df = df.withColumn("text", concat_ws(" ", "Title", 'Description'))

  # Select new text feature and labels
  df = df.select('label', 'text')
  return df

df = arrangeColumns(df)

# tokenizer
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# stopwords
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# term frequency
hashing_tf = HashingTF(inputCol="filtered",
                       outputCol="raw_features",
                       numFeatures=16384)

# Inverse Document Frequency
idf = IDF(inputCol="raw_features", outputCol="features")

# model
lr = LogisticRegression(featuresCol='features',
                        labelCol='label',
                        family="multinomial",
                        regParam=0.3,
                        elasticNetParam=0,
                        maxIter=20)



In [None]:
# Put everything in pipeline
pipeline = Pipeline(stages=[tokenizer,
                            stopwords_remover,
                            hashing_tf,
                            idf,
                            lr])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df)

# transform and train
dataset = pipelineFit.transform(df)