In [1]:
import glob
from transformers import BertTokenizer, BertModel
from pprint import pprint
from belt_nlp.splitting import transform_single_text
import torch
import numpy as np
from tqdm import tqdm, tqdm_gui
import pandas as pd
import matplotlib.pyplot as plt

np.random.seed(29)

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
def process_text(text_path, tokenizer, embedder):
    with open(text_path, 'r') as text_file:
        text = text_file.read()

    labels_path = text_path.replace('.txt', '-labels-subtask-3.txt').replace('-articles-subtask-3', '-labels-subtask-3-spans')
    with open(labels_path, 'r') as labels_file:
        labels = labels_file.read()
        labels = [l.split('\t') for l in labels.split('\n')][:-1]
    
    res = dict()

    tokens = tokenizer(text, add_special_tokens=False, truncation=False, return_tensors='pt')
    res['tokens'] = tokens
    res['num_tokens'] = len(tokens['input_ids'][0])
    input_ids, attention_mask = transform_single_text(
        text,
        tokenizer,
        chunk_size=510,
        stride=300,
        minimal_chunk_length=20,
        maximal_text_length=None
    )
    res['chunks'] = input_ids
    res['attn_mask'] = attention_mask

    device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    embedder.to(device)
    embeddingss = []
    for i in range(len(res['chunks'])):
        with torch.no_grad():
            output = embedder(res['chunks'][i].unsqueeze(0).to(device), res['attn_mask'][i].unsqueeze(0).to(device))
        _start = 0 if i == 0 else 210
        _end = list(res['attn_mask'][i]).index(0) if 0 in res['attn_mask'][i] else 0
        last_hidden_state = output.last_hidden_state[:, 1+_start:-1+_end, :].detach().cpu()
        embeddingsss.append(last_hidden_state)
    embeddingss = torch.cat(embeddingss, dim=1)

    # SPANS
    spans = [text[int(l[2]):int(l[3])] for l in labels]
    res['spans'] = spans

    # NONE CLASS
    space_ids = [i for i in range(len(text)) if ' ' == text[i]]
    # space_ids = [i for i in range(len(text)) if re.fullmatch(r'\s', text[i])]
    span_edges = [(int(l[2]), int(l[3])) for l in labels]
    def overlap(s, e, edgs):
        return any([not (e < edgs[i][0] or s > edgs[i][1]) for i in range(len(edgs))])
    
    none_classes, none_spans = [], []
    for i in range(len(spans)):
        _len = len(spans[i].split())
        start = space_ids.index(np.random.choice(space_ids))
        end = start + _len
        counter = 1
        while overlap(start, end, span_edges) and counter < 200:
            start = space_ids.index(np.random.choice(space_ids))
            end = start + _len
            counter += 1
        none_spans.append(' '.join(text.split()[start:end + 1]))
        none_classes.append('NoClass')

    res['spans'].extend(none_spans)

    # SPAN EMDEDDINGS
    span_tokens = [
        tokenizer(span, add_special_tokens=False, truncation=False)
        for span in res['spans']
    ]
    span_embeddingss = []
    res['classes'] = []
    for j in range(len(span_tokens)):
        try:
            for i in range(len(tokens['input_ids'][0])):
                if (tokens['input_ids'][0][i:i + len(span_tokens[j]['input_ids'])].numpy() == span_tokens[j]['input_ids']).all():
                    span_embeddingss.append(embeddingss[:, i:i + len(span_tokens[j]['input_ids']), :])
                    try:
                        res['classes'].append(labels[j][1])
                    except IndexError as e:
                        pass
                    break
        except ValueError as e:
            print(text_path, labels_path)
    res['span_embeddingss'] = span_embeddingss

    res['classes'].extend(none_classes)

    text = text.lower()
    for l in labels:
        text = text.replace(text[int(l[2]):int(l[3])], text[int(l[2]):int(l[3])].upper())
    res['text'] = text

    return res

In [3]:
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.preprocessing import OneHotEncoder

onehot_encoder = OneHotEncoder(sparse_output=False)
data = pd.read_csv('train.csv')['class'].values.reshape(-1, 1)
onehot_encoder.fit(data)

tokenizer = BertTokenizer.from_pretrained('SpanBERT/spanbert-base-cased')
embedder = BertModel.from_pretrained('SpanBERT/spanbert-base-cased', add_pooling_layer=False)

class CustomDataset(Dataset):
    def __init__(self, files, onehot_encoder, tokenizer, embedder, padding_size=256, method='endpoint'):
        self.files = files
        self.ohe = onehot_encoder
        self.method = method
        self.tokenizer = tokenizer
        self.embedder = embedder
        self.padding_size = padding_size

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        r = process_text(self.files[idx], self.tokenizer, self.embedder)
        label = r['classes']

        raw_embeddings = r['span_embeddings']
        if self.method == 'endpoint':
            embeddings = [torch.cat((re[:, 0, :], re[:, -1, :]), dim=1) for re in raw_embeddings]
        elif self.method == 'diff-sum':
            embeddings = [torch.cat(
                (
                    re[:, 0, :] + re[:, -1, :],
                    re[:, 0, :] - re[:, -1, :]
                ),
                dim=1
            ) for re in raw_embeddings]
        elif self.method == 'coherent':
            embeddings = [torch.cat(
                (
                    re[:, 0, :360],
                    re[:, -1, 360:720],
                    torch.dot(
                        re[:, 0, :].squeeze()[720:],
                        re[:, -1, :].squeeze()[720:]
                    ).unsqueeze(0).unsqueeze(0)
                ),
                dim=1
            ) for re in raw_embeddings]
        elif self.method == 'maxpool':
            embeddings = [torch.max(re, dim=1)[0] for re in raw_embeddings]
        elif self.method == 'avgpool':
            embeddings = [torch.mean(re, dim=1) for re in raw_embeddings]
        else:
            embeddings = raw_embeddings
            padding = [torch.zeros(1, self.padding_size - e.shape[1], e.shape[-1]) for e in embeddings]
            embeddings = [torch.cat((e, p), dim=1) for e, p in zip(embeddings, padding)]

        return embeddings, torch.tensor(label)

  return self.fget.__get__(instance, owner)()


In [None]:
def collate_fn(batch):
    pass

In [None]:
train_csv_path = 'train.csv'
test_csv_path = 'dev.csv'

train_dataset = CustomDataset(train_csv_path, onehot_encoder)
test_dataset = CustomDataset(test_csv_path, onehot_encoder)

train_batch_size = 32
test_batch_size = 32

train_data_loader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)