In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):

        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [5]:
import pandas as pd

In [7]:
df =pd.read_csv("reviews.csv", sep=',')
df = df[['review_id', 'app_id', 'review_text', 'label']]
df.to_csv('reviews.csv', sep='\t', index=False)
df.shape

(2088, 4)

In [8]:
df.groupby('label').count()

Unnamed: 0_level_0,review_id,app_id,review_text
label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0.0,71,71,71
1.0,394,394,394


In [9]:
df.label.mean()

0.8473118279569892

# build model

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, HashingTF
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import expr
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.sql.functions import split
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

file_location="reviews.csv"
#text_df = spark.read.text(file_location)

In [11]:
# create a SparkSession 
spark = SparkSession.builder \
    .appName("assignment 3") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.cores", "2") \
    .getOrCreate()

In [49]:
#text_df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine", "true").csv(file_location)

# read csv

text_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiline", "true") \
    .option("delimiter", "\t") \
    .csv(file_location)

text_df = text_df.select(col('review_text'), col('label'))
text_df = text_df.dropna()
text_df.show()

+--------------------+-----+
|         review_text|label|
+--------------------+-----+
|i can confirm tha...|  1.0|
|Really good game,...|  1.0|
|Its not finished ...|  1.0|
|Hey. It's really ...|  1.0|
|          Fun so far|  1.0|
|I'd rather play W...|  0.0|
|I have been playi...|  1.0|
|Nice game!\nLoads...|  1.0|
|All hail NA serve...|  1.0|
|enjoying it so fa...|  1.0|
|This game came as...|  1.0|
|        Great fun \n|  1.0|
|Fantastic consept...|  1.0|
|You know the game...|  1.0|
|EDIT: My issues h...|  1.0|
|AMAZIIIIIIIIIIING...|  1.0|
|In its current st...|  0.0|
|Играю в Калибр го...|  1.0|
|Secret Word: Prou...|  1.0|
|Fated Word: Death...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [13]:
# Get the number of rows and columns in the DataFrame
num_rows = text_df.count()
num_cols = len(text_df.columns)

# Print the shape of the DataFrame
print("Shape of the DataFrame: (%d, %d)" % (num_rows, num_cols))

Shape of the DataFrame: (433, 2)


In [14]:
# Remove duplicate rows
text_df = text_df.dropDuplicates()
# Remove rows with missing values
text_df = text_df.na.drop()

In [15]:
text_df.select("label").distinct().show()

+-----+
|label|
+-----+
|  0.0|
|  1.0|
+-----+



In [16]:
# Remove rows with unexpected labels
text_df = text_df.filter((col("label") == 1.0) | (col("label") == 0.0))
print("Number of rows after filtering: ", text_df.count())

Number of rows after filtering:  433


In [17]:
train_data, test_data = text_df.limit(10000).randomSplit([0.8, 0.2], seed=7)
train_data.show(5)

+--------------------+-----+
|         review_text|label|
+--------------------+-----+
| Similar to Battl...|  1.0|
|"(BTW I have a bi...|  0.0|
|"Could be an inte...|  1.0|
|"Fun action rogue...|  1.0|
|"I buy a few new ...|  1.0|
+--------------------+-----+
only showing top 5 rows



In [18]:
# preprocess the data
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", locale="en_US")
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")
string_indexer = StringIndexer(inputCol="label", outputCol="label_index")

# create model
lr = LogisticRegression(featuresCol="features", labelCol="label_index")

In [19]:
# define params grid
param_grid = ParamGridBuilder() \
   .addGrid(count_vectorizer.vocabSize, [1000, 5000]) \
   .addGrid(lr.regParam, [0.01, 0.1]) \
   .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
   .build()

# define the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction")

In [34]:
# create pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf, string_indexer, lr])

# define the cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# fit pipeline to the training data
cv_model = cv.fit(train_data)

# make predictions on the test data
predictions = cv_model.transform(test_data)

In [38]:
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction")

# Calculate accuracy
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

# Calculate F1 score
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Calculate recall
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Calculate precision
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})

print("Accuracy: {:.4f}".format(accuracy))
print("F1 score: {:.4f}".format(f1_score))
print("Recall: {:.4f}".format(recall))
print("Precision: {:.4f}".format(precision))


