# Coursework Part 1: Detecting Spam with Spark

### Classifying messages to detect spam. 

The overall goal is to transform the data into so that we can build a classifier. 
Then some aspects of the data and it's preparation will be explored. We will 
specifically study the effect of 
* the size of training set 
* the size of the representation vector, and 
* the preprocessing with stopword removal and/or lemmatisation.


## Read some files and prepare a (f,w) RDD 
Reading the directory with text files from the distributed file system (`hdfs://saltdean.nsqdc.city.ac.uk./data/spam/bare/part1`), and loading all text files using wholeTextFiles(), which loads the text per file, i.e. tuples (f,t)

Then splitting the text into words (lower case), creating a (file,word) RDD.


In [15]:
import re 

# USE DEPENDING ON DATASTORE
#prefix = '/data/tempstore/'
prefix = 'hdfs://saltdean.nsqdc.city.ac.uk/data/'

dirPath = prefix + 'spam/bare/part1'


def read_fw_RDD( argDir ): # package tasks a/b into a function for later use
    ### task a) read the files
    fwL_RDD = sc.wholeTextFiles(argDir) #loading all text files from the distributed file system dirpath using wholeTextFiles(), which loads the tuples(f,t)
    #print('Read {} files from directory {}'.format(3,argDir)) # status message for testing, can be disabled later on
    #print('file word count histogram') # the histogram can be useful for checking later 
    #print(fwL_RDD.map(lambda fwL: (len(fwL[1]))).histogram([0,10,100,1000,10000]))
    ### task b) split words
    fw_RDD = fwL_RDD.flatMap(splitwords) # inputting the splitwords function defined below as a flatMap argument
    return fw_RDD # A fw_RDD should be returned

def splitwords(filenamecontent): # function for splitting text into lower case words and creating (file,word) RDD ####
    f,c = filenamecontent #### splitting the filenamecontent tuple into separate lists of filename and text content    
    word_list = re.split('\W+',c.lower()) #### create a word list - split the words in text and make them lower case
    fw_list = [] #### creating an empty list 
    for w in word_list: #### looping through every element in the word list
        fw_list.append((f,w)) #### creating a (f,w) tuple and adding it to the fw_list
    return fw_list ####

fw_RDD = read_fw_RDD(dirPath) # for testing
#print(fw_RDD.take(3)) # for testing

[('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-1msg1.txt', 'subject'), ('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-1msg1.txt', 're'), ('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-1msg1.txt', '2')]


## Task c) Normalised word count lists
Generating the `[(word,count), ...]` list per file and to create a word frequency vector. Normalising the term frequency (TF) vector by the total word count per file. 

For normalisation we need to total word count per file. There are a number of ways to do this. You can use a nested list comprehension for this (go through the (w,c) list and divide each c by the sum of all c, which you can get with a list comprehension over all [(w,c),...]). Alternatively, you can write a function where you can create local variables, e.g. for the number of words per file.  Another option is to use a separate RDD with (f,twc), where 'twc' is for total word count, and which you can create from the (f,[(w,c), ... ]) RDD. 
This new RDD can then be joined with the (f,[(w,c), ... ]) RDD and then the (w,c) list be normalised in a list comprehension. 


In [16]:
from operator import add

def reGrpLst(fw_c): # reorganise the tuples
    fw,c = fw_c #### splitting the input into file,word tuple and count 
    f,w = fw #### splitting the file, word tuple into file and word 
    # Now that we have sepearted the f,w and c terms we can create the (f,[(w,c)]) structure we want 
    # This gives the [(word, count), ...] list per file and creates a word frequency vector  
    return (f,[(w,c)]) 
 
