In [1]:
# Reading data
data = spark.read.csv('/FileStore/tables/SMSSpamCollection',
              inferSchema=True, sep='\t')

In [2]:
# Renaming columns to more meaningful names
data = data.withColumnRenamed('_c0', 'class').withColumnRenamed('_c1', 'text')

In [3]:
# Showing some data
data.show(10)

In [4]:
# Importing libraries
from pyspark.sql.functions import length

In [5]:
# Applying length function to text column
data = data.withColumn('length', length(data['text']))

In [6]:
# Showing new dataframe with length column
data.show(10)

In [7]:
# AVG length of SPAM messages is greater than HAM messages
data.groupBy('class').mean().show()

In [8]:
# Import NLP tools
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [9]:
# Creating tokenizer object
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')

# Creating stopwords remover object
stopwords = StopWordsRemover(inputCol='tokens', outputCol='sw_tokens')

# Creating count vectorizer object
cv = CountVectorizer(inputCol='sw_tokens', outputCol='count_tokens')

# Creating TF-IDF object
idf = IDF(inputCol='count_tokens', outputCol='tf_idf')

# Creating String indexer object to convert text labels into numeric
ham_spam_to_numeric = StringIndexer(inputCol='class', outputCol='label')

In [10]:
# Importing assembler
from pyspark.ml.feature import VectorAssembler

In [11]:
# Creating assembler
cleanup = VectorAssembler(inputCols=['tf_idf', 'length'], outputCol='features')

In [12]:
# Importing NaiveBayes model
from pyspark.ml.classification import NaiveBayes

In [13]:
# Creating model
nb = NaiveBayes()

In [14]:
# Importing pipeline
from pyspark.ml import Pipeline

In [15]:
# Creating pipeline of steps to execute
pipeline = Pipeline(stages=[
  ham_spam_to_numeric,
  tokenizer,
  stopwords,
  cv,
  idf,
  cleanup
])

In [16]:
# Fitting data into pipeline
data_processed = pipeline.fit(data).transform(data)

In [17]:
# Check the output
data_processed.show(5)

In [18]:
# Filtering out only needed columns
data_processed = data_processed.select(['label', 'features'])

In [19]:
# Showing final data
data_processed.show(5)

In [20]:
# Splitting data
(train_set, test_set) = data_processed.randomSplit([0.8, 0.2])

In [21]:
# Training model with train data
spam_model = nb.fit(train_set)

In [22]:
# Predicting SPAM x HAM
spam_ham_preds = spam_model.transform(test_set)

In [23]:
spam_ham_preds.show(5)

In [24]:
# Importing evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [25]:
# Evaluating model
accuracy = MulticlassClassificationEvaluator().evaluate(spam_ham_preds)

In [26]:
# Model accuraccy
print("Spam detector's accuracy: {}".format(accuracy))