# Test a sentence input
Requires these files in the same folder as this notebook:
- a bigram_phraser
- a trigram_phraser
- a word2vec model (3 files: model, syn1neg, and vectors)
- a list of stopwords (to ignore as potential euphemisms)

Required packages:
- gensim
- transformers

In [1]:
from gensim.models.phrases import Phraser, Phrases
from gensim.models import Word2Vec
from gensim.models import KeyedVectors
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
from scipy.special import softmax
import urllib.request
import numpy as np
from tqdm import tqdm
import re
import csv

In [3]:
class Euph_Detection:
    def __init__(self, bigram_phraser, trigram_phraser, w2v_model, stopwords_text, sentiment, offensive):
        self.bigram_phraser = Phraser.load(bigram_phraser)
        self.trigram_phraser = Phraser.load(trigram_phraser)
        self.base_model = Word2Vec.load(w2v_model)
        self.model = Word2Vec.load(w2v_model)
        # self.model = KeyedVectors.load(w2v_model)
        self.topic_list = ['politics', 'death', 'kill', 'crime',
               'drugs', 'alcohol', 'fat', 'old', 'poor', 'cheap',
               'sex', 'sexual',
               'employment', 'job', 'disability', 'disabled', 
               'accident', 'pregnant', 'poop', 'sickness', 'race', 'racial', 'vomit'
              ]
        self.stopwords = self.read_stopwords(stopwords_text)
        # load the sentiment models
        # sentiment_labels, sentiment_model, sentiment_tokenizer = self.load_roberta(sentiment)
        # offensive_labels, offensive_model, offensive_tokenizer = self.load_roberta(offensive)
        # pack them together - just for conciseness
        self.sentiment_pack = [x for x in self.load_roberta(sentiment)]
        self.offensive_pack = [x for x in self.load_roberta(offensive)]
    
    def preprocess(self, s):
        s = s.strip()
        s = re.sub(r'(##\d*\W)|<\w>|,|;|:|--|\(|\)|#|%|\\|\/|\.|\*|\+|@', '', s)
        s = re.sub(r'\s\s+', ' ', s)
        s = s.lower()
        return s

    def get_phrases(self, s):
        bigrammed_phrases = self.bigram_phraser[s.split()]
        trigrammed_phrases = self.trigram_phraser[bigrammed_phrases]

    def sum_similarity(self, phrase, topic_list):
        score = 0
        for topic in topic_list:
            try:
                similarity = self.model.wv.similarity(phrase, topic)
                # EXPERIMENTAL - to "reward" the phrases with a high similarity to a particular category, but maybe not others
                if (similarity > 0.50):
                    # print("{} has a high similarity with {}".format(phrase, topic))
                    return 1.51
                if (similarity > 0):
                    score += similarity
            except:
                score += 0
        return score
    
    def read_stopwords(self, text):
        stopwords = []
        with open(text,'rb') as f:
            content = f.read()
            content = content.split(b'\r\n')
            for line in content:
                stopwords.append(line.decode('utf-8'))
        return stopwords

    def topically_filter_phrases(self, phrases, topic_list, stopwords, THRESHOLD, show_stats=False):
        quality_phrases = []
        filtered = []

        for phrase in phrases:
            if (phrase in stopwords):
                continue
            similarity = self.sum_similarity(phrase, topic_list)

            if (show_stats == True):
                print("{} has a relevance score of {}".format(phrase, similarity)) #table?

            if (similarity > THRESHOLD and phrase not in quality_phrases):
                quality_phrases.append(phrase)
            else:
                filtered.append(phrase)

        # if (show_stats == True):
        #     print("\nRELEVANT PHRASES: {}".format(quality_phrases))
        #     print("IRRELEVANT PHRASES: {}".format(filtered))
        return quality_phrases

    def load_roberta(self, task):
        # Tasks:
        # emoji, emotion, hate, irony, offensive, sentiment
        # stance/abortion, stance/atheism, stance/climate, stance/feminist, stance/hillary

        # task='sentiment' or 'offensive'
        MODEL = f"cardiffnlp/twitter-roberta-base-{task}"

        tokenizer = AutoTokenizer.from_pretrained(MODEL)

        # download label mapping
        labels=[]
        mapping_link = f"https://raw.githubusercontent.com/cardiffnlp/tweeteval/main/datasets/{task}/mapping.txt"
        with urllib.request.urlopen(mapping_link) as f:
            html = f.read().decode('utf-8').split("\n")
            csvreader = csv.reader(html, delimiter='\t')
        labels = [row[1] for row in csvreader if len(row) > 1]

        # pretrained
        model = AutoModelForSequenceClassification.from_pretrained(MODEL)
        model.save_pretrained(MODEL)
        tokenizer.save_pretrained(MODEL)

        return labels, model, tokenizer
        
    '''
    functions for getting the sentiment 
    '''
    def get_sentiment(self, s, pack):
        labels, model, tokenizer = pack[0], pack[1], pack[2]
        encoded_input = tokenizer(s, return_tensors='pt')
        output = model(**encoded_input)
        scores = output[0][0].detach().numpy()
        scores = softmax(scores)
        return scores
    
    '''
    needs functions load_roberta_sentiment(), load_roberta_offensive(), get_sentiment() and get_offensive()
    '''
    def get_top_euph_candidates(self, text, phrases, num_paraphrases, wv_model, sentiment_pack, offensive_pack, show_stats=False):
        orig_scores = list(self.get_sentiment(text, sentiment_pack))
        orig_scores = orig_scores + list(self.get_sentiment(text, offensive_pack))
        if show_stats == True: print('SENTIMENT OF ORIGINAL SENTENCE: {}'.format(orig_scores))
        phrase_scores = []

        for q in tqdm(phrases):
            paraphrases = []
            if show_stats == True: print('\n'+q)
            paraphrases = wv_model.wv.most_similar(q, topn = num_paraphrases) # can swap out
            # print(q)
            # print(paraphrases)
            
            # various sentiment statistics
            sentiment_shift = [0, 0, 0, 0, 0] # [neg, neu, pos, off, n-off]
            max_inc = [0, 0, 0, 0, 0]
            max_inc_para = ["", "", "", "", ""]
            tot_neg_inc = 0
            tot_neu_inc = 0
            tot_pos_inc = 0
            tot_noff_inc = 0
            tot_off_inc = 0
            
            for p in paraphrases:
                p_string = re.sub(r'_', ' ', p[0]) # the underscores are removed for sentiment computation - experiment?
                q_string = re.sub(r'_', ' ', q)
                
                if (q_string in p_string):
                    # print("Paraphrase is superstring, skipping!")
                    # print()
                    continue
                    
                # replacement
                pattern = re.compile(r'\b'+q_string+r'\b', re.IGNORECASE)
                new_sentence = pattern.sub(p_string, text)
                # at this point, we could check the integrity of the paraphrase

                # get the sentiment/offensive scores for this paraphrase
                scores = list(self.get_sentiment(new_sentence, sentiment_pack))
                scores = scores + list(self.get_sentiment(new_sentence, offensive_pack))

                # update the quality phrase's sentiment statistics with the sentiment shifts from this paraphrase
                shifts = [0, 0, 0, 0, 0]
                for i in range(0, len(scores)):
                    shifts[i] = scores[i] - orig_scores[i]
                    sentiment_shift[i] += shifts[i]
                    if (shifts[i] > max_inc[i]):
                        max_inc[i] = shifts[i]
                        max_inc_para[i] = p_string

                # update the relevant scores for detection
                if (shifts[0] > 0):
                    tot_neg_inc += shifts[0]
                if (shifts[1] > 0):
                    tot_neu_inc += shifts[1]
                if (shifts[2] > 0):
                    tot_pos_inc += shifts[2]
                if (shifts[3] > 0):
                    tot_noff_inc += shifts[3]
                if (shifts[4] > 0):
                    tot_off_inc += shifts[4]
                
                # print(p_string)
                # print(shifts)

            for val in sentiment_shift:
                val /= num_paraphrases
            if (show_stats == True):
                print("AVERAGE SENTIMENT SHIFTS: {}".format(sentiment_shift))
                print("MAX INCREASE FROM A PHRASE: {}".format(max_inc))
                print("PHRASES THAT CAUSED EACH ^: {}".format(max_inc_para))
                print("TOTAL NEGATIVE INCREASE: {}".format(tot_neg_inc))
                print("TOTAL NEUTRAL INCREASE: {}".format(tot_neu_inc))
                print("TOTAL POSITIVE INCREASE: {}".format(tot_pos_inc))
                print("TOTAL NON-OFFENSIVE INCREASE: {}".format(tot_noff_inc))
                print("TOTAL OFFENSIVE INCREASE: {}".format(tot_off_inc))

            score = tot_neg_inc + tot_neu_inc + 2*(tot_noff_inc + tot_off_inc)
            phrase_scores.append((q_string, score))

        phrase_scores = list(sorted(phrase_scores, key=lambda x: x[1], reverse=True))
        return phrase_scores
    
    def detect_euphs(self, s, topic_threshold, num_paraphrases, show_stats=False):
        s = self.preprocess(s)
        print(s)

        bigrammed_phrases = self.bigram_phraser[s.split()]
        trigrammed_phrases = self.trigram_phraser[bigrammed_phrases]
        print("\nDETECTED PHRASES: {}".format(trigrammed_phrases))

        data = []
        data.append(trigrammed_phrases)
        # train model on input data
        self.model.train(data, total_examples=len(data), epochs=10)

        quality_phrases = self.topically_filter_phrases(trigrammed_phrases, self.topic_list, self.stopwords, topic_threshold, show_stats)
        print("\nRELEVANT PHRASES: {}".format(quality_phrases))

        candidate_list = self.get_top_euph_candidates(s, quality_phrases, num_paraphrases, 
                                            self.model, self.sentiment_pack, self.offensive_pack, 
                                            show_stats)
        
        self.model = self.base_model # reset
        
        return candidate_list