def make_f_tfLn_RDD(argDir):  
    fw_RDD = read_fw_RDD( argDir ) # call function from task a & b
    #<<< read as in the labs 
    fw_RDD = fw_RDD.map(lambda x: (x,1))  # change (f,w) to ((f,w),1) ####
    fw_c_RDD = fw_RDD.reduceByKey(add) # count the words c to give ((f,w),c) ####
    f_wcL_RDD = fw_c_RDD.map(reGrpLst) # use function above to convert ((f,w),c) to (f,[(w,c)]) ####
    f_wcL_RDD = f_wcL_RDD.reduceByKey(add) # Appy reduce operation to create [(w,c), ... ,(w,c)] lists per file ####
    #<<< Normalising the term frequency  (TF) vector by the total word count per file
    f_wcLn_RDD = f_wcL_RDD.map(lambda x: (x[0],[(w, c/sum([c for (w,c) in x[1]])) for (w,c) in x[1]])) ####
    return f_wcLn_RDD

f_wcLn_RDD = make_f_tfLn_RDD( prefix + 'spam/bare/part1') # for testing
#print(f_wcLn_RDD.take(1)) # for testing
wcLn = f_wcLn_RDD.take(1)[0][1] # get the first normalised word count list
#print('')
#print(sum([cn for (w,cn) in wcLn])) # the sum of normalised counts should be close to 1 

[('hdfs://saltdean.nsqdc.city.ac.uk/data/spam/bare/part1/3-550msg1.txt', [('query', 0.045454545454545456), ('anyone', 0.045454545454545456), ('annotext', 0.045454545454545456), ('', 0.045454545454545456), ('thanks', 0.045454545454545456), ('greek', 0.045454545454545456), ('subject', 0.045454545454545456), ('know', 0.045454545454545456), ('classical', 0.045454545454545456), ('michael', 0.045454545454545456), ('internet', 0.045454545454545456), ('dedicated', 0.045454545454545456), ('sikillian', 0.045454545454545456), ('or', 0.09090909090909091), ('does', 0.045454545454545456), ('lists', 0.09090909090909091), ('latin', 0.045454545454545456), ('bitnet', 0.045454545454545456), ('any', 0.045454545454545456), ('to', 0.045454545454545456)])]

0.9999999999999999


## Task d) Creating hashed feature vectors 
Use the hashing trick to create fixed size TF vectors. 


In [17]:
def hashing_vectorizer(word_count_list, N): 
    # use the code from the lecture
    v = [0] * N  # create fixed size vector of 0s ####
    for word_count in word_count_list:  ####
        word,count = word_count # unpack tuple ####
        h = hash(word) # get hash value ####
        v[h % N] = v[h % N] + count # add count ####
    return v # return hashed word vector ####

def make_f_wVn_RDD(f_wcLn_RDD, argN):
    # apply hashing_vectorizer in a lambda, this is only a one-liner
    f_wVec_RDD = f_wcLn_RDD.map(lambda f_wc: (f_wc[0],hashing_vectorizer(f_wc[1],argN)))# apply hashing_vectorizer ####
    return f_wVec_RDD ####
    
N=100
f_wVn_RDD = make_f_wVn_RDD(make_f_tfLn_RDD(dirPath),N) # for testing
#print(f_wVn_RDD.take(1)[0][1]) # for testing
#print('')
#print( sum(f_wVn_RDD.take(1)[0][1])) # for testing

[0.09090909090909091, 0.09090909090909091, 0, 0.09090909090909091, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0.045454545454545456, 0, 0.045454545454545456, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0.045454545454545456, 0.045454545454545456, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0.045454545454545456, 0, 0, 0, 0, 0.09090909090909091, 0.045454545454545456, 0, 0, 0.045454545454545456]

0.9999999999999997


## Task e) Create Labeled Points

Determining whether the file is spam (i.e. the filename contains ’spmsg’) and replace the filename by a 1 (spam) or 0 (ham) accordingly. Use map() to create an RDD of LabeledPoint objects. 


In [18]:
from pyspark.mllib.regression import LabeledPoint

