In [1]:
%sql

select * from default.dataset

_c0,Title,Section
0,The WTO’s outgoing boss leaves behind a hobbled body,finance and economics
1,"For fixed-income investors, hell is other bondholders",finance and economics
2,Psychological scars of downturns could depress growth for decades,finance and economics
3,Nicolai Tangen pays a big price for his new gig,finance and economics
4,The explosion at Beirut’s port will blow a hole in insurers’ balance-sheets,finance and economics
5,Efforts to rein in house prices are fuelling discontent in Seoul,finance and economics
6,"In twenty years, exchanges have gone from clubby firms to huge conglomerates",finance and economics
7,Ant Group IPO filing shows its might,finance and economics
8,"Phil Hogan, Europe’s trade commissioner, resigns",finance and economics
9,The Fed makes its biggest inflation-policy change in decades,finance and economics


In [2]:
%sql

select count(Title) from default.dataset

count(Title)
13951


In [3]:
%sql

select count(distinct Title) from default.dataset

count(DISTINCT Title)
7200


In [4]:
%sql 

select Section, count(distinct Title) num from default.dataset group by Section order by num desc  

Section,num
finance and economics,4562
science-and-technology,1539
books-and-arts,1158


In [5]:
ds_dataset = spark.sql("select * from dataset where _c0 < 1250 or _c0 > 4500").dropDuplicates(['Title'])

In [6]:
ds_dataset.createOrReplaceTempView("ds_dataset")

In [7]:
%sql

select Section, count(Title) num from ds_dataset group by Section order by num desc

Section,num
science-and-technology,1524
finance and economics,1498
books-and-arts,1148


In [8]:
train, test = ds_dataset.randomSplit([0.8, 0.2], seed=12345)

In [9]:
train.createOrReplaceTempView("train")
train_dataset = spark.sql("select * from train")

display(train_dataset.select("*").limit(10))

_c0,Title,Section
230,Kreppanomics,finance and economics
877,Difference of opinion,finance and economics
1217,Long ranger,finance and economics
4691,Bohemia’s fading rhapsody,finance and economics
4859,Covid-19 has many faces,science-and-technology
4944,Trials and transparency,science-and-technology
5430,How to save 1m children a year,science-and-technology
5992,Beneath the Southern Cross I stand,science-and-technology
6088,Bursting into life,science-and-technology
6320,Silicon waves,science-and-technology


In [10]:
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

regexTokenizer = RegexTokenizer(inputCol= "Title", outputCol = "words", pattern = "\\W")
tokenized = regexTokenizer.transform(train_dataset)

countTokens = udf(lambda words: len(words), IntegerType())
tokenized_counts = tokenized.select("Title", "words","Section").withColumn("tokens", countTokens(col("words")))

##remove rows with only one token
tokenized_counts.createOrReplaceTempView("tokenized_counts")
tokenized = spark.sql("select * from tokenized_counts where tokens > 1")
##

display(tokenized_counts.select("*").limit(10))

Title,words,Section,tokens
Kreppanomics,List(kreppanomics),finance and economics,1
Difference of opinion,"List(difference, of, opinion)",finance and economics,3
Long ranger,"List(long, ranger)",finance and economics,2
Bohemia’s fading rhapsody,"List(bohemia, s, fading, rhapsody)",finance and economics,4
Covid-19 has many faces,"List(covid, 19, has, many, faces)",science-and-technology,5
Trials and transparency,"List(trials, and, transparency)",science-and-technology,3
How to save 1m children a year,"List(how, to, save, 1m, children, a, year)",science-and-technology,7
Beneath the Southern Cross I stand,"List(beneath, the, southern, cross, i, stand)",science-and-technology,6
Bursting into life,"List(bursting, into, life)",science-and-technology,3
Silicon waves,"List(silicon, waves)",science-and-technology,2


In [11]:
from pyspark.sql.functions import explode, desc

tokens = tokenized.select(explode(col("words")).alias("word")).groupby(col("word")).count().orderBy(desc("count"))
display(tokens.select("*").limit(20))

word,count
the,947
a,411
of,393
and,214
in,210
s,192
to,172
for,117
on,112
it,81


In [12]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol = "words", outputCol = "cleaned", caseSensitive = False)

In [13]:
remover.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
cleaned = remover.transform(tokenized)

cleaned.show()

In [14]:
tokens = cleaned.select(explode(col("cleaned")).alias("word")).groupBy(col("word")).count().orderBy(desc("count"))
display(tokens.select("*").limit(20))

word,count
new,61
big,34
world,29
man,27
life,27
time,25
light,25
first,21
money,20
one,20


In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# define our custom function to remove too short terms
def filter_by_len(words):
  filtered = [word for word in words if len(word) > 2]
  return filtered

# register our function as udf
filter_by_len_udf = udf(filter_by_len, ArrayType(StringType()))

In [16]:
filtered = cleaned.withColumn("filtered", filter_by_len_udf(col("cleaned")))
display(filtered.select("*").limit(20))