In [4]:
import time

start = time.time()

euph_detector = Euph_Detection('data/bigram_phraser_7', 'data/trigram_phraser_7', 
                               'data/wv_model_7', 'data/stopwords.txt', 
                               'sentiment', 'offensive')
print("Setup took {} seconds".format(time.time() - start))

Setup took 76.65264558792114 seconds


#### Input your sentence below

In [7]:
s = "She was happy to announce to her parents that they would soon be grandparents as she had a bun in the oven"
candidate_ranking = euph_detector.detect_euphs(s, topic_threshold=1.45, num_paraphrases=25, show_stats=False)

print("\nEUPH CANDIDATE RANKING: {}".format(candidate_ranking))

she was happy to announce to her parents that they would soon be grandparents as she had a bun in the oven

DETECTED PHRASES: ['she_was', 'happy_to_announce', 'to', 'her_parents', 'that', 'they', 'would_soon', 'be', 'grandparents', 'as', 'she_had', 'a', 'bun_in_the_oven']

RELEVANT PHRASES: ['she_was', 'her_parents', 'grandparents', 'she_had', 'bun_in_the_oven']


100%|████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:14<00:00,  3.00s/it]


EUPH CANDIDATE RANKING: [('bun in the oven', 6.281971137272194), ('she had', 2.188295707805082), ('grandparents', 1.6664501605555415), ('she was', 1.621023417916149), ('her parents', 1.4890306764282286)]