def make_lp_RDD(f_tfLn_RDD,argN):
    #<<< make a vector
    f_wVec_RDD = f_tfLn_RDD.map(lambda f_wc:(f_wc[0],hashing_vectorizer(f_wc[1],argN)))# apply hashing_vectorizer ####
    #Detecting spam by filename and transforming into LabeledPoint objects
    ## The code below replaces the filename by a 1 (if it is spam) or 0 (if not spam) accordingly. 
    ## I have used the LabeledPoint function in map() to create a tuple of labeled points ####
    ## For the first argument of LabeledPoint, use re.search() to search for 'spmsg' in the filename (x[0]) and set the filename to 0 if it is true and 1 if not
    ## The second argument of LabeledPoint is the hash vector x[1]
    lp_RDD = f_wVec_RDD.map(lambda x:LabeledPoint(0 if (re.search('spmsg', x[0])) is None else 1,(x[1])))
    return lp_RDD

lp_RDD = make_lp_RDD(make_f_tfLn_RDD(prefix + 'spam/bare/part1'),100)
#print(lp_RDD.take(1)) #for testing 

[LabeledPoint(0.0, [0.0909090909091,0.0909090909091,0.0,0.0909090909091,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0454545454545,0.0,0.0454545454545,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0454545454545,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0454545454545,0.0,0.0,0.0,0.0,0.0909090909091,0.0454545454545,0.0,0.0,0.0454545454545])]


## Task f) Train a classifier 

Use the `LabeledPoint` objects to train the `LogisticRegression` and calculate the accuracy of the model on the training set 

In [19]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, NaiveBayes
from pyspark.mllib.util import MLUtils

path = prefix + 'spam/stop/part1'

N=100
def trainModel(f_wcL_RDD,N):
    #<<< get the training data as LabeledPoint objects.
    trainData = make_lp_RDD(f_wcLn_RDD, N) ## use the make_lp_RDD function defined in the previous section 
    model = LogisticRegressionWithLBFGS.train(trainData) #### train model on training set 
    print('training Logistic Regression') 
    count = trainData.count() # total size of training set 
    # use model to calculate correctly classified data points####
    correct = trainData.map(lambda lp: 1 if model.predict(lp.features) == lp.label else 0).sum() 
    print('training data items: {}, correct: {}'.format(trainData.count(), correct)) # output raw numbers
    accuracy = (correct/count) # define accuracy as the ratio of correctly classified points as total count 
    print('training accuracy {:.1%}'.format(accuracy)) # print accuracy
    return model 

f_wcLn_RDD = make_f_tfLn_RDD(path) # for testing
model = trainModel(f_wcLn_RDD,N) # for testing

## The code above trains a logistic regression model on the entire training set.
## The model is evaluated on the same training set and generates a 100% accuracy as expected due to overfitting. 
## A better approach would be to use cross validation by splitting the dataset into a set of folds which are used 
## as separate training and test datasets. Using 10 folds will generate 10 (training, test) dataset pairs, 
## each of which uses 9/10 of the data for training and 1/10 for testing. The accuracy is computed by calculating 
##the average after fitting the estimator on the 10 different (training, test) dataset pairs.

training Logistic Regression
training data items: 289, correct: 289
training accuracy 100.0%


## Task g) Test the classifier

Using a different bunch of files from and preparing them like in task~a)-e) before. Then using the trained model to predict the label for each vector you have and compare it to the original to test the performance of your classifier. 

In [20]:
def testModel(model,f_wcL_RDD,N):
    #<<< like with trainModel, transform the data and evaluate it.
    testData = make_lp_RDD(f_wcL_RDD, N) #### use the make_lp_RDD function defined earlier to generate a test RDD 
    print('Using trained logistic regression model in f) to predict labels') 
    # calculate correctly classified data points ###
    correct = testData.map(lambda lp: 1 if model.predict(lp.features) == lp.label else 0).sum() 
    accuracy = (correct/lp_RDD.count()) # define accuracy ### 
    print('test data items: {}, correct:{}'.format(testData.count(),correct))
    print('testing accuracy {:.1%}'.format(accuracy))

N = 100 
testModel(model,make_f_tfLn_RDD('hdfs://saltdean/data/spam/stop/part10'),N) # for testing


