In [1]:
import pandas as pd
import numpy as np
import nltk
import re
import string
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, Dense, Dropout, Conv2D, GlobalMaxPool2D, Layer
from tensorflow.keras import Model, Input
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Embedding
from tensorflow_addons.metrics import F1Score
import warnings
warnings.filterwarnings('ignore')


In [2]:
train = pd.read_csv('data/train.csv')
train.head(10)

In [2]:
def preprocess(text):

    # removing '\n', '\t'
    text = re.sub("\n", "", text)
    text = re.sub("\t", "", text)
    # lowercasing
    text = text.lower()

    # specific
    text = re.sub(r"won\'t", "will not", text)
    text = re.sub(r"can\'t", "can not", text)

    # general
    text = re.sub(r"n\'t", " not", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'s", " is", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'t", " not", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'m", " am", text)
    
    # digits
    text = re.sub(r'\d+?\w', '', text)

    # Remove some punctuations 
    text = re.sub(r"[!?,:'\"*)@#%(&$_.^-]", ' ', text)

    text = re.sub(' +', ' ', text)

    return text

In [4]:
train['clean_text'] = train['text'].apply(preprocess)
train['clean_aspect'] = train['aspect'].apply(preprocess)

In [3]:
def get_pw(k, m, i, n, mode="lf"):
    C = 30.

    i += 1
    k += 1

    if i == k:
        pw = 1
    elif i < (k+m):
        pw = 1 - ((k + m - i)/C)
    elif (k + m) <= i and i <= n:
        pw = 1 - ((i - k)/C)
    else:
        pw = 0

    return round(pw, ndigits=3) if pw > 0 else 0

In [4]:
def loop_pw(sentence, target):
    
    text = sentence.split(" ")
    target_words = target.split(" ")

    first_target = [target_words[0]]
    m = len(first_target)
    n = len(text)

    if first_target[0] not in text:
        for i in range(len(text)):
            if target_words[0] in text[i]:
                k = i
                break

        pw = [get_pw(k, m, idx, n) for idx in range(len(text))]
            

    else:

        for i, word in enumerate(text):
            if target_words[0] in word:

                if len(target_words[0]) < len(word):
                    continue
                else:
                    k = i

        pw = [get_pw(k, m, idx, n) for idx in range(len(text))]

    return pw

In [7]:
positionW = []
for i in range(0, len(train)):
    positionW.append(loop_pw(train['clean_text'][i], train['clean_aspect'][i]))

positionW[0]

In [8]:
# Padding positionW to seq length
max_seq_length = 46
for i in range(len(positionW)):

    if len(positionW[i]) < max_seq_length:
        pad_length = max_seq_length - len(positionW[i])
        positionW[i] = np.pad(positionW[i], (0, pad_length), 'constant')

    else:
            positionW[i] = positionW[i][:max_seq_length]

positionW[:3]


In [9]:
positionW = np.stack(arrays=positionW, axis=0)
positionW.shape

### Preprocess the text

**seq len = 46 (99 percentile) |**
**max aspect len = 8**

In [10]:
def tokenize_sent(text):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(text)

    return tokenizer, tokenizer.texts_to_sequences(text)

In [11]:
tokenizer, _ = tokenize_sent(train.clean_text.to_list())

In [12]:
word2idx = tokenizer.word_index

vocab_size = len(word2idx)
vocab_size

### Embedding using Glove

In [13]:
path_to_glove = '../input/glove6b/glove.6B.300d.txt'

embeddings_index = {}
with open(path_to_glove) as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs

print("Found %s word vectors." % len(embeddings_index))

In [14]:
num_tokens = vocab_size +1
embedding_dim = 300
hits = 0
misses = 0

# Prepare embedding matrix
embedding_matrix = np.zeros((num_tokens, embedding_dim))
for word, i in word2idx.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        # Words not found in embedding index will be all-zeros.
        # This includes the representation for "padding" and "OOV"
        embedding_matrix[i] = embedding_vector
        hits += 1
    else:
        embedding_matrix[i] = tf.random.uniform(shape=(embedding_dim, ), minval=-0.25, maxval=0.25).numpy()
        misses += 1
print("Converted %d words (%d misses)" % (hits, misses))

In [17]:
# hyper Parameters
hidden_dims = 50
filter_nums = 50

### Embedding Layer

