In [None]:
import nltk
nltk.download('stopwords')

In [None]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("sms-spam-classifier")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000/")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    .getOrCreate()
)

In [None]:
df = spark.read.parquet("s3a://delta/")
df.show()

In [None]:
import mlflow
from datetime import datetime

mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.pyspark.ml.autolog()
mlflow.create_experiment("sms-classifier-" + datetime.now().strftime("%Y-%m-%d_%H:%M:%S"))
mlflow.start_run()

In [None]:
from pyspark.ml.feature import (
    CountVectorizer, StringIndexer, VectorAssembler, Tokenizer, RegexTokenizer, StopWordsRemover)
from nltk.corpus import stopwords

stages = []
regexTokenizer = RegexTokenizer(inputCol="sms", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

STOPWORDS = stopwords.words('english') + ['u', 'ü', 'ur', '4', '2', 'im', 'dont', 'doin', 'ure']
remover = StopWordsRemover(stopWords=STOPWORDS, inputCol="tokens", outputCol="real_tokens")
stages += [remover]

cv = CountVectorizer(inputCol="real_tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

indexer = StringIndexer(inputCol="label", outputCol="label_num")
stages += [indexer]

vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

for stage in stages:
    print(stage)

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
data = pipeline.fit(df).transform(df)

In [None]:
train, test = data.randomSplit([0.75, 0.25], seed = 42)

train_s  = train.where('is_spam').count()
train_ns = train.where('not is_spam').count()
test_s   = test.where('is_spam').count()
test_ns  = test.where('not is_spam').count()
print(f'''
      Train spam:  {train_s}
      Train ham:   {train_ns}
      Train ratio: {train_s / train_ns} (spam/ham)
      
      Test spam:  {test_s}
      Test ham:   {test_ns}
      Test ratio: {test_s / test_ns} (spam/ham)
      ''')

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="label_num")
model = nb.fit(train)

In [None]:
predictions = model.transform(test)
predictions.select("label_num", "prediction", "probability").show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label_num")
accuracy = evaluator.evaluate(predictions)
print ("Test Area Under ROC: ", accuracy)

In [None]:
model_info = mlflow.spark.log_model(model, "naive-bayes")
print(model_info)

In [None]:
mlflow.end_run()