In [24]:
from datasets import load_dataset
import json
import os
import random
import nltk
from tqdm import tqdm
nltk.download('words')
nltk.download('stopwords')
from nltk.corpus import words, stopwords
from nltk.tokenize import word_tokenize
import numpy as np
import string

[nltk_data] Downloading package words to /Users/mamooler/nltk_data...
[nltk_data]   Package words is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/mamooler/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [25]:
output_data_dir = "/Users/mamooler/Desktop/incontext_ie/GPT-NER/data"
dataset_name = "bc5disease" # "conll2003"
entity_type = "DISEASE" # " PER" # " ORG" # " LOC" # " MISC"
entity_type2def = {
    "CHEMICAL": "are substances with a distinct molecular composition that are produced by or used in a chemical process. They can be elements or compounds, and they may exist in various forms—solid, liquid, or gas",
    "DISEASE": "are abnormal conditions or disorders of a structure or function in a living organism, often associated with specific signs and symptoms.",
    "GENE": "are specific sequence of nucleotides within a DNA molecule that encode information for the synthesis of a functional product, such as a protein or functional RNA. A gene is typically mentioned with reference to its associated traits, functions, or the role it plays in a biological process"
}
conll_task2entity_type = {"person entities": 'PER', "organization entities": 'ORG', "location entities": 'LOC', "miscellaneous entities": 'MISC'}

dataset_output_dir = "/".join([output_data_dir, dataset_name])
os.makedirs(dataset_output_dir, exist_ok=True)

In [26]:
if "chemprot" in dataset_name:
    dataset_train = load_dataset("bigbio/chemprot", "chemprot_full_source", split="train")
    dataset_val = load_dataset("bigbio/chemprot", "chemprot_full_source", split="validation")
    dataset_test = load_dataset("bigbio/chemprot", "chemprot_full_source", split="test")
elif "conll" in dataset_name:
    dataset_train = load_dataset(dataset_name, split="train")
    dataset_val = load_dataset(dataset_name, split="validation")
    dataset_test = load_dataset(dataset_name, split="test")
else:
    dataset_train = load_dataset("bigbio/blurb", dataset_name, split="train")
    dataset_val = load_dataset("bigbio/blurb", dataset_name, split="validation")
    dataset_test = load_dataset("bigbio/blurb", dataset_name, split="test")