## Analyzing the Process
After running Euph_Detection on a sentence, you can further look at the intermediate outputs for a specific candidate phrase:

#### Topic Relevance

In [12]:
test_phrase = 'grandparents'
similar_topics = []

topic_list = euph_detector.topic_list
model = euph_detector.model

score = 0
for topic in topic_list:
    similarity = model.wv.similarity(test_phrase, topic)
    if (similarity > 0.25):
        similar_topics.append(topic)
    if (similarity > 0):
        score += similarity
    print('{}: {}'.format(topic, similarity))

print('SIMILAR TOPICS: {}'.format(similar_topics))
print('TOTAL SCORE: {}'.format(score))

politics: 0.03169498220086098
death: 0.23297420144081116
kill: 0.0050100162625312805
crime: -0.008931629359722137
drugs: 0.12348905205726624
alcohol: 0.0824684277176857
fat: 0.10275810211896896
old: 0.24060304462909698
poor: 0.2921329736709595
cheap: 0.009934276342391968
sex: 0.20923519134521484
sexual: 0.08769768476486206
employment: 0.04461533948779106
job: 0.13673429191112518
disability: 0.3075178265571594
disabled: 0.3100000321865082
accident: 0.16150902211666107
pregnant: 0.3779146671295166
poop: 0.12431168556213379
sickness: 0.24127842485904694
race: 0.10285885632038116
racial: 0.04492925852537155
vomit: -0.023195646703243256
SIMILAR TOPICS: ['poor', 'disability', 'disabled', 'pregnant']
TOTAL SCORE: 3.2696673572063446


#### Sentiment

In [None]:
text = s
q = 'grandparents'

sentiment_pack = euph_detector.sentiment_pack
offensive_pack = euph_detector.offensive_pack
model = euph_detector.model

orig_scores = list(euph_detector.get_sentiment(s, sentiment_pack))
orig_scores = orig_scores + list(euph_detector.get_sentiment(text, offensive_pack))
print('SENTIMENT OF ORIGINAL SENTENCE: {}'.format(orig_scores))

num_paraphrases=25
paraphrases = []
print('\n'+q)
paraphrases = model.wv.most_similar(q, topn = num_paraphrases) # can swap out

# various sentiment statistics
sentiment_shift = [0, 0, 0, 0, 0] # [neg, neu, pos, off, n-off]
max_inc = [0, 0, 0, 0, 0]
max_inc_para = ["", "", "", "", ""]
tot_neg_inc = 0
tot_neu_inc = 0
tot_pos_inc = 0
tot_noff_inc = 0
tot_off_inc = 0

