In [9]:
# Setup Spark Session
# Spark Session Name: Assingment07
from pyspark.sql import SparkSession

# Get Imports Needed
from pyspark.sql.functions import col, udf

# Get Datatypes needed for DataFrame manipulation
from pyspark.sql.types import IntegerType, StringType

# Setup Spark Session
sc = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("Assignment07") \
        .getOrCreate()

# Print Spark Version being run
print("Spark V: ", sc.version)

Spark V:  3.3.2


In [10]:
# Get the Sentiment Value (0/1) from the line and return as an Integer
def GetSentiment(line):
    res = line.strip()[-1]
    return int(res)
get_sentiment = udf(lambda q : GetSentiment(q), IntegerType())

# Get the Sentence by removing the last charater and then cleaning it up
# Returing the sentence as lower case string
def GetSentence(line):
    res = line.strip()[:-1].strip()
    return res.lower()
get_sentence = udf(lambda q : GetSentence(q), StringType())

In [11]:
# Read the three (3) dataset files into dataframes
df1 = sc.read.text("datasets/amazon_cells_labelled.txt")
df2 = sc.read.text("datasets/imdb_labelled.txt")
df3 = sc.read.text("datasets/yelp_labelled.txt")

# Concatenate the dataframe df1, df2, and df3 with eachoather to form a "df"
df = df1.union(df2).union(df3)

# Cleanup Data and set two columns (sentence & sentiment) int eh dataframe
df = df.withColumn("label", get_sentiment(col("value")))
df = df.withColumn("sentence", get_sentence(col("value")))

# remove the inital "value" column from the dataframe
df = df.drop("value")
df.printSchema()

root
 |-- label: integer (nullable = true)
 |-- sentence: string (nullable = true)



In [12]:
# Setup MLlib Features
from pyspark.ml.feature import IDF, StopWordsRemover, Tokenizer, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# Setting up Transformations
# Use the Tokenizer to conver the sentence into a word ARRAY / LIST
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

# The Count Vectorizer converts the words from the Tokenizer to Vectors
countVectorizer = CountVectorizer(inputCol="words", outputCol="rawFeatures")

# IDF is the Inverse Document Frequency - It essentaill sets the imporatnce of the word
# IDF is an Estimator which is fit on a dataset and produces an IDF Model
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Setting up the Pipeline for Transformation
# This is useful so I can re-use it on my own example data later on in the example
pipeline = Pipeline(stages=[tokenizer, countVectorizer, idf])

# Let the Pipeline learn from the main datasets previoulsy created
pipelineModel = pipeline.fit(df)




# The below is just an example of what happens to the dataframe -> This will not be used later on
# This just helps understand how the columns get added to teh dataframe
print("This is Dataframe: 'df'")
df.show(2)

dfWords = tokenizer.transform(df)
print("This is Dataframe: 'dfWords'")
dfWords.show(2)

model = countVectorizer.fit(dfWords)
dfRawFeatures = model.transform(dfWords)
print("This is Dataframe: 'dfRawFeatures'")
dfRawFeatures.show(2)

model = idf.fit(dfRawFeatures)
dfFeatures = model.transform(dfRawFeatures)
print("This is Dataframe: 'dfFeatures'")
dfFeatures.show(2)


This is Dataframe: 'df'
+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|    0|so there is no wa...|
|    1|good case, excell...|
+-----+--------------------+
only showing top 2 rows

This is Dataframe: 'dfWords'
+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|so there is no wa...|[so, there, is, n...|
|    1|good case, excell...|[good, case,, exc...|
+-----+--------------------+--------------------+
only showing top 2 rows

This is Dataframe: 'dfRawFeatures'
+-----+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rawFeatures|
+-----+--------------------+--------------------+--------------------+
|    0|so there is no wa...|[so, there, is, n...|(7354,[0,2,3,4,5,...|
|    1|good case, excell...|[good, case,, exc...|(7354,[23,105,797...|
+-----+--------------------+-----

In [13]:
# Split Dataframe for Trainign and Testing
# I opted for a 80 / 20 Split
# Using the Random Split I get two arrays each with their respective number of items
seed = 77
dfTrain, dfTest = df.randomSplit([0.8, 0.2], seed)

print("Data Split into Training:", dfTrain.count(), "and Test:", dfTest.count())

Data Split into Training: 2372 and Test: 628


In [14]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

# Using LR for Learing and Best fit
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# Building the Parameter for the Cross Validator
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, np.linspace(0.01, 0.3, 10)) \
    .addGrid(lr.elasticNetParam, np.linspace(0.3, 0.8, 6)) \
    .build()

# Build and Learning Setup for Cross Validation
crossval_lr = CrossValidator(
                  estimator = lr,
                  estimatorParamMaps = paramGrid_lr,
                  evaluator = BinaryClassificationEvaluator()) 

# Let the LR learn from the Training Dataframe
# Note: I initally pass the dataframe to our Pipeline Model for Transformation
cvModel_lr = crossval_lr.fit(pipelineModel.transform(dfTrain))

# Transform the Dataframe on Our LR Learning Function
predictions_lr = cvModel_lr.transform(pipelineModel.transform(dfTest))

# Show the Results of the Prediction vs. Labels (supplied in the intial data)
predictions_lr.groupBy("label", "prediction").count().show()

# Use the Binary Classigicaiton Evaluator (Simple since we only have 1 / 0) to 
# show and compare "labels" to the "predictions"
my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print("Estimated Prediction:", my_eval_lr.evaluate(predictions_lr))

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   63|
|    0|       0.0|  248|
|    1|       1.0|  248|
|    0|       1.0|   69|
+-----+----------+-----+

Estimated Prediction: 0.7898810187955816


In [15]:
# Testing a totally New Dataset created by Richard Hoehn as test to see how well the model works
# This was just for fun to see how it might work using the LR Model later on when gettting more
# data to test with.
dfRichard = sc.createDataFrame(
    [
        (0, "Oh this is really bad i hate coming to work"),
        (1, "I love this class!"),
        (1, "I would like to do more of this stuff"),
        (0, "Waste of my time"),
        (1, "Going to school is great")
    ], [
        "label", "sentence"
    ]
)

# We will use Richard's made up example to see how well it did
predictions_lr = cvModel_lr.transform(pipelineModel.transform(dfRichard))
predictions_lr.groupBy('label','prediction').count().show()
predictions_lr.select("sentence", "label", "prediction").show(10, truncate=False)

my_eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
print("Estimated Prediction:", my_eval_lr.evaluate(predictions_lr))

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    0|       0.0|    2|
|    1|       1.0|    2|
|    1|       0.0|    1|
+-----+----------+-----+

+-------------------------------------------+-----+----------+
|sentence                                   |label|prediction|
+-------------------------------------------+-----+----------+
|Oh this is really bad i hate coming to work|0    |0.0       |
|I love this class!                         |1    |1.0       |
|I would like to do more of this stuff      |1    |0.0       |
|Waste of my time                           |0    |0.0       |
|Going to school is great                   |1    |1.0       |
+-------------------------------------------+-----+----------+

Estimated Prediction: 0.8333333333333333
