<a href="https://colab.research.google.com/github/pkrosoff/wine_machine_learning/blob/justans_branch/wine_reviews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.1'
# spark_version = 'spark-3.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Hit:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Reading package lists... Done


In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

In [3]:
# Read in data from Google Drive
from pyspark import SparkFiles
url ="/content/drive/MyDrive/winemag-data_first150k.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("winemag-data_first150k.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+---+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|_c0|country|         description|         designation|points|price|          province|            region_1|         region_2|           variety|              winery|
+---+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|  0|     US|This tremendous 1...|   Martha's Vineyard|    96|235.0|        California|         Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  1|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|110.0|    Northern Spain|                Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|  2|     US|Mac Watson honors...|Special Selected ...|    96| 90.0|        California|      Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley

In [4]:
# wine_df['description'].str.encode('ascii', 'ignore').str.decode('ascii')
from pyspark.sql.functions import regexp_replace, col

df = df.withColumn("description", regexp_replace(col("description"), "[^a-zA-Z\d\s:]", ""));

# [^a-zA-Z0-9_]

In [5]:
wine_scores = df.drop('_c0','country', 'designation', 'price', 'province', 'region_1', 'region_2', 'variety', 'winery')

wine_scores.show()

+--------------------+------+
|         description|points|
+--------------------+------+
|This tremendous 1...|    96|
|Ripe aromas of fi...|    96|
|Mac Watson honors...|    96|
|This spent 20 mon...|    96|
|This is the top w...|    95|
|Deep dense and pu...|    95|
|Slightly gritty b...|    95|
|Lush cedary black...|    95|
|This renamed vine...|    95|
|The producer sour...|    95|
|Elegance complexi...|    95|
|From 18yearold vi...|    95|
|A standout even i...|    95|
|This wine is in p...|    95|
|With its sophisti...|    95|
|First made in 200...|    95|
|This blockbuster ...|    95|
|Nicely oaked blac...|    95|
|Coming from a sev...|    95|
|This fresh and li...|    95|
+--------------------+------+
only showing top 20 rows



In [6]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
wine_df = wine_scores.withColumn('length', length(df['description']))
wine_df.show()

+--------------------+------+------+
|         description|points|length|
+--------------------+------+------+
|This tremendous 1...|    96|   345|
|Ripe aromas of fi...|    96|   309|
|Mac Watson honors...|    96|   273|
|This spent 20 mon...|    96|   370|
|This is the top w...|    95|   364|
|Deep dense and pu...|    95|   305|
|Slightly gritty b...|    95|   309|
|Lush cedary black...|    95|   347|
|This renamed vine...|    95|   289|
|The producer sour...|    95|   300|
|Elegance complexi...|    95|   365|
|From 18yearold vi...|    95|   250|
|A standout even i...|    95|   282|
|This wine is in p...|    95|   297|
|With its sophisti...|    95|   412|
|First made in 200...|    95|   271|
|This blockbuster ...|    95|   268|
|Nicely oaked blac...|    95|   329|
|Coming from a sev...|    95|   361|
|This fresh and li...|    95|   253|
+--------------------+------+------+
only showing top 20 rows



In [7]:
from pyspark.sql.functions import isnan, when, count, col, isnull

wine_df.select([count(when(isnull(c), c)).alias(c) for c in wine_df.columns]).show()

+-----------+------+------+
|description|points|length|
+-----------+------+------+
|          1|     6|     1|
+-----------+------+------+



In [8]:
wine_df = wine_df.na.drop()

In [9]:
wine_df.select([count(when(isnull(c), c)).alias(c) for c in wine_df.columns]).show()

+-----------+------+------+
|description|points|length|
+-----------+------+------+
|          0|     0|     0|
+-----------+------+------+



In [25]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
wine_points = StringIndexer(inputCol='points',outputCol='label')
tokenizer = Tokenizer(inputCol="description", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [26]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [27]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[wine_points, tokenizer, stopremove, hashingTF, idf, clean_up])

In [28]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(wine_df)
cleaned = cleaner.transform(wine_df)

In [29]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 16.0|(262145,[2701,471...|
| 16.0|(262145,[11481,33...|
| 16.0|(262145,[10077,12...|
| 16.0|(262145,[1546,153...|
| 13.0|(262145,[1546,181...|
| 13.0|(262145,[8408,104...|
| 13.0|(262145,[5561,114...|
| 13.0|(262145,[4235,121...|
| 13.0|(262145,[2306,316...|
| 13.0|(262145,[3354,584...|
| 13.0|(262145,[3848,939...|
| 13.0|(262145,[21534,32...|
| 13.0|(262145,[4176,894...|
| 13.0|(262145,[8297,120...|
| 13.0|(262145,[2306,584...|
| 13.0|(262145,[1546,439...|
| 13.0|(262145,[5847,189...|
| 13.0|(262145,[2701,104...|
| 13.0|(262145,[1546,124...|
| 13.0|(262145,[3572,311...|
+-----+--------------------+
only showing top 20 rows



In [30]:
from pyspark.ml.classification import NaiveBayes

# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [31]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|         description|points|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| Big lively and v...|    91|   288|  7.0|[, big, lively, a...|[, big, lively, i...|(262144,[18911,45...|(262144,[18911,45...|(262145,[18911,45...|[-1080.4835129746...|[6.50174364162678...|       2.0|
| The wine is easy...|    88|   149|  1.0|[, the, wine, is,...|[, wine, easy, fr...|(262144,[62807,64...|(262144,[62807,64...|(262145,[62807,64...|[-620.11420196923...|[0.99665242104747...|       

In [32]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.354097
