To build a spam filter using various NLP tools and Naive Bayes classifier.  
The dataset for Spam Detection is from: https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection

## 1. Import and get spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
from pyspark.ml.feature import (Tokenizer,
                                StopWordsRemover,
                                CountVectorizer,
                                IDF,
                                StringIndexer,
                                VectorAssembler)
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.appName('NLP').getOrCreate()

## 2. Explore dataset

In [3]:
df = spark.read.csv('smsspamcollection/SMSSpamCollection', sep='\t', inferSchema=True)
df.show(3)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
+----+--------------------+
only showing top 3 rows



#### Change column names

In [4]:
df = df.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')
df.show(10)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
+-----+--------------------+
only showing top 10 rows



#### Add new feature column: count lenght of each text

In [5]:
df = df.withColumn('length', length(df['text']))
df.show(3)

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
+-----+--------------------+------+
only showing top 3 rows



#### Check mean length of text for each class (spam or ham)

In [6]:
df.groupby('class').mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## 3. Feature Transformations

### Transform categorical data to numeric data

In [7]:
ham_spam_to_num = StringIndexer(inputCol='class', outputCol='label')

### Tokenizer  
Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality.
RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. By default, the parameter “pattern” (regex, default: "\\s+") is used as delimiters to split the input text. Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.

In [8]:
# Split sentence to a list of words
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')

### Stop Words Removal  
Stop words are words which should be excluded from the input, typically because the words appear frequently and don’t carry as much meaning.

StopWordsRemover takes as input a sequence of strings (e.g. the output of a Tokenizer) and drops all the stop words from the input sequences. The list of stopwords is specified by the stopWords parameter. Default stop words for some languages are accessible by calling StopWordsRemover.loadDefaultStopWords(language), for which available options are “danish”, “dutch”, “english”, “finnish”, “french”, “german”, “hungarian”, “italian”, “norwegian”, “portuguese”, “russian”, “spanish”, “swedish” and “turkish”. A boolean parameter caseSensitive indicates if the matches should be case sensitive (false by default).  
```
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+
```

In [9]:
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')

### CountVectorizer  
CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

During the fitting process, CountVectorizer will select the top vocabSize words ordered by term frequency across the corpus. An optional parameter minDF also affects the fitting process by specifying the minimum number (or fraction if < 1.0) of documents a term must appear in to be included in the vocabulary. Another optional binary toggle parameter controls the output vector. If set to true all nonzero counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts.  
```
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "d b c a d".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

print('original df')
df.show()

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", 
                     vocabSize=4, # maximum unique words in the corpus
                     minDF=1.0)   # Ex. minDF = 2 means "ignore terms that appear in less than 2 documents". 

model = cv.fit(df)

result = model.transform(df)
print('df with count vectorizer')
result.show(truncate=False)

original df
+---+---------------+
| id|          words|
+---+---------------+
|  0|[d, b, c, a, d]|
|  1|[a, b, b, c, a]|
+---+---------------+

df with count vectorizer
+---+---------------+-------------------------------+
|id |words          |features                       |
+---+---------------+-------------------------------+
|0  |[d, b, c, a, d]|(4,[0,1,2,3],[1.0,1.0,2.0,1.0])|
|1  |[a, b, b, c, a]|(4,[0,1,3],[2.0,2.0,1.0])      |
+---+---------------+-------------------------------+
```

In [10]:
count_vec = CountVectorizer(inputCol='stop_tokens', outputCol='c_vec')

<h3 id="tf-idf">TF-IDF</h3>

<p><a href="http://en.wikipedia.org/wiki/Tf%E2%80%93idf">Term frequency-inverse document frequency (TF-IDF)</a> 
is a feature vectorization method widely used in text mining to reflect the importance of a term 
to a document in the corpus. Denote a term by <code>$t$</code>, a document by  d , and the corpus by D.
Term frequency <code>$TF(t, d)$</code> is the number of times that term <code>$t$</code> appears in document <code>$d$</code>, while 
document frequency <code>$DF(t, D)$</code> is the number of documents that contains term <code>$t$</code>. If we only use 
term frequency to measure the importance, it is very easy to over-emphasize terms that appear very 
often but carry little information about the document, e.g. &#8220;a&#8221;, &#8220;the&#8221;, and &#8220;of&#8221;. If a term appears 
very often across the corpus, it means it doesn&#8217;t carry special information about a particular document.
Inverse document frequency is a numerical measure of how much information a term provides:

$$ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1} $$

where |D| is the total number of documents in the corpus. Since logarithm is used, if a term 
appears in all documents, its IDF value becomes 0. Note that a smoothing term is applied to avoid 
dividing by zero for terms outside the corpus. The TF-IDF measure is simply the product of TF and IDF:
$$ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). $$ 

