In [None]:
# !git clone https://github.com/taslimamindia/NERC.git

# Importation

In [86]:
import pandas as pd

import numpy as np

from nltk import word_tokenize, sent_tokenize, download
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

from gensim.models import Word2Vec

import tensorflow as tf

from keras.utils import to_categorical, pad_sequences

# import string

In [None]:
download('wordnet') # for google colab

# Class define form data.

In [31]:
class Data(object):
    unique_words = {"<PAD>":0}
    unique_ner_tags = {"O":0}
    MAX_LENGTH = 50
    VOCAB_SIZE = 100
    
    def __init__(self):
        self.sentences = []
        self.sentences_num = None
        self.ner_tags = []
        self.ner_tags_num = None
        self.chunk_tags = []
        self.pos_tags = []
        self.x, self.y = None, None
    def word2vec(self, vector_size=100):
        VOCAB_SIZE = vector_size
        word2vec_model = Word2Vec(self.sentences, vector_size=vector_size, window=5, min_count=1, workers=4)
        return word2vec_model   
    def word2idx(self, word:str):
        return Data.unique_words.get(word, None)
    def idx2word(self, index:int):
        for word, value in Data.unique_words.items():
            if index is value: return word
        return None    
    def tag2idx(self, tag):
        return Data.unique_ner_tags.get(tag, None)
    def idx2tag(self, index):
        for tag, value in Data.unique_ner_tags.items():
            if index == value: return tag
        return None
    def unicity(self):
        unique_sent, unique_tag = set(), set()
        [unique_tag.update(tags) for tags in self.ner_tags_num]
        [unique_sent.update(tags) for tags in self.sentences_num]
        max_tags = len(Data.unique_ner_tags)
        max_words = len(Data.unique_words)
        for word in list(unique_sent):
            if Data.unique_words.get(word, None) == None:
                Data.unique_words[word] = max_words
                max_words += 1
        for tag in list(unique_tag):
            if Data.unique_ner_tags.get(tag, None) == None:
                Data.unique_ner_tags[tag] = max_tags
                max_tags += 1


# Loading data

In [32]:
class Loading():
    def __init__(self, data: Data, file):
        self.data = data
        self.load_sentences(file)
    def load_sentences(self, filepath):
        tokens, pos_tags, chunk_tags, ner_tags = [], [], [], []
        with open(filepath, 'r') as f:
            for line in f.readlines():
                if (line == ('-DOCSTART- -X- -X- O\n') or line == '\n'):
                    if len(tokens) > 0:
                        self.data.sentences.append(tokens)
                        self.data.pos_tags.append(pos_tags)
                        self.data.chunk_tags.append(chunk_tags)
                        self.data.ner_tags.append(ner_tags)
                        tokens, pos_tags, chunk_tags, ner_tags = [], [], [], []
                else:
                    l = line.split(' ')
                    tokens.append(l[0])
                    pos_tags.append(l[1])
                    chunk_tags.append(l[2])
                    ner_tags.append(l[3].strip('\n'))

# Preprocessing

In [57]:
class Preprocessing():
    def __init__(self, data:Data, text=None, lang="english"):
        self.data = data
        self.text = text
        self.lang = lang
        if text == None:
            self.data.sentences_num = self.data.sentences
            self.data.ner_tags_num = self.data.ner_tags
    
    def tokenize(self):
        if self.text != None:
            sentenses = [word_tokenize(sentence, language=self.lang) for sentence in sent_tokenize(self.text, language=self.lang)]
            self.data.sentences = [[token for token in sentence if token not in stopwords.words(self.lang)] for sentence in sentenses]
            self.data.sentences_num = self.data.sentences
        
    def lowercasing(self):
        self.data.sentences_num = [[word.lower() for word in sentence] for sentence in self.data.sentences_num]
    
    def lemmatize(self):
        lemmatizer = WordNetLemmatizer()
        self.data.sentences_num = [[lemmatizer.lemmatize(word) for word in sentence] for sentence in self.data.sentences_num]
    
    def remove_stopword(self):
        punctuation = ['!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~']
        sentences = [[(self.data.sentences_num[i][j], self.data.ner_tags[i][j]) for j in range(len(self.data.sentences_num[i]))] for i in range(len(self.data.sentences_num))]
        sentences = [[(token, tag) for token, tag in sentence if token not in stopwords.words(self.lang) + punctuation] for sentence in sentences]
        self.data.sentences_num = [[token for token, tag in sentence] for sentence in sentences]
        self.data.ner_tags_num = [[tag for token, tag in sentence] for sentence in sentences]

# Vectorization