Accuracy: 0.8352
F1 score: 0.8076
Recall: 0.8352
Precision: 0.8069


In [39]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert the predictions and labels to an RDD
predictionAndLabels = predictions.select("prediction", "label_index").rdd.map(lambda r: (r[0], r[1]))

# Instantiate a MulticlassMetrics object
metrics = MulticlassMetrics(predictionAndLabels)

# Get the confusion matrix as a NumPy array
confusion_matrix = metrics.confusionMatrix().toArray()

# Print the confusion matrix
print("Confusion matrix:")
print(confusion_matrix)

Confusion matrix:
[[72.  3.]
 [12.  4.]]


In [23]:
# Get the best model from the cross-validation process
best_model = cv_model.bestModel

# Save the my_model
best_model.save("my_logistic_regression")


NameError: name 'cv_model' is not defined

In [34]:
from pyspark.ml import PipelineModel
globals()['models_loaded'] = False
globals()['my_model'] = None

# Toy predict function that returns a random probability. Normally you'd use your loaded globals()['my_model'] here
# def predict(df):
#     return random.random()

# predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df = df.withColumn("label", col("label").cast("float"))
    df.show()
    
#     # Utilize our predict function
#     df_withpreds = df.withColumn("pred", predict_udf(
#         struct([df[x] for x in df.columns])
#     ))
#     df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model (uncomment below):
    
    df_result = globals()['my_model'].transform(df)
    df_result.select('label', 'review_text', 'prediction', 'label_index').show()

In [35]:
ssc = StreamingContext(sc, 10)

In [36]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [37]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2311190|  1.0|138696311|SO EXITED FOR GAR...|
|1375900|  1.0|138695647|               great|
|1742020|  1.0|138698127|                 The|
+-------+-----+---------+--------------------+

+-----+--------------------+----------+-----------+
|label|         review_text|prediction|label_index|
+-----+--------------------+----------+-----------+
|  1.0|SO EXITED FOR GAR...|       0.0|        0.0|
|  1.0|               great|       0.0|        0.0|
|  1.0|                 The|       0.0|        0.0|
+-----+--------------------+----------+-----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1557990|  1.0|138694719|i swear. i made s...|
|1940340|  0.0|138697687|- Restart the run...|
|1940340|  1.0|138694794|Morrer na primeir...|
+-------+-----+--------

+-----+--------------------+----------+-----------+
|label|         review_text|prediction|label_index|
+-----+--------------------+----------+-----------+
|  1.0|runs fine for a v...|       0.0|        0.0|
+-----+--------------------+----------+-----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2016280|  1.0|138698281|Well this is rath...|
|1934780|  0.0|138699254|3rd person shoote...|
|1934780|  1.0|138699202|Good Game! But fo...|
|1934780|  0.0|138699092|The game is okay....|
+-------+-----+---------+--------------------+

+-----+--------------------+----------+-----------+
|label|         review_text|prediction|label_index|
+-----+--------------------+----------+-----------+
|  1.0|Well this is rath...|       0.0|        0.0|
|  0.0|3rd person shoote...|       0.0|        1.0|
|  1.0|Good Game! But fo...|       0.0|        0.0|
|  0.0|The game is okay....|       0.0|        1.0

+-----+--------------------+----------+-----------+
|label|         review_text|prediction|label_index|
+-----+--------------------+----------+-----------+
|  1.0|This game has ama...|       0.0|        0.0|
|  1.0|Couple bugs aside...|       0.0|        0.0|
|  0.0|Short Pros/Cons:\...|       0.0|        1.0|
+-----+--------------------+----------+-----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2392070|  1.0|138695819|This casual game ...|
+-------+-----+---------+--------------------+

+-----+--------------------+----------+-----------+
|label|         review_text|prediction|label_index|
+-----+--------------------+----------+-----------+
|  1.0|This casual game ...|       1.0|        0.0|
+-----+--------------------+----------+-----------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+-------------------

In [33]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2311190|  1.0|138696311|SO EXITED FOR GAR...|
+-------+-----+---------+--------------------+

+-----+--------------------+----------+-----------+
|label|         review_text|prediction|label_index|
+-----+--------------------+----------+-----------+
|  1.0|SO EXITED FOR GAR...|       0.0|        0.0|
+-----+--------------------+----------+-----------+

