<a href="https://colab.research.google.com/github/melodyrous/MIASHS2/blob/main/donnees_massives/spark/Spark_-_machine_learning_pipeline_SPAM_detector.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importing Spark

In [23]:
!pip install pyspark

import pyspark

from pyspark import SparkContext
from pyspark.sql import SparkSession

SparkContext.setSystemProperty('spark.executor.memory', '8g')
SparkContext.setSystemProperty('spark.driver.memory', '45G')

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("Python Spark").getOrCreate()



# Loading and preparing the data

In [26]:
from pyspark.sql.functions import monotonically_increasing_id

# loading and constructing headers
# load : data/spam.csv
df_spam = spark.read\
    .option("delimiter", ",")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .csv('spam.csv')
df_spam = df_spam.withColumn("id", monotonically_increasing_id())
df_spam.show(5)

+----+--------------------+----+----+----+---+
|  v1|                  v2| _c2| _c3| _c4| id|
+----+--------------------+----+----+----+---+
| ham|Go until jurong p...|null|null|null|  0|
| ham|Ok lar... Joking ...|null|null|null|  1|
|spam|Free entry in 2 a...|null|null|null|  2|
| ham|U dun say so earl...|null|null|null|  3|
| ham|Nah I don't think...|null|null|null|  4|
+----+--------------------+----+----+----+---+
only showing top 5 rows



In [27]:
df_spam.printSchema()

root
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- id: long (nullable = false)



In [None]:
df_spam.na.drop()

In [28]:
df_spam.columns

['v1', 'v2', '_c2', '_c3', '_c4', 'id']

In [36]:
# creating the labels as 0 and 1 instead of strings
# and clean sms txt
from pyspark.sql.functions import udf, col, upper

# 0 if not SPAM and 1 if SPAM
@udf
def categorize(x):
  v = None
  if x == "spam":
    v = 1
  else:
    v = 0
  return v

remove_quotes = udf(lambda x: x.replace('"',''))

df_spam = df_spam.withColumn("label", categorize(col("v1")).cast('float'))

remove_quotes = udf(lambda x: x.replace('"', '').replace("'", "") if x is not None else "")

df_spam = df_spam.withColumn("sms", remove_quotes(df_spam["v2"]))
# clean text messages

In [37]:
df_spam.show()

+----+--------------------+----+----+----+---+-----+--------------------+
|  v1|                  v2| _c2| _c3| _c4| id|label|                 sms|
+----+--------------------+----+----+----+---+-----+--------------------+
| ham|Go until jurong p...|null|null|null|  0|  0.0|Go until jurong p...|
| ham|Ok lar... Joking ...|null|null|null|  1|  0.0|Ok lar... Joking ...|
|spam|Free entry in 2 a...|null|null|null|  2|  1.0|Free entry in 2 a...|
| ham|U dun say so earl...|null|null|null|  3|  0.0|U dun say so earl...|
| ham|Nah I don't think...|null|null|null|  4|  0.0|Nah I dont think ...|
|spam|FreeMsg Hey there...|null|null|null|  5|  1.0|FreeMsg Hey there...|
| ham|Even my brother i...|null|null|null|  6|  0.0|Even my brother i...|
| ham|As per your reque...|null|null|null|  7|  0.0|As per your reque...|
|spam|WINNER!! As a val...|null|null|null|  8|  1.0|WINNER!! As a val...|
|spam|Had your mobile 1...|null|null|null|  9|  1.0|Had your mobile 1...|
| ham|I'm gonna be home...|null|null|n

# Train test split

In [51]:
print(df_spam.count())
test_sample = df_spam.sample(fraction=.2)
print(sample.count())
train_sample = df_spam.join(sample, on="id", how='anti')
print(train_sample.count())


5574
1111
4463


# Constructing pipeline

In [72]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# tokenizer

tokenizer = Tokenizer(outputCol="words")
tokenizer.setInputCol("sms")
# df_spam = tokenizer.transform(df_spam)

# df_spam.show()
# # term frequency
cv = CountVectorizer()
cv.setInputCol("words")
cv.setOutputCol("vectors")
# model = cv.fit(df_spam)
# df_spam = model.transform(df_spam)
# df_spam.show()


# logistic regression
lr = LogisticRegression(featuresCol='vectors', labelCol='label', predictionCol='prediction')
# lr_model = lr.fit(df_spam)
# df_spam = lr_model.transform(df_spam)
df_spam.show()
# in a pipeline


+----+--------------------+----+----+----+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  v1|                  v2| _c2| _c3| _c4| id|label|                 sms|               words|             vectors|       rawPrediction|         probability|prediction|
+----+--------------------+----+----+----+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| ham|Go until jurong p...|null|null|null|  0|  0.0|Go until jurong p...|[go, until, juron...|(13343,[8,44,52,6...|[29.4945498874911...|[0.99999999999984...|       0.0|
| ham|Ok lar... Joking ...|null|null|null|  1|  0.0|Ok lar... Joking ...|[ok, lar..., joki...|(13343,[5,75,401,...|[31.4324390819105...|[0.99999999999997...|       0.0|
|spam|Free entry in 2 a...|null|null|null|  2|  1.0|Free entry in 2 a...|[free, entry, in,...|(13343,[0,3,8,22,...|[-23.132843274511...|[8.98532551535158..

In [64]:
tokenizer = Tokenizer(outputCol="words", inputCol="sms")
cv = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="vectors")
lr = LogisticRegression(featuresCol=cv.getOutputCol(), labelCol='label', predictionCol='prediction')
pipeline = Pipeline(stages=[tokenizer, cv, lr])

In [73]:
tokenizer = Tokenizer(outputCol="words", inputCol="sms")
cv = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="vectors")
lr = LogisticRegression(featuresCol=cv.getOutputCol(), labelCol='label', predictionCol='prediction')
pipeline = Pipeline(stages=[tokenizer, cv, lr])

# Fitting the model

In [74]:
model = pipeline.fit(train_sample)

# Evaluation of the model

In [75]:
test_sample = model.transform(test_sample)

IllegalArgumentException: ignored

In [69]:
test_sample.show()

+----+--------------------+----+----+----+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  v1|                  v2| _c2| _c3| _c4| id|label|                 sms|               words|             vectors|       rawPrediction|         probability|prediction|
+----+--------------------+----+----+----+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| ham|So �_ pay first l...|null|null|null| 22|  0.0|So �_ pay first l...|[so, �_, pay, fir...|(11679,[8,25,41,6...|[22.4166146658587...|[0.99999999981609...|       0.0|
| ham|I see the letter ...|null|null|null| 37|  0.0|I see the letter ...|[i, see, the, let...|(11679,[1,4,9,16,...|[19.5155961878014...|[0.99999999665432...|       0.0|
| ham|Hello! How's you ...|null|null|null| 39|  0.0|Hello! Hows you a...|[hello!, hows, yo...|(11679,[0,1,2,6,1...|[48.9622260932013...|           [1.0,0.0

In [71]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol("prediction")
evaluator.setLabelCol("label")

evaluator.evaluate(test_sample)

evaluator.evaluate(test_sample, {evaluator.metricName: "areaUnderPR"})

0.9729843478332185