In [1]:
### Tutorial on Natural Language Processing Techniques

#### For more information please visit:
##### https://spark.apache.org/docs/2.1.0/ml-features.html#feature-extractors

In [2]:
# Import the necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import (Tokenizer, RegexTokenizer, StopWordsRemover, NGram, IDF, 
                            HashingTF, CountVectorizer, StringIndexer, VectorAssembler)
from pyspark.sql.functions import col, udf, length
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# Create a spark session
spark = SparkSession.builder.appName("NLP").getOrCreate()

## Tutorial on NLP techniques

### 1. Tokenizer
#### Tokenizer splits the sentence into words on whitespace

In [4]:
# Create data frames and apply tokenizer method on the data
# Tokenizer splits the sentence into words

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

sentenceDataFrame.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|Hi I heard about ...|
|  1|I wish Java could...|
|  2|Logistic,regressi...|
+---+--------------------+



In [5]:
# Create a tokenizer object and create a new words column
token = Tokenizer(inputCol='sentence', outputCol='words')
tokenized = token.transform(sentenceDataFrame)
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [6]:
# Create a column to include count of number of words. Treat this column as input feature
countTokens = udf(lambda w: len(w), IntegerType())

tokenized.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+---+-----------------------------------+------------------------------------------+------+
|id |sentence                           |words                                     |tokens|
+---+-----------------------------------+------------------------------------------+------+
|0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|1  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|2  |Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+---+-----------------------------------+------------------------------------------+------+



In [8]:
# Above data, last entry is treated as entire string. This is solved using a 
# Regex Tokenizer and providing a pattern 
regex_token = RegexTokenizer(inputCol='sentence', outputCol='words', pattern="\\W")
regex_tokenized = regex_token.transform(sentenceDataFrame)
regex_tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic, regres...|
+---+--------------------+--------------------+



In [9]:
regex_tokenized.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+---+-----------------------------------+------------------------------------------+------+
|id |sentence                           |words                                     |tokens|
+---+-----------------------------------+------------------------------------------+------+
|0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|1  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|2  |Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |5     |
+---+-----------------------------------+------------------------------------------+------+



### Stop Words Remover
#### Remove the stop words (common words such as is, are, and, the which are not important in the bigger context)


In [10]:
# Create input data frame
sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

# Create a stop words removed object and transform and show the result
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



### N-grams
#### N-grams are used for language modeling. In this example, the Ngrams groups the data into a group consists of N elements
#### For example, bigram models groups data in pairs. The current word and the next word are grouped as pairs as input training data 

In [11]:
# Create a input data frame

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

wordDataFrame.show(truncate=False)


+---+------------------------------------------+
|id |words                                     |
+---+------------------------------------------+
|0  |[Hi, I, heard, about, Spark]              |
|1  |[I, wish, Java, could, use, case, classes]|
|2  |[Logistic, regression, models, are, neat] |
+---+------------------------------------------+



In [12]:
# Build an ngram model with n = 2
ngrams = NGram(n=2, inputCol="words", outputCol="ngrams")
ngrams.transform(wordDataFrame).show(truncate=False)

+---+------------------------------------------+------------------------------------------------------------------+
|id |words                                     |ngrams                                                            |
+---+------------------------------------------+------------------------------------------------------------------+
|0  |[Hi, I, heard, about, Spark]              |[Hi I, I heard, heard about, about Spark]                         |
|1  |[I, wish, Java, could, use, case, classes]|[I wish, wish Java, Java could, could use, use case, case classes]|
|2  |[Logistic, regression, models, are, neat] |[Logistic regression, regression models, models are, are neat]    |
+---+------------------------------------------+------------------------------------------------------------------+



### TF-IDF
#### Feature extraction technique to convert words into vectors based on the count and term frequency of the words in the sentence and in the dataset combined

In [14]:
# Createan input data frame

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|  0.0|Hi I heard about ...|
|  0.0|I wish Java could...|
|  1.0|Logistic regressi...|
+-----+--------------------+



