In [1]:
import glob
from tqdm import tqdm 
from collections import defaultdict

root_path ='/home/santosh/Downloads/chemdner_corpus/'

# annotation_files = sorted(glob.glob(annotation_path + '*.a*'))
text_files = sorted(glob.glob(root_path + '*.abstracts*txt*'))
annotations_files = sorted(glob.glob(root_path + '*.annotations*txt*'))

In [2]:
annotations_files

['/home/santosh/Downloads/chemdner_corpus/development.annotations.txt',
 '/home/santosh/Downloads/chemdner_corpus/evaluation.annotations.txt',
 '/home/santosh/Downloads/chemdner_corpus/training.annotations.txt']

In [3]:
text_files

['/home/santosh/Downloads/chemdner_corpus/development.abstracts.txt',
 '/home/santosh/Downloads/chemdner_corpus/evaluation.abstracts.txt',
 '/home/santosh/Downloads/chemdner_corpus/silver.abstracts.txt',
 '/home/santosh/Downloads/chemdner_corpus/training.abstracts.txt']

In [4]:
# load sci_spacy
# https://allenai.github.io/scispacy/

import scispacy
import spacy
# import en_core_web_sm
import en_core_sci_sm
nlp = en_core_sci_sm.load() # for sentencising. The best sentenciser for biomedical text
# nlp = en_core_web_sm.load()

In [5]:
each_text_path = text_files[3]
each_annotation_path = annotations_files[2]
each_text_path, each_annotation_path

('/home/santosh/Downloads/chemdner_corpus/training.abstracts.txt',
 '/home/santosh/Downloads/chemdner_corpus/training.annotations.txt')

In [6]:
import pandas as pd

text_df = pd.read_csv(each_text_path, names=['id','title', 'text'], sep = '\t')
ann_df = pd.read_csv(each_annotation_path, names=['id','position', 'span1', 'span2', 'entity', 'type'], sep = '\t')

In [7]:
text_df.head(2)

Unnamed: 0,id,title,text
0,21826085,DPP6 as a candidate gene for neuroleptic-induc...,We implemented a two-step approach to detect p...
1,22080034,Nanosilver effects on growth parameters in exp...,Aflatoxicosis is a cause of economic losses in...


In [8]:
ann_df.head(3)

Unnamed: 0,id,position,span1,span2,entity,type
0,21826085,A,946,957,haloperidol,TRIVIAL
1,22080034,A,190,199,aflatoxin,FAMILY
2,22080034,A,594,603,aflatoxin,FAMILY


In [7]:
def extract_spans_from_df(identifier, ann_df):
    selected_cols = ['span1', 'span2', 'entity', 'type']
    wanted_tags = ['TRIVIAL', 'IDENTIFIER', 'MULTIPLE', 'FAMILY']
    
    dic = {'TRIVIAL': "CD", 'IDENTIFIER': "CD", "MULTIPLE":"CD", "FAMILY":"CD"}
  
    ann_df_ = ann_df[ann_df['id'] == identifier]
    ann_df_ = ann_df_[ann_df_['position'] == 'A'][selected_cols]
    ann_df_selected = ann_df_[ann_df_['type'].isin(wanted_tags)]
    
    ann_df_selected = ann_df_selected.replace({"type": dic})
    return ann_df_selected.values.tolist()

In [8]:
non_overlapping_dataset = defaultdict(set)