Title,words,Section,tokens,cleaned,filtered
Difference of opinion,"List(difference, of, opinion)",finance and economics,3,"List(difference, opinion)","List(difference, opinion)"
Long ranger,"List(long, ranger)",finance and economics,2,"List(long, ranger)","List(long, ranger)"
Bohemia’s fading rhapsody,"List(bohemia, s, fading, rhapsody)",finance and economics,4,"List(bohemia, fading, rhapsody)","List(bohemia, fading, rhapsody)"
Covid-19 has many faces,"List(covid, 19, has, many, faces)",science-and-technology,5,"List(covid, 19, many, faces)","List(covid, many, faces)"
Trials and transparency,"List(trials, and, transparency)",science-and-technology,3,"List(trials, transparency)","List(trials, transparency)"
How to save 1m children a year,"List(how, to, save, 1m, children, a, year)",science-and-technology,7,"List(save, 1m, children, year)","List(save, children, year)"
Beneath the Southern Cross I stand,"List(beneath, the, southern, cross, i, stand)",science-and-technology,6,"List(beneath, southern, cross, stand)","List(beneath, southern, cross, stand)"
Bursting into life,"List(bursting, into, life)",science-and-technology,3,"List(bursting, life)","List(bursting, life)"
Silicon waves,"List(silicon, waves)",science-and-technology,2,"List(silicon, waves)","List(silicon, waves)"
Two authors wrestle with inequality and the allure of populism,"List(two, authors, wrestle, with, inequality, and, the, allure, of, populism)",books-and-arts,10,"List(two, authors, wrestle, inequality, allure, populism)","List(two, authors, wrestle, inequality, allure, populism)"


In [17]:
from pyspark.ml.feature import NGram
ngrams2 = NGram(n=2, inputCol = "cleaned", outputCol = "ngrams_2")
ngrams = ngrams2.transform(filtered)

ngrams.show()

In [18]:
def union_ngrams(c1,c2):
  return c1 + c2

union_ngrams_udf = udf(union_ngrams, ArrayType(StringType()))

ngrams_final = ngrams.filter("filtered is not Null").withColumn("ngrams", union_ngrams_udf(col("filtered"), col("ngrams_2")))
ngrams_final.show()

In [19]:
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF

hashing_tf = HashingTF(inputCol="ngrams", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
hash_dataset = hashing_tf.transform(ngrams_final)

idf_model = idf.fit(hash_dataset)
idf_dataset = idf_model.transform(hash_dataset)

In [20]:
from pyspark.ml.feature import IndexToString, StringIndexer

indexer = StringIndexer(inputCol = "Section", outputCol = "label")
indexer_model = indexer.fit(idf_dataset)
indexed = indexer_model.transform(idf_dataset)
converter = IndexToString(inputCol = "prediction", outputCol ="prediction_category", labels=indexer_model.labels)

In [21]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol ="label", featuresCol = "features")
paramGrid = ParamGridBuilder()\
    .addGrid(nb.smoothing, [0.1,0.5,1.0]) \
    .build()

tvs = TrainValidationSplit(estimator=nb,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          trainRatio=0.8)

In [22]:
nb_model = tvs.fit(indexed)

In [23]:
test.createOrReplaceTempView("test")
test_dataset = spark.sql("select * from test")
test_tokenized = regexTokenizer.transform(test_dataset)
test_cleaned = remover.transform(test_tokenized)
test_filtered = test_cleaned.withColumn("filtered", filter_by_len_udf(col("cleaned")))
test_ngrams = ngrams2.transform(test_filtered)
test_ngrams_final = test_ngrams.filter("filtered is not Null").withColumn("ngrams", union_ngrams_udf(col("filtered"), col("ngrams_2")))

test_hash_dataset = hashing_tf.transform(test_ngrams_final)
test_idf_dataset = idf_model.transform(test_hash_dataset)
test_indexed = indexer_model.transform(test_idf_dataset)

In [24]:
predictions = nb_model.transform(test_indexed).select("Title", "label", "prediction", "Section")
predictions_decoded = converter.transform(predictions)
display(predictions_decoded.select("*").limit(20))

Title,label,prediction,Section,prediction_category
Trials of a vaccine and new drug raise hope of beating covid-19,0.0,0.0,science-and-technology,science-and-technology
A mystery no longer,0.0,1.0,science-and-technology,finance and economics
The life in an ocean wave,0.0,0.0,science-and-technology,science-and-technology
Think again,2.0,1.0,books-and-arts,finance and economics
The myth of structure,2.0,0.0,books-and-arts,science-and-technology
Independent eye,2.0,0.0,books-and-arts,science-and-technology
Unscrambling the eggs,1.0,2.0,finance and economics,books-and-arts
One qubit at a time,0.0,2.0,science-and-technology,books-and-arts
The collider calamity,0.0,2.0,science-and-technology,books-and-arts
The end of Mir,0.0,0.0,science-and-technology,science-and-technology


In [25]:
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol = "prediction", metricName = "weightedPrecision")

precision = evaluator_precision.evaluate(predictions)
print("Test set weighted precision = " + str(precision))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName = "weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Test set weighted recall = " + str(recall))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName = "f1")

f1 = evaluator_f1.evaluate(predictions)
print("Test set f1 = " + str(f1))