In [17]:
import os
import json
import pandas as pd
import pickle
import random

from constants import *

In [12]:
with open(os.path.join(ROOT_PATH, 'data', 'BioRel', 'train.json')) as f:
    train_json = json.load(f)

In [13]:
diff_relations = {}

for i in train_json:
    t = i["relation"]
    if t not in diff_relations.keys():
        diff_relations[t] = 1
    else:
        diff_relations[t] += 1

In [14]:
relation2id = {}
relation2id["NA"] = 0
cnt = 1

for rel in diff_relations.keys():
    if rel != "NA":
        relation2id[rel] = cnt
        cnt += 1

In [24]:
with open("relation2id.pkl", "wb") as f:
    pickle.dump(relation2id, f)

In [5]:
train_sentences_full = []
train_labels_full = []

for i in train_json:
    
    s = i["sentence"]
    start_head = i["head"]["start"]
    len_head = i["head"]["length"]
    w_head = "<e1>" + i["head"]["word"] + "</e1>"

    start_tail = i["tail"]["start"]
    len_tail = i["tail"]["length"]
    w_tail = "<e2>" + i["tail"]["word"] + "</e2>"

    s_indexed = s[:start_head] + w_head + s[start_head + len_head: start_tail] + w_tail + s[start_tail + len_tail:]
    l = i["relation"]
    
    train_sentences_full.append(s_indexed)
    train_labels_full.append(relation2id[l])

In [6]:
with open(os.path.join(ROOT_PATH, 'data', 'BioRel', 'test.json')) as f:
    test_json = json.load(f)
    
test_sentences_full = []
test_labels_full = []

for i in test_json:
    
    s = i["sentence"]
    start_head = i["head"]["start"]
    len_head = i["head"]["length"]
    w_head = "<e1>" + i["head"]["word"] + "</e1>"

    start_tail = i["tail"]["start"]
    len_tail = i["tail"]["length"]
    w_tail = "<e2>" + i["tail"]["word"] + "</e2>"

    s_indexed = s[:start_head] + w_head + s[start_head + len_head: start_tail] + w_tail + s[start_tail + len_tail:]
    l = i["relation"]
    
    test_sentences_full.append(s_indexed)
    test_labels_full.append(relation2id[l])

In [7]:
NUM_TRAIN = 10000
NUM_TEST = 1000

train_sentences = []
train_labels = []
test_sentences = []
test_labels = []

kept_train = random.sample(list(range(len(train_sentences_full))), NUM_TRAIN)
kept_test = random.sample(list(range(len(test_sentences_full))), NUM_TEST)

for i in kept_train:
    train_sentences.append(train_sentences_full[i])
    train_labels.append(train_labels_full[i])

for i in kept_test:
    test_sentences.append(test_sentences_full[i])
    test_labels.append(test_labels_full[i])

In [8]:
json_train_sentences = json.dumps(train_sentences)
json_train_labels = json.dumps(train_labels)

with open("train_sentence.json", "w") as f:
    f.write(json_train_sentences)

with open("train_label_id.json", "w") as f:
    f.write(json_train_labels)

json_test_sentences = json.dumps(test_sentences)
json_test_labels = json.dumps(test_labels)

with open("test_sentence.json", "w") as f:
    f.write(json_test_sentences)

with open("test_label_id.json", "w") as f:
    f.write(json_test_labels)

In [9]:
import torch
from torch import nn
from transformers import BertPreTrainedModel, BertModel, BertForSequenceClassification
from torch.nn import CrossEntropyLoss, MSELoss
import math


class BertForSequenceClassificationUserDefined(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.bert = BertModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(2 * config.hidden_size, config.hidden_size)
        self.classifier_2 = nn.Linear(config.hidden_size, self.config.num_labels)
        self.init_weights()
        self.output_emebedding = None

    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None,
                head_mask=None, inputs_embeds=None, labels=None, e1_pos=None, e2_pos=None, w=None):

        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
        )  # sequence_output, pooled_output, (hidden_states), (attentions)

        e_pos_outputs = []
        sequence_output = outputs[0]
        for i in range(0, len(e1_pos)):
            e1_pos_output_i = sequence_output[i, e1_pos[i].item(), :]
            e2_pos_output_i = sequence_output[i, e2_pos[i].item(), :]
            e_pos_output_i = torch.cat((e1_pos_output_i, e2_pos_output_i), dim=0)
            e_pos_outputs.append(e_pos_output_i)
        e_pos_output = torch.stack(e_pos_outputs)
        self.output_emebedding = e_pos_output  # e1&e2 cancat output

        e_pos_output = self.dropout(e_pos_output)
        hidden = self.classifier(e_pos_output)
        logits = self.classifier_2(hidden)

        outputs = (logits,) + outputs[2:]  # add hidden states and attention if they are here

        if labels is not None:
            if self.num_labels == 1:
                #  We are doing regression
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1), labels.view(-1))
            else:
                loss_fct = CrossEntropyLoss()
                loss = 0
                for i in range(len(w)):
                    loss += math.exp(w[i] - 1) * loss_fct(logits[i].view(-1, self.num_labels), labels[i].view(-1))
                    #loss += w[i] * loss_fct(logits[i].view(-1, self.num_labels), labels[i].view(-1))
                loss = loss / len(w)
            outputs = (loss, ) + outputs + (self.output_emebedding,)

        return outputs  # (loss), logits, (hidden_states), (attentions), (self.output_emebedding)


# f_theta1
class RelationClassification(BertForSequenceClassificationUserDefined):
    def __init__(self, config):
        super().__init__(config)


# g_theta2
class LabelGeneration(BertForSequenceClassificationUserDefined):
    def __init__(self, config):
        super().__init__(config)