In [27]:
def bio_to_token_spans(tags, entity_type, dataset_name):
    """
    convert BIO format to a list of start and end indices for entities
    for the blurb benchmark dataset, the tags are {0:O, 1:B, 2:I}
    """
    conll_tags = {'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8}
    tags_dict = {'O':0, 'B':1, 'I':2}
    if "conll" in dataset_name:
        tags_dict = {'O': 0, 'B': conll_tags['B-'+entity_type], 'I': conll_tags['I-'+entity_type]}
    spans = []
    start = -1
    end = -1
    for i, tag in enumerate(tags):
        if tag == tags_dict['B']:
            start = i
        elif tag == tags_dict['I']:
            pass
        elif tag == tags_dict['O']:
            if start != -1:
                end = i
                spans.append((start, end-1))
                start = -1
                end = -1
    return spans

In [28]:
def token_span_to_char_span(tokens, token_span):
    """
    convert token span to char span
    """
    start = token_span[0]
    end = token_span[1]
    # if the start token is the first token, then the start char is 0 otherwise it's the length of the tokens before it plus a space
    start_char = len(" ".join(tokens[:start])) + min(start, 1)
    end_char = len(" ".join(tokens[start:end+1])) + start_char - 1
    return (start_char, end_char)

### test the token_span_to_char_span function
# token_span_test = [0,0]
# text_test = "start with bc5 dataset"
# tokens_test = text_test.split()
# char_span_test = token_span_to_char_span(tokens_test, token_span_test)
# start = char_span_test[0]
# end = char_span_test[1]
# print(text_test[start:end+1])

In [29]:
def offsets_to_char_span(offsets):
    """
    convert offsets to char span
    """
    spans = [(start, end-1) for start, end in offsets]
    return spans

In [30]:
def hf_to_gpt_ner_format(hf_dataset, dataset_name, entity_type, entity_def):
    """
    convert a huggingface dataset to the GPT-NER format
    """
    gpt_ner_dataset = []
    
    if "chemprot" in dataset_name:
        for i, text in enumerate(tqdm(hf_dataset['text'])):
            annotations = hf_dataset['entities'][i]
            types = annotations['type']
            offsets = [o for i,o in enumerate(annotations['offsets']) if types[i] == entity_type]
            char_spans = offsets_to_char_span(offsets)
            gpt_ner_dataset.append(
                {
                    'context': text,
                    'end_position_char': [s[1] for s in char_spans],
                    'entity_label': entity_type,
                    'impossible': len(char_spans) == 0,
                    'qas_id': f"{i}.0",
                    'query': entity_def,
                    'span_position_char': [f"{s[0]};{s[1]}" for s in char_spans],
                    'start_position_char': [s[0] for s in char_spans],
                }
            )
    else:
        for i, tokens in enumerate(tqdm(hf_dataset['tokens'])):
            text = " ".join(tokens)
            tags = hf_dataset['ner_tags'][i]
            word_spans = bio_to_token_spans(tags, entity_type, dataset_name)
            char_spans = [token_span_to_char_span(tokens, word_span) for word_span in word_spans]
            gpt_ner_dataset.append(
                {
                    'context': text,
                    'end_position_word': [s[1] for s in word_spans],
                    'end_position_char': [s[1] for s in char_spans],
                    'entity_label': entity_type,
                    'impossible': len(word_spans) == 0,
                    'qas_id': f"{i}.0",
                    'query': entity_def,
                    'span_position_word': [f"{s[0]};{s[1]}" for s in word_spans],
                    'span_position_char': [f"{s[0]};{s[1]}" for s in char_spans],
                    'start_position_word': [s[0] for s in word_spans],
                    'start_position_char': [s[0] for s in char_spans],
                }
            )


    return gpt_ner_dataset

In [32]:
gpt_ner_train = hf_to_gpt_ner_format(dataset_train, dataset_name, entity_type, entity_type2def[entity_type])
gpt_ner_val = hf_to_gpt_ner_format(dataset_val, dataset_name, entity_type, entity_type2def[entity_type])
gpt_ner_test = hf_to_gpt_ner_format(dataset_test, dataset_name, entity_type, entity_type2def[entity_type])

100%|██████████| 12574/12574 [18:34<00:00, 11.28it/s]


In [33]:
# sanity check for char and word indices
def check_char_word_span(dataset):
    i = 0
    for sample in dataset:
        i += 1
        text = sample['context']
        tokens = text.split()
        assert len(sample['start_position_char']) == len(sample['start_position_word']), "start_position_char and start_position_word should have the same length"
        assert len(sample['end_position_char']) == len(sample['end_position_word']), "end_position_char and end_position_word should have the same length"
        for span_index, char_span in enumerate(sample['span_position_char']):
            start_char = int(char_span.split(";")[0])
            end_char = int(char_span.split(";")[1])
            span_word = sample['span_position_word'][span_index]
            start_word = int(span_word.split(";")[0])
            end_word = int(span_word.split(";")[1])
            char_span_text = text[start_char:end_char+1]
            word_span_text = " ".join(tokens[start_word:end_word+1])
            assert char_span_text == word_span_text, f"the span {text[start_char:end_char+1]} should be equal to {' '.join(tokens[start_word:end_word+1])}"

if "chemprot" not in dataset_name:
    # check_char_word_span(gpt_ner_train)
    check_char_word_span(gpt_ner_val)   
    check_char_word_span(gpt_ner_test)

In [35]:
json.dump(gpt_ner_train, open(f"{dataset_output_dir}/ner.train", "w"), indent=4, ensure_ascii=False)
json.dump(gpt_ner_val, open(f"{dataset_output_dir}/ner.val", "w"), indent=4, ensure_ascii=False)
json.dump(gpt_ner_test, open(f"{dataset_output_dir}/ner.test", "w"), indent=4, ensure_ascii=False)

In [38]:
gpt_ner_test_100 = []
texts = [e["text"] for e in json.load(open(f"/Users/mamooler/Desktop/incontext_ie/ICL_IE/data/{dataset_name}/test_100samples.json"))]

# preserve the order of the samples
for text in texts:
    for sample in gpt_ner_test:
        if sample["context"] == text:
            gpt_ner_test_100.append(sample)
            
json.dump(gpt_ner_test_100, open(f"{dataset_output_dir}/ner.test.100", "w"), indent=4, ensure_ascii=False)