``` 
Example
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "d b c a d".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

print('original df')
df.show()

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="cv_features", 
                     vocabSize=4, # maximum unique words in the corpus
                     minDF=1.0)   # Ex. minDF = 2 means "ignore terms that appear in less than 2 documents". 
model = cv.fit(df)
result = model.transform(df)
print('df with count vectorizer')
result.show(truncate=False)

idf = IDF(inputCol="cv_features", outputCol="idf_features")
idfModel = idf.fit(result)
rescaledData = idfModel.transform(result)
print('df with IDF')
rescaledData.show(truncate=False)

original df
+---+---------------+
| id|          words|
+---+---------------+
|  0|[d, b, c, a, d]|
|  1|[a, b, b, c, a]|
+---+---------------+

df with count vectorizer
+---+---------------+-------------------------------+
|id |words          |cv_features                    |
+---+---------------+-------------------------------+
|0  |[d, b, c, a, d]|(4,[0,1,2,3],[1.0,1.0,2.0,1.0])|
|1  |[a, b, b, c, a]|(4,[0,1,3],[2.0,2.0,1.0])      |
+---+---------------+-------------------------------+

df with IDF
+---+---------------+-------------------------------+----------------------------------------------+
|id |words          |cv_features                    |idf_features                                  |
+---+---------------+-------------------------------+----------------------------------------------+
|0  |[d, b, c, a, d]|(4,[0,1,2,3],[1.0,1.0,2.0,1.0])|(4,[0,1,2,3],[0.0,0.0,0.8109302162163288,0.0])|
|1  |[a, b, b, c, a]|(4,[0,1,3],[2.0,2.0,1.0])      |(4,[0,1,3],[0.0,0.0,0.0])                     |
+---+---------------+-------------------------------+----------------------------------------------+
```

In [11]:
idf = IDF(inputCol="c_vec", outputCol="tf_idf")

### Make features columns to run machine learning on Spark

In [12]:
clean_up = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

## 4. The Model

We'll use Naive Bayes classifier (Can try other classification models such as RF, Decison Tree)

In [13]:
nb = NaiveBayes(featuresCol='features',
                labelCol='label',
                predictionCol='prediction',
                probabilityCol='probability') 

### Pipeline

In [14]:
data_prep_pipe = Pipeline(stages=[ham_spam_to_num, 
                                  tokenizer, 
                                  stopremove,
                                  count_vec,
                                  idf,
                                  clean_up])

In [15]:
%%time
cleaner = data_prep_pipe.fit(df)

CPU times: user 42.5 ms, sys: 2.74 ms, total: 45.3 ms
Wall time: 7.07 s


In [16]:
clean_data = cleaner.transform(df)
clean_data.show(3)

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|class|                text|length|label|          token_text|         stop_tokens|               c_vec|              tf_idf|            features|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------

### Training 

In [17]:
# Select only 2 columns for ML in Spark
clean_data = clean_data.select(['label','features'])

In [18]:
clean_data.show(3)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[7,11,31,6...|
|  0.0|(13424,[0,24,297,...|
|  1.0|(13424,[2,13,19,3...|
+-----+--------------------+
only showing top 3 rows



In [19]:
# Train, test split
(training, testing) = clean_data.randomSplit([0.8, 0.2])  # 80% and 20%

In [20]:
spam_predictor = nb.fit(training)

In [21]:
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)



### Testing

In [22]:
test_results = spam_predictor.transform(testing)

In [23]:
test_results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,2,13,...|[-606.35872207312...|[1.0,1.4606590065...|       0.0|
|  0.0|(13424,[0,1,12,33...|[-445.26743833842...|[1.0,2.4606440248...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1368.9459678229...|[1.0,9.4011337917...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-669.20818569207...|[1.0,3.9110883342...|       0.0|
|  0.0|(13424,[0,1,20,27...|[-972.56817871929...|[1.0,5.6114360786...|       0.0|
|  0.0|(13424,[0,1,27,88...|[-1531.1264630880...|[0.04422736586211...|       1.0|
|  0.0|(13424,[0,1,46,17...|[-1144.5161799816...|[2.56736897118619...|       1.0|
|  0.0|(13424,[0,1,874,1...|[-95.709748556200...|[0.99999998670071...|       0.0|
|  0.0|(13424,[0,2,3,6,9...|[-3319.6602710011...|[1.0,7.4049278562...|       0.0|
|  0.0|(13424,[0

### Evaluation

In [24]:
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting spam was: {}".format(acc))

Accuracy of model at predicting spam was: 0.931382333192883


Not bad considering we're using straight math on text data.  
May come up with other engineering's features to get a better result.