In [58]:
class Vectorization():
    
    def __init__(self, data:Data):
        self.data = data
    
    def word2vec(self, min_count=1, window=5):
        word2vec_model = Word2Vec(self.data.sentences_num, min_count=min_count, vector_size=Data.VOCAB_SIZE, window=window)
        self.data.sentences_num = [[word2vec_model.wv[word] for word in sentence] for sentence in self.data.sentences_num]
    
    def padding_x(self, value=np.zeros((Data.VOCAB_SIZE,), dtype="float32"), dtype="float32"):
        self.data.x = pad_sequences(
            sequences=self.data.sentences_num, 
            maxlen=self.data.MAX_LENGTH, 
            dtype=dtype, 
            padding="post", 
            value=value
        )
    
    def vectorized_x(self):
        self.word2vec()
        self.padding_x()
        
    def tag2num(self):
        NUM_CLASSES = len(Data.unique_ner_tags)
        self.data.ner_tags_num = [[to_categorical(Data.unique_ner_tags.get(tag), num_classes=NUM_CLASSES) for tag in tags] for tags in self.data.ner_tags_num]
    
    def padding_y(self, value=to_categorical(Data.unique_ner_tags.get("O"), num_classes=NUM_CLASSES)):
        self.data.y = pad_sequences(
            sequences=self.data.ner_tags_num, 
            maxlen=self.data.MAX_LENGTH,
            padding="post", 
            dtype="float32",
            value=value
        )
    
    def vectorized_y(self):
        self.tag2num()
        self.padding_y()

def load_dataset(path: str):
    data = Data()
    base_file = "../Data/conll2003_english/"
    # base_file = "/content/NERC/Data/conll2003_english/"
    Loading(data = data, file=base_file + path)
    return data

# Main

### New input text

In [None]:
# test_text = Data()

# preprocessing = Preprocessing(data = test_text, text = "Obama is the president of the United States. I am from Guinea, nice to meet you.")
# preprocessing.tokenize()
# preprocessing.lowercasing()
# preprocessing.lemmatize()
# print(test_text.sentences)

# vector = Vectorization(test_text)
# vector.vectorized_x()
# print(test_text.x.shape)

### Parameters

In [78]:
NUM_WORDS = len(Data.unique_words)
NUM_CLASSES = len(Data.unique_ner_tags)
# Hyperparameters
EMBEDDING_DIM = 100
NUM_FILTERS = 256
KERNEL_SIZE = 3
DROPOUT_RATE = 0.5
BATCH_SIZE = 32
EPOCHS = 10

## Evaluation

In [81]:
def evaluation(test:Data, y_predict):
  true, false, total, predict = 0, 0, 0, 0
  x, y, z = test.y.shape
  for i in range(x):
    for j in range(y):
      real_tag = np.argmax(test.y[i][j]) 
      predict_tag = np.argmax(y_predict[i][j])
      if predict_tag == 0: predict +=1
      if real_tag != 0:
        total = total + 1
        if real_tag == predict_tag: true = true + 1
        else: false = false + 1
  print("----------------------- Evaluation -------------------------")
  print(test.y.shape)
  print(predict, x*y)
  print(true, false, total, round(true/total, 3), round(false/total, 3), end="\n\n")

In [82]:
def checkDataset(train, test, valid):    
    print("X_train", train.x.shape)
    print("y_train", train.y.shape, "\n")
    print("X_test", test.x.shape)
    print("y_test", test.y.shape, "\n")    
    print("X_valid", valid.x.shape)
    print("y_valid", valid.y.shape)

def main():
    train = load_dataset("train.txt")
    test = load_dataset("test.txt")
    valid = load_dataset("valid.txt")
    preprocess_lstm(train)
    preprocess_lstm(test)
    preprocess_lstm(valid)
    vectorize(train)
    vectorize(test)
    vectorize(valid)
    checkDataset(train, test, valid)
    return train, test, valid

## CNN model

In [83]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv1D, MaxPooling1D

class Model_CNN:
  def __init__(self):
    # Define the model architecture
    self.model = Sequential()
    self.model.add(Conv1D(64, KERNEL_SIZE, activation='relu', input_shape=(Data.MAX_LENGTH, EMBEDDING_DIM), padding='same'))
    self.model.add(Dropout(DROPOUT_RATE))
    self.model.add(Conv1D(32, KERNEL_SIZE, activation='relu', padding='same'))
    self.model.add(Dropout(DROPOUT_RATE))
    self.model.add(Dense(NUM_CLASSES, activation='softmax'))
    
  def summary(self):
    self.model.summary()
    
  def trainning(self, train:Data, valid:Data=None):
    cat_accuracy = tf.keras.metrics.CategoricalAccuracy()
    recall = tf.keras.metrics.Recall()
    self.model.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.CategoricalCrossentropy(), metrics=[cat_accuracy, recall])
    if valid == None:
      self.model.fit(train.x, train.y, batch_size=BATCH_SIZE, epochs=EPOCHS)
    else:
      self.model.fit(train.x, train.y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(valid.x, valid.y))
      
  def testing(self, test:Data):
    return self.model.evaluate(test.x, test.y)
  
  def predicting(self, test:Data):
    return self.model.predict(test.x, batch_size=BATCH_SIZE)

