In [96]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

from pyspark.ml.feature import Tokenizer, RegexTokenizer, NGram, HashingTF, ChiSqSelector, VectorAssembler

from pyspark.ml.classification import LogisticRegression

In [2]:
spark = SparkSession.builder.appName("Tokenizer Application").master("local[2]").getOrCreate()
sc = spark.sparkContext

25/02/05 11:32:05 WARN Utils: Your hostname, Globals-MacBook-Pro.local resolves to a loopback address: 127.94.0.1; using 172.20.10.2 instead (on interface en7)
25/02/05 11:32:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/05 11:32:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [55]:
df_pos = spark.read.text("aclImdb/train/pos/100*.txt").withColumnRenamed("value", "raw")
df_pos = df_pos.withColumn("sentiment", lit(1))
df_neg = spark.read.text("aclImdb/train/neg/100*.txt").withColumnRenamed("value", "raw")
df_neg = df_pos.withColumn("sentiment", lit(0))

df = df_pos.union(df_neg)

                                                                                

In [58]:
df.show()

+--------------------+---------+
|                 raw|sentiment|
+--------------------+---------+
|Titanic directed ...|        1|
|Back in 1997, do ...|        1|
|After a brief pro...|        1|
|James Cameron's '...|        1|
|ELEPHANT WALK may...|        1|
|When it comes to ...|        1|
|I avoided watchin...|        1|
|Every once in a w...|        1|
|Titanic has to be...|        1|
|The anime that go...|        1|
|THE NIGHT LISTENE...|        1|
|Previously, I wro...|        1|
|I find it so amaz...|        1|
|Homelessness (or ...|        1|
|What's inexplicab...|        1|
|Scott Bartlett's ...|        1|
|In this "critical...|        1|
|Titanic is a long...|        1|
|I think James Cam...|        1|
|I admit to being ...|        1|
+--------------------+---------+
only showing top 20 rows



In [64]:
tokenizer = Tokenizer(inputCol="raw",outputCol="bow")
reg_tokenizer = RegexTokenizer(inputCol="raw", outputCol="bow", pattern="\\W+")

In [75]:
df = tokenizer.transform(df) 
df.show(3)

+--------------------+---------+--------------------+
|                 raw|sentiment|                 bow|
+--------------------+---------+--------------------+
|Titanic directed ...|        1|[titanic, directe...|
|Back in 1997, do ...|        1|[back, in, 1997,,...|
|After a brief pro...|        1|[after, a, brief,...|
+--------------------+---------+--------------------+
only showing top 3 rows



In [69]:
x_unigram = NGram(n=1, inputCol="bow", outputCol="unigram")
x_bigram = NGram(n=2, inputCol="bow", outputCol="bigram")

In [76]:
df = x_unigram.transform(df)
df = x_bigram.transform(df)

In [78]:
df.show(3)

+--------------------+---------+--------------------+--------------------+--------------------+
|                 raw|sentiment|                 bow|             unigram|              bigram|
+--------------------+---------+--------------------+--------------------+--------------------+
|Titanic directed ...|        1|[titanic, directe...|[titanic, directe...|[titanic directed...|
|Back in 1997, do ...|        1|[back, in, 1997,,...|[back, in, 1997,,...|[back in, in 1997...|
|After a brief pro...|        1|[after, a, brief,...|[after, a, brief,...|[after a, a brief...|
+--------------------+---------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [81]:
x_unigram_tf = HashingTF(inputCol="unigram",binary=True, outputCol="unigramTF")
x_bigram_tf = HashingTF(inputCol="bigram",binary=True, outputCol="bigramTF")

In [83]:
df = x_unigram_tf.transform(df)
df = x_bigram_tf.transform(df)

In [85]:
df.show(3)

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 raw|sentiment|                 bow|             unigram|              bigram|           unigramTF|            bigramTF|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Titanic directed ...|        1|[titanic, directe...|[titanic, directe...|[titanic directed...|(262144,[1141,157...|(262144,[312,571,...|
|Back in 1997, do ...|        1|[back, in, 1997,,...|[back, in, 1997,,...|[back in, in 1997...|(262144,[991,1603...|(262144,[207,306,...|
|After a brief pro...|        1|[after, a, brief,...|[after, a, brief,...|[after a, a brief...|(262144,[896,1797...|(262144,[106,236,...|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [87]:
uni10K = ChiSqSelector(numTopFeatures=10_000, outputCol="uni10K", featuresCol="unigramTF", labelCol="sentiment")
bi1K = ChiSqSelector(numTopFeatures=1_000, outputCol="bi1K", featuresCol="bigramTF", labelCol="sentiment")

In [89]:
df = uni10K.fit(df).transform(df)
df = bi1K.fit(df).transform(df)

                                                                                

In [91]:
df.show(3)

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                 raw|sentiment|                 bow|             unigram|              bigram|           unigramTF|            bigramTF|              uni10K|                bi1K|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Titanic directed ...|        1|[titanic, directe...|[titanic, directe...|[titanic directed...|(262144,[1141,157...|(262144,[312,571,...|(10000,[1141,1578...|(1000,[312,571,64...|
|Back in 1997, do ...|        1|[back, in, 1997,,...|[back, in, 1997,,...|[back in, in 1997...|(262144,[991,1603...|(262144,[207,306,...|(10000,[991,1603,...|(1000,[207,306,38...|
|After a brief pro...|        1|[after, a, brief,...|[after, a, brief,...|[after a, a brief...|(2621

In [93]:
assembler = VectorAssembler(inputCols=["uni10K","bi1K"], outputCol="features")

In [97]:
df = assembler.transform(df)

In [99]:
lr = LogisticRegression(labelCol="sentiment",predictionCol="sentiment_pred")

In [102]:
lr.fit(df).transform(df).show()

                                                                                

+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------+--------------+
|                 raw|sentiment|                 bow|             unigram|              bigram|           unigramTF|            bigramTF|              uni10K|                bi1K|            features|rawPrediction|probability|sentiment_pred|
+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------+--------------+
|Titanic directed ...|        1|[titanic, directe...|[titanic, directe...|[titanic directed...|(262144,[1141,157...|(262144,[312,571,...|(10000,[1141,1578...|(1000,[312,571,64...|(11000,[1141,1578...|   [-0.0,0.0]|  [0.5,0.5]|           0.0|
|Back in 1997, do ...|        1|

In [88]:
from pyspark.ml.pipeline import Pipeline

In [None]:
ppl = Pipeline(stages=[tokenizer,x_unigram,x_bigram,x_unigram_tf,x_bigram_tf,uni10K,bi1K,assembler,lr])