In [2]:
import pandas as pd
import os
import numpy as np
from tensorflow.keras.callbacks import LambdaCallback
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Flatten, Bidirectional, \
                            Dropout, Embedding, Conv2D, MaxPool2D, Reshape, \
                            TimeDistributed, Activation, BatchNormalization, Input
from tensorflow.keras.optimizers import RMSprop, Adam

# TO RUN ON GPU, UNCOMMENT
# import tensorflow as tf
# config = tf.compat.v1.ConfigProto(device_count = {'GPU':2})
# sess = tf.compat.v1.Session(config=config)
# tf.compat.v1.keras.backend.set_session(sess)

In [3]:
# Get the data into a pandas dataframe
train_file_path = "data/train.csv"
df = pd.read_csv(train_file_path)

In [4]:
# separate labels and data
texts = df["text"]
selected_texts = df["selected_text"]
sentiments = df["sentiment"]

# a list to hold text, sentiment dictionaries
train_list = []
# a list to hold the labels
label_list = []

for text, data, label in zip(texts, sentiments, selected_texts):
    dict_to_add = dict()
    dict_to_add[text] = data
    train_list.append(dict_to_add)
    label_list.append(label)
    
# a list to hold specific text and label lists 
positive_train_list = []
negative_train_list = []
neutral_train_list = []
# a list to hold the labels
positive_label_list = []
negative_label_list = []
neutral_label_list = []

i = 0
for text, data, label in zip(texts, sentiments, selected_texts):
    if data == "positive":
        positive_train_list.append(text)
        positive_label_list.append(label)
    elif data == "negative":
        negative_train_list.append(text)
        negative_label_list.append(label)
    else:
        neutral_train_list.append(text)
        neutral_label_list.append(label)

In [5]:
# get strings with all text, positive, neutral and negative test
# for non-deep-learning analysis
all_text = ""
all_selected_text = ""
positive_text = ""
positive_selected_text = ""
negative_text = ""
negative_selected_text = ""
neutral_text = ""
neutral_selected_text = ""
for item, label in zip(train_list, label_list):
    all_selected_text += (" " + str(label))
    for key in item:
        all_text += (" " + str(key))
        if(item[key] == "positive"):
            positive_text += (" " + str(key))
            positive_selected_text += (" " + str(label))
        elif(item[key] == "negative"):
            negative_text += (" " + str(key))
            negative_selected_text += (" " + str(label))
        else:
            neutral_text += (" " + str(key))
            neutral_selected_text += (" " + str(label))


chars = sorted(list(set(all_text)))
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))

In [6]:
# set maximum tweet size to seqlen characters (dataset seems to have max about 140 chars)
# so we pad with spaces from the beginning for each tweet
seqlen = 160
modified_positive_train_list = []
i = 0
for tweet in positive_train_list:
    num_spaces = seqlen - len(str(tweet))
    addition = ""
    for i in range(num_spaces):
        addition += " "
    new_tweet = addition + str(tweet)
    modified_positive_train_list.append(new_tweet)

In [7]:
modified_positive_label_list = []
i = 0
for selected, original in zip(positive_label_list, modified_positive_train_list):
    selected = str(selected)
    modified_label = []
    whitespace_count = 0
    j = 0
    selected_len = len(selected)
    while original[j:(j+selected_len)] != selected:
        whitespace_count += 1
        j += 1
    for k in range(whitespace_count):
        modified_label.append(0)
    for k in range(selected_len):
        modified_label.append(1)
    while len(modified_label) != 160:
        modified_label.append(0)
    modified_positive_label_list.append(modified_label) 

In [8]:
# we add one extra dimension to the end
x_positive = np.zeros((len(modified_positive_train_list), seqlen, len(chars)+1), dtype=np.float32)
# y_positive = np.zeros((len(modified_positive_label_list), seqlen, len(chars)+1), dtype=np.float32)
for i, tweet in enumerate(modified_positive_train_list):
    for t, char in enumerate(tweet):
        x_positive[i, t, char_indices[char]] = 1
# for i, tweet in enumerate(modified_positive_label_list):
#     for t, char in enumerate(tweet):
#         y_positive[i, t, char_indices[char]] = 1 

y_positive = modified_positive_label_list
y_positive = np.asarray(modified_positive_label_list)

In [9]:
print(y_positive.shape)
x_positive = np.reshape(x_positive, [8582, 160, 102, 1])
print(x_positive.shape)

(8582, 160)
(8582, 160, 102, 1)


In [12]:
model = Sequential()

