# Developing Features

In [1]:
import numpy as np
import pandas as pd
import pyspark
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from pyspark.ml import Pipeline # pipeline to transform data
from pyspark.sql import SparkSession # to initiate spark
from pyspark.ml.linalg import Vectors # to allow us to work with VectorAssembler
from pyspark.ml.feature import VectorAssembler # to combine our feature columns to pass to LogisticRegression model
from pyspark.sql.types import FloatType
from pyspark.ml.feature import RegexTokenizer # tokenizer
from pyspark.ml.feature import HashingTF, IDF, IDFModel # vectorizer
from pyspark.ml.feature import StopWordsRemover # to remove stop words
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel # logisric regression ml model
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel # random forest ml model
from pyspark.ml.evaluation import BinaryClassificationEvaluator # to evaluate the model
from pyspark.ml.pipeline import PipelineModel

In [2]:
spark = SparkSession.builder \
    .appName("AABDW Assignment 3") \
    .getOrCreate()

24/05/25 17:42:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Featurising  _'Title'_

Do all the subsetting _*before*_ using spark, otherwise we will use way too much memory and it will crash.

In [4]:
## load in the dataset as pandas dataframe
df = pd.read_csv("assignment3_full.csv")
## turn our label into a binary variable
df['frontpage'] = df['frontpage'].astype(int)
## use a sample of the data to use less memory
# df = df.sample(frac=0.1, random_state=1)
## subset by label and variable we wish to featurise, to use less memory
df = df[['frontpage', 'title']]

In [5]:
## Split Train/Test data
train, test= train_test_split(df, test_size=0.2, random_state=1)
print("Training Dataset Count: " + str(train.shape[0]))
print("Test Dataset Count: " + str(test.shape[0]))

Training Dataset Count: 14684
Test Dataset Count: 3672


In [6]:
## create a spark DataFrame out of our pandas DataFrame
spark_train = spark.createDataFrame(train)
spark_test = spark.createDataFrame(test)
#spark_df = spark_df.sample(withReplacement=False, fraction=0.1, seed=1)
## repartition the df (may make it easier to process)
spark_train = spark_train.repartition(4)
spark_test = spark_test.repartition(4)

In [8]:
## convert our variable of text into list of words
tokenizer = RegexTokenizer(inputCol="title", outputCol="title_words", pattern="\\W")

## adds a column of our tokenised words to df
spark_train_tokenised = tokenizer.transform(spark_train)

In [9]:
## stopwords remove to remove common, uninformative words
stopwords_remover = StopWordsRemover(inputCol="title_words", outputCol="f_title_words")

## adds a column of the filtered words to df
spark_train_stopwordless = stopwords_remover.transform(spark_train_tokenised)

In [10]:
## Calculate term frequency in each article
hashing_tf = HashingTF(inputCol="f_title_words",
                       outputCol="raw_features", 
                       numFeatures=256)

## adds raw tf features to the DF
featurized_data = hashing_tf.transform(spark_train_stopwordless)

In [11]:
## Inverse document frequency
idf = IDF(inputCol="raw_features", outputCol="title_features")

idf_vectorizer = idf.fit(featurized_data)

## converting text to vectors
rescaled_data = idf_vectorizer.transform(featurized_data)

                                                                                

In [12]:
assembler = VectorAssembler(inputCols=['title_features'],outputCol="features")
rescaled_data = assembler.transform(rescaled_data)

In [13]:
## model object
rf = RandomForestClassifier(featuresCol='features',
                            labelCol='frontpage',
                            numTrees=500)

## train model with default parameters
rfmodel = rf.fit(rescaled_data)

24/05/25 17:44:13 WARN DAGScheduler: Broadcasting large task binary with size 1098.4 KiB
24/05/25 17:44:13 WARN DAGScheduler: Broadcasting large task binary with size 1744.4 KiB
                                                                                

## Model Pipeline

In [14]:
# pipeline staging area
tokenizer = RegexTokenizer(inputCol="title", outputCol="title_words", pattern="\\W")
stopwords_remover = StopWordsRemover(inputCol="title_words", outputCol="f_title_words")
hashing_tf = HashingTF(inputCol="f_title_words",
                       outputCol="raw_features", 
                       numFeatures=256)
idf = IDF(inputCol="raw_features", outputCol="title_features")
assembler = VectorAssembler(inputCols=['title_features'],outputCol="features")
rf = RandomForestClassifier(featuresCol='features',
                            labelCol='frontpage',
                            numTrees=500)

In [15]:
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf, assembler, rf])

In [16]:
model=pipeline.fit(spark_train)

24/05/25 17:44:39 WARN DAGScheduler: Broadcasting large task binary with size 1098.4 KiB
24/05/25 17:44:40 WARN DAGScheduler: Broadcasting large task binary with size 1744.4 KiB
                                                                                

In [17]:
model_path = "/Users/Jovan/Library/Mobile Documents/com~apple~CloudDocs/Uni/Semester2/AdvancedAnalyticsInBusiness/Project/spark/project"
model.write().overwrite().save(model_path)

24/05/25 17:44:46 WARN TaskSetManager: Stage 86 contains a task of very large size (1024 KiB). The maximum recommended task size is 1000 KiB.


In [18]:
pred_model = PipelineModel.load(model_path)



In [19]:
predictions = pred_model.transform(spark_test)

In [20]:
predictions.select("title", "probability", "prediction", "frontpage").show()

+--------------------+--------------------+----------+---------+
|               title|         probability|prediction|frontpage|
+--------------------+--------------------+----------+---------+
|             Pile-T5|[0.82440822642678...|       0.0|        1|
|Firefox: Apply fo...|[0.82265626497984...|       0.0|        0|
|US economic growt...|[0.82647763156330...|       0.0|        0|
|Novelists as Scho...|[0.82598951651131...|       0.0|        0|
|Show HN: I made a...|[0.85085676968993...|       0.0|        0|
|No Abstractions: ...|[0.82755194547151...|       0.0|        1|
|The Devil Went Do...|[0.82760571283405...|       0.0|        0|
|Show HN: 1/Month ...|[0.84903911295391...|       0.0|        0|
|Unraveling life's...|[0.82283474567546...|       0.0|        1|
|Show HN: Roast my...|[0.85308268615294...|       0.0|        0|
|'To the Future': ...|[0.81325033273763...|       0.0|        0|
|Finding the Balan...|[0.82547392246384...|       0.0|        0|
|Speculative decod...|[0.

24/05/25 17:44:52 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


In [21]:
## to evalute model
evaluator = BinaryClassificationEvaluator(labelCol="frontpage")

## print test accuracy
print("Test-set Accuracy is : ", evaluator.evaluate(predictions))

24/05/25 17:44:56 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


Test-set Accuracy is :  0.5799212050626649
