In [1]:
#Loading data

# Going to the work directory 
%cd ~/notebook/work/

# and cloning the project from github
!git clone https://github.com/sathu95/Reuters_text_classification_pyspark.git

/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5/notebook/work
fatal: destination path 'Reuters_text_classification_pyspark' already exists and is not an empty directory.


In [2]:
# Now we have the project directory
%cd ~/notebook/work/Reuters_text_classification_pyspark/
# we can also pull the latest files from git
!git pull
%cd ./Reuters/
!ls #checking all the files 

/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5/notebook/work/Reuters_text_classification_pyspark
Already up-to-date.
/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5/notebook/work/Reuters_text_classification_pyspark/Reuters
bop	 coffee  dlr   jobs	     nat-gas   ship	veg-oil
carcass  corn	 gnp   livestock     oilseed   soybean	yen
cocoa	 cpi	 gold  money-supply  reserves  sugar


In [3]:
#loading the necessary libraries

from pyspark.ml import *
from pyspark.sql import *
from pyspark.sql.types import Row
from pyspark.sql import SQLContext


sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()

#we get the absolute path for the working directory
import os.path
%cd ~
p = os.path.abspath('./notebook/work/Reuters_text_classification_pyspark/Reuters/')
print("p: ", p) 

# Creating a dictionary for class labels (from the directory names)
topicLabels = {}
import os
label = 0
# funtion to iterate through the working directories
for root, dirs, files in os.walk(p, topdown=False):
    for name in dirs:
        topicLabels[name] = label # assigning a label number to each
        label = label + 1 # and increment the label number by 1

print(topicLabels)

#checking the time using all the topics
%time news_RDD = sc.wholeTextFiles(p + '/*') 

#To have a look at the total number of documents
print ('Number of documents read is:',news_RDD.count())
%time news_RDD.take(1)

/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5
('p: ', '/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5/notebook/work/Reuters_text_classification_pyspark/Reuters')
{'cpi': 11, 'gnp': 2, 'jobs': 9, 'livestock': 6, 'gold': 3, 'yen': 4, 'bop': 8, 'corn': 15, 'carcass': 1, 'money-supply': 19, 'coffee': 5, 'sugar': 7, 'oilseed': 16, 'dlr': 10, 'veg-oil': 13, 'reserves': 0, 'soybean': 17, 'nat-gas': 12, 'ship': 14, 'cocoa': 18}
CPU times: user 2.68 ms, sys: 3.78 ms, total: 6.45 ms
Wall time: 369 ms
('Number of documents read is:', 1913)
CPU times: user 4.68 ms, sys: 1.69 ms, total: 6.37 ms
Wall time: 1.16 s


[(u'file:/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5/notebook/work/Reuters_text_classification_pyspark/Reuters/reserves/0004940',
  u"\n\r\nITALIAN NET RESERVES RISE IN FEBRUARY\r\n\n    ROME, March 19 - Italy's net official reserves rose to\r\n66,172 billion lire in February 1987 from a previously reported\r\n62,174 billion in January, the Bank of Italy said.\r\n    Gold holdings at end-February totalled 35,203 billion lire,\r\nunchanged on January.\r\n    Convertible currencies totalled 18,467 billion lire, up\r\nfrom 14,899 billion in January, while European Currency Unit\r\n(ECU) holdings were 10,156 billion lire against 10,133 billion.\r\n\r")]

In [4]:
import re

# new function to remove the headers using regular expressions
def removeHeader(ft): 
    fn,text = ft # unpacking the filename and text content 
    matchObj = re.match(r'.+^Lines:(.*)', text,re.DOTALL|re.MULTILINE) 
    if(matchObj): # only if the pattern has matched 
        text = matchObj.group(1) # can we replace the text, 
    return (fn,text)

ft_RDD = news_RDD.map(removeHeader)

# Removing the path before the last directory name and the file name after (i.e. leaving the directory names, which are the Reuters news topics) 
tt_RDD = ft_RDD.map(lambda ft: (re.split('[/]',ft[0])[-2],ft[1]))


