# Coursework Part 1: Detecting Spam with Spark

IN432 Big Data coursework 2017, part 1. Classifying messages to detect spam.


## Task a) & b) Read some files and prepare a (f,w) RDD 
a) Start by reading the directory with text files from the distributed file system (e.g. `hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1`), and loading all text files using wholeTextFiles(), which loads the text per file, i.e. tuples (f,t). (5%)

b) Split the text into words (lower case), creating a (file,word) RDD. (10%)

For both tasks you can use the code from the labs. Don't remove finals 's' (we have already lemmatised data to work with later). 

It is very useful to pack the code into a function that takes a directory as an argument and returns an RDD with (f,w) structure, e.g. `read_fw_RDD`.

Please write two lines of code at the end of the cell that run a little example and print some output. You can comment them out after you have verified that your code works. 

In [1]:
import re 
from operator import add
#prefix = 'hdfs://saltdean.nsqdc.city.ac.uk/data/'

prefix = '/data/tempstore/'
dirPath = prefix + 'spam/bare/part1'

# Function that take a directory in argument, read the files in that directory and create an RDD file by splitting the words and making them lower case
def read_fw_RDD( argDir ): # package tasks a/b into a function for later use
    ft_RDD = sc.wholeTextFiles(argDir) #<<< task a) read the files
    #print('Read {} files from directory {}'.format(<no of files>,argDir)) # status message for testing, can be disabled later on
    #print('file word count histogram') # the histogram can be useful for checking later 
    #print(fwL_RDD.map(lambda fwL: (len(fwL[1]))).histogram([0,10,100,1000,10000]))
    fw_RDD = ft_RDD.flatMap(lambda fc: [(fc[0],w.lower()) for w in re.split('\W+',fc[1])]) #<<< task b) split words
    return fw_RDD #,fw_RDD

fw_RDD = read_fw_RDD(dirPath) # for testing
print(fw_RDD.take(3)) # for testing

[('file:/data/tempstore/spam/bare/part1/3-437msg3.txt', 'subject'), ('file:/data/tempstore/spam/bare/part1/3-437msg3.txt', 'becoming'), ('file:/data/tempstore/spam/bare/part1/3-437msg3.txt', 'a')]


## Task c) Normalised word count lists
Use the code from the labs to generate the \texttt{[(word,count), ...]} list per file and to create a word frequency vector. 

Normalise the term frequency (TF) vector by the total word count per file. (15\%)

This is mostly reusing the lab code. The interesting part here is the normalisation. For normalisation we need to total word count per file. You can use a nested list comprehension for this (go through the (w,c) list and divide each c by the sum of all c, which you can get with a list). 

Another option is to use a separate RDD with (f,twc), where 'twc' is for total word count, and which you can create from the (f,[(w,c), ... ]) RDD. This new RDD can then be joined with the (f,[(w,c), ... ]) RDD and then the (w,c) list be normalised in a list comprehension. 

It would actually be more efficient to do the word counting when you split the words, but that would mix the tasks too much, so please don't do that in your coursework submission.

Again, put your code into a function, and add a short test that can be commented out.

In [12]:
from operator import add

#Function that sum all the word counts and then divide each count by this total which makes the normalization
#takes in arguments a list of tuples (w,c)
def norm(fw_RDD): 
    total = 0
    for (w,c) in fw_RDD:
        total += c     
    return [(w,c/total) for w,c in fw_RDD]
          
def reGrpLst(fw_c): # reorganise the tuples
    fw, c = fw_c 
    f,w = fw
    return (f,[(w,c)])

#takes in argument a directory and return an object with the structure [[f,[(w,c),(w,c)...]],[f,[(w,c)...]]] with the word counts normalized
def make_f_tfLn_RDD(argDir): 
    fw_RDD = read_fw_RDD( argDir ) # call function from task a & b
    fw_1_RDD = fw_RDD.map(lambda x: (x,1))
    f_wc2_RDD = fw_1_RDD.reduceByKey(add) # map and reduce in the RDD file
    f_wcL_RDD = f_wc2_RDD.map(reGrpLst)  # reorganise the tuples using the reGrpLst function
    f_wcL2_RDD = f_wcL_RDD.reduceByKey(add) 
    f_wcLn_RDD = f_wcL2_RDD.map(lambda f_wcL: [f_wcL[0], norm(f_wcL[1])]) #<<< and normalise by applying the normalization function for each file 
    return f_wcLn_RDD

