In [1]:
import nltk
from nltk.corpus import PlaintextCorpusReader
from nltk.corpus import stopwords
nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /home/tony/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/tony/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

The product review corpus contains reviews scored as positive and negative opinions. Pre-process
your text, prepare the review examples for training and evaluation. Implement, train and evaluate a
neural network that can classify an input review to either a positive or a negative class. You are
free to choose any neural network/deep learning technique taught in the Chapter “Deep Learning
for NLP”, e.g., multi-layer perceptron, LSTM, bi-directional LSTM, etc. You should design
appropriate experiments to evaluate your classifier’s classification accuracy based on 5-fold cross
validation (CV).

In [2]:
# Set up useful dictionary mappings
fileNames = {} # Dictionary to fetch file name based on docID (docID : fileName)
docTerms = {} # Dictionary to fetch terms of a doc based on fileName (fileName : [terms_list]) where terms_list includes duplicates
review_data = [] # Stores review samples

# Set up PlaintextCorpusReader object to read all txt files in product_reviews folder
data = None
path = "product_reviews"
data = PlaintextCorpusReader(path, '.*.txt')

# Initialise dictionaries
documents = data.fileids()
documents.remove("README.txt")
for i in range(0,len(documents)):
  docID = i+1
  file_name = documents[i]
  fileNames.update({docID : file_name})


Pre-processing the text

In [3]:
from nltk.stem import snowball
import regex as re
import string

def filter_words(words: list) -> list:
    """
    Returns a list of words that are:
    - lowercase
    - not stopwords
    - length > 1
    - have no punctation in each word
    - stemmed
    """
    final_words = words
    # Use only lowercase characters
    final_words = [word.lower() for word in words]
    
    # Remove stopwords
    stop_words = stopwords.words('english')
    final_words = [word for word in final_words if word not in stop_words]
    
    # Remove punctation in between words
    final_words = [word.translate(str.maketrans('', '', string.punctuation)) for word in final_words]
    
    # Remove anything that isn't alphabetic
    final_words = [word for word in final_words if word.isalpha() == True]
    
    # Remove strings with length 1
    final_words = [word for word in final_words if len(word) > 1]

    
#     # Stemming using Snowball Stemmer (Porter2)
#     stemmer = snowball.SnowballStemmer('english')
#     final_words = [stemmer.stem(word) for word in final_words]
    return final_words

def filter_review(review: str) -> str:
    """
    Returns a string review that:
    - does not include the rating in the review
    - is lexically processed
    """
    # The comment of a review appears after the ## tag in each document
    try:
        splitting_index = review.index('##')
    except: # Some reviews aren't tagged with ## before the the review's comment so look directly after the rating tag
        splitting_index = review.rfind(']')
    comment = review[splitting_index+2:]
    return " ".join(filter_words(comment.split()))
    
    
def preprocess_document(document: str):
    raw_text = data.raw(document)
    lines = raw_text.splitlines()

    for line in lines:
        res = re.search('\[(?:\+|\-)\d\]',line) # Search for any tags in the form [+n] or [-n] in the review
        if res is not None: 
            review = filter_review(line)
            # Generate tagged data based on the rating tag (class 1 = positive, class 0 = negative)
            if '+' in res.group():
                review_data.append([review,1])
            else:
                review_data.append([review,0])

# Pre-process each document
for doc in documents:
    preprocess_document(doc)
    

Preparing the review examples for training and evaluation

In [4]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
import numpy as np

good_reviews = [review for review in review_data if review[1]==1]
bad_reviews = [review for review in review_data if review[1]==0]

reviews = [review for review,_ in review_data]
labels = [label for _,label in review_data]

print("Sample number:",len(reviews))
print("Positive:",len(good_reviews))
print("Negative:",len(bad_reviews))



Sample number: 2090
Positive: 1344
Negative: 746


In [18]:
import scipy
import tensorflow as tf
from keras.preprocessing.text import Tokenizer
from keras.models import Sequential
from keras import layers
from keras import losses
from keras.preprocessing.sequence import pad_sequences
from collections import Counter

def count_words(data):
    total_counter = dict()
    for sent in data:
        words = sent.split()
        for word in words:
            if word in total_counter:
                total_counter[word] += 1
            else:
                total_counter[word] = 1       
    return len(total_counter)
        

def model_and_evaluate(training_data,test_data,EPOCHS=10,BATCH_SIZE=30,VOCAB_SIZE=1000,VERBOSE=False):
    
    model = None
    train_reviews = [review for review,_ in training_data]
    train_labels = np.array([label for _,label in training_data])
    test_reviews = [review for review,_ in test_data]
    test_labels = np.array([label for _,label in test_data])
        
    num_words = count_words(train_reviews)
    max_length = 20
    tokenizer = Tokenizer(num_words=num_words)
    tokenizer.fit_on_texts(train_reviews)

    train_seq = tokenizer.texts_to_sequences(train_reviews)
    train_padded = pad_sequences(train_seq,maxlen=max_length,padding="post",truncating="post")
    test_seq = tokenizer.texts_to_sequences(test_reviews)
    test_padded = pad_sequences(test_seq,maxlen=max_length,padding="post",truncating="post")

    model = Sequential([
        layers.Embedding(num_words,32,input_length=max_length),
        layers.Bidirectional(layers.LSTM(32, recurrent_dropout=0.2)),
        layers.Dense(1,activation="sigmoid")
    ])
#         model = Sequential()
#         model.add(layers.Embedding(num_words, 32, input_length=max_length))
#         model.add(layers.Conv1D(filters=32, kernel_size=3, padding='same', activation='relu'))
#         model.add(layers.MaxPooling1D(pool_size=2))
#         model.add(layers.Bidirectional(layers.LSTM(100, dropout=0.2, recurrent_dropout=0.2))),
#         model.add(layers.Dense(1, activation='sigmoid'))

    model.compile(loss="binary_crossentropy",optimizer='adam',metrics=["accuracy"])

    history = model.fit(
        train_padded,
        train_labels,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        verbose=VERBOSE)

    loss, accuracy = model.evaluate(test_padded,test_labels)
    return accuracy

In [19]:
import random
random.shuffle(review_data)

index = int(len(review_data)*0.8)

training_data = review_data[:index]
test_data = review_data[index:]

print(model_and_evaluate(training_data,test_data,VERBOSE=True,BATCH_SIZE=64,EPOCHS=8))




Epoch 1/8
Epoch 2/8
Epoch 3/8
Epoch 4/8
Epoch 5/8
Epoch 6/8
Epoch 7/8
Epoch 8/8
0.7320574162679426


Designing and implementing experiments to evaluate classification accuracy based on 5-fold cross validation.

In [17]:
import random
random.shuffle(review_data)

k = 5

groups = np.array_split(review_data, k) # Split dataset into 5 groups

reviews = [review for review,_ in review_data]
labels = [label for _,label in review_data]

# 5-fold cross validation
accuracies = []
for i in range (0,k):
    test_data = groups[i]
    other_groups = list(range(0,k))
    other_groups.remove(i)
    training_data = [review for j in other_groups for review in groups[j]]
    accuracy = model_and_evaluate(training_data,test_data,BATCH_SIZE=64,EPOCHS=8)
    accuracies.append(accuracy)
    
print(np.mean(accuracies))

0.7320574162679425