model.add(Input(shape=(160, 102, 1), name='model_input'))
model.add(Conv2D(filters=8, kernel_size=3, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(filters=16, kernel_size=3, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(filters=32, kernel_size=3, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(filters=16, kernel_size=3, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Conv2D(filters=8, kernel_size=3, strides=2, padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(Flatten())
model.add(Dense(160, activation='sigmoid'))


model.build(input_shape=(None, 160, 102))
model.summary()

model.compile(
    loss='binary_crossentropy',
    optimizer=Adam(learning_rate=0.001),
    metrics=['binary_crossentropy', 'accuracy'])

model.fit(x_positive, y_positive,
          batch_size=2,
          epochs=50, 
          validation_split=0.1)
model.save("three_cnn_positive_model.h5")

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_10 (Conv2D)           (None, 80, 51, 8)         80        
_________________________________________________________________
batch_normalization_10 (Batc (None, 80, 51, 8)         32        
_________________________________________________________________
conv2d_11 (Conv2D)           (None, 40, 26, 16)        1168      
_________________________________________________________________
batch_normalization_11 (Batc (None, 40, 26, 16)        64        
_________________________________________________________________
conv2d_12 (Conv2D)           (None, 20, 13, 32)        4640      
_________________________________________________________________
batch_normalization_12 (Batc (None, 20, 13, 32)        128       
_________________________________________________________________
conv2d_13 (Conv2D)           (None, 10, 7, 16)        

Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [13]:
# Get the data into a pandas dataframe
test_file_path = "data/test.csv"
df_test = pd.read_csv(test_file_path)
# separate labels and data
texts = df_test["text"]
sentiments = df_test["sentiment"]

# a list to hold text, sentiment dictionaries
test_list = []

for text, data in zip(texts, sentiments):
    dict_to_add = dict()
    dict_to_add[text] = data
    test_list.append(dict_to_add)
    
# a list to hold specific text and label lists 
positive_test_list = []
negative_test_list = []
neutral_test_list = []

i = 0
for text, data in zip(texts, sentiments):
    if data == "positive":
        positive_test_list.append(text)
    elif data == "negative":
        negative_test_list.append(text)
    else:
        neutral_test_list.append(text)

In [14]:
# get strings with all text, positive, neutral and negative test
all_text = ""
positive_text = ""
negative_text = ""
neutral_text = ""
for item in zip(test_list):
    all_text += (" " + str(item))
    if(item == "positive"):
        positive_text += (" " + str(item))
    elif(item == "negative"):
        negative_text += (" " + str(item))
    else:
        neutral_text += (" " + str(item))


In [15]:
# set maximum tweet size to seqlen characters (dataset seems to have max about 140 chars)
# so we pad with spaces from the beginning for each tweet
seqlen = 160
modified_positive_test_list = []
i = 0
for tweet in positive_test_list:
    num_spaces = seqlen - len(str(tweet))
    addition = ""
    for i in range(num_spaces):
        addition += " "
    new_tweet = addition + str(tweet)
    modified_positive_test_list.append(new_tweet)

In [16]:
# we add one extra dimension to the end
x_positive_test = np.zeros((len(modified_positive_test_list), seqlen, len(chars)+1), dtype=np.float32)
# y_positive = np.zeros((len(modified_positive_label_list), seqlen, len(chars)+1), dtype=np.float32)
for i, tweet in enumerate(modified_positive_test_list):
    for t, char in enumerate(tweet):
        try:
            x_positive_test[i, t, char_indices[char]] = 1
        except:
            continue

In [19]:
result_train = model.predict(np.reshape(x_positive[600], [1, 160, 102, 1]))
result_test1 = model.predict(np.reshape(x_positive_test[20], [1, 160, 102, 1]))
result_test2 = model.predict(np.reshape(x_positive_test[21], [1, 160, 102, 1]))
result_test3 = model.predict(np.reshape(x_positive_test[22], [1, 160, 102, 1]))
result_test4 = model.predict(np.reshape(x_positive_test[23], [1, 160, 102, 1]))

In [20]:
print("Test for training data:\n")
train_str = ""
for i, line in enumerate(modified_positive_train_list):
    if i == 600:
        train_str = line
        break
print(train_str)
train_label_str = ""
for i in range(len(train_str)):
    if result_train[0][i] >= 0.5:
        train_label_str = train_label_str + train_str[i]
print(train_label_str)
print(positive_label_list[600])

print("\nTest for testing data:\n")
test_str = ""
for i, line in enumerate(modified_positive_test_list):
    if i == 20:
        test_str = line
        break
print(test_str)
test_label_str = ""
for i in range(len(test_str)):
    if result_test1[0][i] >= 0.1:
        test_label_str = test_label_str + test_str[i]
print(test_label_str + "\n")

test_str = ""
for i, line in enumerate(modified_positive_test_list):
    if i == 21:
        test_str = line
        break
print(test_str)
test_label_str = ""
for i in range(len(test_str)):
    if result_test2[0][i] >= 0.1:
        test_label_str = test_label_str + test_str[i]
print(test_label_str + "\n")

test_str = ""
for i, line in enumerate(modified_positive_test_list):
    if i == 22:
        test_str = line
        break
print(test_str)
test_label_str = ""
for i in range(len(test_str)):
    if result_test3[0][i] >= 0.1:
        test_label_str = test_label_str + test_str[i]
print(test_label_str + "\n")

test_str = ""
for i, line in enumerate(modified_positive_test_list):
    if i == 23:
        test_str = line
        break
print(test_str)
test_label_str = ""
for i in range(len(test_str)):
    if result_test4[0][i] >= 0.1:
        test_label_str = test_label_str + test_str[i]
print(test_label_str + "\n")

Test for training data:

                                                                                                                    Had a good time at Flap-a-taco with , , and 
a 
good

Test for testing data:

                                                                       Yay for Block Party!  You`re the BOMB! Blockheads <3 Dave!   Thanks for supporting NKOTB!
k

                                                                           Happy Birthday Snickers!!!! ? I hope you have the best day ever! Let`s go shopping!!!
phat

                                                                                                                                   Thank you!  I`m working on `s
   Thank you!  I`m working on `s

                                                                                                                                            Happy Mothers Day!!!
  Happy Mothers Day!!!