f_wcLn_RDD = make_f_tfLn_RDD(prefix + 'spam/bare/part5') # for testing
print(f_wcLn_RDD.take(1)) # for testing
wcLn = f_wcLn_RDD.take(1)[0][1] # get the first normalised word count list
print(sum([cn for (w,cn) in wcLn])) # the sum of normalised counts should be close to 1 

[['file:/data/tempstore/spam/bare/part5/9-1220msg1.txt', [('washington', 0.0012853470437017994), ('paper', 0.002570694087403599), ('written', 0.0012853470437017994), ('advanced', 0.0012853470437017994), ('be', 0.002570694087403599), ('reconstruct', 0.0012853470437017994), ('she', 0.0012853470437017994), ('by', 0.005141388174807198), ('actors', 0.0012853470437017994), ('grammaticalization', 0.002570694087403599), ('2', 0.0012853470437017994), ('15', 0.0012853470437017994), ('u', 0.0012853470437017994), ('english', 0.0038560411311053984), ('among', 0.0012853470437017994), ('show', 0.0012853470437017994), ('later', 0.0012853470437017994), ('carolyn', 0.0012853470437017994), ('argue', 0.0012853470437017994), ('differently', 0.0012853470437017994), ('halftones', 0.002570694087403599), ('expressed', 0.0012853470437017994), ('to', 0.012853470437017995), ('examining', 0.0012853470437017994), ('goffman', 0.0012853470437017994), ('5', 0.0012853470437017994), ('volume', 0.0012853470437017994), ('

## Task d) Creating hashed feature vectors 
Use the hashing trick to create fixed size TF vectors. (10%)

Use the code from the week 2 lecture to create the hash vectors.

As before, make it a function and add a short test.

In [21]:
def hashing_vectorizer(word_count_list, N):
     v = [0] * N  # create fixed size vector of 0s
     for word_count in word_count_list:          word,count = word_count 	# unpack tuple
         h = hash(word) 		# get hash value
         v[h % N] = v[h % N] + count # add count
     return v 	# return hashed word vector

#Takes for arguments a directory and an integer which defines the size of the hash vector
#Apply the hash function for each word count list
#Return a hash vector
def make_f_wVn_RDD(f_wcLn_RDD, argN):
    f_wVec_RDD = f_wcLn_RDD.map(lambda f_wc: (f_wc[0],hashing_vectorizer(f_wc[1],N)))
    return f_wVec_RDD
    
N=10
f_wVn_RDD = make_f_wVn_RDD(make_f_tfLn_RDD(dirPath),N) # for testing
print(f_wVn_RDD.take(1)[0][1]) # for testing
print( sum(f_wVn_RDD.take(1)[0][1])) # for testing

[0.1953727506426738, 0.10025706940874031, 0.09254498714652953, 0.08226221079691513, 0.06041131105398455, 0.08868894601542412, 0.08483290488431872, 0.12596401028277632, 0.06041131105398456, 0.1092544987146529]
0.9999999999999998


## Task e) Create Labeled Points

Determine whether the file is spam (i.e. the filename contains ’spmsg’) and replace the filename by a 1 (spam) or 0 (ham) accordingly. Use map() to create an RDD of LabeledPoint objects. 

See here [http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) for an example, and here [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint) for the `LabeledPoint` documentation. (15%)

It's useful to take the RDD with normalised word lists as input. 

For finding the spam messages use `re.search()` see here[https://docs.python.org/3/library/re.html?highlight=re%20search#re.search](https://docs.python.org/3/library/re.html?highlight=re%20search#re.search) for documentation. Search for 'spmsg' in the filename and check whether the result is `None`. The relevan syntax here is <b>`0 if <yourCondition> else 1`</b>, i.e. 0 if 'spmsg' is not in the filename (not spam) and 1 if it is (it's spam).

In [24]:
from pyspark.mllib.regression import LabeledPoint

#Function that returns a labelpoint that shows if a file is a spam (1) or not (0) and also shows the hashing vector associated at each file
def make_lp_RDD(f_tfLn_RDD,argN):
    f_wVn_RDD = make_f_wVn_RDD(f_tfLn_RDD, argN)
    lp_RDD = f_wVn_RDD.map(lambda f_wVn_RDD: LabeledPoint(1 if re.search('spmsg',f_wVn_RDD[0]) else 0,f_wVn_RDD[1]))
    return lp_RDD

lp_RDD = make_lp_RDD(make_f_tfLn_RDD('hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1'),10)
print(lp_RDD.take(3))

[LabeledPoint(0.0, [0.195372750643,0.100257069409,0.0925449871465,0.0822622107969,0.060411311054,0.0886889460154,0.0848329048843,0.125964010283,0.060411311054,0.109254498715]), LabeledPoint(0.0, [0.111471861472,0.101731601732,0.0984848484848,0.108225108225,0.0854978354978,0.135281385281,0.0887445887446,0.115800865801,0.0703463203463,0.0844155844156]), LabeledPoint(0.0, [0.132450331126,0.12582781457,0.0827814569536,0.105960264901,0.0761589403974,0.0794701986755,0.0662251655629,0.162251655629,0.046357615894,0.122516556291])]


## Task f) Train a classifier 

Use the `LabeledPoint` objects to train the `LogisticRegression` and calculate the accuracy of the model on the training set (again, follow this example [http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) and here is the documentation [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS).  (15%) 

It's useful to start with a normalised word list as input again (because we can later also use it with TF.IDF values).  

In [26]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, NaiveBayes
from pyspark.mllib.util import MLUtils

path = prefix + 'spam/stop/part1'

#Function that train a model on the [[f[(w,c)...]],...] file to predict the spam files using logistic regression
def trainModel(f_wcL_RDD,N):
    lp_RDD = make_lp_RDD(f_wcL_RDD,N)
    model = LogisticRegressionWithLBFGS.train(lp_RDD) # create the model using the labelpoint and logistic regression method
    predict = lp_RDD.map(lambda x: (x.label, model.predict(x.features))) #make the prediction
    correct = predict.filter(lambda x: x[0] == x[1]).count() # calculate the number of values well predicted
    accuracy = correct / predict.count() #calculate the percentage of accuracy
    print('training data items: {}, correct: {}'.format(predict.count(), correct)) # output raw numbers
    print('training accuracy {:.1%}'.format(accuracy)) # and accuracy
    return model 

f_wcLn_RDD = make_f_tfLn_RDD(path) # for testing
model = trainModel(f_wcLn_RDD,N) # for testing

training data items: 290, correct: 246
training accuracy 84.8%


## Task g) Test the classifier

Use the files from \texttt{.../data/extra/spam/bare/part10} and prepare them like in task~a)-e) (use the function you created in task e) and before. Then use the trained model to predict the label for each vector you have and compare it to the original to test the performance of your classifier. (10\%) 

In [29]:
# Test the model created previously on another directory
def testModel(model,f_wcL_RDD,N):
    testData=make_lp_RDD(f_wcL_RDD, N)     #<<< like with trainModel, transform the data and evaluate it.
    predict=testData.map(lambda x: (x.label, model.predict(x.features)))
    correct=predict.filter(lambda lp: lp[0]==lp[1]).count()
    accuracy = correct / predict.count()

    print('test data items: {}, correct:{}'.format(predict.count(),correct))
    print('testing accuracy {:.1%}'.format(accuracy))

testModel(model,make_f_tfLn_RDD('hdfs://saltdean/data/spam/stop/part10'),N) # for testing

test data items: 290, correct:246
testing accuracy 84.8%


## Task h) Run experiments 

Package the whole classifier training and evaluation in one function. Then apply it to the files from `/data/extra/spam/lemm`, `/data/extra/spam/stop` and `/data/extra/spam/lemm_stop` in addition to `/data/extra/spam/bare`  and evaluate the accuracy of your classifier. 

Comment on the effect of *lemmatisation* and *stopword removal* on classification accuracy. Further, evaluate the use of larger training sets and the effect of different vector sizes. Print out the results of your experiments in readable form. (20%) 

You need to create one small fuction that combines tasks f) and g), and then apply it to different datasets sizes, vector sizes, and different preprocessings. 

The combination of the part1-part9 datasets can be achieved by using 'glob' patterns in the filename ('part[1-9]'). This is a feature of the Hadoop filesystem and not well documented in Spark (or anywhere else). You can find a description of its Python implementation here: [https://docs.python.org/3/library/glob.html](https://docs.python.org/3/library/glob.html). You can also supply multiple comma-separated paths, but you'll need to test what works, when you use this feature. Recursive patterns don't seem to work.

Alternatively, you can create unions of RDDs for each part. However, this seems to lead to slower execution. With the latter, it is useful to created arrays of the directory names (part1, ...). When you work with unions, it may be useful to start with an empty RDD. That can be created with `sc.parallelize([])`.

A useful tool for creatng multiple long paths variants is the use of the Python string format() method as used below. There is a good set of example here: [https://docs.python.org/3/library/string.html#format-examples](https://docs.python.org/3/library/string.html#format-examples) and the specification is here: [https://docs.python.org/3/library/string.html#format-specification-mini-language](https://docs.python.org/3/library/string.html#format-specification-mini-language).


In [None]:
# this function combines tasks f) and g)
def trainTestModel(trainPaths,testPath,N):
    
   model = trainModel(make_f_tfLn_RDD(trainPaths),N)  #<<< just combine training ans testing here 
   testModel(model,make_f_tfLn_RDD(testPath),N)   
   return model
   
   
# prepare the part directories and the path
dirPattern = 'hdfs://saltdean/data/spam/bare/part[1-{}]' # the {} can be filled by 'dirPattern.format(i)' 
testPath = 'hdfs://saltdean/data/spam/bare/part10'
print('EXPERIMENT 1: Testing different training set sizes')
print('Path = {}, N = {}'.format(dirPattern,N)) # using format to make sure we record the parameters of the experiment
#<<< make the test set, it will be constant for this experiment
#<<< loop over i the number of parts for training (1-9)
for i in range(1,10):
   trainPaths = dirPattern.format(i) # in the loop you can create a path like this
   print(trainPaths) #just for testing, remove later
   #<<< create the trainRDD (using your make_f_tfLn_RDD method)
   trainTestModel(trainPaths,testPath,N)
print('\nEXPERIMENT 2: Testing different vector sizes')
# #<<< loop over different values for N. 3,10,30,100,300, ... is a good pattern
new_N=[3,10,30,100,300,1000]
for i in range(0,5):
   print('=== N = ',new_N[i])
   trainTestModel(trainPaths,testPath,new_N[i])
# change to what you feel is a good compromise between computation and accuracy
# # the dictionary below helps associate description and paths.
setDict = {'No preprocessing': prefix + 'spam/bare/',
           'Stopwords removed': prefix + 'spam/stop/',
           'Lemmatised': prefix + 'spam/lemm/',
           'Lemmatised and stopwords removed': prefix + 'spam/lemm_stop/'}
print('\nEXPERIMENT 3: Testing differently preprocessed data sets')
print('training on parts 1-9, N = {}'.format(N))
testPath = 'hdfs://saltdean/data/spam/bare/part10'
for sp in setDict:
   print('=== ',sp)
   trainPaths = setDict[sp] + 'part[1-9]'
#     #<<< make the test (part1-9) and training data (part10) RDDs and evaluate 
   print(trainPaths) #just for testing, remove later
   #<<< create the trainRDD (using your make_f_tfLn_RDD method)
   trainTestModel(trainPaths,testPath,N)
print('\n====== Done ======')

EXPERIMENT 1: Testing different training set sizes
Path = hdfs://saltdean/data/spam/bare/part[1-{}], N = 100
hdfs://saltdean/data/spam/bare/part[1-1]
training data items: 290, correct: 290
training accuracy 100.0%
test data items: 290, correct:290
testing accuracy 100.0%
hdfs://saltdean/data/spam/bare/part[1-2]
training data items: 290, correct: 290
training accuracy 100.0%
test data items: 290, correct:290
testing accuracy 100.0%
hdfs://saltdean/data/spam/bare/part[1-3]
training data items: 290, correct: 290
training accuracy 100.0%
test data items: 290, correct:290
testing accuracy 100.0%
hdfs://saltdean/data/spam/bare/part[1-4]
training data items: 290, correct: 290
training accuracy 100.0%
test data items: 290, correct:290
testing accuracy 100.0%
hdfs://saltdean/data/spam/bare/part[1-5]
training data items: 290, correct: 290
training accuracy 100.0%
test data items: 290, correct:290
testing accuracy 100.0%
hdfs://saltdean/data/spam/bare/part[1-6]
training data items: 290, correct: 

## Task i) (Task for pairs) TF.IDF vectors
You need to address this task if you are working as a pair. 

Calculate the IDF values for each word and generate fixed size TF.IDF vectors for each document (word frequencies still normalised by total document word count). Also evaluate the use of TF.IDF compared to normalised word counts in terms of accuracy. (25%)

To calculate the IDF values you need to create an RDD (w,f) pairs. You can use the function `RDD.distinct()` to remove duplicates and reorganise to create (w,[f, ...]) lists. The length of the list is the document frequency and can be used to calculate the IDF.





In [None]:
from operator import add
from math import log

trainPath = 'hdfs://saltdean.nsqdc.city.ac.uk./data/spam/lemm_stop/part[1-9]'
testPath = 'hdfs://saltdean.nsqdc.city.ac.uk./data/spam/lemm_stop/part10'

def make_f_wtfiL_RDD(path):
    # Calculuate the IDFs
    fw_RDD = read_fw_RDD(path)
    #<<< keep only unique (f,w) pairs
    #<<< (f,w) -> (w,[f])
    #<<< join the lists of files with reduceByKey
    vocSize = wfL_RDD.count() # calculate the vocabulary size
    print('vocSize: {}'.format(vocSize)) 
    # calculate the IDF values per word by using len() on the list of files
    print('wIdf_RDD.count(): ',wIdf_RDD.count()) # for testing
    print(wIdf_RDD.take(2)) # for testing

    # Get the normalise word counts (TFs) and organise by word (w,(f,cn))
    f_wcLn_RDD = make_f_wcLn_RDD(path) # create the normalised word count lists 
    #print('f_wcLn_RDD: ',f_wcLn_RDD.map(
    #        lambda x: sum([c for (w,c) in x[1]]).histogram([0,10,100,1000,10000]))) # check for the per-file word counts
    #<<<< create a list of tuples [(w,(f,cn)), ..] and use flatmap 
    print('w_fcn_RDD.count(): {}'.format(w_fcn_RDD.count())) # for testing
    print(w_fcn_RDD.take(2)) # for testing

    # now we can join the IFDs and TFs by the words (w,(f,cn)) join (w,idf) to (w,((f,cn),idf))
    #<<< use RDD.join()
    print( 'w_fcnIdf_RDD.count(): ', w_fcnIdf_RDD.count())
    print( w_fcnIdf_RDD.take(2))

    # we have doubly nested tuples (w,((f,cn),idf)) in the RDD, 
    # but they let us calculate the TF.IDF per file and word (f,[(w,cn*idf)]).
    #<<<< map to (f,[(w,cn*idf)])
    print('f_wtfiL_RDD.count()', f_wtfiL_RDD.count())
    print(str(f_wtfiL_RDD.take(2)))

    # with that we can reduce by key (files) to get [(w,tfidf), ...] lists per file.
    #<<< reduceByKey
    print('# of files with TF.IDF vectors: {}'.format(f_wtfiL2_RDD.count()))
    print(f_wtfiL2_RDD.take(2)))

    return f_wtfiL2_RDD


N=100 # choose a value yourself 
print('N: {}, trainPath: {}'.format(N,trainPath))
#<<< you can now apply trainModel and test Model to RDDs created with make_f_wtfiL_RDD()