<a href="https://colab.research.google.com/github/minh-chaudang/IntroAI/blob/main/BiCNN_MI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

import numpy as np
from nltk.tokenize.punkt import PunktLanguageVars

Mounted at /content/drive


In [2]:
from tensorflow import keras
import tensorflow as tf
import copy

In [3]:
def load_msrp_corpus(path):
    X = []
    y = []
    with open(path, encoding="utf_8_sig") as f:
        # next(f)  # skip header line
        for line in f:
            cols = line.strip().split("\t")  # Quality	#1 ID	#2 ID	#1 String	#2 String
            y.append(cols[0])
            X.append((cols[3], cols[4]))
    return X, np.array(y, dtype=np.int32)

In [4]:
class Tokenizer(PunktLanguageVars):

    def __init__(self):
        super(Tokenizer, self).__init__()
        pass

    def tokenize(self, document):
        return self.word_tokenize(document)


class Preprocessor:
    def __init__(self,
                 embed_file,
                 max_document_length,
                 unknown="<UNK>",
                 pad="<PAD>",
                 tokenizer=None):
        self._maxlen = max_document_length
        vocabulary, embeddings = self._load_embeddings(embed_file)
        embed_size = embeddings.shape[1]
        self._unknown = unknown
        self._pad = pad
        self._vocabulary = vocabulary
        self._embeddings = embeddings
        self._new_embeddings = []
        self._embed_size = embed_size
        if unknown not in vocabulary:
            self._add_vocabulary(unknown, random=False)
        if pad not in vocabulary:
            self._add_vocabulary(pad, random=False)
        if tokenizer:
            self._tokenizer = tokenizer
        else:
            self._tokenizer = Tokenizer()

    @staticmethod
    def _load_embeddings(path):
        vocabulary = {}
        embeddings = []
        with open(path) as f:
            for line in f:
                cols = line.strip().split(" ")
                word = cols[0]
                if word not in vocabulary:
                    vocabulary[word] = len(vocabulary)
                    embeddings.append(np.array(cols[1:], dtype=np.float32))
        return vocabulary, np.array(embeddings)

    def _add_vocabulary(self, word, random=True):
        # if word in self._vocabulary:
        #     return
        self._vocabulary[word] = len(self._vocabulary)
        if random:
            word_vector = np.random.uniform(-1, 1, self._embed_size)  # generate a random embedding for an unknown word
        else:
            word_vector = np.zeros(self._embed_size, dtype=np.float32)
        self._new_embeddings.append(word_vector)

    def fit(self, raw_documents):
        for document in raw_documents:
            self._fit_each(document)
        return self

    def _fit_each(self, raw_document):
        for token in self._tokenizer.tokenize(raw_document.lower()):
            if token not in self._vocabulary:
                self._add_vocabulary(token, random=True)
        return self

    def transform(self, raw_documents):
        samples = []
        for document in raw_documents:
            samples.append(self._transform_each(document))
        return np.array(samples, dtype=np.int32)

    def _transform_each(self, raw_document):
        tokens = self._tokenizer.tokenize(raw_document.lower())
        if len(tokens) > self._maxlen:
            print("Token length exceeds max_document_length")
            raise
        word_ids = np.full(self._maxlen, self._vocabulary[self._pad], dtype=np.int32)
        for i, token in enumerate(tokens):
            if token in self._vocabulary:
                word_ids[i] = self._vocabulary[token]
            else:
                word_ids[i] = self._vocabulary[self._unknown]
        return word_ids

    def fit_transform(self, raw_documents):
        return self.fit(raw_documents).transform(raw_documents)

    def _fit_transform_each(self, raw_document):
        return self._fit_each(raw_document)._transform_each(raw_document)

    def get_embeddings(self):
        if len(self._new_embeddings) > 0:
            self._embeddings = np.r_[self._embeddings, self._new_embeddings]
            self._new_embeddings = []
        return self._embeddings

