In [1]:
import pandas as pd
from os import listdir
from os.path import isfile, join
import random
import math
from enum import Enum
import copy
import nlpaug.augmenter.word as naw
import nlpaug.augmenter.char as nac
import nlpaug.augmenter.word as naw
import nlpaug.augmenter.sentence as nas
import nlpaug.flow as nafc
import nltk
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\Mario\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Mario\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\Mario\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

In [2]:
class FeatureSequence():
    
    def __init__(self, feature_num, start_loc, end_loc):
        self.feature_num = feature_num
        self.start_loc = start_loc
        self.end_loc = end_loc
        
    def __len__(self):
        return self.end_loc - self.start_loc

class Sample():
    
    def __init__(self, text, labels, case_num, pn_num):
        self.text = text
        self.labels = labels
        self.case_num = case_num
        self.pn_num = pn_num
        self.feat_seqs = []
        self.update_feat_seqs()
        
    def update_feat_seqs(self):
        self.feat_seqs = []
        prev_label = self.labels[0]
        start_loc = 0
        end_loc = 0
        for i, label in enumerate(self.labels):
            if prev_label != label:
                end_loc = i
                self.feat_seqs.append(FeatureSequence(prev_label, start_loc, end_loc))
                prev_label = label
                start_loc = i
        end_loc = i
        self.feat_seqs.append(FeatureSequence(prev_label, start_loc, end_loc))
    
    def get_num_word(self):
        return len(self.text.split())
    
    def feat_seq_num(self):
        return len(self.feat_seqs)
    
    def select_random_feat_seq(self, feature_num=None, min_length=10):
        filtered_seqs = [seq for seq in self.feat_seqs if len(seq) >= 5]
        if feature_num != None:
            filtered_seqs = [seq for seq in filtered_seqs if seq.feature_num == feature_num]
        return random.choice(filtered_seqs)   
    
    def select_random_word(self):
        text = ''.join(self.text)
        words = text.split()
        word = random.choice(words)
        loc = text.find(word)
        return word, loc
        
        
    def copy(self):
        return copy.deepcopy(self)
    

class AugmenterType(Enum):
    WORD = 1
    SEQUENCE = 2

class ShuffleSequenceAugmenter():
    
    def __init__(self):
        self.type = AugmenterType.SEQUENCE
        
    def augment(self, text):
        words = text.split()
        words = random.shuffle(words)
        return ' '.join(words)

class BackTranslationAugmenter():
    def __init__(self):
        self.type = AugmenterType.SEQUENCE
        self.augmenter = naw.BackTranslationAug(
            from_model_name='facebook/wmt19-en-de', 
            to_model_name='facebook/wmt19-de-en')
    
    def augment(self, text):
        return self.augmenter.augment(text)

class SynonymAugmenter():
    def __init__(self):
        self.type = AugmenterType.WORD
        self.augmenter = naw.SynonymAug(aug_src='wordnet')
        
    def augment(self, text):
        return self.augmenter.augment(text)

        
def augment(augmenter, sample, percentage_to_augment = 0.1, min_length=10):
    if augmenter.type == AugmenterType.WORD:
        words_to_augment = math.ceil(sample.get_num_word() * percentage_to_augment)
        for i in range(words_to_augment):
            word, loc = sample.select_random_word()
            aug_word = augmenter.augment(word)
#             print(word, aug_word)
            if len(word) < len(aug_word) :
#                 print('adding', len(aug_word) - len(word))
                for j in range(len(aug_word) - len(word)):
                    sample.labels.insert(loc, sample.labels[loc])
            if len(word) > len(aug_word):
#                 print('removing', len(word) - len(aug_word))
                for j in range(len(word) - len(aug_word)):
#                     print('inside')
                    sample.labels.pop(loc)
            sample.text = sample.text.replace(word, aug_word, 1)
        sample.update_feat_seqs()
    
    if augmenter.type == AugmenterType.SEQUENCE:
        sequences_to_augment = math.ceil(sample.feat_seq_num() * percentage_to_augment)
#         print('seqs to augment', sequences_to_augment)
        for i in range(sequences_to_augment):
            
            seq = sample.select_random_feat_seq()
            tokens = sample.text[seq.start_loc:seq.end_loc]
            
            prefix_whitespace = False
            postfix_whitespace = False
            if tokens[0] == ' ':
                prefix_whitespace = True
            if tokens[-1] == ' ':
                postfix_whitespace = True
                
            aug_tokens = augmenter.augment(tokens)
            if aug_tokens == '':
                aug_tokens = tokens
            if len(tokens) < len(aug_tokens) :
#                 print('adding', len(aug_tokens) - len(tokens))
                for j in range(len(aug_tokens) - len(tokens)):
                    sample.labels.insert(seq.start_loc, seq.feature_num)
            if len(tokens) > len(aug_tokens):
#                 print('removing', len(tokens) - len(aug_tokens))
                for j in range(len(tokens) - len(aug_tokens)):
#                     print('inside')
                    sample.labels.pop(seq.start_loc)
#             print(tokens)
#             print(aug_tokens)
            
            if prefix_whitespace:
#                 print('in_prefix')
                aug_tokens = aug_tokens + ' '
                sample.labels.insert(seq.start_loc, seq.feature_num)
            
            if postfix_whitespace:
#                 print('in postfix')
                aug_tokens = ' ' + aug_tokens
                sample.labels.insert(seq.start_loc, seq.feature_num)
                
            sample.text = sample.text.replace(tokens, aug_tokens, 1)
            sample.update_feat_seqs()
            
def read_simplified_data(path):
    
    data_files = [f for f in listdir(path) if isfile(join(path, f))]
    samples = []
    for data_file in data_files:
        case_num, pn_num = (int(i) for i in data_file[:-4].split('_'))
        df = pd.read_csv(path + data_file)  
        tokens = ''.join(df.word.to_list())
        labels = df.label.to_list()
        samples.append(Sample(tokens, labels, case_num, pn_num))
    return samples

def augment_simplified_data(num_augmentations, percentage_to_augment, augmenter, sample, min_length):
    
    augmented_samples = []
    for i in range(num_augmentations):
        augmented_samples.append(sample.copy())
    for i, aug_sample in enumerate(augmented_samples):
        augment(augmenter, aug_sample, percentage_to_augment, min_length)
        aug_sample.pn_num = (i+1)*10000000 + aug_sample.pn_num * sample.pn_num # creates a unique patient num
    
    return augmented_samples    