In [1]:
from collections import defaultdict
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
import os
from sklearn.model_selection import train_test_split
from google.colab import drive
drive.mount('/content/drive/')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive/


In [0]:
def create_dict():
    dct = defaultdict()
    dct['B-PER'] = defaultdict(int)
    dct['I-PER'] = defaultdict(int)
    dct['B-ORG'] = defaultdict(int)
    dct['I-ORG'] = defaultdict(int)
    dct['B-LOC'] = defaultdict(int)
    dct['I-LOC'] = defaultdict(int)
    dct['O'] = defaultdict(int)
    return dct

In [0]:
def remove_noisy_label(label, prev_label, next_label):
    if label in ['B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG', 'O']:
        return label
    else:
        if (prev_label is None) or (next_label is None):
            return 'O'
        if label.endswith('MISC'):
            return 'O'
        if prev_label.startswith('B'):
            if prev_label == 'B-PER':
                return 'I-PER'
            elif prev_label == 'B-LOC':
                return 'I-LOC'
            else:
                return 'I-ORG'
        elif next_label.startswith('I'):
            if prev_label == 'O':
                if next_label == 'I-PER':
                    return 'B-PER'
                elif next_label == 'I-LOC':
                    return 'B-LOC'
                else:
                    return 'B-ORG'
            elif prev_label.startswith('I'):
                return prev_label
        else:
            return 'O'

In [0]:
def load_data(data_path, data_format='1', test_size=0.15):
    """
    Load data from file and split training set and test set
    :param data_path: path to data folder
    :param test_size: the ratio of test set to total dataset,
        0 < test_size < 1, default = 0.15
    :param data_format: form of a output data point:
        '1': ('word', 'ner_label')
        '2.1': ('word', 'pos-tagging label')
        '2.2': ('word', 'chucking label', 'ner label')
        '3': ('word', 'pos-tagging label', 'chucking label', 'ner label')
        default = '1'
    :return: training_set, test_set
    """
    # Check input
    if data_format not in {'1', '2.1', '2.2', '3'}:
        raise Exception("{} not is a data_format. The value of data_format should in ('1', '2.1', '2.2', '3')"
                        .format(data_format))
    if not os.path.exists(data_path):
        raise Exception("{} does not exist" .format(data_path))
    if test_size <= 0 or test_size >= 1:
        raise Exception("Test_size should be between 0 and 1. The value of test_size is: {}" .format(test_size))

    # load data
    data = []
    for file_name in os.listdir(data_path):
        file_path = data_path + '/' + file_name
        with open(file_path, encoding='utf-8') as f:
            sentence = []
            all_data = f.readlines()
            for i in range(len(all_data)):
                line = all_data[i]
                label = line.split()
                if len(label) > 4:
                    # prev_label = all_data[i - 1].split()[3] if len(sentence) != 0 else None
                    # next_label = all_data[i + 1].split()[3] if len(all_data[i + 1].split()) > 4 else None
                    # label[3] = remove_noisy_label(label[3], prev_label, next_label)
                    if label[3] not in ['B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG', 'O']:
                        continue
                    # all_data[i] = label[0] + '	' + label[1] + '	' + label[2] + '	' + label[3] + '	' + label[4]
                    if data_format == '1':
                        sentence.append((label[0], label[3]))
                    elif data_format == '2.1':
                        sentence.append((label[0], label[1], label[3]))
                    elif data_format == '2.2':
                        sentence.append((label[0], label[2], label[3]))
                    else:
                        sentence.append((label[0], label[1], label[2], label[3]))
                else:
                    data.append(sentence)
                    sentence = []
        f.close()

    # split training set and test set
    training_set, test_set = train_test_split(data, test_size=test_size, shuffle=True)
    return training_set, test_set

