# Coursework Part 1: Detecting Spam with Spark

IN432 Big Data coursework 2017, part 1. Classifying messages to detect spam.

Coursework by Gediminas Sadaunykas and Vaida Gulbinskaitė. 


## Task a) & b) Read some files and prepare a (f,w) RDD 
a) Start by reading the directory with text files from the distributed file system (e.g. `hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1`), and loading all text files using wholeTextFiles(), which loads the text per file, i.e. tuples (f,t). (5%)

b) Split the text into words (lower case), creating a (file,word) RDD. (10%)

For both tasks you can use the code from the labs. Don't remove finals 's' (we have already lemmatised data to work with later). 

It is very useful to pack the code into a function that takes a directory as an argument and returns an RDD with (f,w) structure, e.g. `read_fw_RDD`.

Please write two lines of code at the end of the cell that run a little example and print some output. You can comment them out after you have verified that your code works. 

In [2]:
# PREPROCESING

dirPath = 'hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1'
import re

### b. split words
def splitFileWords(filenameContent): # your splitting function
    f,c = filenameContent # split the input tuple  
    fwLst = [] # the new list for (filename,word) tuples
    wLst = re.split('\W+',c) # create a word list wLst
    for w in wLst: # iterate through the list
        fwLst.append((f,(w.lower()))) # append (f,w)  
    return fwLst #return a list of (f,w) tuples 

### a. Read files 
def read_fw_RDD(argDir): 
    fwL_RDD = sc.wholeTextFiles(argDir)
    fw_RDD = fwL_RDD.flatMap(splitFileWords)
    return fw_RDD
    
    #print('Read {} files from directory {}'.format(5,argDir)) # status message for testing, can be disabled later on
    #print('file word count histogram') # the histogram can be useful for checking later 
    #print(fwL_RDD.map(lambda fwL: (len(fwL[1]))).histogram([0,10,100,1000,10000]))
    

fw_RDD = read_fw_RDD(dirPath) # for testing
print(fw_RDD.take(3))

[('hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1/3-1msg1.txt', 'subject'), ('hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1/3-1msg1.txt', 're'), ('hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1/3-1msg1.txt', '2')]


## Task c) Normalised word count lists
Use the code from the labs to generate the \texttt{[(word,count), ...]} list per file and to create a word frequency vector. 

Normalise the term frequency (TF) vector by the total word count per file. (15\%)

This is mostly reusing the lab code. The interesting part here is the normalisation. For normalisation we need to total word count per file. You can use a nested list comprehension for this (go through the (w,c) list and divide each c by the sum of all c, which you can get with a list). 

Another option is to use a separate RDD with (f,twc), where 'twc' is for total word count, and which you can create from the (f,[(w,c), ... ]) RDD. This new RDD can then be joined with the (f,[(w,c), ... ]) RDD and then the (w,c) list be normalised in a list comprehension. 

It would actually be more efficient to do the word counting when you split the words, but that would mix the tasks too much, so please don't do that in your coursework submission.

Again, put your code into a function, and add a short test that can be commented out.

In [39]:
# NORMALIZATION

from operator import add

def reGrpLst(fw_c): # reorganise the tuples
    fw,c = fw_c
    f,w = fw
    return (f,[(w,c)]) #returns (f,[(w,c)]) structure

def normalizeLstReduced(lst):
    f_wcLn_RDD = []
    for x in lst:
        f_wcLn_RDD.append((x[0], x[1]/sum(x[1] for x in lst)))
    return f_wcLn_RDD


def make_f_tfLn_RDD(argDir): 
    fw_RDD = read_fw_RDD(argDir) # call function from task a & b
    fw_1_RDD = fw_RDD.map(lambda x: (x,1))  # change (f,w) to ((f,w),1)
    #print('-'*80)
    #print(fw_1_RDD.take(10)) # TESTING
    
    fw_c_RDD = fw_1_RDD.reduceByKey(add) # change ((f,w),1) to ((f,w),c) where c is sum of unique word instances
    #print('-'*80)
    #print(fw_c_RDD.take(10)) # TESTING
    
    f_wcL_RDD = fw_c_RDD.map(reGrpLst) # reorganize the tupples ((f,w), c) to (f, [(w,c)])
    #print('-'*80)
    #print(f_wcL_RDD.take(10)) # TESTING    
    
    f_wcL_RDD2 = f_wcL_RDD.reduceByKey(add) # reogrganise the tupples ((f,w), c) to (f, [(w,c),...])) 
    #print(''*80)
    #print(f_wcL_RDD2.take(1)) # TESTING
    
    f_tfLn_RDD = f_wcL_RDD2.map(lambda f_wcl: (f_wcl[0], normalizeLstReduced(f_wcl[1]))) # list normalization
    #print(''*80)
    #print(f_tfLn_RDD.take(1)) # TESTING
    return f_tfLn_RDD