In [18]:
embedding_layer = Embedding(
    num_tokens,
    embedding_dim,
    embeddings_initializer=tf.keras.initializers.Constant(embedding_matrix),
    mask_zero=True,
    trainable=False,
)


embedding_layer -> (None, seq_len, 300)

### Bi-LSTM

In [19]:
from tensorflow.keras.layers import Bidirectional, LSTM


def BiLSTM(hidden_nums, embed_out):


    lstm_layer = Bidirectional(LSTM(hidden_dims, return_sequences=True))

    outputs = lstm_layer(embed_out)

    return outputs

BiLstm  -> (1, seq_len, hidden_dims*2)

### Context Preserving transformation

1. Target Specific Transformation
2. Adaptive Scaling / Lossless Forwarding

In [20]:
class CPT(Layer):
    def __init__(self, hidden_nums):
        super(CPT, self).__init__()
        self.hidden_nums = hidden_nums

        self.t_weights = {
            'trans_weights': tf.Variable(tf.initializers.RandomUniform(-0.01, 0.01)(shape=[4 * self.hidden_nums, 2*self.hidden_nums]),
                                        trainable=True,
                                        import_scope='TST_weights',
                                        name='trans_W')
        }
        self.t_bias = {
            'trans_bias': tf.Variable(tf.zeros_initializer()(shape=[2 * self.hidden_nums]),
                                        trainable=True,
                                        import_scope='TST_bias',
                                        name='trans_b')
        }

    def tst(self, target_hidden_states, hidden_states):
        hidden_sp = tf.shape(hidden_states)
        batch_size = hidden_sp[0]

        # (seq_len , batch_size, 2 * hidden_size)
        hs_ = tf.transpose(hidden_states, perm=[1, 0, 2])
        # (batch_size, 2*hidden_size, target_len)
        t_ = tf.transpose(target_hidden_states, perm=[0, 2, 1])

        # tst
        sentence_index = 0
        sentence_array = tf.TensorArray(dtype=tf.float32, size=1, dynamic_size=True)

        def body(sentence_index, sentence_array):
            # (batch_size, 2*hidden_size)
            hi = tf.transpose(tf.gather_nd(
                hs_, [[sentence_index]]), perm=[1, 2, 0])

            # (batch_size, target_length)
            ai = tf.nn.softmax(tf.squeeze(
                tf.matmul(target_hidden_states, hi), axis=-1))

            # (batch_size, 2 * hidden_size, 1)
            ti = tf.matmul(t_, tf.expand_dims(ai, axis=-1))

            # squeeze_dim = 1
            hi = tf.squeeze(hi, axis=-1)
            ti = tf.squeeze(ti, axis=-1)

            # concatenate (batch_size, 1, 4 * hidden_size)
            concated_hi = tf.concat([hi, ti], axis=-1)
            concated_hi = tf.reshape(
                concated_hi, [batch_size, 1, 4 * self.hidden_nums])


            hi_new = tf.math.tanh(
                tf.matmul(concated_hi, tf.tile(tf.expand_dims(self.t_weights['trans_weights'], axis=0),
                        [batch_size, 1, 1])) + self.t_bias['trans_bias']
            )

            hi_new = tf.squeeze(hi_new, axis=1)

            sentence_array = sentence_array.write(sentence_index, hi_new)

            return (sentence_index + 1, sentence_array)

        def cond(sentence_index, sentence_array):
            return sentence_index < hidden_sp[1]

        _, sentence_array = tf.while_loop(
            cond=cond,
            body=body,
            loop_vars=[sentence_index, sentence_array])

        # while sentence_index < hidden_sp[1]:
        #    sentence_index, sentence_array = body(sentence_index, sentence_array)

        sentence_array = tf.transpose(sentence_array.stack(), perm=[1, 0, 2])

        return sentence_array


    def lf_layer(self, target_hidden_states, hidden_states):
        hidden_states_ = self.tst(target_hidden_states, hidden_states)

        return hidden_states_ + hidden_states

    def call(self, target_hidden_states, hidden_states):
        """
        Input : {
            target_embeddings: (?, ?, embed_dim),
            target_sequence_length : (?, ),
            hidden_states: (?, ?, 2 * hidden_nums)
        }
        """

        output = self.lf_layer(target_hidden_states, hidden_states)

        return output
    
    def get_config(self):
        config = super(CPT, self).get_config()
        config.update({
            'hidden_nums': self.hidden_nums,
        })
        return config
    
    @classmethod
    def from_config(cls, config):
        return cls(**config)

