In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!pip install pyspark

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=22973744a84699d227c9b6e596af1a60de6ed7438023907bc9a7c240ac436116
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Succ

In [2]:
from pyspark import SparkFiles
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pandas as pd
import json
import random as rd

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "14g") \
    .appName("CloudETLProject") \
    .getOrCreate()

In [None]:
# Define file locations. Please update with the correct file locations after downloading the proper files from this Google Drive link:
# https://drive.google.com/drive/folders/1XARu2a6orQTGdpzL36-Rw20iuNPSM-yD?usp=sharing

yelp_review_file = "/content/drive/My Drive/Bootcamp Final Project/yelp_reviews.csv"
keywords_file = "/content/drive/My Drive/Bootcamp Final Project/keywords.csv"
labels_file = "/content/drive/My Drive/Bootcamp Final Project/labels.csv"
response_file = "/content/drive/My Drive/Bootcamp Final Project/responses.csv"

In [54]:
# Preview data
reviews_df = pd.read_csv(yelp_review_file)
reviews_df.head(10)

Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date
0,xQY8N_XvtGbearJ5X4QryQ,OwjRMXRC0KyPrIlcjaXeFQ,-MhfebM0QIsKt87iDN-FNw,2,5,0,0,"As someone who has worked with many museums, I...",2015-04-15 05:21:16
1,UmFMZ8PyXZTY2QcwzsfQYA,nIJD_7ZXHq-FX8byPMOkMQ,lbrU8StCq3yDfr-QMnGrmQ,1,1,1,0,I am actually horrified this place is still in...,2013-12-07 03:16:52
2,LG2ZaYiOgpr2DK_90pYjNw,V34qejxNsCbcgD8C0HVk-Q,HQl28KMwrEKHqhFrrDqVNQ,5,1,0,0,I love Deagan's. I do. I really do. The atmosp...,2015-12-05 03:18:11
3,i6g_oA9Yf9Y31qt0wibXpw,ofKDkJKXSKZXu5xJNGiiBQ,5JxlZaqCnk1MnbgRirs40Q,1,0,0,0,"Dismal, lukewarm, defrosted-tasting ""TexMex"" g...",2011-05-27 05:30:52
4,6TdNDKywdbjoTkizeMce8A,UgMW8bLE0QMJDCkQ1Ax5Mg,IS4cv902ykd8wj1TR0N3-A,4,0,0,0,"Oh happy day, finally have a Canes near my cas...",2017-01-14 21:56:57
5,L2O_INwlrRuoX05KSjc4eg,5vD2kmE25YBrbayKhykNxQ,nlxHRv1zXGT0c0K51q3jDg,5,2,0,0,This is definitely my favorite fast food sub s...,2013-05-07 07:25:25
6,ZayJ1zWyWgY9S_TRLT_y9Q,aq_ZxGHiri48TUXJlpRkCQ,Pthe4qk5xh4n-ef-9bvMSg,5,1,0,0,"Really good place with simple decor, amazing f...",2015-11-05 23:11:05
7,lpFIJYpsvDxyph-kPzZ6aA,dsd-KNYKMpx6ma_sRWCSkQ,FNCJpSn0tL9iqoY3JC73qw,5,0,0,0,"Awesome office and staff, very professional an...",2017-07-18 18:31:54
8,JA-xnyHytKiOIHl_ztnK9Q,P6apihD4ASf1vpPxHODxAQ,e_BiI4ej1CW1F0EyVLr-FQ,5,0,0,0,Most delicious authentic Italian I've had in t...,2015-02-16 06:48:47
9,z4BCgTkfNtCu4XY5Lp97ww,jOERvhmK6_lo_XGUBPws_w,Ws8V970-mQt2X9CwCuT5zw,4,3,0,1,I have been here twice. Very nice and laid bac...,2009-10-13 04:16:41


In [5]:
# Load data into a DataFrame
url = yelp_review_file
spark.sparkContext.addFile(url)

yelpSchema = StructType([
    StructField("review_id",StringType(), True),
    StructField("user_id",StringType(), True),
    StructField("business_id",StringType(), True),
    StructField("stars", IntegerType(), True),
    StructField("useful", StringType(), True),
    StructField("funny", StringType(), True),
    StructField("cool", StringType(), True),
    StructField("text", StringType(), True),
    StructField("date", StringType(), True)
])