f_tfLn_RDD= make_f_tfLn_RDD(dirPath) 
print('-'*80)
wcLn = f_tfLn_RDD.take(1)[0][1] # get the first normalised word count list. # TESTING
print(sum([cn for (w,cn) in wcLn])) # the sum of normalised counts should be close to 1. TESTING



--------------------------------------------------------------------------------
1.0000000000000007


## Task d) Creating hashed feature vectors 
Use the hashing trick to create fixed size TF vectors. (10%)

Use the code from the week 2lecture to create the hash vectors.

As before, make it a function and add a short test.

In [4]:
def hashing_vectorizer(word_count_list, N): 
    v = [0] * N  # create fixed size vector of 0s
    for word_count in word_count_list:
        word,count = word_count # unpack tuple of normalized word counts
        h = hash(word) 	# get hash value
        v[h % N] = v[h % N] + count # add count
    return v 	# return hashed word vector

def make_f_wVn_RDD(f_tfLn_RDD, N):
    f_wVn_RDD = f_tfLn_RDD.map(lambda f_wc: (f_wc[0],hashing_vectorizer(f_wc[1],N))) # apply hashing_vectorizer on normalized word count list 
    return f_wVn_RDD
    
N=25 # initialize N

f_wVn_RDD = make_f_wVn_RDD(make_f_tfLn_RDD(dirPath), N)
print('-'*117)
print(f_wVn_RDD.take(1)[0][1]) # TESTING
print('-'*117)
print(sum(f_wVn_RDD.take(1)[0][1])) # TESTING


---------------------------------------------------------------------------------------------------------------------
[0.039603960396039604, 0.099009900990099, 0.07920792079207921, 0.039603960396039604, 0.019801980198019802, 0.09900990099009901, 0.0297029702970297, 0.009900990099009901, 0.009900990099009901, 0.0297029702970297, 0, 0.0297029702970297, 0.009900990099009901, 0.019801980198019802, 0.0297029702970297, 0.019801980198019802, 0.0297029702970297, 0.04950495049504951, 0.019801980198019802, 0.04950495049504951, 0.04950495049504951, 0.0792079207920792, 0.039603960396039604, 0.05940594059405941, 0.05940594059405941]
---------------------------------------------------------------------------------------------------------------------
1.0000000000000004


## Task e) Create Labeled Points

Determine whether the file is spam (i.e. the filename contains ’spmsg’) and replace the filename by a 1 (spam) or 0 (ham) accordingly. Use map() to create an RDD of LabeledPoint objects. 

See here [http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) for an example, and here [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint) for the `LabeledPoint` documentation. (15%)

It's useful to take the RDD with normalised word lists as input. 