In [0]:
def create_transition_matrix_bigram(dataset, data_path):
    transition_matrix = create_dict()
    transition_matrix['start'] = defaultdict(int)
    for sentence in dataset:
        for i in range(len(sentence)):
            ner = sentence[i][1]
            if i == 0:
                transition_matrix['start'][ner] += 1
            elif i == len(sentence) - 1:
                prev_ner = sentence[i - 1][1]
                transition_matrix[prev_ner][ner] += 1
                transition_matrix[ner]['stop'] += 1
            else:
                prev_ner = sentence[i - 1][1]
                transition_matrix[prev_ner][ner] += 1
    freq_matrix = defaultdict()
    with open(data_path, 'w') as f:
        for prev_ner in transition_matrix.keys():
            curr_ner = transition_matrix[prev_ner]
            total_label = sum(curr_ner.values())
            freq_bigram = {}
            for ner in curr_ner.keys():
                freq_bigram[ner] = curr_ner[ner] / total_label
                f.write((prev_ner + '<fff>' + ner + '<fff>' + str(freq_bigram[ner])))
                f.write('\n')
            freq_matrix[prev_ner] = freq_bigram
    f.close()

In [0]:
def create_transition_matrix_trigram(dataset, data_path):
    transition_matrix = defaultdict()
    transition_matrix['start'] = create_dict()
    transition_matrix['start']['start'] = defaultdict(int)
    transition_matrix['B-PER'] = create_dict()
    transition_matrix['I-PER'] = create_dict()
    transition_matrix['B-ORG'] = create_dict()
    transition_matrix['I-ORG'] = create_dict()
    transition_matrix['B-LOC'] = create_dict()
    transition_matrix['I-LOC'] = create_dict()
    transition_matrix['O'] = create_dict()
    for sentence in dataset:
        for i in range(len(sentence)):
            ner = sentence[i][1]
            if i == 0:
                transition_matrix['start']['start'][ner] += 1
            elif i == 1:
                prev1_ner = sentence[i - 1][1]
                transition_matrix['start'][prev1_ner][ner] += 1
            elif i == len(sentence) - 1:
                prev2_ner = sentence[i - 2][1]
                prev1_ner = sentence[i - 1][1]
                transition_matrix[prev2_ner][prev1_ner][ner] += 1
                transition_matrix[prev1_ner][ner]['stop'] += 1
            else:
                prev2_ner = sentence[i - 2][1]
                prev1_ner = sentence[i - 1][1]
                transition_matrix[prev2_ner][prev1_ner][ner] += 1
    freq_matrix = defaultdict()
    with open(data_path, 'w') as f:
        for prev2_ner in transition_matrix.keys():
            trigram = transition_matrix[prev2_ner]
            freq_trigram = {}
            for prev1_ner in trigram.keys():
                bigram = trigram[prev1_ner]
                freq_bigram = {}
                total_label = sum(bigram.values())
                for curr_ner in bigram.keys():
                    freq_bigram[curr_ner] = bigram[curr_ner] / total_label
                    f.write((prev2_ner + '<fff>' + prev1_ner + '<fff>' + curr_ner + '<fff>' + str(freq_bigram[curr_ner])))
                    f.write('\n')
                freq_trigram[prev1_ner] = freq_bigram
            freq_matrix[prev2_ner] = freq_trigram
    f.close()

In [0]:
def create_emission_probability(dataset, data_path):
    emission = create_dict()
    vocab_size = defaultdict()
    for sentence in dataset:
        for i in range(len(sentence)):
            word = sentence[i][0]
            ner = sentence[i][1]
            emission[ner][word] += 1
    emission_probability = defaultdict()
    with open(data_path, 'w', encoding='utf-8') as f:
        for ner in emission.keys():
            words = emission[ner]
            total_label = sum(words.values())
            vocab_size[ner] = len(words)
            emission_bigram = {}
            for word in words.keys():
                emission_bigram[word] = (words[word] + 1) / (total_label + vocab_size[ner])
                f.write((word + '<fff>' + ner + '<fff>' + str(emission_bigram[word])))
                f.write('\n')
            emission_probability[ner] = emission_bigram
    f.close()

