In [4]:
import json
import pandas as pd
import os
import re
from bisect import bisect_left
import random
from functools import reduce

In [44]:
# Paths
pilot_docs = '/data/rsg/nlp/juanmoo1/projects/04_polymer/00_annotation/brat-v1.3_Crunchy_Frog/data/polymer/pilot_dylan/'
bio_files_dir = '../data/all_reagent_grouping/bio_files/'
kfold_split_dir = '../data/all_reagent_grouping/ner_evaluation/k_splits/'

In [46]:
# Create directories if they don't exist
os.makedirs(bio_files_dir, exist_ok=True)
# os.makedirs(split_dir, exist_ok=True)
os.makedirs(kfold_split_dir, exist_ok=True)

## Parser BRAT Files

In [47]:
def get_annotated_document_names(ann_path):
    fnames = [e for e in os.listdir(ann_path) if e.lower().endswith(".ann") or e.lower().endswith(".txt")]
    names = sorted(set([e.split('.')[0] for e in fnames]))
    
    names = [e for e in names if (f'{e}.txt' in fnames) and (f'{e}.ann' in fnames)]
    names.sort()
    
    return names

In [48]:
def parse_document_annotations(doc_name, ann_dir_path=pilot_docs):
    
    text_doc_path = os.path.join(ann_dir_path, f'{doc_name}.txt')
    text = open(text_doc_path, 'r').read()
    
    ann_doc_path = os.path.join(ann_dir_path, f'{doc_name}.ann')
    ann_lines = open(ann_doc_path, 'r').readlines()
    
    tokens = text.split()
    tok_lens = [0] + list(map(len, tokens))
    for j in range(1, len(tok_lens)):
        tok_lens[j] += tok_lens[j - 1] + 1
        
    entities = []
    links = []

    
    for line in ann_lines:
        line = re.sub('\s+', ' ', line).split()

        ann_type = line[0]
        ann_name = line[1]
        
        if ann_type.startswith('T'):
            
            chr_start, chr_stop = map(int, line[2:4])
            ann_text = line[4]

            tok_start = bisect_left(tok_lens, chr_start)
            tok_stop = bisect_left(tok_lens, chr_stop)
            
            entities.append((ann_type, ann_name, tok_start, tok_stop))

        elif ann_type.startswith('R'):
            arg_1, arg_2 = map(lambda s: s.split(':')[1], line[2:4])
            links.append((ann_type, ann_name, arg_1, arg_2))

    
    return {
        "doc_name": doc_name,
        "text": text,
        "spans": entities,
        "links": links
    }

In [49]:
document_names = get_annotated_document_names(pilot_docs)
parsed_annotations = [parse_document_annotations(dname) for dname in document_names]

## Create BIO Files

In [50]:
# Configs
all_entities = ['Monomer',
 'Macromonomer',
 'Initiator',
 'Macroinitiator',
 'Catalyst',
 'Solvent',
 'Reagent',
 'Mixture',
 'Product',
 'Polymer',
 'WorkupReagent',
 'Lighting',
 'RemovedChemical',
 'ActionInput',
 'Monomer_ref',
 'Macromonomer_ref',
 'Initiator_ref',
 'Macroinitiator_ref',
 'Catalyst_ref',
 'Solvent_ref',
 'Reagent_ref',
 'Mixture_ref',
 'Product_ref',
 'Polymer_ref',
 'WorkupReagent_ref',
 'Lighting_ref',
 'RemovedChemical_ref',
 'ActionInput_ref',
 'Vacuum_Condition',
 'Atmosphere_Condition',
 'Repetition_Condition',
 'Action',
 'Amount',
 'Time',
 'Temperature',
 'Data',
 'Yield']

# identity_map = {e:e for e in all_entities}
reagent_ents = ['Monomer', 'Macromonomer', 'Initiator', 'Macroinitiator', 'Catalyst', 'Solvent', 'Reagent', 'Polymer']
reagent_map = [[(e, 'Reagent'), (e + '_ref', 'Reagent')] for e in reagent_ents]
reagent_map.append([('ActionInput_ref', 'ActionInput'), ('WorkupReagent_ref', 'WorkupReagent')])
reagent_map = dict(list(reduce(lambda a, b: a + b, reagent_map)))

# all_reagents
all_reagents = reagent_ents + ['WorkupReagent', 'RemovedChemical', 'ActionInput']
all_reagents_map = [[(e, 'Reagent'), (e + '_ref', 'Reagent')] for e in all_reagents]
all_reagents_map = dict(list(reduce(lambda a, b: a + b, all_reagents_map)))