For finding the spam messages use `re.search()` see here[https://docs.python.org/3/library/re.html?highlight=re%20search#re.search](https://docs.python.org/3/library/re.html?highlight=re%20search#re.search) for documentation. Search for 'spmsg' in the filename and check whether the result is `None`. The relevan syntax here is <b>`0 if <yourCondition> else 1`</b>, i.e. 0 if 'spmsg' is not in the filename (not spam) and 1 if it is (it's spam).

In [5]:
from pyspark.mllib.regression import LabeledPoint
 
def make_lp_RDD(f_tfLn_RDD, argN): # Takes in normalized RDD as input
    
    ##make a vector
    f_wVec_RDD = f_tfLn_RDD.map(lambda f_wc: (f_wc[0],hashing_vectorizer(f_wc[1],argN)))  
    
    ##Detect spam by filename and transform into LabeledPoint objects re.search()  
    lp_RDD = f_wVec_RDD.map(lambda f_wVec: LabeledPoint(1 if (re.search('spmsg',f_wVec[0])) else 0,f_wVec[1])) ##1 if spam, 0 if ham
    return lp_RDD  

   

N=25
lp_RDD = make_lp_RDD(make_f_tfLn_RDD('hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1'),N)
print('-'*117)
print(lp_RDD.take(10)) # TESTING

---------------------------------------------------------------------------------------------------------------------
[LabeledPoint(0.0, [0.039603960396,0.0990099009901,0.0792079207921,0.039603960396,0.019801980198,0.0990099009901,0.029702970297,0.00990099009901,0.00990099009901,0.029702970297,0.0,0.029702970297,0.00990099009901,0.019801980198,0.029702970297,0.019801980198,0.029702970297,0.049504950495,0.019801980198,0.049504950495,0.049504950495,0.0792079207921,0.039603960396,0.0594059405941,0.0594059405941]), LabeledPoint(0.0, [0.0121212121212,0.0666666666667,0.0545454545455,0.0363636363636,0.0181818181818,0.0909090909091,0.0363636363636,0.0242424242424,0.0242424242424,0.0484848484848,0.0363636363636,0.0363636363636,0.0121212121212,0.030303030303,0.0727272727273,0.030303030303,0.0363636363636,0.0969696969697,0.0363636363636,0.0242424242424,0.0363636363636,0.0545454545455,0.0181818181818,0.0424242424242,0.0242424242424]), LabeledPoint(0.0, [0.0588235294118,0.0411764705882,0.0470588235

## Task f) Train a classifier 

Use the `LabeledPoint` objects to train the `LogisticRegression` and calculate the accuracy of the model on the training set (again, follow this example [http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression](http://spark.apache.org/docs/2.0.0/ml-classification-regression.html#logistic-regression) and here is the documentation [http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS](http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.LogisticRegressionWithLBFGS).  (15%) 

It's useful to start with a normalised word list as input again (because we can later also use it with TF.IDF values).  

In [6]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionWithSGD, NaiveBayes
from pyspark.mllib.util import MLUtils

path = 'hdfs://saltdean/data/spam/stop/part1'

def trainModel(f_tfLn_RDD,N):
    trainData = make_lp_RDD(f_tfLn_RDD, N) # trainsform into LabeledPoint objects.
    model = LogisticRegressionWithSGD.train(trainData)
    correct = trainData.map( lambda lp: 1 if model.predict(lp.features) == lp.label else 0).sum() 
    count = trainData.count()
    accuracy = correct/count
    print('training data items: {}, correct: {}'.format(trainData.count(), correct)) # output raw numbers
    print('training accuracy {:.1%}'.format(accuracy)) # and accuracy
    return model 

N=100
f_tfLn_RDD = make_f_tfLn_RDD(path) # Making normalized word RDD 
modelTrained = trainModel(f_tfLn_RDD,N) # Train the model

  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


---------------------------------------------------------------------------------------------------------------------
training data items: 289, correct: 246
training accuracy 85.1%


## Task g) Test the classifier

Use the files from \texttt{.../data/extra/spam/bare/part10} and prepare them like in task~a)-e) (use the function you created in task e) and before. Then use the trained model to predict the label for each vector you have and compare it to the original to test the performance of your classifier. (10\%) 

In [7]:
def testModel(model,f_tfLn_RDD,N):
    
    #like with trainModel, transform the data and evaluate it.
    trainData = make_lp_RDD(f_tfLn_RDD, N) # Labeled point Rdd
    correct = trainData.map( lambda lp: 1 if model.predict(lp.features) == lp.label else 0).sum()
    count = trainData.count()
    accuracy = correct/count 
    print('test data items: {}, correct:{}'.format(trainData.count(),correct)) # raw numbers
    print('testing accuracy {:.1%}'.format(accuracy)) # accuracy

testModel(modelTrained,make_f_tfLn_RDD('hdfs://saltdean/data/spam/stop/part10'),N) # for testing

test data items: 291, correct:234
testing accuracy 80.4%


## Task h) Run experiments 

Package the whole classifier training and evaluation in one function. Then apply it to the files from `/data/extra/spam/lemm`, `/data/extra/spam/stop` and `/data/extra/spam/lemm_stop` in addition to `/data/extra/spam/bare`  and evaluate the accuracy of your classifier. 

Comment on the effect of *lemmatisation* and *stopword removal* on classification accuracy. Further, evaluate the use of larger training sets and the effect of different vector sizes. Print out the results of your experiments in readable form. (20%) 

You need to create one small fuction that combines tasks f) and g), and then apply it to different datasets sizes, vector sizes, and different preprocessings. 

The combination of the part1-part9 datasets can be achieved by using 'glob' patterns in the filename ('part[1-9]'). This is a feature of the Hadoop filesystem and not well documented in Spark (or anywhere else). You can find a description of its Python implementation here: [https://docs.python.org/3/library/glob.html](https://docs.python.org/3/library/glob.html). You can also supply multiple comma-separated paths, but you'll need to test what works, when you use this feature. Recursive patterns don't seem to work.

Alternatively, you can create unions of RDDs for each part. However, this seems to lead to slower execution. With the latter, it is useful to created arrays of the directory names (part1, ...). When you work with unions, it may be useful to start with an empty RDD. That can be created with `sc.parallelize([])`.

A useful tool for creatng multiple long paths variants is the use of the Python string format() method as used below. There is a good set of example here: [https://docs.python.org/3/library/string.html#format-examples](https://docs.python.org/3/library/string.html#format-examples) and the specification is here: [https://docs.python.org/3/library/string.html#format-specification-mini-language](https://docs.python.org/3/library/string.html#format-specification-mini-language).


In [8]:
# Experiment 1
from time import time


def trainTestModel(train_RDD, test_RDD,N):
    LRModel = LogisticRegressionWithSGD.train(train_RDD) # Train the odel
    print('training Logistic Regression') 
    correct = train_RDD.map( lambda lp: 1 if LRModel.predict(lp.features) == lp.label else 0).sum() # correctly classified data points
    count = train_RDD.count() # total size of training set
    print('train set correct: {}, of total: {}, accuracy: {}'.format(correct,count,correct/count)) 
    correct = test_RDD.map( lambda lp: 1 if LRModel.predict(lp.features) == lp.label else 0).sum() # correctly classified data points
    count = test_RDD.count() # total size of training set
    print('test set correct: {}, of total: {}, accuracy: {}'.format(correct,count,correct/count))
    return LRModel    
    
# directories and the path
dirPattern = 'hdfs://saltdean/data/spam/bare/part[1-{}]' #dirPattern.format(i)
# path for the test set
testPath = 'hdfs://saltdean/data/spam/bare/part10'
# N (size of hashed vector)
N=100

print('EXPERIMENT 1: Testing different training set sizes')
print('Path = {}, N = {}'.format(dirPattern,N)) # Record the parameters of the experiment
test_RDD= make_lp_RDD(make_f_tfLn_RDD(testPath), N) # Creates testing data RDD
for i in range(1,10): #loop over i the number of parts for training (1-9)
    t1 = time()
    trainPath = dirPattern.format(i) # initialize the training path
    train_RDD= make_lp_RDD(make_f_tfLn_RDD(trainPath), N) # Creates training data RDD
    print(trainPath)
    trainTestModel(train_RDD,test_RDD,N)
    t2 = time()
    print('Time Cost: {}s'.format(t2-t1))
    print('-'*117)    



EXPERIMENT 1: Testing different training set sizes
Path = hdfs://saltdean/data/spam/bare/part[1-{}], N = 100
hdfs://saltdean/data/spam/bare/part[1-1]


  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


training Logistic Regression
train set correct: 241, of total: 289, accuracy: 0.8339100346020761
test set correct: 241, of total: 291, accuracy: 0.8281786941580757
Time Cost: 13.153444528579712s
---------------------------------------------------------------------------------------------------------------------
hdfs://saltdean/data/spam/bare/part[1-2]
training Logistic Regression
train set correct: 482, of total: 578, accuracy: 0.8339100346020761
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time Cost: 15.756645917892456s
---------------------------------------------------------------------------------------------------------------------
hdfs://saltdean/data/spam/bare/part[1-3]
training Logistic Regression
train set correct: 723, of total: 867, accuracy: 0.8339100346020761
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time Cost: 21.86052894592285s
--------------------------------------------------------------------------------------------------

Different training set sizes has significant effect on the duration needed to train the model. 13.15s for the size of 289 instances (+291 test data) , 54.18s for the size of 2602 (+291 test data). 

However, increase in training data size has no effec on the accuracy of an algorithm. Accuracy remains steady at arround 83%.

Possible explanation is the usage of LinearRegressionWithSGD instead of LogisticRegressionWithLBFGS, as a result of an error. Former model had been used at the beginning of a project, with the expected output - increase in accuracy with increase in training set size. 

In [9]:
# Experiment 2

print('\nEXPERIMENT 2: Testing different vector sizes')

                       
for N in (3,10,30,100,300):
    print('=== N = ',N)
    t3 = time()
    test_RDD = make_lp_RDD(make_f_tfLn_RDD(testPath), N)
    train_RDD = make_lp_RDD(make_f_tfLn_RDD(dirPattern.format(9)), N)
    trainTestModel(train_RDD,test_RDD,N)
    t4 = time()
    print('Time cost: {}s'.format(t4-t3))




EXPERIMENT 2: Testing different vector sizes
=== N =  3


  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 55.641963481903076s
=== N =  10
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 55.36869764328003s
=== N =  30
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 54.25946259498596s
=== N =  100
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 55.853522539138794s
=== N =  300
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.831615120274914

Different sizes of the hashed vector being tested (3,10,30,10,300). Larger size is not associated with a susbstantial increase in the duration of an evaluation. (54s for the N=3; 57s for the N=300) 

Accuracy, same as in the Experiment 1, remains steady as well (83%). Arguably, for the same reasons. Increase in accuracy with increase in N was expected. 

In [10]:
# Experiment 3

N = 100 # initialize N

setDict = {'No preprocessing':'hdfs://saltdean/data/spam/bare/',
           'Stopwords removed':'hdfs://saltdean/data/spam/stop/',
           'Lemmatised':'hdfs://saltdean/data/spam/lemm/',
           'Lemmatised and stopwords removed':'hdfs://saltdean/data/spam/lemm_stop/'}


print('\nEXPERIMENT 3: Testing differently preprocessed data sets')
print('training on parts 1-9, N = {}'.format(N))
for sp in setDict:
    print('=== ',sp)
    t5 = time()
    trainPath = setDict[sp]+'part[1-9]'
    test_RDD = make_lp_RDD(make_f_tfLn_RDD(testPath), N) # test RDD
    train_RDD = make_lp_RDD(make_f_tfLn_RDD(trainPath), N) # train RDD
    trainTestModel(train_RDD, test_RDD, N)
    t6 = time()
    print('Time cost: {}s'.format(t6-t5))

print('\n====== Done ======')


EXPERIMENT 3: Testing differently preprocessed data sets
training on parts 1-9, N = 100
===  Stopwords removed


  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 53.081204652786255s
===  Lemmatised
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 57.22625470161438s
===  No preprocessing
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 59.30894470214844s
===  Lemmatised and stopwords removed
training Logistic Regression
train set correct: 2170, of total: 2602, accuracy: 0.8339738662567256
test set correct: 242, of total: 291, accuracy: 0.8316151202749141
Time cost: 46.06318664550781s



Different preprocessing techniques, has negligible effect on the time cost of training. (arround 54s on average) Except, for the combined procedure of 'Lemmatization' and 'Stopword removal' which leads to the reduced training time. (46s)

Accuracy, same as in the Experiment 1 and 2, remains steady as well (83%). Arguably, for the same reasons. Increase in accuracy with greater preprocessing effor was expected. 



## Task i) (Task for pairs) TF.IDF vectors
You need to address this task if you are working as a pair. 

Calculate the IDF values for each word and generate fixed size TF.IDF vectors for each document (word frequencies still normalised by total document word count). Also evaluate the use of TF.IDF compared to normalised word counts in terms of accuracy. (25%)

To calculate the IDF values you need to create an RDD (w,f) pairs. You can use the function `RDD.distinct()` to remove duplicates and reorganise to create (w,[f, ...]) lists. The length of the list is the document frequency and can be used to calculate the IDF





In [78]:
# TF.IDF experiment

from operator import add
from math import log

trainPath = 'hdfs://saltdean.nsqdc.city.ac.uk./data/spam/lemm_stop/part[1-9]'
testPath = 'hdfs://saltdean.nsqdc.city.ac.uk./data/spam/lemm_stop/part10'


def make_f_wtfiL_RDD(path):
    # Calculuate the IDFs
    
    fw_RDD = read_fw_RDD(path)
    fw_RDD1 = fw_RDD.distinct() # keep only unique (f,w) pairs    
    fw_RDD2 = fw_RDD1.map(lambda x: (x[1],[x[0]]))    # (f,w) to (w,[f])
    #print(fw_RDD2.take(3))
    wfL_RDD = fw_RDD2.reduceByKey(add)  #join the lists of files with reduceByKey
    vocSize = wfL_RDD.count() #calculate the vocabulary size
    #print('vocSize: {}'.format(vocSize))
    #print('-'*117)

    #Calculate the IDF values per word using len() on the list of files
    wfLn_RDD = wfL_RDD.map(lambda wf: (wf[0], len(wf[1])))
    wIdf_RDD = wfLn_RDD.map(lambda wfn: (wfn[0], log(vocSize/(1+wfn[1]))))
    #print('wIdf_RDD.count(): ',wIdf_RDD.count()) # TESTING
    #print('wIdf:')
    #print(wIdf_RDD.take(1)) # TESTING
    #print('-'*117)


   # Get the normalised word counts (TFs) and organise by word (w,(f,cn)) 
    
    f_tfLn_RDD = make_f_tfLn_RDD(dirPath) # create the normalised word count lists  (f, [(w, cn)...])
    #print('f_tfLn:')
    #print(f_tfLn_RDD.take(3))
    #print('-'*117)

    #print('f_tfLn_RDD: ',f_tfLn_RDD.map(
    #        lambda x: sum([c for (w,c) in x[1]]).histogram([0,10,100,1000,10000]))) # check for the per-file word counts
    w_fcn_RDD = f_tfLn_RDD.flatMap(lambda f_tf: [(w, (f_tf[0], cn)) for (w,cn) in f_tf[1]])     # create a list of tuples [(w,(f,cn)), ..] and use flatmap 
    #print('w_fcn_RDD:')
    #print('w_fcn_RDD.count(): {}'.format(w_fcn_RDD.count())) # for testing
    #print(w_fcn_RDD.take(2)) # for testing
    #print('-'*117)

    #join the IFDs and TFs by the words (w,(f,cn)) join (w,idf) to (w,((f,cn),idf))
    w_fcnIdf_RDD = w_fcn_RDD.join(wIdf_RDD) #use RDD.join()
    #print( 'w_fcnIdf_RDD.count(): ', w_fcnIdf_RDD.count())
    #print('w_fcIdf:')
    #print( w_fcnIdf_RDD.take(3))
    #print('-'*117)
    
    # calculate the TF.IDF per file and word (f,[(w,cn*idf)]).
    #map to (f,[(w,cn*idf)])
    f_wtfiL_RDD = w_fcnIdf_RDD.map(lambda w_fcnIdf: (w_fcnIdf[1][0][0], [(w_fcnIdf[0], (w_fcnIdf[1][0][1]*w_fcnIdf[1][1]))]))
    # print('f_wtfiL_RDD.count():', f_wtfiL_RDD.count())
    #print('f_wtfil:')
    #print(str(f_wtfiL_RDD.take(3)))
    #print('-'*117)

    #reduce by key (files) to get [(w,tfidf), ...] lists per file.
    f_wtfiL2_RDD = f_wtfiL_RDD.reduceByKey(add)#<<< reduceByKey
    # print('# of files with TF.IDF vectors: {}'.format(f_wtfiL2_RDD.count()))
    #print('f_wtfil2:')
    #print(str(f_wtfiL2_RDD.take(3)))
    #print('-'*117)

    return f_wtfiL2_RDD


N=100
t1=time()
print('N: {}, trainPath: {}'.format(N,trainPath))
trainedModel = trainModel(make_f_wtfiL_RDD(trainPath), N)
t2=time()
print('Time Cost: {}s'.format(t2-t1))
print('-'*117)

t3=time()
print('N: {}, testPath: {}'.format(N, testPath))
testModel(trainedModel, make_f_wtfiL_RDD(testPath), N)
t4=time()
print('Time Cost:{}s'.format(t4-t3))

print('\n====== Done ======')

N: 100, trainPath: hdfs://saltdean.nsqdc.city.ac.uk./data/spam/lemm_stop/part[1-9]


  "Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "


---------------------------------------------------------------------------------------------------------------------
training data items: 289, correct: 244
training accuracy 84.4%
Time Cost: 27.635865688323975s
---------------------------------------------------------------------------------------------------------------------
N: 100, testPath: hdfs://saltdean.nsqdc.city.ac.uk./data/spam/lemm_stop/part10
test data items: 289, correct:250
testing accuracy 86.5%
Time Cost:6.357905149459839s



Training time doubled in comparison to the similar training data size in Experiment 1, o part (h). Some positive effect on accuracy is observed (86.5% as opposed to 83%).

It is arguable whether the effect would smaller or larger with the LogisticRegressionWithLBFGS, however similar positive behaviour is expected. 