In [0]:
def training():
    path = '/content/drive/My Drive/Colab Notebooks/NER/data/NER2016-training_data'
    train, test = load_data(path)

    emission_path = '/content/drive/My Drive/Colab Notebooks/NER/data/bigram_data'
    create_emission_probability(train, emission_path + '/emission_probability_train.txt')
    create_transition_matrix_bigram(train, emission_path + '/transition_matrix_train.txt')

    emission_path = '/content/drive/My Drive/Colab Notebooks/NER/data/trigram_data'
    create_emission_probability(train, emission_path + '/emission_probability_train.txt')
    create_transition_matrix_trigram(train, emission_path + '/transition_matrix_train.txt')
    return test

In [0]:
class HMM_NER:
    def __init__(self):
        self.trigram = defaultdict()
        self.trigram['start'] = create_dict()
        self.trigram['start']['start'] = defaultdict(int)
        self.trigram['B-PER'] = create_dict()
        self.trigram['I-PER'] = create_dict()
        self.trigram['B-ORG'] = create_dict()
        self.trigram['I-ORG'] = create_dict()
        self.trigram['B-LOC'] = create_dict()
        self.trigram['I-LOC'] = create_dict()
        self.trigram['O'] = create_dict()

        self.bigram = create_dict()
        self.bigram['start'] = defaultdict(int)

        self.emission = create_dict()
        self.states = {'B-PER': 0,
                       'I-PER': 1,
                       'B-LOC': 2,
                       'I-LOC': 3,
                       'B-ORG': 4,
                       'I-ORG': 5,
                       'O': 6}
        self.eval_states1 = ['B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG']
        self.eval_states = [0, 1, 2, 3, 4, 5]

    def load_test_data(self, test_set):
        x = []
        y_word = []
        for sentence in test_set:
            if len(sentence) <= 0:
                continue
            xi = []
            yi = []
            for i in range(len(sentence)):
                xi.append(sentence[i][0])
                yi.append(self.states[sentence[i][1]])
            x.append(xi)
            y_word.extend(yi)
        return x, y_word

    def load_trigram(self, trigram_path):
        for u in self.states.keys():
            for v in self.states.keys():
                for w in self.states.keys():
                    self.trigram[u][v][w] = 0
                    self.trigram['start']['start'][w] = 0
                    self.trigram['start'][v][w] = 0
        with open(trigram_path) as f:
            for line in f.readlines():
                line = line.strip()
                y1, y2, y3, probability = line.split('<fff>')
                self.trigram[y1][y2][y3] = probability

    def load_bigram(self, bigram_path):
        self.bigram['start']['start'] = 0
        for u in self.states.keys():
            for v in self.states.keys():
                self.bigram[u][v] = 0
            self.bigram['start'][u] = 0
        with open(bigram_path) as f:
            for line in f.readlines():
                line = line.strip()
                y1, y2, probability = line.split('<fff>')
                self.bigram[y1][y2] = probability

    def load_emission(self, emission_path):
        with open(emission_path, encoding='utf-8') as f:
            for line in f.readlines():
                line = line.strip()
                observation, state, probability = line.split('<fff>')
                # self.emission[state] = defaultdict()
                # self.emission[state][observation] = 0
                self.emission[state][observation] = probability

    def bigram_transition_probability(self, y1, y2):
        return float(self.bigram[y1][y2])

    def trigram_transition_probability(self, y1, y2, y3):
        return float(self.trigram[y1][y2][y3])

    def state_observation_likelihood(self, observation, state):
        return float(self.emission[state][observation])

    def rare_word_observation(self, state):
        # print(1 / len(self.emission[state].keys()))
        return 1 / len(self.emission[state].keys())

    def bigram_decode(self, sentence):
        viterbi = [defaultdict() for _ in range(len(sentence))]
        back_point = [defaultdict() for _ in range(len(sentence) + 1)]
        tag_seq = [0 for _ in range(len(sentence) + 1)]
        word = sentence[0]
        for state in self.states.keys():
            # print(self.bigram_transition_probability('start', state))
            # print(self.state_observation_likelihood(sentence[0], state))
            # print('----------------')
            state_vocab = [key for key in self.emission[state].keys()]
            if word in state_vocab:
                viterbi[0][state] = self.bigram_transition_probability('start', state) \
                                    * self.state_observation_likelihood(word, state)
            else:
                viterbi[0][state] = self.bigram_transition_probability('start', state) \
                                    * self.rare_word_observation(state)
            # print(state + ': ' + str(self.bigram['start'][state]))
            back_point[0][state] = 0
            # tag_seq[0] = 0
        for i in range(1, len(sentence)):
            word = sentence[i]
            for v in self.states.keys():
                max_score = 0
                tag = None
                for u in self.states.keys():
                    # print(self.emission[v].keys())
                    if word in self.emission[v].keys():
                        # print(1)
                        score = viterbi[i - 1][u] \
                                * self.bigram_transition_probability(u, v) \
                                * self.state_observation_likelihood(word, v)
                    else:
                        score = viterbi[i - 1][u] \
                                * self.bigram_transition_probability(u, v) \
                                * self.rare_word_observation(v)
                    if score > max_score:
                        max_score = score
                        tag = self.states[u]
                viterbi[i][v] = max_score
                back_point[i][v] = tag
                # tag_seq[i] = tag

        max_score = 0
        tag = None
        for u in self.states.keys():
            score = viterbi[len(sentence) - 1][u] \
                    * self.bigram_transition_probability(u, 'stop')
            if score > max_score:
                max_score = score
                tag = self.states[u]
            if tag is None:
                print(viterbi[len(sentence) - 1][u] \
                     * self.bigram_transition_probability(u, 'stop'))
        best_prob = max_score
        tag_seq[len(sentence)] = tag
        back_point[len(sentence)]['stop'] = tag

        for k in range(len(sentence) - 1, 0, -1):
            # print(tag_seq[k + 1])
            ner = (list(self.states.keys()))[list(self.states.values()).index(tag_seq[k + 1])]
            tag_seq[k] = back_point[k][ner]
        # print(tag_seq)

        return best_prob, tag_seq[1:]

    def trigram_decode(self, sentence):
        viterbi = [defaultdict() for _ in range(len(sentence))]
        back_point = [defaultdict() for _ in range(len(sentence) + 1)]
        tag_seq = [0 for _ in range(len(sentence) + 1)]
        word = sentence[0]
        for state in self.states.keys():
            state_vocab = [key for key in self.emission[state].keys()]
            if word in state_vocab:
                viterbi[0][state] = self.trigram_transition_probability('start', 'start', state) \
                                    * self.state_observation_likelihood(word, state)
            else:
                viterbi[0][state] = self.trigram_transition_probability('start', 'start', state) \
                                    * self.rare_word_observation(state)
            # print(state + ': ' + str(self.bigram['start'][state]))
            back_point[0][state] = [-1, -1]
            # tag_seq[0] = 0
        for i in range(1, len(sentence)):
            word = sentence[i]
            for w in self.states.keys():
                max_score = 0
                tag = None
                for v in self.states.keys():
                    for u in self.states.keys():
                        # print(self.emission[v].keys())
                        if word in self.emission[v].keys():
                            # print(1)
                            score = viterbi[i - 1][v] \
                                    * self.trigram_transition_probability(u, v, w) \
                                    * self.state_observation_likelihood(word, w)
                        else:
                            score = viterbi[i - 1][v] \
                                    * self.trigram_transition_probability(u, v, w) \
                                    * self.rare_word_observation(w)
                        if score > max_score:
                            max_score = score
                            tag = [self.states[u], self.states[v]]
                viterbi[i][w] = max_score
                back_point[i][w] = tag
                # tag_seq[i] = tag

        max_score = 0
        tag = None
        for v in self.states.keys():
            for u in self.states.keys():
                score = viterbi[len(sentence) - 1][v] \
                        * self.trigram_transition_probability(u, v, 'stop')
                if score > max_score:
                    max_score = score
                    tag = [self.states[u], self.states[v]]
        best_prob = max_score
        tag_seq[len(sentence)] = tag[1]
        back_point[len(sentence)]['stop'] = tag

        for k in range(len(sentence) - 1, -1, -1):
            ner = (list(self.states.keys()))[list(self.states.values()).index(tag_seq[k + 1])]
            tag_seq[k] = back_point[k][ner][1]

        return best_prob, tag_seq[1:]

    def evaluate(self, sentences, labels, method='bigram'):
        assert method in ['bigram', 'trigram']
        y_predict = []
        for sentence in sentences:
            if method == 'bigram':
                prob, state_seq = self.bigram_decode(sentence)
                y_predict.extend(state_seq)
            if method == 'trigram':
                prob, state_seq = self.trigram_decode(sentence)
                y_predict.extend(state_seq)
        acc = accuracy_score(labels, y_predict)
        print('accuracy: ', acc)
        prec, rec, f1, support = precision_recall_fscore_support(labels, y_predict, labels=self.eval_states)
        all_prec = precision_score(labels, y_predict, labels=self.eval_states, average='micro')
        all_rec = recall_score(labels, y_predict, labels=self.eval_states, average='micro')
        all_f1 = f1_score(labels, y_predict, labels=self.eval_states, average='micro')
        print('labels:    {}'.format(self.eval_states1))
        print('precision: {}'.format([str(round(p * 100, 2)) + '%' for p in prec]))
        print('recall:    {}'.format([str(round(r * 100, 2)) + '%' for r in rec]))
        print('f1-score:  {}'.format([str(round(f * 100, 2)) + '%' for f in f1]))
        print('support:   {}'.format(support))
        print('average precision: ', all_prec)
        print('average recall: ', all_rec)
        print('average f1-score: ', all_f1)

    def predict(self, sentence, method='bigram'):
        assert method in ['bigram', 'trigram']
        y_predict = []
        if method == 'bigram':
            prob, state_seq = self.bigram_decode(sentence)
            y_predict.extend(state_seq)
        if method == 'trigram':
            prob, state_seq = self.trigram_decode(sentence)
            y_predict.extend(state_seq)
        return y_predict