In [84]:
def main_cnn(param:dict):
  dico = {"params":[], "metrics":[]}
  if param.get("max_length", 0) != 0:
      max_lengths = param["max_length"]
      for max_length in max_lengths:   
        Data.MAX_LENGTH = max_length     
        train, test, valid = main()
        model_cnn = Model_CNN()
        model_cnn.trainning(train, valid)
        model_cnn.testing(test)
        y_predict_cnn = model_cnn.predicting(test)
        evaluation(test, y_predict_cnn)

In [85]:
main_cnn({"max_length":[50]})

X_train (14041, 50, 100)
y_train (14041, 50, 9) 

X_test (3453, 50, 100)
y_test (3453, 50, 9) 

X_valid (3250, 50, 100)
y_valid (3250, 50, 9)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
----------------------- Evaluation -------------------------
(3453, 50, 9)
171968 172650
307 7701 8008 0.038 0.962



In [None]:
# # from sklearn.feature_extraction.text import TfidfVectorizer
# from keras.models import Model
# from keras.layers import Dense, Conv1D
# from tf2crf import CRF, ModelWithCRFLoss
# from keras import Input

# # Build CNN model
# # model = Sequential()
# inputs = Input(shape=(MAX_LENGTH, EMBEDDING_DIM))
# outputs = Conv1D(64, KERNEL_SIZE, activation='relu', padding='same')(inputs)
# # model.add(MaxPooling1D(2, padding='same'))
# # outputs = Dropout(DROPOUT_RATE)(inputs)
# outputs = Conv1D(32, KERNEL_SIZE, activation='relu', padding='same')(inputs)
# # model.add(MaxPooling1D(2))
# # model.add(Dropout(DROPOUT_RATE))
# # model.add(Dense(HIDDEN_DIM, activation='relu'))
# # outputs = Dropout(DROPOUT_RATE)(outputs)
# outputs = Dense(NUM_CLASSES, activation='relu')(outputs)
# # outputs.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# # outputs.summary()
# crf = CRF(units=9)
# # cnn_model.add(crf)
# output = crf(outputs)
# cnn_crf_model = Model(inputs, output)
# cnn_crf_model.summary()
# # cnn_crf_model = ModelWithCRFLoss(base_model, sparse_target=True)
# # cnn_crf_model.summary()

In [None]:
# cnn_crf_model.compile(optimizer='adam')
# cnn_crf_model.fit(train.x, train.y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(valid.x, valid.y))

In [None]:
# # Evaluation
# loss, accuracy = cnn_crf_model.evaluate(test.x, test.y, batch_size=BATCH_SIZE)

# print('Test Loss:', loss)
# print('Test Accuracy:', accuracy)

## Model LSTM

In [64]:
def preprocess_lstm(data:Data):
    preprocessing = Preprocessing(data=data)
    preprocessing.lowercasing()
    preprocessing.lemmatize()
    preprocessing.remove_stopword()
    data.unicity()

def vectorize(data:Data):
    vector = Vectorization(data=data)
    vector.vectorized_x()
    vector.vectorized_y()

In [65]:
from keras.models import Sequential
from keras.layers import LSTM, Dense

class Model_LSTM:
  def __init__(self):
    # Define the model architecture
    self.model_LSTM = Sequential()
    self.model_LSTM.add(LSTM(256, input_shape=(Data.MAX_LENGTH, Data.VOCAB_SIZE), return_sequences=True, dropout=0.5))
    self.model_LSTM.add(LSTM(128, return_sequences=True, dropout=0.5))
    self.model_LSTM.add(LSTM(64, return_sequences=True, dropout=0.5))
    self.model_LSTM.add(LSTM(32, return_sequences=True, dropout=0.5))
    self.model_LSTM.add(Dense(9, activation='softmax'))
  def summary(self):
    self.model_LSTM.summary()
  def trainning(self, train:Data, valid:Data=None):
    cat_accuracy = tf.keras.metrics.CategoricalAccuracy()
    recall = tf.keras.metrics.Recall()
    self.model_LSTM.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.CategoricalCrossentropy(), metrics=[cat_accuracy, recall])
    if valid == None:
      self.model_LSTM.fit(train.x, train.y, batch_size=BATCH_SIZE, epochs=EPOCHS)
    else:
      self.model_LSTM.fit(train.x, train.y, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(valid.x, valid.y))
  def testing(self, test:Data):
    return self.model_LSTM.evaluate(test.x, test.y)
  def predicting(self, test:Data):
    return self.model_LSTM.predict(test.x, batch_size=BATCH_SIZE)