# adding the topic numbers as labels
# adding the labels, but adding a third component to each element. 
# This third element is determined by reading the label from the topicLabels dictionary, using the topic string (first in the RDD elements) as key.

ttl_RDD = tt_RDD.map(lambda ft: (ft[0],ft[1],topicLabels[ft[0]]))

# checking the result of above 
print(ttl_RDD.take(1))
# it should contain [('topic', 'text', 'label')]

# Saving the RDD as a pickle file for later use 
# ttl_RDD.saveAsPickleFile('ttl_RDD_full0.pkl')

[(u'reserves', u"\n\r\nITALIAN NET RESERVES RISE IN FEBRUARY\r\n\n    ROME, March 19 - Italy's net official reserves rose to\r\n66,172 billion lire in February 1987 from a previously reported\r\n62,174 billion in January, the Bank of Italy said.\r\n    Gold holdings at end-February totalled 35,203 billion lire,\r\nunchanged on January.\r\n    Convertible currencies totalled 18,467 billion lire, up\r\nfrom 14,899 billion in January, while European Currency Unit\r\n(ECU) holdings were 10,156 billion lire against 10,133 billion.\r\n\r", 0)]


In [5]:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# The schema is represented as a StructField object that comprises three fields, name (a string), dataType (a DataType) and nullable (a bool). 
# Creating 3 fields of names and types according to our data: two strings, one integer for thelabel. 

fields = [] 
fields.append(StructField('topic', StringType(), True))
#now do the same for 'text' instead of topic:
fields.append(StructField('text', StringType(), True))
#and now for 'label' with type IntegerType instead of StringType:
fields.append(StructField('label', IntegerType(), True))

# All the fields above together define the schema 
schema = StructType(fields)

# loading previously saved RDD
ttl_RDD = sc.pickleFile('ttl_RDD_full0.pkl')
# Applying the schema in createDataFrame, to create a DataFrame 'df' from the RDD
df = sqlContext.createDataFrame(ttl_RDD, schema)

#print the schema of DataFrame
df.printSchema()

#displaying the dataframe
import pixiedust
display(df)

In [8]:
# chnaging dataframe variable 
df2 = df

# saving the datframe as a parquet, but once the file exists, this method will throw an error.
#df.write.parquet('df_full0.pqt') 

#Creating the training and test set from the dataframe above by randomsplit<test = 20% and training = 80%>
train_set, test_set = df2.randomSplit([0.8, 0.2])

# caching the dataframe to make sure the sets are stored in memory or disk, rather than re-computed.
train_set.cache()
test_set.cache()
# train_set.write.parquet('train_set0.pqt')
# test_set.write.parquet('test_set0.pqt')

#printing the counts of training ans test set and time the execution with the %time magic
%time print ("Training-set count:",train_set.count())
%time print ("Test-set count:",test_set.count())
train_set

('Training-set count:', 1548)
CPU times: user 4.87 ms, sys: 284 µs, total: 5.15 ms
Wall time: 2.13 s
('Test-set count:', 365)
CPU times: user 2.87 ms, sys: 1.92 ms, total: 4.79 ms
Wall time: 348 ms


DataFrame[topic: string, text: string, label: int]

In [9]:
%%time 
# Timing the execution of whole cell
%cd ~/notebook/work/
# df = spark.read.parquet('df0.pqt')
train_set = spark.read.parquet('train_set0.pqt')
test_set = spark.read.parquet('test_set0.pqt')
print("+++ all DataFrames loaded +++")

# test and training sets are cached, equvalent to cache(), to increase the performance
train_set.persist(StorageLevel.MEMORY_AND_DISK)
test_set.persist(StorageLevel.MEMORY_AND_DISK)

/gpfs/global_fs01/sym_shared/YPProdSpark/user/se1e-c1219d6287d998-c60095ba54e5/notebook/work
+++ all DataFrames loaded +++
CPU times: user 10 ms, sys: 2.36 ms, total: 12.4 ms
Wall time: 700 ms


In [10]:
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer

