# This dataset consists of reviews of fine foods from amazon. The data span a period of more than 10 years, including all ~500,000 reviews up to October 2012. Reviews include product and user information, ratings, and a plain text review. It also includes reviews from all other Amazon categories.

In [6]:
import dask.bag as bag
import os

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
raw_text = bag.read_text("/content/drive/MyDrive/foods.txt",encoding='cp1252')

In [9]:
from dask.delayed import delayed

In [10]:
def get_next_buffer_part(file,start_index,span_index=0,blocksize=1000):
    file.seek(start_index)
    buffer = file.read(blocksize + span_index).decode('cp1252')
    delimeter_position = buffer.find('\n\n')
    if delimeter_position == -1:
        return get_next_buffer_part(file,start_index,span_index+blocksize)
    else:
        file.seek(start_index)
        return start_index,delimeter_position

In [11]:
with open("/content/drive/MyDrive/foods.txt","rb") as file_handle:
    size = file_handle.seek(0,2) - 1       #Get the total size of the file in bytes
    more_data = True                     
    output = list()
    current_position = next_position = 0
    while more_data:
        if current_position >= size:
            more_data = False
        else:
            current_position,next_position = get_next_buffer_part(file_handle,current_position,0)
            output.append((current_position,next_position))
            current_position = current_position + next_position + 2

In [12]:
def get_dict_item(filename,start_index,delimeter_position,encoding='cp1252'):
    with open(filename,"rb") as file_handle:
        file_handle.seek(start_index)
        text = file_handle.read(delimeter_position).decode(encoding)
        elements = text.strip().split("\n")
        key_value_pairs = [(element.split(": ")[0], element.split(": ")[1])
                          if len(element.split(": ")) > 1
                          else ("unknown",element)
                          for element in elements]
        return dict(key_value_pairs)

In [13]:
reviews = bag.from_sequence(output).map(lambda x: get_dict_item("/content/drive/MyDrive/foods.txt",x[0],x[1]))

In [14]:
reviews.take(2)

({'product/productId': 'B001E4KFG0',
  'review/helpfulness': '1/1',
  'review/profileName': 'delmartian',
  'review/score': '5.0',
  'review/summary': 'Good Quality Dog Food',
  'review/text': 'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.',
  'review/time': '1303862400',
  'review/userId': 'A3SGXH7AUHU8GW'},
 {'product/productId': 'B00813GRG4',
  'review/helpfulness': '0/0',
  'review/profileName': 'dll pa',
  'review/score': '1.0',
  'review/summary': 'Not as Advertised',
  'review/text': 'Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as "Jumbo".',
  'review/time': '1346976000',
  'review/userId': 'A1D87F6ZCVE5NK'})

In [15]:
def fetch_scores(element):
    numeric_score = float(element['review/score'])
    return numeric_score

In [16]:
review_scores = reviews.map(fetch_scores)

In [17]:
def tag_reviews(element):
    if float(element['review/score']) > 3:
        element['review/score'] = 'pos'
    else:
        element['review/score'] = 'neg'
    return element

In [18]:
reviews = reviews.map(tag_reviews)

In [19]:
import pandas as pd
import numpy as np
from nltk.tokenize import word_tokenize
import re
import nltk
from nltk.corpus import stopwords
from spacy.lang.en.stop_words import STOP_WORDS
from itertools import filterfalse
from nltk import pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet as wn
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA

In [20]:
def text_tokenization(x):
    x['review/text'] = word_tokenize(x['review/text'])
    return x

In [21]:
tokenized_reviews = reviews.map(text_tokenization)

In [22]:
def normalize_tokens(review):
    review['review/text'] =  [x.lower() for x in review['review/text']]
    return review

In [23]:
normalized_reviews = tokenized_reviews.map(normalize_tokens)

In [24]:
def contracted_word_expansion(token):
    if token in contractions_dict.keys():
        return contractions_dict[token]
    else:
        return token

In [25]:
def contractions_expansion(review):
    review['review/text'] = list(map(contracted_word_expansion,review['review/text']))
    return review

In [26]:
contracted_reviews = normalized_reviews.map(contractions_expansion)

In [27]:
regex = r'^@[a-zA-z0-9]|^#[a-zA-Z0-9]|\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*|\W+|\d+|<("[^"]*"|\'[^\']*\'|[^\'">])*>|_+|[^\u0000-\u007f]+'

In [28]:
def waste_word_or_not(token):
    return re.search(regex,token)

In [29]:
def filter_waste_words(review):
    review['review/text'] = list(filterfalse(waste_word_or_not,review['review/text']))
    return review

In [30]:
filtered_reviews = contracted_reviews.map(filter_waste_words)

In [31]:
def split(review):
    review['review/text'] = list(map(lambda x: re.split(regex,x)[0],review['review/text']))
    return review

In [32]:
filtered_reviews = filtered_reviews.map(split)

In [33]:
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
[nltk_data] Downloading package omw to /root/nltk_data...
[nltk_data]   Unzipping corpora/omw.zip.


