## Generate Synonym and Antonym pairs. 

In [3]:
import spacy
import random
from transformers import AutoConfig, AutoTokenizer
from transformers import HfArgumentParser, PreTrainedTokenizer
from dataclasses import dataclass, field
from typing import Optional
from datasets import load_dataset, Dataset
import os
from spacy.tokens import Doc
from random_words import RandomWords
import nltk
from nltk.corpus import wordnet as wn
from nltk.tokenize import word_tokenize
from lemminflect import getInflection

import argparse
import json
import os
import re
import sys

from allennlp.predictors.predictor import Predictor
from lxml import etree
from nltk.tokenize import TreebankWordTokenizer
from tqdm import tqdm



nltk.download('omw-1.4')
nltk.download("wordnet")

random.seed(12345)

Doc.set_extension('_synonym_sent', default=False)
Doc.set_extension('_synonym_intv', default=False)
Doc.set_extension('_ori_syn_intv', default=False)
Doc.set_extension('_antonym_sent', default=False)
Doc.set_extension('_antonym_intv', default=False)
Doc.set_extension('_ori_ant_intv', default=False)

file_path = "data/semeval14/Laptop_Train_v2_text.txt"
model_path = "biaffine-dependency-parser-ptb-2020.04.06.tar.gz"

predictor = Predictor.from_path(model_path)


rw = RandomWords()

REPLACE_RATIO = 0.3

REPLACE_ORIGINAL = 0
REPLACE_LEMMINFLECT = 1
REPLACE_SYNONYM = 2
REPLACE_HYPERNYMS = 3
REPLACE_ANTONYM = 4
REPLACE_RANDOM = 5
REPLACE_ADJACENCY = 6

REPLACE_NONE = -100

SYNONYM_RATIO = 1/3
HYPERNYMS_RATIO = 1/3
LEMMINFLECT_RATIO = 1/3

ANTONYM_RATIO = 1/2
RANDOM_RATIO = 1/2