In [51]:
def create_doc_bio(par_dict, label_map=dict()):
    tokens = par_dict['text'].split(' ')
    labels = ['O'] * len(tokens)
    
    for _, key, s, t in par_dict['spans']:
        ekey = label_map.get(key, key)
        all_labels.add(ekey)
        
        if ekey == 'O':
            # skip no-concept
            break
        
        labels[s] = f'B-{ekey}'
        for j in range(s + 1, t):
            labels[j] = f'I-{ekey}'
    
    bio_par = []
    for e in zip(tokens, labels):
        bio_par.append('\t'.join(e))
    bio_par = '\n'.join(bio_par)
    
    return (par_dict["doc_name"], bio_par)

In [52]:
# Create and save files
all_labels = set()
bio_pars = [create_doc_bio(par_dict, all_reagents_map) for par_dict in parsed_annotations]

for doc_name, bio_txt in bio_pars:
    bio_file_path = os.path.join(bio_files_dir, f'{doc_name}.bio')
    with open(bio_file_path, 'w') as bio_file:
        bio_file.write(bio_txt)

## Split for Testing

In [53]:
train_percent = 0.8
random.seed(0)
paragraphs = [e[1] for e in bio_pars]
random.shuffle(paragraphs)
j = int(len(paragraphs) * train_percent)

train_set = paragraphs[:j]
test_set = paragraphs[j:]

train_txt = '\n\n'.join(train_set)
test_txt = '\n\n'.join(test_set)

# with open(os.path.join(split_dir, 'train.txt'), 'w') as train_file:
#     train_file.write(train_txt)

# with open(os.path.join(split_dir, 'test.txt'), 'w') as test_file:
#     test_file.write(test_txt)
    
# with open(os.path.join(split_dir, 'dev.txt'), 'w') as dev_file:
#     dev_file.write(test_txt)

In [26]:
# Get labels
all_labels = [e for e in all_labels if e != 'O']
all_labels.sort()

In [27]:
bio_labels = []
for l in all_labels:
    bio_labels.append(f'B-{l}')
    bio_labels.append(f'I-{l}')
bio_labels = ['O'] + bio_labels

In [46]:
bio_labels

['O',
 'B-Action',
 'I-Action',
 'B-ActionInput',
 'I-ActionInput',
 'B-ActionInput_ref',
 'I-ActionInput_ref',
 'B-Amount',
 'I-Amount',
 'B-Atmosphere_Condition',
 'I-Atmosphere_Condition',
 'B-Lighting',
 'I-Lighting',
 'B-Macromonomer',
 'I-Macromonomer',
 'B-Product',
 'I-Product',
 'B-Reagent',
 'I-Reagent',
 'B-RemovedChemical',
 'I-RemovedChemical',
 'B-RemovedChemical_ref',
 'I-RemovedChemical_ref',
 'B-Repetition_Condition',
 'I-Repetition_Condition',
 'B-Temperature',
 'I-Temperature',
 'B-Time',
 'I-Time',
 'B-Vacuum_Condition',
 'I-Vacuum_Condition',
 'B-WorkupReagent',
 'I-WorkupReagent',
 'B-WorkupReagent_ref',
 'I-WorkupReagent_ref',
 'B-Yield',
 'I-Yield']

In [29]:
len(bio_labels)

37

## Cross - Validation

In [54]:
from sklearn.model_selection import KFold
import numpy as np

In [55]:
kf = KFold(n_splits=10, shuffle=True, random_state=0)
paragraphs = np.array(paragraphs)

for j, (train_index, test_index) in enumerate(kf.split(paragraphs)):
    split_path = os.path.join(kfold_split_dir, f'split_{j}')
    os.makedirs(split_path, exist_ok=True)
    
    train_set = paragraphs[train_index]
    test_set = paragraphs[test_index]
    
    train_txt = '\n\n'.join(train_set)
    dev_txt = '\n\n'.join(test_set)
    
    with open(os.path.join(split_path, 'train.txt'), 'w') as train_file:
        train_file.write(train_txt)
        
    with open(os.path.join(split_path, 'dev.txt'), 'w') as dev_file:
        dev_file.write(dev_txt)
    
    with open(os.path.join(split_path, 'test.txt'), 'w') as test_file:
        test_file.write(dev_txt)
