## DATA301 Project

[BeerAdvocate Dataset](https://cseweb.ucsd.edu/~jmcauley/datasets.html#multi_aspect)

The BeerAdvocate dataset was one of the provided datasets in their website consisting of reviews that cover various aspects that includes ratings. Among them, the most extensive datasets involve to beer reviews sourced from Ratebeer and Beeradvocate. These beer review datasets covers sensory aspects like taste, appearance, texture, and aroma.

##Loading the BeerAdvocate Data
We will use a dataset called BeerAdvocate provided by the University of California San Diego (UCSD) research lab.


In [None]:
import urllib.request
filename = 'sample_data/beeradvocate.json.gz'
urllib.request.urlretrieve('https://datarepo.eng.ucsd.edu/mcauley_group/data/beer/beeradvocate.json.gz', filename) # The URL to the BeerAdvocate dataset
print(f"File saved as {filename}")

File saved as sample_data/beeradvocate.json.gz


In [None]:
!pip install ijson # Need to run this so we can run: import ijason

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ijson
  Downloading ijson-3.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.7/112.7 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ijson
Successfully installed ijson-3.2.2


In [None]:
# Importing pyhton functions and modules. Sourced from: https://colab.research.google.com/drive/1Zv6MARGQcrBbLHyjPVVMZVnRWsRnVMpV#scrollTo=feWoOrmt4Tja

import os
import json
import gzip
import pandas as pd
from urllib.request import urlopen

In [None]:
### DONT NEED THIS MAYBE
### Sourced from: https://colab.research.google.com/drive/1Zv6MARGQcrBbLHyjPVVMZVnRWsRnVMpV#scrollTo=feWoOrmt4Tja

#!wget https://datarepo.eng.ucsd.edu/sharknado/data/beer/beeradvocate.json.gz

In [None]:
# This is to unzip the "beeradcvocate.json.gz" file. Sourced from: stackedoverflow
# Then turn it into a JSON file format

import gzip
import shutil
with gzip.open('sample_data/beeradvocate.json.gz', 'rb') as f_in:
    with open('beeradvocate_reviews.json', 'wb') as f_out:

        shutil.copyfileobj(f_in, f_out)
        print(f_in)

<gzip _io.BufferedReader name='sample_data/beeradvocate.json.gz' 0x7f5ee8386aa0>


In [None]:
# Shows a sample of the first and last 10 extracted user reviews
!head -10 beeradvocate_reviews.json
!tail -10 beeradvocate_reviews.json

{'beer/name': 'Sausa Weizen', 'beer/beerId': '47986', 'beer/brewerId': '10325', 'beer/ABV': '5.00', 'beer/style': 'Hefeweizen', 'review/appearance': '2.5', 'review/aroma': '2', 'review/palate': '1.5', 'review/taste': '1.5', 'review/overall': '1.5', 'review/time': '1234817823', 'review/profileName': 'stcules', 'review/text': 'A lot of foam. But a lot.\tIn the smell some banana, and then lactic and tart. Not a good start.\tQuite dark orange in color, with a lively carbonation (now visible, under the foam).\tAgain tending to lactic sourness.\tSame for the taste. With some yeast and banana.'}
{'beer/name': 'Red Moon', 'beer/beerId': '48213', 'beer/brewerId': '10325', 'beer/ABV': '6.20', 'beer/style': 'English Strong Ale', 'review/appearance': '3', 'review/aroma': '2.5', 'review/palate': '3', 'review/taste': '3', 'review/overall': '3', 'review/time': '1235915097', 'review/profileName': 'stcules', 'review/text': 'Dark red color, light beige foam, average.\tIn the smell malt and caramel, not 

## Custom Extraction
- Custom extraction of the user reviews on the aspects of a beer including the text reviews
- This is sequential and could be done in parallel but the effort is not worth it
- JSON file is a poor format for random access storage and splitting data sets, so
- this code converts it to JSON-lines and filters out data we don't need for this project
- Sourced from: DATA301 Sample Project provided

In [None]:
import json, ijson

def float_converter(string):
  """Converts the values from a string into a float that were extracted from the BeeerAdvocate JSON file"""
  try:
      converter = float(string)
  except ValueError:
    converter = string
  return converter



def jsonline_converter(file_entry, file_output, aspect_rates):
  """Grabbing the important variables and values that will be used in later algortihms and analysis"""

  with open(file_entry) as read_file, open(file_output, 'w') as output: # for opening a files, one for reading and one for writing
    output.write('[\n') # Initiates a new list of the things we need
    first = True # Determines whether to insert a new line in the output file

    for line in read_file: # Loops through the BeerAdvocate JSON file
      line = line.replace("'", '"') # Replaces single quotes with double quotes to make sure its a valid JSON syntax
      try:
          data = json.loads(line) # Parses a JSON formatted string and returns Python object
          selected_data = dict() # Creates new empty dictionary
          for i in aspect_rates: # Loops through all the variables that will be used in the project
              value = data.get(i) # Gets the values assoaciated with the syntax
              if value is not None:
                  selected_data[i] = float_converter(data.get(i)) # Converts it into a int value
          if not first:
              output.write(',\n') # If conditions not met then create a new dictionary lines
          first = False
          output.write(json.dumps(selected_data)) # Function then writes the rating dictionary to the output file using the json.dumps() method to convert it to a JSON string.
      except json.JSONDecodeError:
          pass # Ignore line sthat cannotbe parsed into a JSON
    output.write('\n]\n') # When loop completes, the function writes a closing square bracket ']' and a new line to the output file, indicating the end of the JSONL file.


aspect_rates = ['beer/name', 'beer/beerId', 'review/appearance', 'review/aroma', 'review/palate', 'review/taste', 'review/overall', 'review/text']
jsonline_converter('beeradvocate_reviews.json', 'beeradvocate_reviews.jsonl', aspect_rates)

In [None]:
# Shows a sample of the first and last 10 extracted user reviews
!head -10 beeradvocate_reviews.jsonl
!tail -10 beeradvocate_reviews.jsonl

[
{"beer/name": "Sausa Weizen", "beer/beerId": 47986.0, "review/appearance": 2.5, "review/aroma": 2.0, "review/palate": 1.5, "review/taste": 1.5, "review/overall": 1.5, "review/text": "A lot of foam. But a lot.\tIn the smell some banana, and then lactic and tart. Not a good start.\tQuite dark orange in color, with a lively carbonation (now visible, under the foam).\tAgain tending to lactic sourness.\tSame for the taste. With some yeast and banana."},
{"beer/name": "Red Moon", "beer/beerId": 48213.0, "review/appearance": 3.0, "review/aroma": 2.5, "review/palate": 3.0, "review/taste": 3.0, "review/overall": 3.0, "review/text": "Dark red color, light beige foam, average.\tIn the smell malt and caramel, not really light.\tAgain malt and caramel in the taste, not bad in the end.\tMaybe a note of honey in teh back, and a light fruitiness.\tAverage body.\tIn the aftertaste a light bitterness, with the malt and red fruit.\tNothing exceptional, but not bad, drinkable beer."},
{"beer/name": "Bla

## Set up a local spark cluster

Sourced from: [Sample Project Code](https://colab.research.google.com/drive/1EWlIT5ecnwPSpZdE-9Z-vx65cL_bFFPR#scrollTo=cFHRh3GE_Xet)

In [None]:
%env PYTHONHASHSEED 3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

env: PYTHONHASHSEED=3
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from math import sqrt
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
spark = SparkSession.builder.master("local[*]").appName('SparkExample').config(
    "spark.executor.memory", "1g").config("spark.ui.port", "4050"
        ).getOrCreate()
sc = spark.sparkContext

## Loading the data (BeerAdvocate) into an Resilient Distributed Dataset (RDD)

Sourced from: [Sample Project Code](https://colab.research.google.com/drive/1EWlIT5ecnwPSpZdE-9Z-vx65cL_bFFPR#scrollTo=cFHRh3GE_Xet) and [Apache Spark - JSON Files](https://spark.apache.org/docs/latest/sql-data-sources-json.html)

In [None]:
user_reviews_df = spark.read.json("beeradvocate_reviews.jsonl", multiLine=True) # Reads a JSON Line file '.jsonl' into a dataframe using PySpark and the 'multiline' is to enable multiple lines of JSON objects
user_reviews_df.printSchema() # visualizes the schema of the dataframe
print(user_reviews_df.count()) # counts the number of rows in the dataframe
print(user_reviews_df.rdd.take(1)) # retrieves the first row of data from a dataframe in PySpark
# use .drop() function

user_reviews_df_ids = user_reviews_df.rdd.zipWithUniqueId() # converts the dataframe into RDD along with its unique identifier
print(user_reviews_df_ids.take(5)) # unique identifiers are generated based on the partitions and positions of the elements.


root
 |-- beer/beerId: double (nullable = true)
 |-- beer/name: string (nullable = true)
 |-- review/appearance: double (nullable = true)
 |-- review/aroma: double (nullable = true)
 |-- review/overall: double (nullable = true)
 |-- review/palate: double (nullable = true)
 |-- review/taste: double (nullable = true)
 |-- review/text: string (nullable = true)

589528
[Row(beer/beerId=47986.0, beer/name='Sausa Weizen', review/appearance=2.5, review/aroma=2.0, review/overall=1.5, review/palate=1.5, review/taste=1.5, review/text='A lot of foam. But a lot.\tIn the smell some banana, and then lactic and tart. Not a good start.\tQuite dark orange in color, with a lively carbonation (now visible, under the foam).\tAgain tending to lactic sourness.\tSame for the taste. With some yeast and banana.')]
[(Row(beer/beerId=47986.0, beer/name='Sausa Weizen', review/appearance=2.5, review/aroma=2.0, review/overall=1.5, review/palate=1.5, review/taste=1.5, review/text='A lot of foam. But a lot.\tIn the s

## PySpark Sampling
This is one way to get random sample records of the original dataset of our BeerAdvocate which contains about 589528 documents. Hence, I used this approach because, I have been stuck at doing my IDF part of the TF-IDF algortihm for 5 days, and I was not making any progress at all.

I have been adviced by my lecturer/tutor/fellow clasmates that my BeerAdvocate dataset is a very larger dataset. Thus, by taking a sample fraction from the original dataset, I was able to get my IDF algortihm working.

Sourced from: [PySpark Random Sample with Example](https://sparkbyexamples.com/pyspark/pyspark-sampling-example/#:~:text=PySpark%20sampling%20%28pyspark.sql.DataFrame.sample%20%28%29%29%20is%20a%20mechanism%20to,is%20the%20syntax%20of%20the%20sample%20%28%29%20function.)



In [None]:
### Reduce the RDD to 10,000 elements using sample()
### The second argument, 0.02 in this case, represents the sampling fraction.
### It is set to 0.02, which means the resulting RDD will have approximately 2% of the original elements of the 500000 original data
sampled_rdd = user_reviews_df_ids.sample(False, 0.02, seed = 42)

### Take a look at the sampled RDD
print(sampled_rdd.count())   ### Check the total counts from the samples rdd
print(sampled_rdd.take(10))  ### Print the first 10 elements of the sampled RDD

11699
[(Row(beer/beerId=58046.0, beer/name='Rauch Ür Bock', review/appearance=3.5, review/aroma=4.5, review/overall=4.5, review/palate=4.0, review/taste=4.5, review/text='Big thanks to N2168 for knocking this off my wants. Poured into a stone nonic and shared with my buddy Ryan. \t\tThis was quite delicious. The pour made it seem a little lacking in carbonation, and while indeed it was light in carbonation I think it would have been awkward if it had been heavier considering the taste profile. This beer does such a good job of hitting the sweet and savory side of smoke. Nice big campfire smoke smell and taste without any roughness to it, with a nice smoked bacon savory sweetness. This is easy to drink and just downright delicious, would love to have a keg of this by my desk.'), 10), (Row(beer/beerId=10788.0, beer/name='Pilot Rock Porter', review/appearance=4.5, review/aroma=4.5, review/overall=4.5, review/palate=4.5, review/taste=5.0, review/text='Comes out of the tap an opaque deep sa

In [None]:
# download a set of stop words that we can ignore because they are not interesting
# Sourced from: DATA301 Sample Project Code
!pip install nltk

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


The code below imports/download a Natural Language Toolkit (nltk) library and the English stopwords corpus. Then prints out lists of stopwords in English.

Sourced from: [geeksforgeeks.org](https://www.geeksforgeeks.org/removing-stop-words-nltk-python/) and [ DATA301 Sample Project Code](https://colab.research.google.com/drive/1EWlIT5ecnwPSpZdE-9Z-vx65cL_bFFPR#scrollTo=cFHRh3GE_Xet)

In [None]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

STOP_WORDS = set(stopwords.words('english'))
print(STOP_WORDS)


{'am', 'didn', 'the', 'in', 'myself', 'are', 're', 'weren', "won't", 'an', "that'll", 'its', 'will', 'doing', 'we', 'more', 'against', 'they', 'needn', 'having', 'again', "hasn't", 'were', 'now', 'himself', 'itself', "weren't", 'only', 'what', 'which', 'him', 'up', 'then', 'doesn', 'than', 'or', 'to', 'hadn', "didn't", 'this', 'o', "you'd", 'shan', "couldn't", 'll', 'can', 'below', "aren't", 'yourself', 'with', "don't", 'my', "it's", 'that', 'once', 'other', "mustn't", 'not', 'through', 've', 'don', 'down', 'because', 'them', 'aren', 's', "needn't", 'mightn', "shouldn't", 'did', 'before', 'about', 'couldn', 'of', 'at', 'here', "you're", 'ourselves', "you'll", 'our', "isn't", 'their', 'do', 'theirs', 'just', "doesn't", "you've", 'by', 'your', 'themselves', 'being', 'mustn', 'd', 'm', "mightn't", 'i', 'yours', 'any', 'most', 'off', 'wouldn', 'own', 'from', 'when', 'haven', 'very', "she's", 'ours', 'ain', 'herself', 'these', 'same', 'does', 'such', 'had', 'further', 'while', 'on', 'nor', 

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


### Cleaning the dataset stored in JSON-line

The section below performs word count within a document first (we consider a single review text to be a document so we aren't going to parallelize the per document word count since we have so many documents, it is much more efficient to parallelize the documents as a whole)

The section below parallizes the documents by performing word count within a singular document.

Sourced from: [re — Regular expression operations](https://docs.python.org/3/library/re.html) and [DATA301 Sample Project](https://colab.research.google.com/drive/1EWlIT5ecnwPSpZdE-9Z-vx65cL_bFFPR#scrollTo=CkBFZdu-_BQ6)

In [None]:
### Perform word count within a document first (we consider a single review text to be a document so we aren't going to parallelize the per document word count since we have so many documents, it is much more efficient to parallelize the documents)

import re

### Takes a string input and returns a new string without all 'non-letter'
### The re.sub() function replaces non-letters specified into an empty string
def remove_nonletters(word):
  """Removes any nonletter words"""
  return re.sub(r'[^a-zA-Z]', '', word)

### This function helps remove any insignificant words that do not really add any meaningful explanation about a text reviews on a beer
def split_remove_nonletters(line):
  """Removes any insignificant words that do not add any meaning to the text reviews"""
  result = []
  for word in line.split(" "):
    removed_token = remove_nonletters(word.lower())
    if removed_token != '':
      result.append((removed_token, 1))
  return result

### Parses the review_str and generates a dictionary containing word:count entries
### The funciton takes a string as an input and then perform a word count and return a dictionary (result) containing the count of each words.
def wc(review_str):
  """Takes a string as an input and performs a word count on the input strings and returns a dictionary result containing counts of each word"""
  result = {}
  for word in review_str.split():
    removed_token = remove_nonletters(word.lower())
    if removed_token != '' and removed_token not in STOP_WORDS:
      if removed_token not in result:
        result[removed_token] = 0
      result[removed_token] += 1
  return result



In [None]:
### Applying word count of the significant words (that were meaningful) in the reviews text of each individuals on a beer to ensure it works
### per User Document Count (UDC)
per_UDC = sampled_rdd.map(lambda x: (x[1], wc(x[0]['review/text'])))
print(per_UDC.take(1))


[(10, {'big': 2, 'thanks': 1, 'n': 1, 'knocking': 1, 'wants': 1, 'poured': 1, 'stone': 1, 'nonic': 1, 'shared': 1, 'buddy': 1, 'ryan': 1, 'quite': 1, 'delicious': 2, 'pour': 1, 'made': 1, 'seem': 1, 'little': 1, 'lacking': 1, 'carbonation': 2, 'indeed': 1, 'light': 1, 'think': 1, 'would': 2, 'awkward': 1, 'heavier': 1, 'considering': 1, 'taste': 2, 'profile': 1, 'beer': 1, 'good': 1, 'job': 1, 'hitting': 1, 'sweet': 1, 'savory': 2, 'side': 1, 'smoke': 2, 'nice': 2, 'campfire': 1, 'smell': 1, 'without': 1, 'roughness': 1, 'smoked': 1, 'bacon': 1, 'sweetness': 1, 'easy': 1, 'drink': 1, 'downright': 1, 'love': 1, 'keg': 1, 'desk': 1})]


In [None]:
### Practising on grabbing variables that will be used in TF-IDF algorithm and putting the 'review/overall' and 'review/text' in a tuple.
### Applying word count of the significant words (that were meaningful) in the reviews text of each individuals on a beer
### per User Document Count (UDC)

#per_UDC = user_reviews_df_ids.map(lambda x:589528 documents (x[1], (x[0]['review/overall'], wc(x[0]['review/text'])))) ## This was the inital testing using the original RDD that contains the 589528 documents

per_UDC_rate = sampled_rdd.map(lambda x: (x[1], (wc(x[0]['review/text']), x[0]['review/overall']))) # This can be use instead of joining the User Text Reviews and Overall Ratings
print(per_UDC.take(5))

### NOT sure if working
#per_UDC_ratings = per_UDC.filter(lambda x: overall_splitter(x[0]['review/overall']))


[(10, {'big': 2, 'thanks': 1, 'n': 1, 'knocking': 1, 'wants': 1, 'poured': 1, 'stone': 1, 'nonic': 1, 'shared': 1, 'buddy': 1, 'ryan': 1, 'quite': 1, 'delicious': 2, 'pour': 1, 'made': 1, 'seem': 1, 'little': 1, 'lacking': 1, 'carbonation': 2, 'indeed': 1, 'light': 1, 'think': 1, 'would': 2, 'awkward': 1, 'heavier': 1, 'considering': 1, 'taste': 2, 'profile': 1, 'beer': 1, 'good': 1, 'job': 1, 'hitting': 1, 'sweet': 1, 'savory': 2, 'side': 1, 'smoke': 2, 'nice': 2, 'campfire': 1, 'smell': 1, 'without': 1, 'roughness': 1, 'smoked': 1, 'bacon': 1, 'sweetness': 1, 'easy': 1, 'drink': 1, 'downright': 1, 'love': 1, 'keg': 1, 'desk': 1}), (91, {'comes': 1, 'tap': 1, 'opaque': 1, 'deep': 1, 'sable': 1, 'minimal': 1, 'head': 1, 'latter': 1, 'may': 1, 'artifact': 1, 'te': 1, 'pour': 1, 'smoky': 1, 'aroma': 1, 'tones': 1, 'molasses': 2, 'highquality': 1, 'coffee': 2, 'absolutely': 1, 'delicious': 1, 'flavor': 1, 'roast': 1, 'chocolate': 1, 'components': 1, 'slightly': 1, 'sweet': 1, 'superb': 1,

In [None]:
### This function help split the overall review ratings by a user on a beer into three categories of negative (0-6), neutral (7-13), and positive (14-20)

def overall_splitter(ratings):
  """Splits the overall review ratings made by a user into a three categories. That is negative (score from 0-6), neutral (score from 7-13), and positive (score from 14-20)"""

  if ratings <= 6:
    return f'negative {ratings}'
  elif ratings > 6 and ratings <= 13:
    return f'neutral {ratings}'
  elif ratings > 13 and ratings <= 20:
    return f'positive {ratings}'
  else:
    return f'Exceeded the overall rating range of 20 by {ratings - 20}'


In [None]:
### Displaying to make sure we have everything that we need before we start doing TF-IDF Algorithm
per_UDC_ratings = user_reviews_df_ids.map(lambda x: (x[1], (overall_splitter(x[0]['review/overall']), wc(x[0]['review/text'])))) # This is new using the 'sampled_rdd' RDD
print(per_UDC_ratings.take(1))


[(0, ('negative 1.5', {'lot': 2, 'foam': 2, 'smell': 1, 'banana': 2, 'lactic': 2, 'tart': 1, 'good': 1, 'start': 1, 'quite': 1, 'dark': 1, 'orange': 1, 'color': 1, 'lively': 1, 'carbonation': 1, 'visible': 1, 'tending': 1, 'sourness': 1, 'taste': 1, 'yeast': 1}))]


## Computing Term Frequency - Inverse Document Frequency (TF-IDF) to the BeerAdvocate RDD

This computes the TF and IDF together to determine the most relevant term (word text) across the large collection of documents from Beeradvocate dataset.
The higher the TF-IDF score identified here informs us how important that term is across the text reviews made by users.

This will be helpful to identify the most likeable beer as we are going to associate the result we get here to its 'review/overall' ratings. And undertsand whether the 'review/overall' rates given by each query user relates to any of the words retrieved from the TF-IDF algorithm.

Sourced from [DATA301 Sample Poject Code](https://colab.research.google.com/drive/1EWlIT5ecnwPSpZdE-9Z-vx65cL_bFFPR#scrollTo=UJDipG7SPIRo) and [DATA301 - Lab2](https://colab.research.google.com/drive/1Li4ev2Hip6oLPMxqM5gF9Cct90_yQJ0s)

In [None]:
### The higher the IDF value means that a term appears in fewer documents. Hence this can be considered as an informative term (word) in a 'review/text'
### If a term have a low IDF value this means that it appears in a large number of documents.
### User Document Counts (UDC)
import math

def idf(user_count):
  """Measures the importance of a term (words) in a collection of documents (per user reviews)"""
  n_reviews = user_reviews_df.count()
  #print(n_reviews)

  combined_per_UCD = user_count.flatMap(lambda x: [(word, 1) for word in x[1].keys()]) # Initialise RDD containing a pair of tuple
  print(combined_per_UCD.take(3))

  overall_UCD = combined_per_UCD.reduceByKey(lambda a, b: a + b) # Collecting all pairs with the same key
  print(overall_UCD.take(3))

  ### IDFi = log2(N = number of documents in the collection / n = number of times a word appears in those 4 documents)
  idf_UCD = overall_UCD.map(lambda x: (x[0], math.log(n_reviews/x[1],2))) # math.log(Number of documents on the left side, # of times a word appears in those 4 documents)
  #print(idf_UCD.take(3))

  return idf_UCD


word_idf = idf(per_UDC)
print(word_idf.take(3))

[('big', 1), ('thanks', 1), ('n', 1)]


In [None]:
### The code below doesn't use joining because when I first tried it, I ended up waiting for more than 10 minutes and still didn't get the result I wanted.
### That joined RDD should roughly look like this: (0, ({'lot': 2, 'foam': 2, 'smell': 1, ... }, 1.5))
### That is why I have approached a different way by just mapping through the RDD, and specifically grabbing the things I need such as 'key-positions', 'review/text', 'reviews/overal'
### Creating a new RDD called per User Document Count Ratings (UDCR) which grabs 'key-positions', 'review/text', 'reviews/overal'
### Using the 'sampled_rdd' here again because the original BeerAdvocate dataset is too large and when I tried it on that it was taking too long.
### So this 'sampled_rdd' are just a subset sample from the original one.

per_UDCR = sampled_rdd.map(lambda x: (x[1], (wc(x[0]['review/text']), x[0]['review/overall']))) # This alternative way of joining the User Text Reviews and Overall Ratings
  #return per_UDC

def tf(UD_counts_ratings):
  """Computes the Term Frequency (TF) for each user review text on a singular Beer in our BeerAdvocate dataset."""

  ### Since we have an RDD called per User Document Count Ratings (UDCR) which grabs 'key-positions', 'review/text', 'reviews/overall'
  ### Pairing the 'review/text' and 'review/overall' that have the same key
  ### Then loop per documents on the 'sampled RDD'
  counting_rating_word = UD_counts_ratings.flatMap(lambda kv: [((word, kv[1][1]), kv[1][0][word]) for word in kv[1][0].keys()])
  print(counting_rating_word.take(3))

  ### total Word Rating Count (WRC)
  total_WRC = counting_rating_word.reduceByKey(lambda a, b: a + b)
  print(total_WRC.take(3))

  ### Grabbing only the key from the (key, value) and the frequency of it
  ### Which should give us the 'total Word Count By Ratings' (WCBR)
  total_WCBR = total_WRC.map(lambda kv: (kv[0][1], kv[1]))
  print(total_WCBR.take(3))

  ### getting the maximum Per User Ratings (PUR)
  ### Grouping the key-value pairs by key and do reduction to values associated with each key
  ### Then using the 'max()' function to get the maximum value between 'a' and 'b'
  ### We want the key-value pairs from the the RDD to convert into python dictionary, hence use 'collectAsMap()'
  ### We shold get something like: {unique ratings from 'reviews/overall': maximum count associated with each rating}
  max_PUR = total_WCBR.reduceByKey(lambda a, b: max(a, b)).collectAsMap()
  print(max_PUR)

  ### User Document Count Ratings frequency
  UDCR_frequency = total_WRC.map(lambda kv: (kv[0], kv[1] / max_PUR[kv[0][1]]))
  return UDCR_frequency

# getting per User Document Ratings (UDR) Term Frequencies (TFs)
per_UDR_tfs = tf(per_UDCR)
print(per_UDR_tfs.take(3))


[(('big', 4.5), 2), (('thanks', 4.5), 1), (('n', 4.5), 1)]
[(('big', 4.5), 438), (('thanks', 4.5), 190), (('n', 4.5), 9)]
[(4.5, 438), (4.5, 190), (4.5, 9)]
{4.5: 2434, 4.0: 4343, 3.5: 2161, 5.0: 714, 1.0: 92, 2.5: 332, 1.5: 107, 2.0: 232, 3.0: 1035}
[(('big', 4.5), 0.1799506984387839), (('thanks', 4.5), 0.0780608052588332), (('n', 4.5), 0.0036976170912078883)]


In [None]:
### Forgot whether I need this
# per_UDCR = sampled_rdd.map(lambda x: (x[1], (wc(x[0]['review/text']), x[0]['review/overall'])))
# print(per_UDCR.take(1))

In [None]:
def tfidfi(tfs, idfs):
  """Identifies the most important term (word text) within a collection of documents form BeerAdvocate dataset"""

  ### Getting the Term Frequency per word along with its 'review/overall'ratings made by a user
  word_tfs = tfs.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
  print(word_tfs.take(3))

  ### Joining together the Term Frequency and Inverse Document Frequency
  ### Joining documents with the same term
  tfidf_join = word_tfs.join(idfs)
  print(tfidf_join.take(30)) # The output showing repeated term, it may indicate that this term have the highest significance within the BeerAdvocate document

  ### Multiplying values: TF * IDF
  tfidf_result = tfidf_join.map(lambda kv: (kv[1][0][0], (kv[1][0][1] * kv[1][1], kv[0])))

  return tfidf_result


### Rate By (RB) Term Frequency Inverse Document Frequency (tfidfi)
RB_tfidfi = tfidfi(per_UDR_tfs, word_idf)
print(RB_tfidfi.take(5))

[('big', (4.5, 0.1799506984387839)), ('thanks', (4.5, 0.0780608052588332)), ('n', (4.5, 0.0036976170912078883))]
[('big', ((4.5, 0.1799506984387839), 8.800694349462107)), ('big', ((4.0, 0.14598204006447157), 8.800694349462107)), ('big', ((3.0, 0.12657004830917876), 8.800694349462107)), ('big', ((3.5, 0.13142063859324388), 8.800694349462107)), ('big', ((2.5, 0.09036144578313253), 8.800694349462107)), ('big', ((5.0, 0.15126050420168066), 8.800694349462107)), ('big', ((2.0, 0.11206896551724138), 8.800694349462107)), ('big', ((1.0, 0.06521739130434782), 8.800694349462107)), ('big', ((1.5, 0.06542056074766354), 8.800694349462107)), ('n', ((4.5, 0.0036976170912078883), 14.262310215361282)), ('n', ((4.0, 0.0018420446695832373), 14.262310215361282)), ('n', ((2.5, 0.006024096385542169), 14.262310215361282)), ('n', ((3.5, 0.002776492364645997), 14.262310215361282)), ('n', ((3.0, 0.004830917874396135), 14.262310215361282)), ('n', ((5.0, 0.0014005602240896359), 14.262310215361282)), ('wants', ((4.

In [None]:
### Computing half of the answer to our proposed question of:
### “what is the relationship between a customer’s text reviews and a beer’s features to the overall impression of the most popular beer.”
### The output will be showing...

### Outputting the Term Frequency-Inverse Document Frequency (TFIDF) Count By Ratings Word (CBRW)
### Applying the 'overall_splitter' function which splits the 'review/overall' into three categories:
### negative (0-6), neutral (7-13), and positive (14-20)

TFIDF_CBRW = RB_tfidfi.groupByKey().map(lambda kv: (overall_splitter(kv[0]), sorted(kv[1], reverse=True)[:20]))
#print(TFIDF_CBRW.take(5))

# Sort the RDD in descending order
descending_ratings = sorted(TFIDF_CBRW.collect(), key=lambda x: x[0], reverse = True)

for grouped_ratings in descending_ratings:
  print(grouped_ratings)


('negative 5.0', [(6.592480285889554, 'beer'), (4.372868546280418, 'head'), (4.09039283013185, 'nice'), (3.664268017628552, 'taste'), (3.2293434681430715, 'one'), (3.108249069321355, 'sweet'), (3.0053510000336128, 'light'), (2.902204831205447, 'great'), (2.8391113606041167, 'malt'), (2.789696501799021, 'hops'), (2.736412252498564, 'good'), (2.6144216971716774, 'dark'), (2.5772544751443536, 'flavor'), (2.560485733863881, 'like'), (2.5232509431470267, 'glass'), (2.382921432050751, 'smooth'), (2.244653041171659, 'aroma'), (2.2247601067818925, 'well'), (2.1450239463194665, 'chocolate'), (2.0453324704076707, 'hop')])
('negative 4.5', [(6.592480285889554, 'beer'), (5.693779764901281, 'nice'), (5.664901801606627, 'head'), (4.014617542682194, 'hops'), (3.9906777906601616, 'malt'), (3.9902599212990797, 'light'), (3.8614478124758422, 'taste'), (3.831645230004248, 'good'), (3.6995834977321, 'sweet'), (3.468105660678755, 'dark'), (3.33458086341894, 'well'), (3.2192053674357863, 'carbonation'), (3.

## Computing the Cosine Similarity to the BeerAdvocate RDD

In this section, the first process was to extract all the beer aspects: apearance, aroma, palate, taste. Then average all of it per 'review/overall' rates.



Sourced from: [DATA301 - Lab 4](https://colab.research.google.com/drive/19qEMeBYEibyTZHKkHPmk3EapYO5_xYFP)

In [None]:
### Sourced from DATA301 - Lab 4
### Cosine Similarity formula

def cosine_similarity(u, v):
  """Computes cosine similarity between two lists of integers"""
  if u == v:
    return 1.0

  # Indicators
  numerator = 0
  denominator_u = 0
  denominator_v = 0

  # loops through both vectors and does the calculations
  for i, j in zip(u, v):
    numerator += i * j
    denominator_u += i ** 2
    denominator_v += j ** 2

  # Does the Cosine Similarity Function
  cosine_similarity = numerator / (sqrt(denominator_u) * sqrt(denominator_v))

  return cosine_similarity

### TEST CASE
print(cosine_similarity([1,2,-1],[2,1,1]))
print(cosine_similarity([1,2,1],[1,2,1]))

0.5000000000000001
1.0


Computing all the mean average value of the Beer aspects made by user queries.
Gets the apearance, aroma, palate, taste and then average all of it per 'review/overall'.

This will be used later in the cosine similarity, wherein comparison between the first lowest 'review/overall' to the first top 5 lowest 'review/overall'.

Sourced from: [Average of List in Python](https://sparkbyexamples.com/python/average-of-list-in-python/)

In [None]:

### Hoping to get something like this:
### [('the review/overall', [(mean of appearance, mean of aroma, mean of palate, mean of taste)])] -> [('negative 5.0', [(4, 4, 4, 4)]), ('negative 4.5', [(3, 3, 3, 3)]), .......]

import statistics
import math

def mean_value_computation(rates):
  """Calculating the mean values for all of the rates user gave on apearance, aroma, palate, taste"""
  appearance_rates, aroma_rates, palate_rates, taste_rates = zip(*rates) ### unpacks the lists of tuples from each aspects of the beer

  ### Compute mean of the given lists of beer aspects values
  mean_appearance_rate = statistics.mean(appearance_rates)
  mean_aroma_rate = statistics.mean(aroma_rates)
  mean_palate_rate = statistics.mean(palate_rates)
  mean_taste_rate = statistics.mean(taste_rates)


  return (math.ceil(mean_appearance_rate), math.ceil(mean_aroma_rate), math.ceil(mean_palate_rate), math.ceil(mean_taste_rate))

In [None]:
### Creating a new RDD that gathers the rating of 'review/overall' and the aspects of the beer made by users from the BeerAdvocate dataset

### Negative Overall Ratings (NOR) rdd.
NOR_rdd = sampled_rdd.filter(lambda x : overall_splitter(x[0]["review/overall"]).split()[0] == 'negative') ### get the negative from the 'review/overall'
beer_aspect_rates = NOR_rdd.map(lambda x : (x[0]['review/overall'], (x[0]['review/appearance'], x[0]['review/aroma'], x[0]['review/palate'], x[0]['review/taste']))) ### Retrieving only the important variables

grouped = beer_aspect_rates.groupByKey() ### Grouping by the key (which is the 'review/overall')
mean_rates_rdd = grouped.mapValues(lambda x: mean_value_computation(x)) ### mapping through 'grouped' rdd then calulate all mean values

### Mean Rates RDD (MRR) in ascending order
MRR_ascending = mean_rates_rdd.sortByKey()

print(MRR_ascending.collect())

# print(mean_rates_rdd.takeOrdered(5, lambda x : x[0])) ### Prints it in ascending order
# print(mean_rates_rdd.takeOrdered(5, lambda x : -x[0])) ### Prints it in descending order

[(1.0, (3, 2, 2, 2)), (1.5, (3, 3, 2, 2)), (2.0, (3, 3, 3, 3)), (2.5, (4, 3, 3, 3)), (3.0, (4, 4, 4, 4)), (3.5, (4, 4, 4, 4)), (4.0, (4, 4, 4, 4)), (4.5, (5, 5, 5, 5)), (5.0, (5, 5, 5, 5))]


In [None]:
def cos_sim(user_rates):
  """Takes each aspects of singular beer (appearance, aroma, palate, taste) per user query and measures similarity between the lowest 'review/overall' compared to the rest of the first top 5 samples"""

  ### Collection of Aspect Rates
  Collection_AR = user_rates.collect()

  ### Computing cosine similarities in all possile pairs comparisons
  ### Wherein, the lowest/very first 'review/overall' is being compared with the first top 5 result of the 'review/overall'
  ### Loops through this RDD:                                                                   index:  (x[0], x[1]     )
  ### [(10, [3.5, 4.5, 4.0, 4.5]), (91, [4.5, 4.5, 4.5, 5.0]), (115, [4.0, 4.0, 4.0, 4.5]), ......] -> [(i, [v, v, v, v]), (i, [v, v, v, v]), ......]

  cosine_similarities = user_rates.map(lambda x : [((i, x[0]), cosine_similarity(x[1] , v)) for i, v in Collection_AR]).flatMap(lambda x : (i for i in x))
  #print(cosine_similarities.take(5))

  return cosine_similarities

  #print(cosine_similarities.takeOrdered(5, lambda x : -x[1])) ### to check it in descending order - takes about 6 minutes to output the result

### Cosine Similarity (CS) Per User Aspect Rates (PUAR)
cos_sim_PUAR = cos_sim(MRR_ascending)
print(cos_sim_PUAR.take(5))

[((1.0, 1.0), 1.0), ((1.5, 1.0), 0.9843091327750998), ((2.0, 1.0), 0.9819805060619657), ((2.5, 1.0), 0.9983374884595828), ((3.0, 1.0), 0.9819805060619657)]


In [None]:
### Outputting the result
results = cos_sim_PUAR.take(5) ## Taking only the first 5 sample of the randomly selected user reviews about the aspects of the beers.
for (id1, id2), similarity in results: ## Looping through the cosine similarity RDD
    print(f"Cosine similarity between IDs {id1} and {id2}: {similarity}")

Cosine similarity between IDs 1.0 and 1.0: 1.0
Cosine similarity between IDs 1.5 and 1.0: 0.9843091327750998
Cosine similarity between IDs 2.0 and 1.0: 0.9819805060619657
Cosine similarity between IDs 2.5 and 1.0: 0.9983374884595828
Cosine similarity between IDs 3.0 and 1.0: 0.9819805060619657


In [None]:
### From the TF-IDF result

#print(TFIDF_CBRW.take(5))

# Sort the RDD in ascending order
ascending_ratings = sorted(TFIDF_CBRW.collect(), key=lambda x: x[0], reverse = False)

### Origginal
# for grouped_ratings in ascending_ratings[:5]:
#   print(grouped_ratings)

### Testing
for overall_rates, tfidf_words in ascending_ratings[:5]:
  print(f"Overall rates: {overall_rates} and importance of a term word {tfidf_words}.")


Overall rates: negative 1.0 and importance of a term word [(6.592480285889554, 'beer'), (3.815719051268565, 'taste'), (3.7772454834272953, 'like'), (3.195599097318654, 'head'), (2.967929222867188, 'one'), (2.267464317082571, 'smell'), (2.2321969841992684, 'bottle'), (1.9514493181539343, 'ever'), (1.8376782456906064, 'color'), (1.773987653743334, 'white'), (1.758776751257147, 'drain'), (1.7561489477645496, 'bad'), (1.695526336327957, 'sweet'), (1.6610970649808383, 'carbonation'), (1.6326998105338009, 'corn'), (1.600676076104859, 'light'), (1.5978493094029191, 'yellow'), (1.5665880012244648, 'pours'), (1.557797440248031, 'beers'), (1.5377873793797159, 'thin')].
Overall rates: negative 1.5 and importance of a term word [(6.592480285889554, 'beer'), (3.4079369595396733, 'light'), (3.022379706996708, 'head'), (2.8241087726559218, 'like'), (2.6936346217922345, 'one'), (2.537981354502738, 'taste'), (2.483675628389305, 'corn'), (2.2454754996633604, 'much'), (2.2013125326391845, 'flavor'), (1.8

In [None]:
### TESTING
### Join the TF-IDF result with the mean value computation of aspect rate beers result

tfidf_rdd = TFIDF_CBRW.map(lambda x : (float(x[0].split()[1]), x[1]))

tfidf_rate_beer = mean_rates_rdd.join(tfidf_rdd)

#print(tfidf_rate_beer.take(5))

rates_tfidf = tfidf_rate_beer.map(lambda x: (overall_splitter(x[0]), x[1][0], x[1][1]))

print(rates_tfidf.take(5))

print(rates_tfidf.takeOrdered(5, lambda x : -float(x[0].split()[1])))


[('negative 2.5', (4, 3, 3, 3), [(6.592480285889554, 'beer'), (5.295454215002502, 'head'), (4.927733559958336, 'taste'), (4.346107755027792, 'like'), (3.8230720050628393, 'light'), (3.455713043526484, 'malt'), (3.4384085560911415, 'sweet'), (3.2590780144843503, 'flavor'), (3.1464018344435565, 'aroma'), (3.055416842232575, 'color'), (3.026924530145651, 'little'), (3.0128989480922983, 'carbonation'), (2.978026589106171, 'hops'), (2.977219370618315, 'bit'), (2.878533764347092, 'one'), (2.6304324905878382, 'finish'), (2.603095868320266, 'smell'), (2.5855260247474137, 'mouthfeel'), (2.5454008728007804, 'much'), (2.413244979133867, 'white')]), ('negative 3.0', (4, 4, 4, 4), [(6.528784824190137, 'beer'), (5.879902339066324, 'head'), (5.382003726820735, 'taste'), (5.379626685406383, 'light'), (4.195975713156925, 'malt'), (3.9596534238285215, 'sweet'), (3.87836703260751, 'bit'), (3.639789273235698, 'hops'), (3.6043045129856135, 'little'), (3.4832702089901217, 'carbonation'), (3.332008518000104,