class MsrpCorpusPreprocessor(Preprocessor):

    def __init__(self, embed_file):
        super(MsrpCorpusPreprocessor, self).__init__(
            embed_file=embed_file,
            max_document_length=48,
            unknown="*UNKNOWN*",
            pad="*PAD*",
        )
    # override
    def transform(self, X_raw):
        transform_each = super(MsrpCorpusPreprocessor, self)._transform_each
        X = []
        for X_raw_each in X_raw:
            X.append((transform_each(X_raw_each[0]), transform_each(X_raw_each[1])))
        return np.array(X)

    # override
    def fit_transform(self, X_raw):
        fit_transform_each = super(MsrpCorpusPreprocessor, self)._fit_transform_each
        X = []
        for X_raw_each in X_raw:
            X.append((fit_transform_each(X_raw_each[0]), fit_transform_each(X_raw_each[1])))
        return np.array(X)

    def embed_each(self, sentence):
        indices = self._transform_each(sentence)
        result = []
        for index in indices:
          result.append(self.embeddings[index])
        return np.array(result)

    def embed(self, sentence_pairs):
      result = []
      for pair in sentence_pairs:
        result.append([self.embed_each(pair[0]), self.embed_each(pair[1])])

      return np.array(result)

    @property
    def max_sentence_length(self):
        return self._maxlen

    @property
    def embeddings(self):
        return self.get_embeddings()

In [5]:
processor = MsrpCorpusPreprocessor("/content/drive/MyDrive/paraphraseIden/Bi-CNN-MI/sample/embeddings-original.EMBEDDING_SIZE=25.txt")

In [6]:
X_raw,y = load_msrp_corpus("/content/drive/MyDrive/paraphraseIden/Bi-CNN-MI/sample/msr_paraphrase_test-small.txt")
X_train = processor.fit_transform(X_raw)

In [8]:
class Embed(keras.layers.Layer):
  def __init__(self, processor):
    super().__init__(trainable = False)
    self.processor = processor

  def call(self, X):
    result = []
    for S in X: result.append(self.embed_each(S))
    return np.array(result)

  def embed_each(self, wordIds):
    result = []
    for id in wordIds: result.append(processor.embeddings[id])
    return np.array(result)

In [9]:
def Conv2D(filters, window_size):
  return keras.layers.Conv2D(filters = filters, kernel_size = (1, window_size), padding = 'same')

In [10]:
class Similarity(keras.layers.Layer):
  def __init__(self, beta):
    self.beta = beta
    super().__init__(trainable = False)
  def call(self, inputs):
    x0, x1 = inputs
    temp0 = np.transpose(x0, [0,1,3,2])
    temp1 = np.transpose(x1, [0,1,3,2])
    nums, channels, words, embed_size = temp0.shape

    res = []

    for index in range(nums):
      for channel in range(channels):
        res.append(self.sentence_diff(temp0[index][channel], temp1[index][channel]))  
    
    return np.array(res).reshape(nums, channels, words, words)

  
  def word_diff(self, word0, word1):
    diff = word0 - word1
    distance = np.sum(diff * diff)
    return np.exp(- distance / (2 * self.beta))

  def sentence_diff(self, sen1, sen2):
    res = np.array([[0]*sen2.shape[0] for i in range(sen1.shape[0])])
    for i in range(sen1.shape[0]):
      for j in range(sen2.shape[0]):
        res[i][j] = (self.word_diff(sen1[i], sen2[j]))
    return res

