# NLP - code along

In [1]:
import findspark
findspark.init("/home/rodolfo/spark-3.3.1-bin-hadoop3")
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("nlp").getOrCreate()

23/01/02 19:53:04 WARN Utils: Your hostname, rodolfo-300E5M-300E5L resolves to a loopback address: 127.0.1.1; using 192.168.15.11 instead (on interface wlp3s0)
23/01/02 19:53:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/02 19:53:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/02 19:53:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/01/02 19:53:06 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
df = spark.read.csv("../../data/smsspamcollection/SMSSpamCollection", inferSchema=True, sep="\t")

                                                                                

In [4]:
df.show(5)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows



In [6]:
df = (
    df
    .withColumnRenamed("_c0", "class")
    .withColumnRenamed("_c1", "text")
)

In [7]:
df.show(5)

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import length

In [9]:
df = (
    df
    .withColumn("length", length(df["text"]))
)

In [10]:
df.show(5)

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
+-----+--------------------+------+
only showing top 5 rows



In [11]:
df.groupBy("class").mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [12]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [22]:
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stop_remove = StopWordsRemover(inputCol="token_text", outputCol="stop_token")
count_vec = CountVectorizer(inputCol="stop_token", outputCol="c_vec")
idf = IDF(inputCol="c_vec", outputCol="tf_idf")
ham_spam_numeric = StringIndexer(inputCol="class", outputCol="label")

In [14]:
from pyspark.ml.feature import VectorAssembler

In [29]:
clean_up = VectorAssembler(
    inputCols=["tf_idf", "length"],
    outputCol="features"
)

In [30]:
from pyspark.ml.classification import NaiveBayes

In [31]:
nb = NaiveBayes()

In [32]:
from pyspark.ml import Pipeline

In [33]:
pipeline = Pipeline(stages=[
    ham_spam_numeric,
    tokenizer,
    stop_remove,
    count_vec,
    idf,
    clean_up
]).fit(df)

In [34]:
df_preprocessed = pipeline.transform(df)

In [35]:
df_preprocessed.show(5)

+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|class|                text|length|label|          token_text|          stop_token|               c_vec|              tf_idf|            features|
+-----+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|  ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
|  ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
| spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
|  ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,[0,70,8

In [37]:
df_select = df_preprocessed.select("label", "features")

In [38]:
train, test = df_select.randomSplit([0.7, 0.3])

In [39]:
spam_detector = nb.fit(train)

23/01/02 20:03:39 WARN DAGScheduler: Broadcasting large task binary with size 1144.4 KiB


[Stage 35:>                                                         (0 + 1) / 1]

23/01/02 20:03:40 WARN DAGScheduler: Broadcasting large task binary with size 1134.1 KiB


                                                                                

In [40]:
test_results = spam_detector.transform(test)

In [41]:
test_results.show(5)

23/01/02 20:04:10 WARN DAGScheduler: Broadcasting large task binary with size 1366.7 KiB
+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,5,15,...|[-997.97881342495...|[1.0,7.9012473170...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-805.87438683763...|[1.0,3.6147041721...|       0.0|
|  0.0|(13424,[0,1,7,8,1...|[-879.05311206593...|[1.0,2.1161967044...|       0.0|
|  0.0|(13424,[0,1,14,18...|[-1385.2679373540...|[1.0,2.1939350822...|       0.0|
|  0.0|(13424,[0,1,18,20...|[-853.37406398631...|[1.0,2.7522191456...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



                                                                                

In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [43]:
acc_eval = MulticlassClassificationEvaluator()

In [44]:
acc = acc_eval.evaluate(test_results)

23/01/02 20:04:39 WARN DAGScheduler: Broadcasting large task binary with size 1371.2 KiB


                                                                                

In [45]:
print(f"Acc: {acc:.4f}")

Acc: 0.9217