REPLACE_TAG = ['NN', 'NNS', 'JJ', 'JJR', 'JJS', 'RB', 'RBR',
               'RBS', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']  # [NNP, NNPS]
REPLACE_POS = ['NOUN', 'VERB', 'ADJ', 'ADV']
POS_TO_TAGS = {'NOUN': ['NN', 'NNS'],
               'ADJ': ['JJ', 'JJR', 'JJS'],
               'VERB': ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'],
               'ADV': ['RB', 'RBR', 'RBS']}


def get_synonym(token):
    lemma = token.lemma_
    text = token.text
    tag = token.tag_
    pos = token.pos_
    word_synset = set()
    if pos not in REPLACE_POS:
        return list(word_synset)

    synsets = wn.synsets(text, pos=eval("wn."+pos))
    for synset in synsets:
        words = synset.lemma_names()
        for word in words:
            # word = wnl.lemmatize(word, pos=eval("wn."+pos))
            if word.lower() != text.lower() and word.lower() != lemma.lower():
                # inflt = getInflection(word, tag=tag)
                # word = inflt[0] if len(inflt) else word
                word = word.replace('_', ' ')
                word_synset.add(word)

    return list(word_synset)


def get_hypernyms(token):
    lemma = token.lemma_
    text = token.text
    tag = token.tag_
    pos = token.pos_
    word_hypernyms = set()
    if pos not in REPLACE_POS:
        return list(word_hypernyms)

    synsets = wn.synsets(text, pos=eval("wn."+pos))
    for synset in synsets:
        for hyperset in synset.hypernyms():
            words = hyperset.lemma_names()
            for word in words:
                # word = wnl.lemmatize(word, pos=eval("wn."+pos))
                if word.lower() != text.lower() and word.lower() != lemma.lower():
                    # inflt = getInflection(word, tag=tag)
                    # word = inflt[0] if len(inflt) else word
                    word = word.replace('_', ' ')
                    word_hypernyms.add(word)

    return list(word_hypernyms)


def get_antonym(token):
    lemma = token.lemma_
    text = token.text
    tag = token.tag_
    pos = token.pos_
    word_antonym = set()
    if pos not in REPLACE_POS:
        return list(word_antonym)

    synsets = wn.synsets(text, pos=eval("wn."+pos))
    for synset in synsets:
        for synlemma in synset.lemmas():
            for antonym in synlemma.antonyms():
                word = antonym.name()
                # word = wnl.lemmatize(word, pos=eval("wn."+pos))
                if word.lower() != text.lower() and word.lower() != lemma.lower():
                    # inflt = getInflection(word, tag=tag)
                    # word = inflt[0] if len(inflt) else word
                    word = word.replace('_', ' ')
                    word_antonym.add(word)

    return list(word_antonym)


def get_lemminflect(token):
    text = token.text
    lemma = token.lemma_
    tag = token.tag_
    pos = token.pos_
    word_lemminflect = set()
    if pos not in REPLACE_POS:
        return list(word_lemminflect)

    tags = POS_TO_TAGS[pos]
    for tg in tags:
        if tg == tag:
            continue
        inflects = getInflection(lemma, tag=tg)
        for word in inflects:
            if word.lower() != text.lower():
                word_lemminflect.add(word)

    return list(word_lemminflect)


def search_replacement(doc, candidate_index, replace_type, max_num, pos_to_words=None):
    sr_rep = []
    if max_num < 1:
        return sr_rep

    for r_idx in candidate_index:
        token = doc[r_idx]
        rep = None
        if replace_type == REPLACE_ANTONYM:
            reps = get_antonym(token)
            rep = random.choice(reps) if reps else None
        elif replace_type == REPLACE_ADJACENCY:
            reps = pos_to_words[token.pos_]
            rep = random.choice(reps) if reps else None
        elif replace_type == REPLACE_RANDOM:
            rep = rw.random_word()
        elif replace_type == REPLACE_SYNONYM:
            reps = get_synonym(token)
            rep = random.choice(reps) if reps else None
        elif replace_type == REPLACE_HYPERNYMS:
            reps = get_hypernyms(token)
            rep = random.choice(reps) if reps else None
        elif replace_type == REPLACE_LEMMINFLECT:
            reps = get_lemminflect(token)
            rep = random.choice(reps) if reps else None
        else:
            pass

        if rep and rep.lower() != token.text.lower():
            sr_rep.append((r_idx, rep, replace_type))

        if len(sr_rep) >= max_num:
            break

    # print("Search Replacement: \n", sr_rep)
    return sr_rep


def replace_word(doc, pairs):
    synonym_sent = []
    synonym_intv = []
    ori_syn_intv = []
    antonym_sent = []
    antonym_intv = []
    ori_ant_intv = []

    length = len(doc)
    rep_num = int(length*REPLACE_RATIO)

    rep_index = []
    # pos_word = {p:[] for p in REPLACE_POS}
    for index, token in enumerate(doc):
        if token.pos_ in REPLACE_POS:
            rep_index.append(index)
            # pos_word[token.pos_].append(token.text)

    rep_num = min(rep_num, len(rep_index))

    syn_rand = random.random()
    ant_rand = random.random()

    syn_index = rep_index[:]
    random.shuffle(syn_index)
    ant_index = rep_index[:]
    random.shuffle(ant_index)

    syn_replace = []
    ant_replace = []  # [(rep_idx, rep_word, rep_type)]

    ############### Antonym Replacement ####################
    if ant_rand < ANTONYM_RATIO:
        ant_replace = search_replacement(
            doc, candidate_index=ant_index, replace_type=REPLACE_ANTONYM, max_num=rep_num)
        # print("Ant_replace1: \n", ant_replace)

    # if not ant_replace and ant_rand < ANTONYM_RATIO + ADJACENCY_RATIO:
    #     ant_replace = search_replacement(doc, candidate_index=ant_index, replace_type=REPLACE_ADJACENCY, max_num=rep_num, pos_to_words=pos_word)

    if not ant_replace:
        ant_replace = search_replacement(
            doc, candidate_index=ant_index, replace_type=REPLACE_RANDOM, max_num=rep_num)
        # print("Ant_replace2: \n", ant_replace)

    ############### Synonym Replacement ####################
    if syn_rand < HYPERNYMS_RATIO:
        syn_replace = search_replacement(
            doc, candidate_index=syn_index, replace_type=REPLACE_HYPERNYMS, max_num=rep_num)
        # print("syn_replace1: \n", syn_replace)

    if not syn_replace and syn_rand < HYPERNYMS_RATIO + SYNONYM_RATIO:
        syn_replace = search_replacement(
            doc, candidate_index=syn_index, replace_type=REPLACE_SYNONYM, max_num=rep_num)
        # print("syn_replace2: \n", syn_replace)

    if not syn_replace:
        syn_replace = search_replacement(
            doc, candidate_index=syn_index, replace_type=REPLACE_LEMMINFLECT, max_num=rep_num)
        # print("syn_replace3:\n ", syn_replace)
    ############### Original Replacement ####################

    all_replace = ant_replace + syn_replace
    all_replace = sorted(all_replace, key=lambda x: x[0], reverse=True)
    # print("All Replace: \n", all_replace)

    ori_len = -1  # point to the space before next token
    syn_len = -1
    ant_len = -1
    rep_idx, rep_word, rep_type = all_replace.pop(
    ) if all_replace else (None, None, None)
    for index, token in enumerate(doc):
        ori = syn = ant = token.text

        while index == rep_idx:
            if rep_type in [REPLACE_SYNONYM, REPLACE_HYPERNYMS, REPLACE_LEMMINFLECT]:
                syn = rep_word
                # fix length mismatch, mx.encode for bytelevelbpe
                synonym_intv.append(
                    (syn_len, syn_len + len(syn.encode('utf-8')), rep_type))
                ori_syn_intv.append(
                    (ori_len, ori_len + len(ori.encode('utf-8')), rep_type))
            elif rep_type in [REPLACE_ANTONYM, REPLACE_RANDOM]:
                ant = rep_word
                antonym_intv.append(
                    (ant_len, ant_len + len(ant.encode('utf-8')), rep_type))
                ori_ant_intv.append(
                    (ori_len, ori_len + len(ori.encode('utf-8')), rep_type))
            else:
                pass

            rep_idx, rep_word, rep_type = all_replace.pop(
            ) if all_replace else (None, None, None)

        if index in rep_index:
            if ori == syn:
                synonym_intv.append(
                    (syn_len, syn_len + len(syn.encode('utf-8')), REPLACE_ORIGINAL))
                ori_syn_intv.append(
                    (ori_len, ori_len + len(ori.encode('utf-8')), REPLACE_ORIGINAL))
            if ori == ant:
                antonym_intv.append(
                    (ant_len, ant_len + len(ant.encode('utf-8')), REPLACE_ORIGINAL))
                ori_ant_intv.append(
                    (ori_len, ori_len + len(ori.encode('utf-8')), REPLACE_ORIGINAL))

        ori_len = ori_len + len(ori.encode('utf-8')) + 1
        # +1 to point the space before next token
        syn_len = syn_len + len(syn.encode('utf-8')) + 1
        ant_len = ant_len + len(ant.encode('utf-8')) + 1

        synonym_sent.append(syn)
        antonym_sent.append(ant)

    doc._._synonym_sent = synonym_sent
    # print("Synonym Sent: \n", synonym_sent)
    synSentence = ""
    for val in synonym_sent:
        synSentence += val + " "
    pairs.append(synSentence)
    # print("Synonym Sentence Derived: ", synSentence)
    doc._._synonym_intv = synonym_intv
    # print("Synonym Intv: \n", synonym_intv)

    doc._._ori_syn_intv = ori_syn_intv
    # print("Ori Syn Intv:\n ", ori_syn_intv)

    doc._._antonym_sent = antonym_sent
    # print("Anton Sent: \n", antonym_sent)
    antSentence = ""
    for val in antonym_sent:
        antSentence += val + " "
    pairs.append(antSentence)

    doc._._antonym_intv = antonym_intv
    # print("Anton Intv: \n", antonym_intv)

    doc._._ori_ant_intv = ori_ant_intv
    # print("Ori Ant Intv: \n", ori_ant_intv)

    return doc


def parsing_pipeline(given_sentence):
    sentence_dict = dict()
    parse_predict = predictor.predict(sentence=given_sentence)

    sentence_dict["sentence"] = given_sentence
    sentence_dict['tokens'] = parse_predict['words']
    sentence_dict['tags'] = parse_predict['pos']
    
    predicted_dependencies = parse_predict['predicted_dependencies']
    predicted_heads = parse_predict['predicted_heads']
    
    sentence_dict['predicted_dependencies'] = parse_predict['predicted_dependencies']
    sentence_dict['predicted_heads'] = parse_predict['predicted_heads']
    sentence_dict['dependencies'] = []
    
    for idx, item in enumerate(predicted_dependencies):
        dep_tag = item
        frm = predicted_heads[idx]
        to = idx + 1
        sentence_dict['dependencies'].append([dep_tag, frm, to])
        
    sentence_dict["aspect_sentiment"] = []
    sentence_dict['from_to'] = [] #left and right offset of the target word 

    for index_sentence, tag in enumerate(sentence_dict["tags"]):
        if tag == "NOUN" or tag=="PROPN":
            sentence_dict["aspect_sentiment"].append((sentence_dict['tokens'][index_sentence]))
            from_index = index_sentence
            if from_index != 0:
                to = sentence_dict['predicted_heads'][from_index - 1]
                if sentence_dict['predicted_heads'][from_index] == to and sentence_dict['predicted_heads'][from_index + 1] !=to:
                    to_index = sentence_dict['predicted_heads'][from_index + 1]
                else:
                    to_index = sentence_dict['predicted_heads'][from_index]


            sentence_dict['from_to'].append((from_index, to_index))
    
    return sentence_dict




def createPairs(sentence):
    '''
        Input: Sentence
        Output: Tuple/List of Two Synonym and Antonym Sentences Derived

    '''
    pairs = []
    config = AutoConfig.from_pretrained(
        "nlptown/bert-base-multilingual-uncased-sentiment")
    tokenizer = AutoTokenizer.from_pretrained(
        "nlptown/bert-base-multilingual-uncased-sentiment", config="nlptown/bert-base-multilingual-uncased-sentiment")
    spacy_nlp = spacy.load("en_core_web_sm")

    # nlp = spacy.load('en_core_web_sm', disable=['parser', 'ner'])
    doc = spacy_nlp(sentence)
    replace_word(doc, pairs)

    return pairs


In [4]:
sentence = "This laptop is not mine"
syn_replace_sentence, ant_replace_sentence = createPairs(sentence)

print("Synonym replaced: {}".format(parsing_pipeline(syn_replace_sentence)))

Your label namespace was 'pos'. We recommend you use a namespace ending with 'labels' or 'tags', so we don't add UNK and PAD tokens by default to your vocabulary.  See documentation for `non_padded_namespaces` parameter in Vocabulary.


Synonym replaced: {'sentence': 'This laptop computer is not mine ', 'tokens': ['This', 'laptop', 'computer', 'is', 'not', 'mine'], 'tags': ['DET', 'NOUN', 'NOUN', 'AUX', 'PART', 'ADJ'], 'predicted_dependencies': ['amod', 'amod', 'nsubj', 'cop', 'neg', 'root'], 'predicted_heads': [3, 3, 4, 6, 6, 0], 'dependencies': [['amod', 3, 1], ['amod', 3, 2], ['nsubj', 4, 3], ['cop', 6, 4], ['neg', 6, 5], ['root', 0, 6]], 'aspect_sentiment': ['laptop', 'computer'], 'from_to': [(1, 4), (2, 4)]}


In [5]:
print("Antonym replaced: {}".format(parsing_pipeline(ant_replace_sentence)))

Antonym replaced: {'sentence': 'This heat is not mine ', 'tokens': ['This', 'heat', 'is', 'not', 'mine'], 'tags': ['DET', 'NOUN', 'AUX', 'PART', 'ADJ'], 'predicted_dependencies': ['quantmod', 'nsubj', 'cop', 'neg', 'root'], 'predicted_heads': [2, 3, 5, 5, 0], 'dependencies': [['quantmod', 2, 1], ['nsubj', 3, 2], ['cop', 5, 3], ['neg', 5, 4], ['root', 0, 5]], 'aspect_sentiment': ['heat'], 'from_to': [(1, 3)]}
