# BBC Text Categorization using Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover, RegexTokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
#Create Spark session
#load data
spark = SparkSession \
    .builder \
    .appName("Spark ML") \
    .getOrCreate()

df = spark.read.option('header',True).csv("data/bbc-text.csv")

In [3]:
df.head(10)

[Row(category='tech', text='tv future in the hands of viewers with home theatre systems  plasma high-definition tvs  and digital video recorders moving into the living room  the way people watch tv will be radically different in five years  time.  that is according to an expert panel which gathered at the annual consumer electronics show in las vegas to discuss how these new technologies will impact one of our favourite pastimes. with the us leading the trend  programmes and other content will be delivered to viewers via home networks  through cable  satellite  telecoms companies  and broadband service providers to front rooms and portable devices.  one of the most talked-about technologies of ces has been digital and personal video recorders (dvr and pvr). these set-top boxes  like the us s tivo and the uk s sky+ system  allow people to record  store  play  pause and forward wind tv programmes when they want.  essentially  the technology allows for much more personalised tv. they are 

In [4]:
#stratified split
df_train = df.sampleBy("category", {'sport':0.7, 'tech':0.7, 'politics':0.7, 'entertainment':0.7, 'business':0.7}, seed=10)
df_test = df.subtract(df_train)

In [5]:
#encode labels
indexer = StringIndexer(inputCol="category", outputCol="label").fit(df_train)
df_train = indexer.transform(df_train)

In [6]:
#tokenize data
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")
df_train = regexTokenizer.transform(df_train).select('label', 'tokens')

In [7]:
#remove stopwords
remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_filtered")
df_train = remover.transform(df_train).select('label', 'tokens_filtered')

In [8]:
df_train.head(5)

[Row(label=3.0, tokens_filtered=['tv', 'future', 'hands', 'viewers', 'home', 'theatre', 'systems', 'plasma', 'high', 'definition', 'tvs', 'digital', 'video', 'recorders', 'moving', 'living', 'room', 'way', 'people', 'watch', 'tv', 'radically', 'different', 'five', 'years', 'time', 'according', 'expert', 'panel', 'gathered', 'annual', 'consumer', 'electronics', 'show', 'las', 'vegas', 'discuss', 'new', 'technologies', 'impact', 'one', 'favourite', 'pastimes', 'us', 'leading', 'trend', 'programmes', 'content', 'delivered', 'viewers', 'via', 'home', 'networks', 'cable', 'satellite', 'telecoms', 'companies', 'broadband', 'service', 'providers', 'front', 'rooms', 'portable', 'devices', 'one', 'talked', 'technologies', 'ces', 'digital', 'personal', 'video', 'recorders', 'dvr', 'pvr', 'set', 'top', 'boxes', 'like', 'us', 'tivo', 'uk', 'sky', 'system', 'allow', 'people', 'record', 'store', 'play', 'pause', 'forward', 'wind', 'tv', 'programmes', 'want', 'essentially', 'technology', 'allows', 'm

In [9]:
#Count frequency of words in document
hashingTF = HashingTF(inputCol="tokens_filtered", outputCol="tf")
df_train = hashingTF.transform(df_train)

In [10]:
#transform token frequencies in to tf-idf
idf = IDF(inputCol="tf", outputCol="tfidf", minDocFreq=5)
idfModel = idf.fit(df_train) 
df_train = idfModel.transform(df_train)

In [11]:
#train model
mlr = LogisticRegression(featuresCol="tfidf", labelCol='label', family="multinomial", regParam=0.1)
mlrModel = mlr.fit(df_train)

In [None]:
# #find train accuracy
# df_train = mlrModel.transform(df_train)
# lp = df_train.select('label', 'prediction')
# evaluator = MulticlassClassificationEvaluator()
# evaluator.evaluate(lp, {evaluator.metricName: "accuracy"})

In [12]:
#test
df_test = indexer.transform(df_test)
df_test = regexTokenizer.transform(df_test).select('label', 'tokens')
df_test = remover.transform(df_test).select('label', 'tokens_filtered')
df_test = hashingTF.transform(df_test)
df_test = idfModel.transform(df_test)

In [13]:
#find test accuracy
df_test = mlrModel.transform(df_test)
lp = df_test.select('label', 'prediction')
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(lp, {evaluator.metricName: "accuracy"})

0.9737274220032841

### TO DO:

#### 1) Hyper parameter Tuning using grid search
#### 2) Compare with other classifiers