In [None]:
# !pip install transformers tqdm 

In [None]:
import torch
from transformers import RobertaTokenizer, RobertaForMaskedLM, pipeline, AutoModelForMaskedLM, AutoTokenizer
import re
from tqdm import tqdm
from itertools import permutations
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
import pandas as pd
import os
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"


In [None]:
def get_candidates_setwise(Ci,Ak, wk):
    Ci_final = set()
    for cj in Ci:
        if cj in Ci_final:
            Ci_final.remove(cj)
        c0j = cj.split()
        c_temp = c0j.copy()
        for a in Ak:
            if type(a) == tuple:
                c0j[a[1]] = a[0]
                Ci_final.add(' '.join((c0j)))
                c0j = c_temp.copy()
                continue
            for i in range(len(a)):
                c0j[a[i][1]] = a[i][0]
                Ci_final.add(' '.join((c0j)))
            c0j = c_temp.copy()
    return Ci_final

def candidate_set_generation(xi, pi0, Ti, T0i):
    c0 = ' '.join(['<mask>'] * len(pi0))
    Ci = {c0}
    Tshared = Ti.intersection(T0i)
    for tk in Tshared:
        Wk = [word for word, pos in xi if pos == tk]
        Sk = [i for i, pos in enumerate(pi0) if pos[1] == tk]
        Ak = []
        if len(Wk)==1 and len(Sk)>1:
            for i in Sk:
                Ak.append((Wk[0],i))
        elif len(Sk)==1: 
            for word in Wk:
                Ak.append((word,Sk[0]))
        elif len(Sk)>len(Wk):
            perms = list(permutations(Sk,len(Wk)))
            for perm in perms:
                Ak.append(list(zip(Wk,perm)))
        Ci_tag = get_candidates_setwise(Ci,Ak,Wk)
        Ci = Ci.union(Ci_tag)
    return Ci

In [None]:
#Import similar sentences dataset
bad_sentences_similar_pos_tags = pd.read_csv('data/bad_sentences_similar_pos_tags.csv',header=0)

In [None]:
import ast
candidates_generated = {}
for i in tqdm(range(len(bad_sentences_similar_pos_tags))):
    candidate_sentences = None
    xi = bad_sentences_similar_pos_tags['bad_sentences'][i]

    if len(xi.split())>15:
        continue
    # print(f'xi {xi}')
    tag_xi = bad_sentences_similar_pos_tags['bad_sentences_pos_tags'][i]
    tag_xi = [s.strip() for s in tag_xi[1:-1].split(',')]
    # print(f'tag_xi {tag_xi}')
    xi_mod = [(word,tag) for word,tag in zip(xi.split()[:15],tag_xi[:15])]
    Ti = set(tag_xi[:15])
    list_of_similar_sentences = [s.strip() for s in bad_sentences_similar_pos_tags['similar_sentences'][i][1:-1].split(',')]
    list_of_similar_sentences_pos_tags = ast.literal_eval(bad_sentences_similar_pos_tags['pos_tags_similar_sentences'][i])
    # print(f'list_of_similar_sentences_pos_tags {list_of_similar_sentences_pos_tags}')
    for j in range(len(list_of_similar_sentences)):
        
        pi0 = list_of_similar_sentences[j]
        if len(pi0.split())>15:
            continue
        # print(f'pi0 {pi0}')
        try:
            tag_pi0 = list_of_similar_sentences_pos_tags[j]
        except:
            continue
        # print(f'tag_pi0 {tag_pi0}')
        try:
            tag_pi0 = [s.strip() for s in tag_pi0[1:-1].split(',')]
        except:
            continue
        pi0_mod = [(word,tag) for word,tag in zip(pi0.split(),tag_pi0)]
        T0i = set(tag_pi0)
        # print(f'xi_mod = {xi_mod} ')
        # print(f'pi0_mod = {pi0_mod}')
        Ci = candidate_set_generation(xi_mod, pi0_mod, Ti, T0i)
        # print(len(Ci))
        if candidate_sentences is None:
            candidate_sentences = Ci
        else:
            candidate_sentences = candidate_sentences.union(Ci)
    candidates_generated[i] = candidate_sentences


1. DRoBERTa Base Model

In [None]:
from transformers import pipeline, AutoModelForMaskedLM, AutoTokenizer

# Load the pre-trained DistilRoBERTa model and tokenizer
model_name = "distilroberta-base"
model = AutoModelForMaskedLM.from_pretrained(model_name)
model = model.to('cuda')

