In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 56.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=acb08f9f38acd8c08a5bc37381c4202b4e68636508ec543bdffc76ea9879f5bb
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("yelp-NLP").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-online/v2/module_17/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [None]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [None]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [None]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [None]:
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [None]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [None]:
data_prep_pipeline

Pipeline_dee192fa70c8

In [None]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [None]:
cleaned.show()

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|positive|Wow... Loved this...|    24|  1.0|[wow..., loved, t...|[wow..., loved, p...|(262144,[177414,2...|(262144,[177414,2...|(262145,[177414,2...|
|negative|  Crust is not good.|    18|  0.0|[crust, is, not, ...|      [crust, good.]|(262144,[49815,23...|(262144,[49815,23...|(262145,[49815,23...|
|negative|Not tasty and the...|    41|  0.0|[not, tasty, and,...|[tasty, texture, ...|(262144,[109649,1...|(262144,[109649,1...|(262145,[109649,1...|
|positive|Stopped by during...|    87|  1.0|[stopped, by, dur...|[stopped, late, m...|(262144,[53101

In [None]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [None]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [None]:
test_results = predictor.transform(testing)
test_results.show()

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
acc_eval = BinaryClassificationEvaluator(labelCol='label' , rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print("Accuracy of the model at predicting reviews was: %f" % acc)

Accuracy of the model at predicting reviews was: 0.700298