## Created a test RDD from .../data/extra/spam/bare/part10 and uses the trained model in f) 
## to predict the label for each vector. This is compared to the actual labeled values to 
## calculate the accuracy and test the performance of the classifier. This model achieves 
## an accuracy of 75.8% which is much lower than the training accuracy calculated above. 
## The test accuracy would have been improved if cross validation was used to train the model
## This would have trained a model which would generalise well and produce a better test accuracy

Using trained logistic regression model in f) to predict labels
test data items: 291, correct:219
testing accuracy 75.8%


## Task h) Run experiments 

Package the whole classifier training and evaluation in one function. Then apply it to the files from different hdfs paths  `/data/extra/spam/lemm`, `/data/extra/spam/stop` and `/data/extra/spam/lemm_stop` in addition to `/data/extra/spam/bare`  and evaluate the accuracy of your classifier. 
Evaluating the use of larger training sets and the effect of different vector sizes.
The combination of the part1-part9 datasets can be achieved by using 'glob' patterns in the filename ('part[1-9]'). 


In [21]:

## Please see additional comments for the results of each experiment in the text box below 

# this function combines tasks f) and g)
def trainTestModel(train_RDD,test_RDD,N):
    #<<< just combine training and testing here  
    model = LogisticRegressionWithLBFGS.train(train_RDD)  # training the Logistic Regression model
    ## The trained Logistic Regression is evaluated on the training dataset to compute the training accuracy 
    correct = train_RDD.map(lambda lp: 1 if model.predict(lp.features) == lp.label else 0).sum()
    count = train_RDD.count() # total size of training set 
    print('training data items: {}, correct: {}'.format(count, correct))
    accuracy = (correct/count) # define accuracy 
    print('training accuracy {:.1%}'.format(accuracy)) # and print training accuracy 
    ## Using the trained Logistic Regression model above to test on the test RDD (unseen data)
    ## This is used to calculate the test accuracy 
    correct = test_RDD.map( lambda lp: 1 if model.predict(lp.features) == lp.label else 0).sum()
    count = test_RDD.count() ## total size of test set 
    print('test data items: {}, correct:{}'.format(count,correct))
    accuracy = (correct/count) # computes test accuracy  ### 
    print('test accuracy {:.1%}'.format(accuracy)) ## prints the test accuracy 
    
    return model

#trainTestModel(train_RDD,test_RDD,100)
#print()
    
# prepare the part directories and the path
dirPattern = 'hdfs://saltdean/data/spam/bare/part[1-{}]' # the {} can be filled by 'dirPattern.format(i)' 
# create the path for the test set
testPath = 'hdfs://saltdean/data/spam/bare/part10'

print('EXPERIMENT 1: Testing different training set sizes')
print('Path = {}, N = {}'.format(dirPattern,N)) # using format to make sure we record the parameters of the experiment
#<<< make the test set, it will be constant for this experiment
#<<< loop over i the number of parts for training (1-9)
f_wcLn_RDD = make_f_tfLn_RDD(testPath)
test_RDD = make_lp_RDD(f_wcLn_RDD, N) #<<< make the test set, it will be constant for this experiment
for i in range(1,10):   #loop over i the number of parts for training (1-9)
    print('=== add part',i)
    trainPath = dirPattern.format(i) # in the loop you can create a path like this
    print(trainPath) #just for testing, remove later
    #<<< create the trainRDD (using the make_f_tfLn_RDD and make_lp_RDD methods)
    f_wcLn_RDD = make_f_tfLn_RDD(trainPath)
    train_RDD = make_lp_RDD(f_wcLn_RDD, N)
    # calling the trainTestModel function and passing train_RDD (this will vary in size at each iteration), test_RDD and N value as inputs 
    trainTestModel(train_RDD,test_RDD,N)

print('')
    