In [14]:
class BiCNN(keras.Model):
  def __init__(self, processor, channels, filter_width, k_top, beta, pool_size):
    # retain parameters
    self.processor = processor
    self.filter_width = filter_width
    self.channels = channels
    self.k_top = k_top
    self.beta = beta
    self.pool_size = pool_size
    super(BiCNN, self).__init__()

    self.embed = Embed(processor)
    self.conv1l = Conv2D(channels[0], filter_width[0])
    self.similarity = Similarity(self.beta)

  def call(self, x):

    Sl, Sr = x[:, 0], x[:, 1]
    El = self.to4d(self.embed(Sl).transpose([0,2,1]))
    Er = self.to4d(self.embed(Sr).transpose([0,2,1]))
    
    El_transposed = np.transpose(El, [0, 3, 1, 2])
    Er_transposed = np.transpose(Er, [0, 3, 1, 2])
    Fu = self.similarity([El_transposed, Er_transposed])

    print("Fu", type(Fu), Fu.shape)

    C1l = self.conv1l(El)
    C1r = self.conv1l(Er)


    C1l_transposed = np.transpose(C1l, [0, 3, 1, 2])
    C1r_transposed = np.transpose(C1r, [0, 3, 1, 2])

    A1l = self.folding(C1l_transposed)
    A1r = self.folding(C1r_transposed)
    B1l = np.tanh(A1l)
    B1r = np.tanh(A1r)


    # print(B1l.shape)

    return Fu
  
  @staticmethod
  def to4d(x):
    n, h, w = x.shape
    return x.reshape((n, h, w, 1))

  @staticmethod
  def add_row(x):
    n, c, w, h = x.shape
    return np.concatenate([np.zeros((n, c, 1, h), dtype=x.dtype), x], axis=2)

  @staticmethod
  def folding(x):
    x_odd = x[:, :, 1::2]  # extract odd rows
    x_even = x[:, :, ::2]  # extract even rows
    d = x_odd.shape[2] - x_even.shape[2]
    if d == -1:
      x_odd = BiCNN.add_row(x_odd)
    elif d == 1:
      x_even = BiCNN.add_row(x_even)
    return (x_odd + x_even) / 2
  
  def word_diff(self, word1, word2):
    return np.exp(-(np.linalg.norm(word1 - word2)**2)/(2*self.beta))
  
  def F(self, El, Er):

    w1, w2 = El.shape[0], El.shape[0]
    result = [[0]*w2 for i in range(w1)]
    for i in range(w1):
      for j in range(w2):
        result[i][j] = self.word_diff(El[i], Er[j])
    return np.array(result)
  

In [None]:
bicnn = BiCNN(processor = processor, channels=[3, 5], filter_width=[6, 14], k_top=4, beta=2, pool_size=[(10, 10), (10, 10), (6, 6), (2, 2)])


In [83]:
from keras import layers

In [111]:
model = keras.Sequential(
    [
        keras.Input(shape=(1,48,48)),
        layers.Flatten(),
        layers.Dense(1, activation="softmax"),
    ]
)

model.summary()

Model: "sequential_14"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten_17 (Flatten)        (None, 2304)              0         
                                                                 
 dense_22 (Dense)            (None, 1)                 2305      
                                                                 
Total params: 2,305
Trainable params: 2,305
Non-trainable params: 0
_________________________________________________________________


In [115]:
X_raw,y = load_msrp_corpus("/content/drive/MyDrive/paraphraseIden/MSRP/msr_paraphrase_train.txt")
X_train = processor.fit_transform(X_raw)
matrices = bicnn(X_train)

Fu <class 'numpy.ndarray'> (4076, 1, 48, 48)
(4076, 3, 13, 48)


In [116]:
print(matrices.shape, y.shape)

(4076, 1, 48, 48) (4076,)


In [122]:
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

model.fit(matrices, y, epochs=5, validation_split=0.1)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7f9b8e4862d0>

In [118]:
X_test_raw, y_test = load_msrp_corpus("/content/drive/MyDrive/paraphraseIden/MSRP/msr_paraphrase_test.txt")
X_test = bicnn(processor.fit_transform(X_test_raw))


Fu <class 'numpy.ndarray'> (1725, 1, 48, 48)
(1725, 3, 13, 48)
Test loss: 0.0
Test accuracy: 0.6649275422096252


In [123]:
score = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

Test loss: 0.0
Test accuracy: 0.6649275422096252