##Constructing a pipeline
#By spliting each sentence by white spaces into words using Tokenizer. 

tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")

#Removing stopwords
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)

#using HashingTF to hash the sentence into a feature vector by bag of words 
hashingTF = HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("rawFeatures")

#using IDF to rescale the feature vectors; this generally improves performance when using text as features.
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

#The feature vectors could then be passed through a learning algorithm, in this case Naive Bayes as cf
cf = NaiveBayes()

#connecting all the steps above to create one pipeline:
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, cf])
print ("Pipeline:",pipeline.explainParams()) #Shows all the parameters used by above methods

('Pipeline:', 'stages: a list of pipeline stages (current: [Tokenizer_432ab6f3a08327391590, StopWordsRemover_4602814c5d94625320b1, HashingTF_453988508c99acf4efb9, IDF_4d4f87e50de68d724325, NaiveBayes_494db7e965e967464e98])')


In [11]:
#getting information for each parameter  using the .explainParams()
print ("Tokenizer:",tokenizer.explainParams())
print("\n\n")
print ("Remover:",remover.explainParams())
print("\n\n")
print ("HashingTF:",hashingTF.explainParams())
print ("\n\n")
print ("IDF:",idf.explainParams())
print ("\n\n")
print ("classifier:",cf.explainParams())

('Tokenizer:', u'inputCol: input column name. (current: text)\noutputCol: output column name. (default: Tokenizer_432ab6f3a08327391590__output, current: words)')



('Remover:', u"caseSensitive: whether to do a case sensitive comparison over the stop words (default: False, current: False)\ninputCol: input column name. (current: words)\noutputCol: output column name. (default: StopWordsRemover_4602814c5d94625320b1__output, current: filtered)\nstopWords: The words to be filtered out (default: [u'i', u'me', u'my', u'myself', u'we', u'our', u'ours', u'ourselves', u'you', u'your', u'yours', u'yourself', u'yourselves', u'he', u'him', u'his', u'himself', u'she', u'her', u'hers', u'herself', u'it', u'its', u'itself', u'they', u'them', u'their', u'theirs', u'themselves', u'what', u'which', u'who', u'whom', u'this', u'that', u'these', u'those', u'am', u'is', u'are', u'was', u'were', u'be', u'been', u'being', u'have', u'has', u'had', u'having', u'do', u'does', u'did', u'doing', u'a', u'an', u'the

In [12]:
# Fitting ML pipeline to the training data 
# and obtaining a trained pipeline model that can be used for prediction.
%time model=pipeline.fit(train_set)

CPU times: user 32.1 ms, sys: 10.4 ms, total: 42.5 ms
Wall time: 4.45 s


In [13]:
#Checking the performance of the fitted pipeline model by displaying the predicted labels

#using the .transform() on the test set to make predictions on the test set
test_predictions = model.transform(test_set)
train_predictions = model.transform(train_set)

#Showing the predicted labels of topic "yen" along with true labels and raw texts.
test_predictions.select("topic","prediction","label").filter(test_predictions.topic.like("%yen%")).show(5)
# Similarly for "corn"
test_predictions.select("topic","prediction","label").filter(test_predictions.topic.like("%corn%")).show(5)

+-----+----------+-----+
|topic|prediction|label|
+-----+----------+-----+
|  yen|      10.0|    4|
|  yen|      10.0|    4|
|  yen|      10.0|    4|
|  yen|      10.0|    4|
|  yen|       4.0|    4|
+-----+----------+-----+
only showing top 5 rows

+-----+----------+-----+
|topic|prediction|label|
+-----+----------+-----+
| corn|      15.0|   15|
| corn|      14.0|   15|
| corn|      15.0|   15|
| corn|      17.0|   15|
| corn|      15.0|   15|
+-----+----------+-----+
only showing top 5 rows



In [14]:
#Using the accuracy
evaluator = MulticlassClassificationEvaluator().setMetricName("accuracy")

print ("Accuracy - training:",evaluator.evaluate(train_predictions))
print ("Accuracy - testing:",evaluator.evaluate(test_predictions))

('Accuracy - training:', 0.8276762402088773)
('Accuracy - testing:', 0.6325459317585301)


In [15]:
#We use a ParamGridBuilder to construct a grid of parameters to search over.

#With 3 values for hashingTF.numFeatures and 3 values for idf,
# this grid will have 3 x 3 = 9 parameter settings for the Validator to choose from.

paramGrid = ParamGridBuilder()\
    .addGrid(hashingTF.numFeatures,[100,1000,10000])\
    .addGrid(idf.minDocFreq,[0,10,100])\
    .build()

In [16]:
from pyspark.ml.tuning import TrainValidationSplit 
tvs = TrainValidationSplit().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid)