print('\nEXPERIMENT 2: Testing different vector sizes')
#<<< loop over different values for N. 3,10,30,100,300, ... is a good pattern
for N in [3,10,30,100,300,1000,3000,10000]:
    print('=== N = ',N)
    i = 9;
    trainPath = dirPattern.format(i)
    #<<< create the train and test RDD (using your make_f_tfLn_RDD method and make_lp_RDD)
    f_wcLn_RDD = make_f_tfLn_RDD(trainPath)
    train_RDD = make_lp_RDD(f_wcLn_RDD, N)
    f_wcLn_RDD = make_f_tfLn_RDD(testPath)
    test_RDD = make_lp_RDD(f_wcLn_RDD, N)
    # calling the trainTestModel function and passing train_RDD, test_RDD and various N values as inputs 
    trainTestModel(train_RDD,test_RDD,N)  

N = 100 # change to what you feel is a good compromise between computation and accuracy
# the dictionary below helps associate description and paths.
setDict = {'No preprocessing': prefix + 'spam/bare/',
           'Stopwords removed': prefix + 'spam/stop/',
           'Lemmatised': prefix + 'spam/lemm/',
           'Lemmatised and stopwords removed': prefix + 'spam/lemm_stop/'}

print('\nEXPERIMENT 3: Testing differently preprocessed data sets')
print('training on parts 1-9, N = {}'.format(N))
for sp in setDict:
    print('=== ',sp)
    #<<< make the training data (part1-9) and test data (part10) RDDs and evaluate 
    trainPath = setDict[sp] + 'part[1-9]'    
    testPath = setDict[sp] + 'part10' 
    # Like in experiments 1 and 2,  creating the trainR and test RDD (using make_f_tfLn_RDD and make_lp_RDD)
    f_wcLn_RDD = make_f_tfLn_RDD(trainPath)
    train_RDD = make_lp_RDD(f_wcLn_RDD, N)
    f_wcLn_RDD = make_f_tfLn_RDD(testPath)
    test_RDD = make_lp_RDD(f_wcLn_RDD, N)
     # calling the trainTestModel function and passing the relevant arguments 
    trainTestModel(train_RDD,test_RDD,N)

print('\n====== Done ======')

 
## Comments on results for experiments 1,2 and 3. Also, includes comments on lemmatisation, stop word removal on classification accuracy.

## Experiment 1 - As training set size increases, we see that the training accuracy decreases from 100% to 93% and test accuracy 
##increases from 76% to a maximum of 89.3% (when training with parts 1-8). This is expected as increasing training set 
##size reduces overfitting and hence we see the training accuracy decreasing. On the contrary, test accuracy increases 
##because the trained model is able to generalise better. We see that the test accuracy reaches a maximum of 96.2% when 
##combining parts 1-7 for training and parts 8-10 for testing (70/30 train:test ratio) and then decreases slightly when 
##using a 80/20 or 90/10 train:test split. The reason for this could be because there are less enough samples in the test dataset 
#with the 90/10 split which could lower the classification accuracy slightly. 
##Note: Re running the experiment again seems to produce slight changes in accuracy results. This may due to the hashing function 
##producing random collisions.

##Experiment 2 - Hashing maps each word onto a position in a vector. Increasing the vector size increases accuracy because dataset 
#is compressed less. As vector size is increased, likelihood of collisions is less and less information is loss in the hashing process.

##Experiment 3 - Effect of lemmatisation seems to have a similar accuracy to no preprocessing (94.2% and 94.8% respectively). 
##Lemmatisation reduces words down to their base form so that different words with the same base are not treated separately 
##(and sepearate frequencies are not calculated). Stop word removal removes common stop words like 'and', 'the', 'a', 'of' which occur 
##frequently in all documents and do not contribute much to classification accuracy. Effect of stop word removal results in lose of
##information and reduced accuracy (86.3%), the effect of which is more pronounced compared to lemmatisation. 