tokenizer = AutoTokenizer.from_pretrained(model_name)
# Define the sentence with multiple masks

# Initialize the pipeline for masked language modeling
fill_mask = pipeline(
    "fill-mask",
    model=model,
    tokenizer=tokenizer,
    device=0
)

def mask_filled_sentences(sentences):
    mask_filled_sentences_list = []
    for sentence in tqdm(sentences):
        sentence=sentence
        num_masks = sentence.count(tokenizer.mask_token)
        if num_masks > len(sentence.split())-2:
            continue
        for i in range(num_masks):
            results = fill_mask(sentence, top_k=5)
            if i == num_masks - 1:
                sentence = results[0]['sequence']
            else:
                sentence = results[0][0]['sequence']
        mask_filled_sentences_list.append(sentence)
    return mask_filled_sentences_list

2. Finetuned Model

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import nltk
from nltk.translate.bleu_score import sentence_bleu


def calculate_scores(bad_sentence, generated_sentences):
    # Create a CountVectorizer to convert sentences into BoW vectors
    vectorizer = CountVectorizer()

    # Convert original sentence and generated sentences to BoW vectors
    bow_original = vectorizer.fit_transform([bad_sentence])
    bow_generated = vectorizer.transform(generated_sentences)

    # Calculate cosine similarities between original sentence and generated sentences
    cos_similarities = cosine_similarity(bow_original, bow_generated)[0]
    
    # Preprocess the bad sentence and generated sentences
    bad_sentence_tokens = nltk.word_tokenize(bad_sentence.lower())
    generated_sentences_tokens = [nltk.word_tokenize(sent.lower()) for sent in generated_sentences]

    # Calculate BLEU score and toxicity score for each sentence
    scores = []
    for i, sent_tokens in enumerate(generated_sentences_tokens):
        # Calculate BLEU score
        bleu_score = sentence_bleu([bad_sentence_tokens], sent_tokens)
        
        # Calculate toxicity score using the 
        toxicity_score = 0

        # Sum the scores and append to the list
        total_score = cos_similarities[i] + bleu_score + toxicity_score

        scores.append(total_score)
        
    max_index = scores.index(max(scores))
    
    return scores, generated_sentences[max_index]


In [None]:
# generate text for each of the candidate sentences
generated_texts = {}
for key in (candidates_generated.keys()): 
    candidate_sentences = list(candidates_generated[key])
    mask_filled_sentences_list = mask_filled_sentences(candidate_sentences)
    if mask_filled_sentences_list is not None:
        generated_texts[key] = mask_filled_sentences_list

In [None]:
generated_texts.keys()

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction


def calculate_scores(bad_sentence, generated_sentences):
    # Define the smoothing function
    smoothie = SmoothingFunction().method4

    # Create a CountVectorizer to convert sentences into BoW vectors
    vectorizer = CountVectorizer()

    # Convert original sentence and generated sentences to BoW vectors
    bow_original = vectorizer.fit_transform([bad_sentence])
    bow_generated = vectorizer.transform(generated_sentences)

    # Calculate cosine similarities between original sentence and generated sentences
    cos_similarities = cosine_similarity(bow_original, bow_generated)[0]
    
    # Preprocess the bad sentence and generated sentences
    bad_sentence_tokens = nltk.word_tokenize(bad_sentence.lower())
    generated_sentences_tokens = [nltk.word_tokenize(sent.lower()) for sent in generated_sentences]

    # Calculate BLEU score and toxicity score for each sentence
    scores = []
    for i, sent_tokens in enumerate(generated_sentences_tokens):
        # Calculate BLEU score
        bleu_score = sentence_bleu([bad_sentence_tokens], sent_tokens, smoothing_function=smoothie)
        
        # Calculate toxicity score using the 
        toxicity_score = 0

        # Sum the scores and append to the list
        total_score = cos_similarities[i] + bleu_score + toxicity_score

        scores.append(total_score)
        
    max_index = scores.index(max(scores))
    
    return scores, generated_sentences[max_index]

In [None]:
for i,idx in enumerate(generated_texts.keys()): 
    bad_sentence = bad_sentences_similar_pos_tags["bad_sentences"][idx]
    generated_sentences = generated_texts[idx]
    print(f'{i} bad sentence -------------------------------- {bad_sentence}')
    # Calculate BLEU score and toxicity score for each sentence
    scores,best_sentence = calculate_scores(bad_sentence, generated_sentences)
    print(f'Good sentence ------------------------------ {best_sentence}\n')