# read the text file 
for iter_, row in tqdm(text_df.iterrows(), total = len(text_df)):
    
    annotation_list =[] # list of entities from the annotations file
    text_list = [] # sentences list from the text file, with sentence spans
    non_overlapping_annotation_list = [] # list with non overlapping entities
    
    # sentencise the text and get the spans
    text_id = row['id']
    text = row['text']

    doc = nlp(text)
    for sent_ in doc.sents:
        sent = str(sent_).replace('\n',' ')
        text_list.append([sent, sent_.start_char, sent_.end_char])

    
    annotation_list = extract_spans_from_df(text_id, ann_df)
    
    # Remove the overlaps
    for inx in range(-1, len(annotation_list)):
        if inx == len(annotation_list)-1:
            break
        if inx ==-1:
            non_overlapping_annotation_list.append(annotation_list[inx+1])
        else:
            if annotation_list[inx+1][2]>annotation_list[inx][3]:
                non_overlapping_annotation_list.append(annotation_list[inx+1])
        

    for each_annotation in non_overlapping_annotation_list:    

        st_ann_sp = int(each_annotation[0]) #  start of annotation span
        en_ann_sp = int(each_annotation[1]) #  end of annotation span
        ann_type = each_annotation[3] #  annotation type
        ann = each_annotation[2] # annotation

        for each_text in text_list:

            snt_text = str(each_text[0]) # sentence text
            st_snt_sp = int(each_text[1]) # Start of sentence span
            en_snt_sp = int(each_text[2]) # end of sentence span

            if st_snt_sp <= st_ann_sp <= en_snt_sp and st_snt_sp <= en_ann_sp <= en_snt_sp:
                if ann in snt_text: # process only if the annotation is in the sentence
                    anno_list = [st_ann_sp-st_snt_sp, en_ann_sp-st_snt_sp,ann, ann_type] # get the annotation details
                    if snt_text[st_ann_sp-st_snt_sp:en_ann_sp-st_snt_sp] == ann: # final check if annotation is indeed in the text and has the right span
                        non_overlapping_dataset[snt_text].add(tuple(anno_list))
            else:
                non_overlapping_dataset[snt_text].add('None')
                    
                    
#     break

100%|██████████| 3500/3500 [02:34<00:00, 22.66it/s]


In [9]:
final_dataset = defaultdict(list)

for key, value in non_overlapping_dataset.items():
    value_list = list(value)
    if len(value_list)==1 and 'None' in value_list:
        final_dataset[key].append('None')
    elif len(value_list)>1 and 'None' in value_list:
        value_list.remove('None')
        for each_value in value_list:
            final_dataset[key].append(list(each_value)) 
    else:
        for each_value in value_list:
            final_dataset[key].append(list(each_value))
        
  
    

In [10]:
final_dataset