EXPERIMENT 1: Testing different training set sizes
Path = hdfs://saltdean/data/spam/bare/part[1-{}], N = 100
=== add part 1
hdfs://saltdean/data/spam/bare/part[1-1]
training data items: 289, correct: 289
training accuracy 100.0%
test data items: 291, correct:254
test accuracy 87.3%
=== add part 2
hdfs://saltdean/data/spam/bare/part[1-2]
training data items: 578, correct: 578
training accuracy 100.0%
test data items: 291, correct:266
test accuracy 91.4%
=== add part 3
hdfs://saltdean/data/spam/bare/part[1-3]
training data items: 867, correct: 867
training accuracy 100.0%
test data items: 291, correct:265
test accuracy 91.1%
=== add part 4
hdfs://saltdean/data/spam/bare/part[1-4]
training data items: 1156, correct: 1156
training accuracy 100.0%
test data items: 291, correct:265
test accuracy 91.1%
=== add part 5
hdfs://saltdean/data/spam/bare/part[1-5]
training data items: 1446, correct: 1446
training accuracy 100.0%
test data items: 291, correct:264
test accuracy 90.7%
=== add part 6
hd

### Comments on results for experiments 1,2 and 3. Also, includes comments on lemmatisation, stop word removal on classification accuracy.

Experiment 1 - As training set size increases, we see that the training accuracy decreases from 100% to 93% and test accuracy increases from 76% to a maximum of 89.3% (when training with parts 1-8). This is expected as increasing training set size reduces overfitting and hence we see the training accuracy decreasing. On the contrary, test accuracy increases because the trained model is able to generalise better.
We see that the test accuracy reaches a maximum of 96.2% when combining parts 1-7 for training and parts 8-10 for testing (70/30 train:test ratio) and then decreases slightly when using a 80/20 or 90/10 train:test split. The reason for this could be because there are less enough samples in the test data set with the 90/10 split which could lower the classification accuracy slightly. 

Note: Re running the experiment again seems to produce slight changes in accuracy results. This may due to the hashing function producing random collisions.

Experiment 2 - Hashing maps each word onto a position in a vector. Increasing the vector size increases accuracy because data set is compressed less. As vector size is increased, likelihood of collisions is less and less information is loss in the hashing process.    

Experiment 3 - Effect of lemmatisation seems to have a similar accuracy to no preprocessing (94.2% and 94.8% respectively). Lemmatisation reduces words down to their base form so that different words with the same base are not treated separately (and sepearate frequencies are not calculated). Stop word removal removes common stop words like 'and', 'the', 'a', 'of' which occur  frequently in all documents and do not contribute much to classification accuracy. Effect of stop word removal results in lose of information and reduced accuracy (86.3%), the effect of which is more pronounced compared to lemmatisation. 

## Appendix
This code is just needed if there is an error message "sc undefined", when starting the script. In that case the code below should be run first. All code cells from the beginning has to be run again, as the new context has no information about what happened before.   

In [22]:
# try this in case of "sc undefined" errors

from pyspark import SparkContext

try: 
    sc.stop()
    print('Stopped existing SparkContext')
except Exception as e: 
    print(e)

try: 
    sc = SparkContext(appName='Coursework part 1')
    print('Created new SparkContext')
except Exception as e: 
    print(e)
print('Proterties of sc: ',list(sc.getConf().getAll()))

Stopped existing SparkContext
Created new SparkContext
Proterties of sc:  [('spark.eventLog.enabled', 'true'), ('spark.executor.logs.rolling.time.interval', 'daily'), ('spark.executor.extraJavaOptions', '-Xss8192k'), ('spark.master', 'spark://10.207.1.85:7077'), ('spark.driver.port', '34450'), ('spark.executor.id', 'driver'), ('spark.executor.logs.rolling.strategy', 'time'), ('spark.driver.memory', '1g'), ('spark.app.id', 'app-20170313040910-0836'), ('spark.executor.memory', '1g'), ('spark.rdd.compress', 'True'), ('spark.driver.host', '10.207.1.85'), ('spark.app.name', 'Coursework part 1'), ('spark.cores.max', '2'), ('spark.serializer.objectStreamReset', '100'), ('spark.submit.deployMode', 'client'), ('spark.driver.extraJavaOptions', '-Xss8192k'), ('spark.eventLog.dir', '/data/sparklog')]