In [13]:
if __name__ == '__main__':
    hmm = HMM_NER()
    test = training()
    x_test, y_test_word = hmm.load_test_data(test)
    hmm.load_bigram('/content/drive/My Drive/Colab Notebooks/NER/data/bigram_data/transition_matrix_train.txt')
    hmm.load_emission('/content/drive/My Drive/Colab Notebooks/NER/data/bigram_data/emission_probability_train.txt')
    hmm.evaluate(x_test, y_test_word, method='bigram')
    hmm.load_trigram('/content/drive/My Drive/Colab Notebooks/NER/data/trigram_data/transition_matrix_train.txt')
    hmm.evaluate(x_test, y_test_word, method='trigram')
    

accuracy:  0.5788295548153719
labels:    ['B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG']
precision: ['15.7%', '6.11%', '73.86%', '53.97%', '3.41%', '2.42%']
recall:    ['47.52%', '77.45%', '20.14%', '25.25%', '28.02%', '88.25%']
f1-score:  ['23.61%', '11.33%', '31.64%', '34.4%', '6.08%', '4.72%']
support:   [1111  541  884  404  182  349]
average precision:  0.06381266596926048
average recall:  0.45692883895131087
average f1-score:  0.11198587819947044
accuracy:  0.9575767343576519
labels:    ['B-PER', 'I-PER', 'B-LOC', 'I-LOC', 'B-ORG', 'I-ORG']
precision: ['71.73%', '80.77%', '42.58%', '55.5%', '24.37%', '70.66%']
recall:    ['73.99%', '89.28%', '83.71%', '77.48%', '74.18%', '64.18%']
f1-score:  ['72.84%', '84.81%', '56.45%', '64.67%', '36.68%', '67.27%']
support:   [1111  541  884  404  182  349]
average precision:  0.5525727069351231
average recall:  0.7827715355805244
average f1-score:  0.6478302336671435
