In [None]:
import string
import re
from os import listdir
from numpy import array
from keras.preprocessing.text import Tokenizer
from keras_preprocessing.sequence import pad_sequences
from keras.utils.vis_utils import plot_model
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Embedding
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D

In [None]:
# load doc into memory
def load_doc(filename):
  # open the file as read only
  file = open(filename, 'r' )
  # read all text
  text = file.read()
  # close the file
  file.close()
  return text

In [None]:
# turn a doc into clean tokens
def clean_doc(doc, vocab):
  # split into tokens by white space
  tokens = doc.split()
  # prepare regex for char filtering
  re_punc = re.compile( '[%s]' % re.escape(string.punctuation))
  # remove punctuation from each word
  tokens = [re_punc.sub( '' , w) for w in tokens]
  # filter out tokens not in vocab
  tokens = [w for w in tokens if w in vocab]
  tokens = ' ' .join(tokens)
  return tokens

In [None]:
# load the vocabulary
vocab_filename = 'vocab.txt'
vocab = load_doc(vocab_filename)
vocab = set(vocab.split())

In [None]:
# load all docs in a directory
def process_train(directory, vocab):
  documents = list()
  for filename in listdir(directory):
    if not filename.startswith( 'cv9' ):
      path = directory + '/' + filename
      doc = load_doc(path)
      tokens = clean_doc(doc, vocab)
      documents.append(tokens)
  return documents

def process_test(directory, vocab):
  documents = list()
  for filename in listdir(directory):
    if filename.startswith( 'cv9' ):
      path = directory + '/' + filename
      doc = load_doc(path)
      tokens = clean_doc(doc, vocab)
      documents.append(tokens)
  return documents  

In [None]:
# load all docs in a directory
def process_docs(directory, vocab, is_train):
  documents = list()
  # walk through all files in the folder
  for filename in listdir(directory):
    # skip any reviews in the test set
    if is_train and filename.startswith( 'cv9' ):
      continue
    if not is_train and not filename.startswith( 'cv9' ):
      continue
    # create the full path of the file to open
    path = directory + '/' + filename
    # load the doc
    doc = load_doc(path)
    # clean doc
    tokens = clean_doc(doc, vocab)
    # add to list
    documents.append(tokens)
  return documents

In [None]:
# load and clean a dataset
def load_clean_dataset(vocab, is_train):
  # load documents
  neg = process_docs('drive/MyDrive/review_polarity/txt_sentoken/neg',vocab, is_train)
  pos = process_docs('drive/MyDrive/review_polarity/txt_sentoken/pos',vocab, is_train)
  docs = neg + pos
  # prepare labels
  labels = array([0 for _ in range(len(neg))] + [1 for _ in range(len(pos))])
  return docs, labels

In [None]:
# fit a tokenizer
def create_tokenizer(lines):
  tokenizer = Tokenizer()
  tokenizer.fit_on_texts(lines)
  return tokenizer

In [None]:
# integer encode and pad documents
def encode_docs(tokenizer, max_length, docs):
  # integer encode
  encoded = tokenizer.texts_to_sequences(docs)
  # pad sequences
  padded = pad_sequences(encoded, maxlen=max_length, padding= 'post' )
  return padded

In [None]:
# define the model
def define_model(vocab_size, max_length):
  model = Sequential()
  model.add(Embedding(vocab_size, 100, input_length=max_length))
  model.add(Conv1D(filters=32, kernel_size=8, activation= 'relu' ))
  model.add(MaxPooling1D(pool_size=2))
  model.add(Flatten())
  model.add(Dense(10, activation= 'relu' ))
  model.add(Dense(1, activation= 'sigmoid' ))
  # compile network
  model.compile(loss= 'binary_crossentropy',optimizer= 'adam',metrics=['accuracy'])
  # summarize defined model
  model.summary()
  plot_model(model, to_file= 'model.png' , show_shapes=True)
  return model

In [None]:
# load the vocabulary
vocab = load_doc('vocab.txt')
vocab = set(vocab.split())

In [None]:
# load all reviews
train_docs, ytrain = load_clean_dataset(vocab, True)
test_docs, ytest = load_clean_dataset(vocab, False)

In [None]:
# create the tokenizer
tokenizer = create_tokenizer(train_docs)

In [None]:
# define vocabulary size
vocab_size = len(tokenizer.word_index) + 1
print( 'Vocabulary size: %d ' % vocab_size)

 Vocabulary size: 25768 


In [None]:
# calculate the maximum sequence length
max_length = max([len(s.split()) for s in train_docs])
print( ' Maximum length: %d ' % max_length)

 Maximum length: 1317 


In [None]:
# encode data
Xtrain = encode_docs(tokenizer, max_length, train_docs)
Xtest = encode_docs(tokenizer, max_length, test_docs)

In [None]:
# define model
model = define_model(vocab_size, max_length)
# fit network
model.fit(Xtrain, ytrain, epochs=10, batch_size=10)

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (None, 1317, 100)         2576800   
                                                                 
 conv1d_1 (Conv1D)           (None, 1310, 32)          25632     
                                                                 
 max_pooling1d_1 (MaxPooling  (None, 655, 32)          0         
 1D)                                                             
                                                                 
 flatten_1 (Flatten)         (None, 20960)             0         
                                                                 
 dense_2 (Dense)             (None, 10)                209610    
                                                                 
 dense_3 (Dense)             (None, 1)                 11        
                                                      

<keras.callbacks.History at 0x7f9664654ca0>

In [None]:
# evaluate model on training dataset
_, acc = model.evaluate(Xtrain, ytrain, verbose=0)
print( ' Train Accuracy: %.2f ' % (acc*100))

 Train Accuracy: 100.00 


In [None]:
# evaluate model on testing dataset
_, acc = model.evaluate(Xtest, ytest, verbose=0, batch_size=1)
print( ' Test Accuracy: %.2f ' % (acc*100))

 Test Accuracy: 87.50 


In [None]:
# classify a review as negative or positive
def predict_sentiment(review):
  # clean review
  line = clean_doc(review, vocab)
  # encode and pad review
  padded = encode_docs(tokenizer, max_length, [line])
  # predict sentiment
  yhat = model.predict(padded, verbose=0)
  # retrieve predicted percentage and label
  percent_pos = yhat[0,0]
  if round(percent_pos) == 0:
    return (1-percent_pos), 'NEGATIVE'
  return percent_pos, 'POSITIVE'

In [None]:
# test positive text
text = 'Everyone will enjoy this film. I love it, recommended!'
percent, sentiment = predict_sentiment(text)
print( 'Review: [%s]\nSentiment: %s (%.3f%%) ' % (text, sentiment, percent*100))

Review: [Everyone will enjoy this film. I love it, recommended!]
Sentiment: NEGATIVE (60.848%) 