# A validator requires an Estimator, a grid of Paramters, and an Evaluator.
%time tvsModel = tvs.fit(train_set)

CPU times: user 827 ms, sys: 210 ms, total: 1.04 s
Wall time: 12.9 s


In [17]:
# calculating the training and test accuracy for the tuned model
print("Training accuracy for tuned model =",evaluator.evaluate(tvsModel.transform(train_set)))
print("Test accuracy for tuned model =",evaluator.evaluate(tvsModel.transform(test_set)))
print ("Test accuracy for default model:",evaluator.evaluate(test_predictions))


#we can see a 3% increase in the training accuracy of the tuned model, where as test accuracy of tuned model is close to that of the deafault 

('Training accuracy for tuned model =', 0.8557441253263708)
('Test accuracy for tuned model =', 0.6351706036745407)
('Test accuracy for default model:', 0.6325459317585301)


In [31]:
#Implementing a parameter grid (using pyspark.ml.tuning.ParamGridBuilder),
#varying at least one feature preprocessing step, one machine learning parameter, and
#the training set size 

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Creating the training and test set from the dataframe above by randomsplit<test = 30% and training = 70%>
train_set1, test_set1 = df2.randomSplit([0.7, 0.3])


# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")

#using HashingTF to hash the sentence into a feature vector by bag of words 
hashingTF = HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("rawFeatures")

#using IDF to rescale the feature vectors; this generally improves performance when using text as features.
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

lr = LogisticRegression(maxIter=10)

pipeline1 = Pipeline(stages=[tokenizer, hashingTF, idf, lr])
print ("Pipeline:",pipeline1.explainParams())

# Fitting ML pipeline to the training data 
# and obtaining a trained pipeline model that can be used for prediction.
%time model=pipeline.fit(train_set1)

#using the .transform() on the test set to make predictions on the test set
test_predictions = model.transform(test_set1)
train_predictions = model.transform(train_set1)

evaluator = MulticlassClassificationEvaluator()

# A validator requires an Estimator, a grid of Paramters, and an Evaluator.
%time tvsModel1 = tvs.fit(train_set1)

# calculating the training and test accuracy for the tuned model
print("Training accuracy for tuned model =",evaluator.evaluate(tvsModel.transform(train_set1)))
print("Test accuracy for tuned model =",evaluator.evaluate(tvsModel.transform(test_set1)))
print ("Test accuracy for default model:",evaluator.evaluate(test_predictions))

('Pipeline:', 'stages: a list of pipeline stages (current: [Tokenizer_45d4b83feba3ebf98581, HashingTF_406b9043ac47d7529aeb, IDF_4f10b7b0f10a4d6e39f5, LogisticRegression_42a19c49d14bfe0c4425])')
CPU times: user 28.7 ms, sys: 6.37 ms, total: 35.1 ms
Wall time: 861 ms
CPU times: user 751 ms, sys: 261 ms, total: 1.01 s
Wall time: 11.5 s
('Training accuracy for tuned model =', 0.8191031197158156)
('Test accuracy for tuned model =', 0.7931578537565427)
('Test accuracy for default model:', 0.610901237935405)


In [None]:
#When the stopwords were not removed and Logistic regression was applied with training and test set split as 70% and 30% respectively, 
#the training accuracy for tuned model gave 82% of accuracy and test accuracy came upto 79%,
#which is better than the previous model which gave 63%. 