df = spark.read.option('header', 'true').csv(SparkFiles.get("yelp_reviews.csv"), schema=yelpSchema, sep=',')
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- useful: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- cool: string (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)



In [6]:
# Cleaning dataframe and preview
df = df.filter(df.text.isNotNull())
df = df.filter(df.useful.isNotNull())
df = df.filter(df.date.isNotNull())
df.show()

+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|           review_id|             user_id|         business_id|stars|              useful|               funny|                cool|                text|                date|
+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|UmFMZ8PyXZTY2Qcwz...|nIJD_7ZXHq-FX8byP...|lbrU8StCq3yDfr-QM...|    1|                   1|                   1|                   0|"I am actually ho...| paid and left. A...|
|LG2ZaYiOgpr2DK_90...|V34qejxNsCbcgD8C0...|HQl28KMwrEKHqhFrr...|    5|                   1|                   0|                   0|I love Deagan's. ...| 2015-12-05 03:18:11|
|6TdNDKywdbjoTkize...|UgMW8bLE0QMJDCkQ1...|IS4cv902ykd8wj1TR...|    4|                   0|                   0|        

In [7]:
# Add text length column
df = df.withColumn('review_length', length(df['text']))

In [8]:
# Clean data to ensure star value is present
df = df.filter("stars >= 1 and stars <= 5")

# Show star ratings
df.select('stars').distinct().show()

+-----+
|stars|
+-----+
|    1|
|    3|
|    5|
|    4|
|    2|
+-----+



In [9]:
# Sample the data
sampled_df = df.sample(fraction=0.4, seed=42)

# Count of sampled data
sampled_df.count()

64626

In [10]:
# Clean and preview dataset for model training
ready_df = sampled_df.select(["text", "stars", "date", "review_length"])
ready_df.show()

+--------------------+-----+-------------------+-------------+
|                text|stars|               date|review_length|
+--------------------+-----+-------------------+-------------+
|This is definitel...|    5|2013-05-07 07:25:25|          523|
|We purchased new ...|    5|2015-07-03 21:48:51|          447|
|Tried to have my ...|    1|2017-06-28 00:39:18|          541|
|Always a fun outi...|    4|2018-04-15 11:13:11|          146|
|Just... not good....|    1|2016-04-11 16:42:13|          988|
|A margarita in cl...|    1|2015-01-18 16:12:27|          132|
|Food was piping h...|    3|2015-07-03 21:35:49|          292|
|The tables and fl...|    1|2012-12-02 06:50:10|          198|
|Worst pedicure ev...|    1|2016-07-03 17:50:57|          298|
|I had an oil chan...|    1|2017-11-06 20:58:23|          466|
|This was my first...|    5|2017-01-14 21:03:01|          197|
|I went into this ...|    1|2018-08-04 22:27:08|          428|
|Some cretin at a ...|    5|2005-05-11 05:05:13|       

In [11]:
# Split training and testing data
training, testing = ready_df.randomSplit([0.7, 0.3], seed = 42)

