In [0]:
!pip install -q keras
!pip install -U -q PyDrive
import pandas as pd
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Embedding, GlobalMaxPooling1D, CuDNNGRU, Dropout, BatchNormalization, Activation
from sklearn.model_selection import train_test_split
from keras.preprocessing import text, sequence
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.models import load_model
import keras.optimizers
import os
from google.colab import files
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

In [0]:
#change this value depending on what you need
use_existing_weights = True
filepath = 'weights_best.hdf5'
model_name = 'rnn_model_twitter.h5'

if use_existing_weights:
  # Authenticate and create the PyDrive client.
  # This only needs to be done once per notebook.
  auth.authenticate_user()
  gauth = GoogleAuth()
  gauth.credentials = GoogleCredentials.get_application_default()
  drive = GoogleDrive(gauth)

  #download weights from google drive
  weight_id = '1cM_aXUHcVylDRu9n57L-6Q0L8XmdCvJU'
  weight_drive_file = drive.CreateFile({'id': weight_id})
  weight_drive_file.GetContentFile(filepath)

In [0]:
!mkdir data
!wget http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip && unzip trainingandtestdata.zip -d data/
!wget http://nlp.stanford.edu/data/glove.twitter.27B.zip && unzip glove.twitter.27B.zip -d data/

In [0]:
#load data to the working environment
data_path = os.path.join(os.path.expanduser('~'), 'data', 'training.1600000.processed.noemoticon.csv')
dataset = pd.read_csv(data_path, error_bad_lines=False, encoding='latin1', header=None, names=['sentiment', 'id', 'date', 'flag', 'user', 'text'])
dataset.dropna(axis=0, inplace=True)
embedding_path = os.path.join(os.path.expanduser('~'), 'data', 'glove.twitter.27B.200d.txt')
dim_size = 200
# dataset = dataset.sample(frac=0.001)

In [0]:
#preprocess dataset
eyes = r"[8:=;]"
nose = r"['`\-]?"

dataset['text'] = dataset['text']\
.str.replace(r'https?:\/\/\S+\b|www\.(\w+\.)+\S*', '<url>')\
  .str.replace(r'@\w+', '<user>')\
  .str.replace(r'{}{}[)dD]+|[)dD]+{}{}'.format(eyes, nose, nose, eyes), '<smile>')\
  .str.replace(r'{}{}p+'.format(eyes, nose), '<lolface>')\
  .str.replace(r'{}{}\(+|\)+{}{}'.format(eyes, nose, nose, eyes), '<sadface>')\
  .str.replace(r'{}{}[\/|l*]'.format(eyes, nose), '<neutralface>')\
  .str.replace(r'/',' / ')\
  .str.replace(r'<3','<heart>')\
  .str.replace(r'[-+]?[.\d]*[\d]+[:,.\d]*', '<number>')\
  .str.replace(r'#\S+', '<hashtag>')\
  .str.replace(r'([!?.]){2,}', r'\1 <repeat>')\
  .str.replace(r'\b(\S*?)(.)\2{2,}\b', r'\1\2 <elong>')
  
#source: https://gist.github.com/tokestermw/cb87a97113da12acb388

In [0]:
#prep the dataset

#replacing positive labels from 4 to 1
dataset['sentiment'] = dataset['sentiment'].replace(4, 1)

#filter these things from the text
token = text.Tokenizer(filters='!"#$%&()*+,-./:;=?@[\]^_`{|}~\t\n')
max_len = dataset['text'].str.len().max()

#learn the vocabulary from all the text
token.fit_on_texts(list(dataset['text']))
vocab_size = len(token.word_index) + 1

x_train, x_test, y_train, y_test = train_test_split(dataset['text'],
													dataset['sentiment'],
													test_size=0.2)

#encode
x_train_enc = token.texts_to_sequences(x_train)
x_test_enc = token.texts_to_sequences(x_test)

#add zero padding
x_train_enc_pad = sequence.pad_sequences(x_train_enc, maxlen=max_len)
x_test_enc_pad = sequence.pad_sequences(x_test_enc, maxlen=max_len)

In [0]:
#create embedding dictionary
embeddings_index = dict()
f = open(embedding_path)
for line in f:
	values = line.split()
	word = values[0]
	coefs = np.asarray(values[1:], dtype='float32')
	embeddings_index[word] = coefs
f.close()

#map the vocabulary to it's word embedding
embedding_matrix = np.zeros((vocab_size, dim_size))
for word, i in token.word_index.items():
	embedding_vector = embeddings_index.get(word)
	if embedding_vector is not None:
		embedding_matrix[i] = embedding_vector


In [0]:
#deep learning
finished_training = False
model = Sequential()
model.add(Embedding(vocab_size, dim_size, input_length=max_len, weights=[embedding_matrix], trainable=False))
model.add(CuDNNGRU(32, return_sequences=True))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.3))

model.add(CuDNNGRU(16, return_sequences=True))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(Dropout(0.3))
model.add(GlobalMaxPooling1D())

model.add(Dense(1, activation='sigmoid'))
if use_existing_weights:
  model.load_weights(filepath)

opt = keras.optimizers.Adam(lr=0.01)
model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])

earlystop = EarlyStopping(monitor='acc', patience=10, verbose=1, mode='auto')
checkpoint = ModelCheckpoint(filepath, monitor='acc', verbose=1, save_best_only=True, mode='auto')
callback_lists = [earlystop, checkpoint]

model.fit(x_train_enc_pad, y_train, epochs=100, batch_size=3000, callbacks=callback_lists)
scores = model.evaluate(x_test_enc_pad, y_test)
print("\n%s: %.2f%%" % (model.metrics_names[1], scores[1]*100))

model.save(model_name)
finished_training = True

In [0]:
# Create & upload a text file.
uploaded = drive.CreateFile({'title': filepath})
uploaded.SetContentFile(filepath)
uploaded.Upload()
print('Uploaded file with ID {}'.format(uploaded.get('id')))
print('Done uploading weights')

if finished_training:
  uploaded_model = drive.CreateFile({'title': model_name})
  uploaded_model.SetContentFile(model_name)
  uploaded_model.Upload()
  print('Uploaded file with ID {}'.format(uploaded_model.get('id')))
  print('Done uploading model')

In [0]:
#using the model

done = False
while done == False:
  sentence = input('Input sentence: ')
  
  sentence_encoding = token.texts_to_sequences([sentence])
  padded_sentence = sequence.pad_sequences(sentence_encoding, maxlen=max_len)
  
  prediction = model.predict(np.array(padded_sentence))
  if prediction[0] == 1:
    print(prediction[0])
    print('Positive')
  elif prediction[0] == 0:
    print(prediction[0])
    print('Negative')
  
  finish = input('Do you still want to input another text? [y/n]')
  if finish == 'N' or finish == 'n':
    done = True
    