True

In [34]:
en_stop_words = list(set(stopwords.words('english')).union(set(STOP_WORDS)))

In [35]:
def is_stopword(token):
    return not(token in en_stop_words or re.search(r'\b\w\b|[^\u0000-\u007f]+|_+|\W+',token))

In [36]:
def stopwords_removal(review):
    review['review/text'] = list(filter(is_stopword,review['review/text']))
    return review

In [37]:
without_stopwords_reviews = filtered_reviews.map(stopwords_removal)

In [38]:
def get_wnet_pos_tag(treebank_tag):
    wn.ensure_loaded()
    if treebank_tag[1].startswith('J'):
        return (treebank_tag[0],wn.ADJ)
    elif treebank_tag[1].startswith('V'):
        return (treebank_tag[0],wn.VERB)
    elif treebank_tag[1].startswith('N'):
        return (treebank_tag[0],wn.NOUN)
    elif treebank_tag[1].startswith('R'):
        return (treebank_tag[0],wn.ADV)
    else:
        return (treebank_tag[0],wn.NOUN)

In [65]:
def get_pos_tag(review):
    wn.ensure_loaded()
    review['review/text'] = list(map(get_wnet_pos_tag,pos_tag(review['review/text'])))
    return review

In [40]:
tagged_reviews = without_stopwords_reviews.map(get_pos_tag)

In [41]:
lemmatizer = WordNetLemmatizer()
wn.ensure_loaded()

In [42]:
def token_lemmatization(token_pos_tuple):
    wn.ensure_loaded()
    if token_pos_tuple == None:
        return ""
    else:
        return lemmatizer.lemmatize(word=token_pos_tuple[0],pos=token_pos_tuple[1])

In [43]:
def lemmatization(review):
    wn.ensure_loaded()
    if len(review['review/text']) > 0:
        review['review/text'] = list(map(token_lemmatization,review['review/text']))
    else:
        review['review/text'] = [""]
    return review

In [44]:
lemmatized_reviews = tagged_reviews.map(lemmatization)

In [45]:
def extract_tokens(review):
    return review['review/text']

In [46]:
extracted_tokens = lemmatized_reviews.map(extract_tokens)

In [47]:
unique_tokens = extracted_tokens.flatten().distinct()

In [48]:
from dask.diagnostics import ProgressBar

In [49]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [50]:
!pip install contractions

Collecting contractions
  Downloading https://files.pythonhosted.org/packages/0a/04/d5e0bb9f2cef5d15616ebf68087a725c5dbdd71bd422bcfb35d709f98ce7/contractions-0.0.48-py2.py3-none-any.whl
Collecting textsearch>=0.0.21
  Downloading https://files.pythonhosted.org/packages/d3/fe/021d7d76961b5ceb9f8d022c4138461d83beff36c3938dc424586085e559/textsearch-0.0.21-py2.py3-none-any.whl
Collecting pyahocorasick
[?25l  Downloading https://files.pythonhosted.org/packages/7f/c2/eae730037ae1cbbfaa229d27030d1d5e34a1e41114b21447d1202ae9c220/pyahocorasick-1.4.2.tar.gz (321kB)
[K     |████████████████████████████████| 327kB 15.2MB/s 
[?25hCollecting anyascii
[?25l  Downloading https://files.pythonhosted.org/packages/09/c7/61370d9e3c349478e89a5554c1e5d9658e1e3116cc4f2528f568909ebdf1/anyascii-0.1.7-py3-none-any.whl (260kB)
[K     |████████████████████████████████| 266kB 13.6MB/s 
[?25hBuilding wheels for collected packages: pyahocorasick
  Building wheel for pyahocorasick (setup.py) ... [?25l[?25hdone

In [51]:
from contractions import contractions_dict

In [52]:
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [53]:
with ProgressBar():
    number_of_tokens = unique_tokens.count().compute()

[########################################] | 100% Completed | 19min 21.3s


In [54]:
number_of_tokens

90271

In [55]:
unique_tokens

dask.bag<distinct-aggregate, npartitions=1>

In [56]:
with ProgressBar():
    tokens_index = list(unique_tokens)

[########################################] | 100% Completed | 19min 12.9s


In [57]:
from collections import Counter
from collections import OrderedDict

In [58]:
def compute_tf(review):
    D = dict(Counter(review))
    non_included = set(tokens_index).difference(set(D.keys()))
    D_prime = dict(zip(non_included,list(np.zeros(len(non_included)))))
    D_prime.update(D)
    full_D = dict(OrderedDict(sorted(D_prime.items())))
    print(full_D)
    return np.array(full_D.values())

In [59]:
tf_vectors = extracted_tokens.map(compute_tf)

In [60]:
def stacker(partition):
    return dask_array.concatenate([element for element in partition])

In [68]:
corpus = list()
for text in reviews['review/text']:
    corpus.append(text)

TypeError: ignored