In [12]:
# Build model
def build_trigrams(inputCol=["text","stars"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "stars", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**15,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx + lr)

In [13]:
# Run dataset through model
trigram_pipelineFit = build_trigrams().fit(training)
test_results = trigram_pipelineFit.transform(testing)

In [14]:
# Calculate model accuracy
predictions = test_results.select(col("label").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Model Accuracy = %g" % accuracy)

Model Accuracy = 0.621522


In [34]:
# Input custom review
input_review = "I loved this restaurant! Our servers were great, and very responsive to our needs. We will be recommending the experience to our friends."

# Load review as Dataframe
customer_review_df = spark.createDataFrame([(input_review, 3)],["text","stars"])

In [35]:
# Run review through model
prediction_score = trigram_pipelineFit.transform(customer_review_df)

In [36]:
# Show scores
prediction_score.select(["rawPrediction","probability","prediction"]).show(truncate=False)

+-----------------------------------------------------------------------------------------------+----------------------------------------+----------+
|rawPrediction                                                                                  |probability                             |prediction|
+-----------------------------------------------------------------------------------------------+----------------------------------------+----------+
|[1192.3243233855812,577.4675053796178,-705.3900912694168,-150.69450046664636,-913.707237029136]|[1.0,9.355710602053846E-268,0.0,0.0,0.0]|0.0       |
+-----------------------------------------------------------------------------------------------+----------------------------------------+----------+



In [37]:
# Calculate final star ratining
score = []
stars = []
for row in prediction_score.rdd.collect():
    score.append(row)
prediction_index = score[0]["prediction"]
star_list = training.groupBy("stars").count().orderBy(desc("count"))
for row in star_list.rdd.collect():
    stars.append(row)

prediction = stars[int(prediction_index)]["stars"]

In [38]:
# Show prediction
print("The star rating for this review is " + str(prediction))

The star rating for this review is 5


In [39]:
# Load files for response generation
keywords_df = pd.read_csv(keywords_file)
labels_df = pd.read_csv(labels_file)
responses_df = pd.read_csv(response_file)

In [40]:
# Create list with response data

# Categories
keywords = categories_df.Item.to_list()
categories = categories_df.Category.to_list()

# Labels
labels_list = labels_df.labels.to_list()
label_list = labels_df.label.to_list()

# Responses
response_category = responses_df.category.to_list()
five_star_responses = responses_df.five_star_responses.to_list()
four_star_responses = responses_df.four_star_responses.to_list()
three_star_responses = responses_df.three_star_responses.to_list()
two_star_responses = responses_df.two_star_responses.to_list()
one_star_responses = responses_df.one_star_responses.to_list()

In [41]:
# Clean responses for manipulation
five_star_responses = [str(w) for w in five_star_responses]
four_star_responses = [str(w) for w in four_star_responses]
three_star_responses = [str(w) for w in three_star_responses]
two_star_responses = [str(w) for w in two_star_responses]
one_star_responses = [str(w) for w in one_star_responses]

# Clean empty response spots
five_star_responses = [w.replace("nan","") for w in five_star_responses]
four_star_responses = [w.replace("nan","") for w in four_star_responses]
three_star_responses = [w.replace("nan","") for w in three_star_responses]
two_star_responses = [w.replace("nan","") for w in two_star_responses]
one_star_responses = [w.replace("nan","") for w in one_star_responses]

In [42]:
# Clean and format review to generate correct response
punctuation = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
for punc in input_review:
  if punc in punctuation:
    input_review = input_review.replace(punc,"")

In [43]:
# Turn review into list to analyze by word
custom_review = input_review
custom_review_list = list(custom_review.split(" "))

In [44]:
# Check is review needs a generic response or category specific response
for word in custom_review_list:
  if word in keywords:
    list_number = keywords.index(word)
    wildcard = ""
    wildcard = word
    review_category = categories[list_number]
    response_category_no = response_category.index(review_category)
    break
  else:
    generic_index_numbs = []
    generic_index = -1
    for word in response_category:
      if word == "generic":
        generic_index = response_category.index(word, generic_index + 1)
        generic_index_numbs.append(generic_index)
    response_category_no = rd.choice(generic_index_numbs)

In [45]:
# Select initial response template
if prediction == 5:
  response = five_star_responses[response_category_no]
elif prediction == 4:
  response = four_star_responses[response_category_no]
elif prediction == 3:
  response = three_star_responses[response_category_no]
elif prediction == 2:
  response = two_star_responses[response_category_no]
else:
  response = one_star_responses[response_category_no]

In [46]:
# Check if reponse is missing and replace with generic response
if response == "":
  response_category_no = rd.choice(generic_index_numbs)
  if prediction == 5:
    response = five_star_responses[response_category_no]
  elif prediction == 4:
    response = four_star_responses[response_category_no]
  elif prediction == 3:
    response = three_star_responses[response_category_no]
  elif prediction == 2:
    response = two_star_responses[response_category_no]
  else:
    response = one_star_responses[response_category_no]

In [47]:
# Break response down to find wildcard words
response_word_list = list(response.split(" "))
brackets = ["{","}"]
wildcard_count = 0
for word in response_word_list:
  if brackets[0] in word:
    wildcard_count = wildcard_count + 1

In [48]:
# Find and replace wildcard words to complete response
for x in range(wildcard_count):
  open_bracket = response.index(brackets[0]) + 1
  close_bracket = response.index(brackets[1])
  wildcard_word = response[open_bracket:close_bracket]
  if wildcard_word in categories:
    response = response.replace(brackets[0]+wildcard_word+brackets[1], wildcard)
  else:
    label_no = labels_list.index(wildcard_word)
    wildcard = label_list[label_no]
    response = response.replace(brackets[0]+wildcard_word+brackets[1], wildcard)

In [49]:
# Display finished response
print(response)

We’re glad you enjoyed your meal! We hope to see you at Panda Mick's again soon.