defaultdict(list,
            {'We implemented a two-step approach to detect potential predictor gene variants for neuroleptic-induced tardive dyskinesia (TD) in schizophrenic subjects.': ['None'],
             'First, we screened associations by using a genome-wide (Illumina HumanHapCNV370) SNP array in 61 Japanese schizophrenia patients with treatment-resistant TD and 61 Japanese schizophrenia patients without TD.': ['None'],
             'Next, we performed a replication analysis in 36 treatment-resistant TD and 138 non-TD subjects.': ['None'],
             'An association of an SNP in the DPP6 (dipeptidyl peptidase-like protein-6) gene, rs6977820, the most promising association identified by the screen, was significant in the replication sample (allelic P=0.008 in the replication sample, allelic P=4.6 × 10(-6), odds ratio 2.32 in the combined sample).': ['None'],
             'The SNP is located in intron-1 of the DPP6 gene and the risk allele was associated with decreased DPP6 gen

In [11]:
# Generate train, test and dev pmc ids
import math
import random
import os
import pathlib
import csv
from nltk.tokenize import WordPunctTokenizer, wordpunct_tokenize


def get_train_dev_test_indxs(total_num_annotations):

    percentage=0.80
    iter = 0

    trainids = []
    devids = []
    testids =[]

    nLines = total_num_annotations
    nTrain = int(nLines*percentage) 
    nValid = math.floor((nLines - nTrain)/2)
    nTest = nLines - (nTrain+nValid)

    deck = list(range(0, nLines))
    random.seed(45) # This will be fixed for reproducibility
    random.shuffle(deck)

    train_ids = deck[0:nTrain]
    devel_ids = deck[nTrain:nTrain+nValid]
    test_ids = deck[nTrain+nValid:nTrain+nValid+nTest]

    return train_ids, devel_ids, test_ids

In [12]:
def find_sub_span(sub_span_range, full_spans_range):
    # if a sub span is present in full span return it
    if sub_span_range[0] in range(full_spans_range[0], full_spans_range[1]):
        return sub_span_range


def convert2IOB(text_data, ner_tags):
    tokenizer = WordPunctTokenizer()

    tokens = []
    ners = []
    spans = []

    split_text = tokenizer.tokenize(text_data)
    span_text = list(tokenizer.span_tokenize(text_data))
    # for each word token append 'O'
    arr = ['O'] * len(split_text)

    if 'None' in ner_tags:
        return zip(split_text, arr)

    ner_tags = sorted(ner_tags, key=lambda x: len(x[2]))

    for each_tag in ner_tags:
        span_list = (each_tag[0], each_tag[1])
        token_list = wordpunct_tokenize(each_tag[2])
        ner_list = wordpunct_tokenize(each_tag[3])

        if (len(token_list) > len(ner_list)):
            ner_list = len(token_list) * ner_list
        for i in range(0, len(ner_list)):
            # The logic here is look for the first B-tag and then append I-tag next
            if (i == 0):
                ner_list[i] = 'B-' + ner_list[i]
            else:
                ner_list[i] = 'I-' + ner_list[i]

        tokens.append(token_list)
        ners.append(ner_list)
        spans.append(span_list)

    split_token_span_list = list(zip(split_text, span_text))
    span_ner_list = list(zip(spans, ners))

    sub_spans = []  # get sub spans from the full spans of the ner

    for each_span_ner_list in span_ner_list:
        # in full range ner e.g., [144, 150, 'GM-CSF', 'GP']
        count = 0
        # count is to keep track of the B, I, sub tags in the ner list
        for each_token in split_token_span_list:
            sub_spans_ = find_sub_span(each_token[1], each_span_ner_list[0])
            if sub_spans_:
                sub_spans.append([sub_spans_, each_span_ner_list[1][count]])
                count = count + 1

    for i, each_span_token in enumerate(split_token_span_list):
        for each_ner_span in sub_spans:
            if each_span_token[1] == each_ner_span[0]:
                arr[i] = ''.join(each_ner_span[1])

    return zip(split_text, arr)

In [18]:
def convert_to_IOB_format(dictionary_dataset, train_ids, devel_ids, test_ids, path, folder_name):
    
    result_path = cwd_+folder_name
    pathlib.Path(result_path).mkdir(parents=True, exist_ok=True)
    
    with open(result_path + 'train.tsv', 'w', newline='\n') as f1, open(result_path + 'dev.tsv', 'w',
                        newline='\n') as f2, open(result_path + 'test.tsv', 'w', newline='\n') as f3:

        train_writer = csv.writer(f1, delimiter='\t', lineterminator='\n')
        dev_writer = csv.writer(f2, delimiter='\t', lineterminator='\n')
        test_writer = csv.writer(f3, delimiter='\t', lineterminator='\n')
        
        iter = 0

        for key, values in tqdm(dictionary_dataset.items(), total=len(dictionary_dataset)):
            
            text = str(key)
#             print(text)
            tagged_tokens = convert2IOB(text, values) 


            if iter in train_ids:
                for each_token in tagged_tokens:
                    train_writer.writerow(list(each_token))
                train_writer.writerow('')

            elif iter in devel_ids:
                for each_token in tagged_tokens:
                    dev_writer.writerow(list(each_token))
                dev_writer.writerow('')

            elif iter in test_ids:
                for each_token in tagged_tokens:
                    test_writer.writerow(list(each_token))
                test_writer.writerow('')
            
            iter = iter+1

In [19]:
cwd_ = os.getcwd()+'/Datasets/'
print(cwd_)

/home/santosh/Downloads/archive/Datasets/


In [20]:
print('final non_overlapping_dataset')
trainidx, develidx, testidx = get_train_dev_test_indxs(len(final_dataset))
convert_to_IOB_format(final_dataset,trainidx, develidx, testidx, cwd_, '/CD/' )


  2%|▏         | 416/18240 [00:00<00:04, 4154.45it/s]

final non_overlapping_dataset


100%|██████████| 18240/18240 [00:05<00:00, 3601.27it/s]


In [16]:
dataset = pd.DataFrame(final_dataset.items(), columns=['text', 'ner'])

In [17]:
dataset.to_csv('CD-dataset.csv')