<a href="https://colab.research.google.com/github/jafinn14/Pandas-Challenge/blob/master/naive_review.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [0]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [0]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [0]:
from sklearn.datasets import fetch_20newsgroups
twenty_train = fetch_20newsgroups(subset='train', shuffle=True)

Downloading 20news dataset. This may take a few minutes.
Downloading dataset from https://ndownloader.figshare.com/files/5975967 (14 MB)
Exception ignored in: <bound method JavaModelWrapper.__del__ of <pyspark.mllib.evaluation.BinaryClassificationMetrics object at 0x7fda25298128>>
Traceback (most recent call last):
  File "/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/mllib/common.py", line 142, in __del__
    self._sc._gateway.detach(self._java_model)
AttributeError: 'BinaryClassificationMetrics' object has no attribute '_sc'
Exception ignored in: <bound method JavaModelWrapper.__del__ of <pyspark.mllib.evaluation.BinaryClassificationMetrics object at 0x7fda0f6b04e0>>
Traceback (most recent call last):
  File "/content/spark-2.4.5-bin-hadoop2.7/python/pyspark/mllib/common.py", line 142, in __del__
    self._sc._gateway.detach(self._java_model)
AttributeError: 'BinaryClassificationMetrics' object has no attribute '_sc'
Exception ignored in: <bound method JavaModelWrapper.__del__ of

In [0]:
# twenty_train

In [0]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



### Feature Transformations


In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [0]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [0]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[33933,69...|
|  1.0|(262145,[150903,1...|
|  1.0|(262145,[63367,11...|
|  0.0|(262145,[6286,272...|
|  0.0|(262145,[6979,911...|
|  1.0|(262145,[24661,34...|
|  1.0|(262145,[101702,2...|
|  1.0|(262145,[3645,855...|
|  0.0|(262145,[53777,13...|
|  0.0|(262145,[138356,2...|
|  0.0|(262145,[24113,20...|
|  1.0|(262145,[172477,1...|
|  1.0|(262145,[36200,40...|
|  0.0|(262145,[18098,83...|
|  1.0|(262145,[89493,95...|
|  1.0|(262145,[86431,10...|
|  0.0|(262145,[31704,21...|
|  1.0|(262145,[27707,65...|
|  1.0|(262145,[12329,61...|
|  1.0|(262145,[8287,208...|
+-----+--------------------+
only showing top 20 rows



In [0]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [0]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|!....THE OWNERS R...|   120|  1.0|[!....the, owners...|[!....the, owners...|(262144,[14,12946...|(262144,[14,12946...|(262145,[14,12946...|[-1282.7673822704...|[0.99661912345368...|       0.0|
|negative|              #NAME?|     6|  1.0|            [#name?]|            [#name?]|(262144,[75443],[...|(262144,[75443],[...|(262145,[75443,26...|[-70.495133606766...|[0.95144457540563.

In [0]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
model = nb.fit(training)

In [0]:
# Make predictions.
predictions = model.transform(testing)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy of model at predicting reviews was: %f" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  1.0|(262145,[14,12946...|
|       0.0|  1.0|(262145,[75443,26...|
|       1.0|  1.0|(262145,[98627,12...|
|       1.0|  1.0|(262145,[146876,2...|
|       1.0|  1.0|(262145,[17519,98...|
+----------+-----+--------------------+
only showing top 5 rows

Accuracy of model at predicting reviews was: 0.736667
Test Error = 0.263333


In [0]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.736324


In [0]:
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
# print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training.select('features','label'))


In [0]:
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # Specify multiple Params.

# You can combine paramMaps, which are python dictionaries.
paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

Model 1 was fit using parameters: 
{Param(parent='LogisticRegression_ddc77a3482d5', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2, Param(parent='LogisticRegression_ddc77a3482d5', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0, Param(parent='LogisticRegression_ddc77a3482d5', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto', Param(parent='LogisticRegression_ddc77a3482d5', name='featuresCol', doc='features column name'): 'features', Param(parent='LogisticRegression_ddc77a3482d5', name='fitIntercept', doc='whether to fit an intercept term'): True, Param(parent='LogisticRegression_ddc77a3482d5', name='labelCol', doc='label column name'): 'label', Param(parent='LogisticRegression_ddc77a3482d5', name='maxIter', doc='maximum nu

In [0]:
prediction = model1.transform(testing.select('features','label'))
prediction
# result = prediction.select("features", "label", "probability", "prediction") \
#     .collect()

# for row in result:
#     print("features=%s, label=%s -> prob=%s, prediction=%s"
#           % (row.features, row.label, row.myProbability, row.prediction))

DataFrame[features: vector, label: double, rawPrediction: vector, probability: vector, prediction: double]

In [0]:
# metrics = BinaryClassificationMetrics(prediction.select('features','label').collect())

In [0]:
# from pyspark.mllib.evaluation import BinaryClassificationMetrics
# metrics = BinaryClassificationMetrics(test_results.select('prediction','label'))


In [0]:
# predictionAndLabels = testing.map(lambda lp: (float(model.predict(lp.features)), lp.label))

In [0]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

In [0]:
# Run training algorithm to build the model\
train = training.rdd.map(list)
test = testing.rdd.map(list)

model = LogisticRegressionWithLBFGS.train(train)

# Compute raw scores on the test set
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

AttributeError: ignored