### Convulation Layer

In [21]:
class CnnLayer(tf.keras.layers.Layer):
    def __init__(self, filter_nums, kernel_size):
        super(CnnLayer, self).__init__()
        self.kernel_size = kernel_size
        self.filter_nums = filter_nums

        self.cnn_layer = Conv2D(self.filter_nums, self.kernel_size, activation='relu')
        self.pool_layer = GlobalMaxPool2D()

    def call(self, hidden_states):
        hs = tf.expand_dims(hidden_states, axis=-1)
        features = self.cnn_layer(hs)
        outputs = self.pool_layer(features)

        return outputs, features
    
    def get_config(self):
        config = super(CnnLayer, self).get_config()
        config.update({
            'filter_nums': self.filter_nums,
            'kernel_size': self.kernel_size
        })
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

### Positional relavance

In [22]:
def position_embedding(hs, pw):
        """
        @hs: (batch_size, sentence_length, 2 * hidden_nums)
        @pw: (batch_size, sentence_length)
        """
        weighted_hs = hs * tf.expand_dims(pw, axis=-1)

        return weighted_hs

### Model

In [23]:

# sentence input

sentence_input = Input(shape=(None, ), name='sentence_input')
sentence_embed = embedding_layer(sentence_input)
sentence_hidden_states = BiLSTM(hidden_dims, sentence_embed)
sentence_hidden_states = tf.keras.layers.Dropout(0.3)(sentence_hidden_states)

# target input
target_input = Input(shape=(None, ), name='target_input')
target_embed = embedding_layer(target_input)
target_hidden_states = BiLSTM(hidden_dims, target_embed)
target_hidden_states = tf.keras.layers.Dropout(0.3)(target_hidden_states)

# CPT 1
cpt_out_1 = CPT(hidden_dims)(target_hidden_states, sentence_hidden_states)

# position weighting
pw = Input(shape=(None,), name='pw')
modified_hidden_states_1 = position_embedding(cpt_out_1, pw)

# CPT 2
cpt_out_2 = CPT(hidden_dims)(target_hidden_states, modified_hidden_states_1)

# position weighting
modified_hidden_states_2 = position_embedding(cpt_out_2, pw)

# CNN layer
cnn_output, cnn_features = CnnLayer(filter_nums, 3)(modified_hidden_states_2)

# Dropout
drp_out = tf.keras.layers.Dropout(0.3)(cnn_output)

# output layer
output_layer = Dense(3, activation='softmax')(drp_out)

model = Model(inputs=[sentence_input, target_input, pw], outputs=output_layer)

In [24]:
model.compile(loss = tf.keras.losses.CategoricalCrossentropy(label_smoothing=0.2),
             optimizer = tf.keras.optimizers.Adam(),
             metrics=['accuracy', F1Score(3)])

In [25]:
model.summary()

In [26]:
# Making validation split
cutoff = 0.95
split = int(len(train) * cutoff)
# spliting texts
train_text = train.clean_text[: split].to_list()
val_text = train.clean_text[split : ].to_list()

# spliting aspects
train_aspect = train.clean_aspect[: split].to_list()
val_aspect = train.clean_aspect[split : ].to_list()

# split positionW
train_pw = positionW[: split]
val_pw = positionW[split : ]

# split labels
train_labels = train.label[: split]
val_labels = train.label[split :]

len(train_text), len(train_aspect), len(train_pw), len(train_labels), len(val_text), len(val_aspect), len(val_pw), len(val_labels)

In [27]:
train_p_text = tokenizer.texts_to_sequences(train_text)
val_p_text = tokenizer.texts_to_sequences(val_text)

train_p_target = tokenizer.texts_to_sequences(train_aspect)
val_p_target = tokenizer.texts_to_sequences(val_aspect)

In [28]:
# padding
seq_len = 46
aspect_len = 8
train_p_text = pad_sequences(train_p_text, maxlen=seq_len, padding="post", truncating='post', value=0)
val_p_text = pad_sequences(val_p_text, maxlen=seq_len, padding="post", truncating='post', value=0)

