# LSTM Model

In [None]:
import tarfile

#Load data
filename = '/content/moral_stories_datasets.tar.xz'

moral_stories = tarfile.open(filename)
moral_stories.extractall()
moral_stories.close()

In [None]:
import pandas as pd
import json

def json_to_dataframe(filename, verbose=False):

  with open(filename, 'r') as json_file:
    situation = []
    action = []
    label = []

    json_list = list(json_file)
    if verbose:
      print(len(json_list))

    for json_str in json_list:
      json_dict = json.loads(json_str)
      if verbose:
        print(f'json dict: {json_dict}')

      if 'situation' in json_dict:
        situation.append(json_dict['situation'])

      if 'immoral_action' in json_dict:
        action.append(json_dict['immoral_action'])

      if 'moral_action' in json_dict:
        action.append(json_dict['moral_action'])

      if 'label' in json_dict:
        label.append(json_dict['label'])

  df = pd.DataFrame({'situation': situation, 'action': action, 'label': label})
  return df

In [None]:
import os
from sklearn.model_selection import train_test_split
#from sklearn.preprocessing import StandardScaler

#Specify data directory 
data_dir = os.path.join('moral_stories_datasets', 'classification', 'action+context', 'lexical_bias')

#Load training and testing data
train_df = json_to_dataframe(os.path.join(data_dir, 'train.jsonl'))
test_df = json_to_dataframe(os.path.join(data_dir, 'test.jsonl'))

In [None]:
train_df

In [None]:
test_df

In [None]:
#Concatenate text in 'situation' and 'action' columns

train_df['situation + action'] = ''
test_df['situation + action'] = ''

num_train_rows = len(train_df['situation'])
num_test_rows = len(test_df['situation'])

for i in range(num_train_rows):
  action = train_df['action'].iloc[i]
  situation = train_df['situation'].iloc[i]
  train_df['situation + action'].iloc[i] = f'<start>{situation}<sep>{action}<end>'

for j in range(num_test_rows):
  action = test_df['action'].iloc[j]
  situation = test_df['situation'].iloc[j]
  test_df['situation + action'].iloc[j] = f'<start>{situation}<sep>{action}<end>'

In [None]:
train_df

In [None]:
test_df

In [None]:
#Generate dictionary for tokenization

import nltk
import pickle
import argparse
from collections import Counter
from vist import VIST

class Vocabulary(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        if not word in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __call__(self, word):
        if not word in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

def build_vocab(textList, threshold):
    counter = Counter()
    for each_text in textList:
        tokens = []
        try:
            tokens = nltk.tokenize.word_tokenize(each_text.lower())
        except Exception:
            pass
        counter.update(tokens)

    if i % 1000 == 0:
        print("[%d/%d] Tokenized the story captions." %(i, len(ids)))

    words = [word for word, cnt in counter.items() if cnt >= threshold]

    vocab = Vocabulary()
    vocab.add_word('<pad>')
    vocab.add_word('<start>')
    vocab.add_word('<end>')
    vocab.add_word('<unk>')

    for i, word in enumerate(words):
        vocab.add_word(word)

    return vocab

def main(args):
    # Give your text_list here
    vocab = build_vocab(text_list,
                        threshold=args.threshold)
    vocab_path = args.vocab_path
    with open(vocab_path, 'wb') as f:
        pickle.dump(vocab, f)
    print("Total vocabulary size: %d" %len(vocab))
    print("Saved the vocabulary wrapper to '%s'" %vocab_path)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
#     parser.add_argument('--sis_path', type=str,
#                         default='./data/sis/train.story-in-sequence.json',
#                         help='path for train sis file')
    parser.add_argument('--vocab_path', type=str, default='./models/vocab.pkl',
                        help='path for saving vocabulary wrapper')
    parser.add_argument('--threshold', type=int, default=10,
                        help='minimum word count threshold')
    args = parser.parse_args()
    main(args)

ModuleNotFoundError: ignored

In [None]:
#Tokenize concatenated text


In [None]:
#Generate word embedding


## EXAMPLES FROM *Text classification with Transformer*

Implement a Transformer block as a layer

In [None]:
# class TransformerBlock(layers.Layer):
#     def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
#         super(TransformerBlock, self).__init__()
#         self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
#         self.ffn = keras.Sequential(
#             [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
#         )
#         self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
#         self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
#         self.dropout1 = layers.Dropout(rate)
#         self.dropout2 = layers.Dropout(rate)
        
#     def call(self, inputs, training):
#         attn_output = self.att(inputs, inputs)
#         attn_output = self.dropout1(attn_output, training=training)
#         out1 = self.layernorm1(inputs + attn_output)
#         ffn_output = self.ffn(out1)
#         ffn_output = self.dropout2(ffn_output, training=training)
#         return self.layernorm2(out1 + ffn_output)

Implement embedding layer

In [None]:
# class TokenAndPositionEmbedding(layers.Layer):
#     def __init__(self, maxlen, vocab_size, embed_dim):
#         super(TokenAndPositionEmbedding, self).__init__()
#         self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
#         self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

#     def call(self, x):
#         maxlen = tf.shape(x)[-1]
#         positions = tf.range(start=0, limit=maxlen, delta=1)
#         positions = self.pos_emb(positions)
#         x = self.token_emb(x)
#         return x + positions

Download and prepare dataset

In [None]:
# vocab_size = 20000  # Only consider the top 20k words
# maxlen = 200  # Only consider the first 200 words of each movie review
# (x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size)
# print(len(x_train), "Training sequences")
# print(len(x_val), "Validation sequences")
# x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
# x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)

Create classifier model using transformer layer

In [None]:
# embed_dim = 32  # Embedding size for each token
# num_heads = 2  # Number of attention heads
# ff_dim = 32  # Hidden layer size in feed forward network inside transformer

# inputs = layers.Input(shape=(maxlen,))
# embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
# x = embedding_layer(inputs)
# transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
# x = transformer_block(x)
# x = layers.GlobalAveragePooling1D()(x)
# x = layers.Dropout(0.1)(x)
# x = layers.Dense(20, activation="relu")(x)
# x = layers.Dropout(0.1)(x)
# outputs = layers.Dense(2, activation="softmax")(x)

# model = keras.Model(inputs=inputs, outputs=outputs)

Train and Evaluate

In [None]:
# model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])
# history = model.fit(
#     x_train, y_train, batch_size=32, epochs=2, validation_data=(x_val, y_val)
# )

In [None]:
# #Function to read .json files
# def json_file_load(filename, verbose=True):
#     #json_data = []
#     with open(filename, 'r') as json_file:
#         json_list = list(json_file)
#         if verbose:
#           print(len(json_list))
#         for json_str in json_list:
#             result = json.loads(json_str)
#             #json_data.append(result)
#             if verbose:
#               print(f"result: {result}")
#     #return json_data