In [15]:
# Apply tokenizer on the dataset 

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|
|  0.0|I wish Java could...|[i, wish, java, c...|
|  1.0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+



In [16]:
# Calculate the TF and the IDF
tf = HashingTF(inputCol="words", outputCol="tf_words", numFeatures=20)
featurizedData = tf.transform(wordsData)
featurizedData.show(truncate=False)

+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
|label|sentence                           |words                                     |tf_words                                 |
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |
|1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+



In [17]:
# Create idf object and fit and transform the TF data 
idf = IDF(inputCol="tf_words", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)


In [18]:
rescaledData.select("label", "features").show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                              |
+-----+----------------------------------------------------------------------------------------------------------------------+
|0.0  |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
|0.0  |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
|1.0  |(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
+-----+----------------------------------------------------------------------------------------------------------------------+



### CountVectorizer
#### Bag of words model based on count of words in the dataset

In [19]:
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



## WORK ON SPAM HAM DATA

In [20]:
data = spark.read.csv("smsspamcollection/SMSSpamCollection",inferSchema=True,sep='\t')

In [21]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [23]:
# The dataset consists of sentences with labels SPAM or HAM 
data = data.withColumnRenamed("_c0", "label").withColumnRenamed("_c1","text")
data.show()

+-----+--------------------+
|label|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



In [24]:
# Create a new feature called length
data = data.withColumn("length", length(col("text")))
data.show()

+-----+--------------------+------+
|label|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [26]:
# Feature transformation: Apply the tokenizer, stop word remover, count vectorizer, TF-IDF and string
# indexer and use assembler to generate feature vector

token = Tokenizer(inputCol="text", outputCol="words_list")
stop_word = StopWordsRemover(inputCol="words_list", outputCol= "truncated_words")
count_vec = CountVectorizer(inputCol="truncated_words", outputCol="count_words")
idf = IDF(inputCol="count_words", outputCol="tf_idf")
ham_spam_number = StringIndexer(inputCol="label", outputCol="idx")

assembler = VectorAssembler(inputCols=["tf_idf","length"], outputCol= "feature_vector")

In [27]:
# Create a Naive Bayes classifier
nb = NaiveBayes(featuresCol="feature_vector", labelCol="idx")

In [28]:
# Build a pipeline to clean the model and feature vector?
pipe = Pipeline(stages=[token, stop_word, count_vec, idf, ham_spam_number, assembler])
clean_data_model = pipe.fit(data)
clean_data = clean_data_model.transform(data).select("idx", "feature_vector")
clean_data.show()

+---+--------------------+
|idx|      feature_vector|
+---+--------------------+
|0.0|(13424,[7,11,31,6...|
|0.0|(13424,[0,24,297,...|
|1.0|(13424,[2,13,19,3...|
|0.0|(13424,[0,70,80,1...|
|0.0|(13424,[36,134,31...|
|1.0|(13424,[10,60,139...|
|0.0|(13424,[10,53,103...|
|0.0|(13424,[125,184,4...|
|1.0|(13424,[1,47,118,...|
|1.0|(13424,[0,1,13,27...|
|0.0|(13424,[18,43,120...|
|1.0|(13424,[8,17,37,8...|
|1.0|(13424,[13,30,47,...|
|0.0|(13424,[39,96,217...|
|0.0|(13424,[552,1697,...|
|1.0|(13424,[30,109,11...|
|0.0|(13424,[82,214,47...|
|0.0|(13424,[0,2,49,13...|
|0.0|(13424,[0,74,105,...|
|1.0|(13424,[4,30,33,5...|
+---+--------------------+
only showing top 20 rows



In [29]:
# Split the data into train and test
train_data, test_data = clean_data.randomSplit([0.7, 0.3])
print("Training set: ", train_data.count())
print("Test set: ", test_data.count())

Training set:  3918
Test set:  1656


In [30]:
# Build a Naive Bayes model
nb_model = nb.fit(train_data)

prediction = nb_model.transform(test_data)
prediction.show()

+---+--------------------+--------------------+--------------------+----------+
|idx|      feature_vector|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+--------------------+----------+
|0.0|(13424,[0,1,5,15,...|[-1001.4393904975...|[1.0,2.4497858119...|       0.0|
|0.0|(13424,[0,1,14,18...|[-1375.9759744484...|[1.0,4.9113230924...|       0.0|
|0.0|(13424,[0,1,17,19...|[-816.29941348569...|[1.0,8.8265094033...|       0.0|
|0.0|(13424,[0,1,20,27...|[-969.00690500292...|[1.0,1.1048862419...|       0.0|
|0.0|(13424,[0,1,23,63...|[-1326.4899573164...|[1.0,5.2694294115...|       0.0|
|0.0|(13424,[0,1,27,88...|[-1538.9190759224...|[4.06350499365914...|       1.0|
|0.0|(13424,[0,1,72,10...|[-667.70516315369...|[1.0,1.9760128913...|       0.0|
|0.0|(13424,[0,1,498,5...|[-315.47723442747...|[0.99999999999999...|       0.0|
|0.0|(13424,[0,1,874,1...|[-95.154667689581...|[0.99999998903334...|       0.0|
|0.0|(13424,[0,2,3,5,3...|[-512.38502497

In [31]:
# Perform evaluation 

evaluator = MulticlassClassificationEvaluator(labelCol="idx", predictionCol="prediction", metricName="accuracy")

In [32]:
result = evaluator.evaluate(prediction)
print("Accuracy of this model is: ", result)

Accuracy of this model is:  0.9082125603864735