for p in paraphrases:
    p_string = re.sub(r'_', ' ', p[0]) # the underscores are removed for sentiment computation - experiment?
    q_string = re.sub(r'_', ' ', q) # string for the original phrase
    
    # sentences_a = [p_string]
    # sentences_b = [q_string]
    # similarities = simcse_model.similarity(sentences_a, sentences_b)
    # if (similarities < 0.7):
    #     print(p_string + " is being skipped.")
    #     continue
    print(p_string)
    # cos_sim = get_phrase_cos_sim(p_string, q_string)
    if (q_string in p_string):
        print("Paraphrase is superstring, skipping!")
        print()
        continue

    # replacement
    pattern = re.compile(r'\b'+q_string+r'\b', re.IGNORECASE)
    new_sentence = pattern.sub(p_string, text)
    
    # at this point, we could check the integrity of the paraphrase

    # get the sentiment/offensive scores for this paraphrase
    scores = list(euph_detector.get_sentiment(new_sentence, sentiment_pack))
    scores = scores + list(euph_detector.get_sentiment(new_sentence, offensive_pack))

    # update the quality phrase's sentiment statistics with the sentiment shifts from this paraphrase
    shifts = [0, 0, 0, 0, 0]
    for i in range(0, len(scores)):
        shifts[i] = scores[i] - orig_scores[i]
        sentiment_shift[i] += shifts[i]
        if (shifts[i] > max_inc[i]):
            max_inc[i] = shifts[i]
            max_inc_para[i] = p_string
    print(shifts)
    
    print()
    # update the relevant scores for detection
    if (shifts[0] > 0):
        tot_neg_inc += shifts[0]
    if (shifts[1] > 0):
        tot_neu_inc += shifts[1]
    if (shifts[2] > 0):
        tot_pos_inc += shifts[2]
    if (shifts[3] > 0):
        tot_noff_inc += shifts[3]
    if (shifts[4] > 0):
        tot_off_inc += shifts[4]     

    for val in sentiment_shift:
        val /= num_paraphrases
    
print("AVERAGE SENTIMENT SHIFTS: {}".format(sentiment_shift))
print("MAX INCREASE FROM A PHRASE: {}".format(max_inc))
print("PHRASES THAT CAUSED EACH ^: {}".format(max_inc_para))
print("TOTAL NEGATIVE INCREASE: {}".format(tot_neg_inc))
print("TOTAL NEUTRAL INCREASE: {}".format(tot_neu_inc))
print("TOTAL POSITIVE INCREASE: {}".format(tot_pos_inc))
print("TOTAL NON-OFFENSIVE INCREASE: {}".format(tot_noff_inc))
print("TOTAL OFFENSIVE INCREASE: {}".format(tot_off_inc))

# score = 0.5*tot_neg_inc + 0.25*tot_neu_inc + 1.5*tot_off_inc
# score = tot_neg_inc + tot_neu_inc + tot_pos_inc + 3*(tot_noff_inc + tot_off_inc)
score = tot_noff_inc + tot_off_inc
# score = tot_neg_inc + tot_neu_inc + (tot_off_inc * 2)
print((q_string, score))
# print((q_string, sentiment_shift[0]+sentiment_shift[1]+sentiment_shift[3]))

SENTIMENT OF ORIGINAL SENTENCE: [0.0018395717, 0.05148052, 0.94667983, 0.80990493, 0.19009507]

grandparents
aunts and uncles
[0.0004102575, 0.018325869, -0.018736005, -0.09982771, 0.09982771]

grandchildren
[-0.0003691659, -0.011124246, 0.011493623, 0.01240021, -0.012400284]

siblings
[-0.0004157899, 0.005893085, -0.005477071, 0.0139247775, -0.013924703]

in-laws
[-0.00029044622, 0.006497167, -0.0062065125, -0.024477601, 0.024477646]

relatives
[0.00069687364, 0.024621267, -0.025318086, -0.006095648, 0.006095752]

other relatives
[0.00077235873, 0.02984209, -0.030614376, 0.0038375854, -0.00383766]

nephews
[-0.00034243532, 0.0064492896, -0.0061067343, 0.002517581, -0.0025176853]

great-grandparents
Paraphrase is superstring, skipping!

older siblings
[-0.00023880275, 0.0111381, -0.010899305, -0.0011742115, 0.0011741668]

grandkids
[-0.00047420408, -0.008419838, 0.008894205, -0.012013733, 0.0120137185]

adult children
[0.0030467235, 0.059609625, -0.0626564, -0.10941303, 0.10941309]

th