train_p_target = pad_sequences(train_p_target, maxlen=aspect_len, padding="post", truncating='post', value=0)
val_p_target = pad_sequences(val_p_target, maxlen=aspect_len, padding="post", truncating='post', value=0)

In [29]:
train_p_text.shape, train_p_target.shape

In [30]:
from sklearn.preprocessing import OneHotEncoder
ohe = OneHotEncoder(sparse=False)
train_ohe_labels = ohe.fit_transform(train_labels.to_numpy().reshape(-1, 1))
val_ohe_labels = ohe.transform(val_labels.to_numpy().reshape(-1, 1))

In [31]:
# train dataset
train_inputs = tf.data.Dataset.from_tensor_slices((train_p_text,
                                                train_p_target,
                                                train_pw))

train_labels = tf.data.Dataset.from_tensor_slices(train_ohe_labels)


train_data = tf.data.Dataset.zip((train_inputs, train_labels))

train_data = train_data.batch(64).prefetch(tf.data.AUTOTUNE)


# Val dataset
val_inputs = tf.data.Dataset.from_tensor_slices((val_p_text,
                                                val_p_target,
                                                val_pw))

val_labels = tf.data.Dataset.from_tensor_slices(val_ohe_labels)


val_data = tf.data.Dataset.zip((val_inputs, val_labels))

val_data = val_data.batch(64).prefetch(tf.data.AUTOTUNE)

In [32]:
train_data, val_data

In [34]:
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', min_delta=0.005, patience=3, mode='max', baseline=0.69)


history = model.fit(train_data, epochs=25, validation_data=val_data, callbacks=[early_stopping])

In [37]:
model.save('Tnet-LF-300dGLOVE')

In [None]:
import matplotlib.pyplot as plt
def plot_loss_curves(history):
    """
    Returns separate loss curves for training and validation metrics.
    Args:
    history: TensorFlow model History object (see: https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/History)
    """ 
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    accuracy = history.history['accuracy']
    val_accuracy = history.history['val_accuracy']
    
    f1_score = history.history['f1_score']
    val_f1_score = history.history['f1_score']

    epochs = range(len(history.history['loss']))

    # Plot loss
    plt.plot(epochs, loss, label='training_loss')
    plt.plot(epochs, val_loss, label='val_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.figure()
    plt.plot(epochs, accuracy, label='training_accuracy')
    plt.plot(epochs, val_accuracy, label='val_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend();
    
    # Plot F1-score
    plt.figure()
    plt.plot(epochs, accuracy, label='train_f1_score')
    plt.plot(epochs, val_accuracy, label='val_f1_score')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend();
    
plot_loss_curves(history)

In [None]:
test = pd.read_csv('../input/tsadata/test.csv')
test.head()

In [None]:
test['clean_text'] = test['text'].apply(preprocess)
test['clean_aspect'] = test['aspect'].apply(preprocess)

test.head()

In [None]:
pw_t = []
seq_len = 46
aspect_len = 8

for i in range(len(test)):
    pw_t.append(loop_pw(test['clean_text'][i], test['clean_aspect'][i]))

for i in range(len(test)):
    if len(pw_t[i]) < seq_len:
        pad_length = seq_len - len(pw_t[i])
        pw_t[i] = np.pad(pw_t[i], (0, pad_length), 'constant')

    else:
            pw_t[i] = pw_t[i][:seq_len]
        

pw_t = np.stack(arrays=pw_t, axis=0)
pw_t.shape

In [None]:
from tqdm import tqdm
test_label = []
for i in tqdm(range(len(test))):
    sentence, target = test['clean_text'][i], test['clean_aspect'][i]

    sen = tokenizer.texts_to_sequences([sentence])
    tar = tokenizer.texts_to_sequences([target])

    sen = pad_sequences(sen, maxlen=seq_len, padding="post", truncating='post', value=0)
    tar = pad_sequences(tar, maxlen=aspect_len, padding="post", truncating='post', value=0)

    probs = model.predict([sen, tar, pw_t[i].reshape(1, -1)])
    test_label.append(np.argmax(probs, axis=1)[0])

In [None]:
result = {
    'text' : test['text'].to_list(),
    'aspect' : test['aspect'].to_list(),
    'label' : test_label
}

In [None]:
result = pd.DataFrame(result)
result.to_csv('test.csv', index=False)