In [66]:
def main_lstm(param:dict):
    dico = {"params":[], "metrics":[]}
    if param.get("max_length", 0) != 0:
        max_lengths = param["max_length"]
        for max_length in max_lengths:   
            Data.MAX_LENGTH = max_length     
            train, test, valid = main()
            model_lstm = Model_LSTM()
            model_lstm.trainning(train, valid)
            model_lstm.testing(test)
            y_predict_lstm = model_lstm.predicting(test)
            evaluation(test, y_predict_lstm)

In [68]:
main_lstm({"max_length":[50]})

## Tools

In [None]:
# !pip install tensorflow-addons

In [None]:
# model_LSTM.save("../Data/model_lstm.keras")

In [None]:
# model_LSTM = tf.keras.models.load_model("../Data/model_lstm.keras")

In [None]:
# # np.quantile(sort([1, 2, 3, 8, 7]), 0.50)
# dico = {}
# for tags in test.sentences + train.sentences + valid.sentences:
#   if dico.get(len(tags), None) == None:
#     dico[len(tags)] = 1
#   dico[len(tags)] += 1
# sorted(list(dico.items()), key= lambda x: x[1])

In [None]:
# entities = dict(zip(Data.unique_ner_tags.keys(), [0 for i in range(len(Data.unique_ner_tags))]))
# for tags in test.ner_tags:
#     for tag in tags:
#         entities[tag] += 1
# is_entities = 0
# is_not_entities = 0
# for tag, nbr in entities.items():
#     if tag != 'O': is_entities += nbr
#     else: is_not_entities += nbr
# print(entities)
# print(is_entities, is_not_entities)

## TF-IDF CNN-Softmax

In [87]:
from sklearn.feature_extraction.text import TfidfVectorizer
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv1D, MaxPooling1D
# from keras_contrib.layers import CRF
# from keras_contrib.utils import save_load_utils

In [89]:
def load_tf_idf():
    train = load_dataset("train.txt")
    # test = load_dataset("test.txt")
    # valid = load_dataset("valid.txt")
    # return train, test, valid
    return train, None, None

def preprocess_tfidf(data:Data):
    preprocessing = Preprocessing(data=data)
    preprocessing.lowercasing()
    preprocessing.lemmatize()
    preprocessing.remove_stopword()
    data.unicity()
    # sentences = [" ".join(sentence) for sentence in data.sentences_num]
    # vectorizer = TfidfVectorizer(max_features=Data.MAX_LENGTH)
    # data.x = vectorizer.fit_transform(sentences).toarray()
    # y = []
    # [[y.append(to_categorical(Data.unique_ner_tags[tag], num_classes=NUM_CLASSES)) for tag in tags] for tags in data.ner_tags]
    # data.y = np.array(y, dtype="float32")

def vectorize_tf_idf(data:Data):
    vectorize = Vectorization(data=data)
    print(data.x)
    vectorize.padding_x(value="<PAD>", dtype="str")
    print(data.x)
    vectorize.padding_y(value="O")

def formalize_tfidf(data:Data):
    data.x = data.sentences_num
    data.y = data.ner_tags_num
    x = len(data.x)
    return [" ".join(["".join([data.x[i][j], "__", data.y[i][j]]) for j in range(len(data.x[i]))]) for i in range(x)]

def tf(train:Data, test:Data, valid:Data):
    vectorizer = TfidfVectorizer()
    data_train = formalize_tfidf(train)
    train.x = vectorizer.fit_transform(data_train).toarray()
    print(train.x.shape)
    # test.x = vectorizer.transform(test.x).toarray()
    # valid.x = vectorizer.transform(valid.x).toarray()

In [90]:
class TF_IDF:
  def __init__(self):
    self.train, self.test, self.valid = load_tf_idf()
    # # Preprocessing
    preprocess_tfidf(self.train)
    # preprocess_tfidf(self.test)
    # preprocess_tfidf(self.valid)
    # # Vectorization
    # vectorize_tf_idf(self.train)
    # vectorize_tf_idf(self.test)
    # vectorize_tf_idf(self.valid)
    tf(self.train, self.test, self.valid)
  def training(self):
    pass
  def testing(self):
    pass
  def evaluation(self):
    pass
    # evaluation(test, y_predict)
tfidf = TF_IDF()

(14041, 19159)
