## Supplementary code package for paper submission: 'Semantically Informed Slang Interpretation'.


This notebook contains the supplementary code package for 'Semantically Informed Slang Interpretation'. Since we cannot publically release all revelant datasets used in the study due to copyright terms, the purpose of this notebook is to provide an illustration of how the main results from the paper can be reproduced. Specifically, the code package includes all required non-standard code dependencies and code in this notebook show how results can be reproduced using these libraries.

We include custom versions of code released from previous work in the following directories:

- **/CatGO** - A categorization library from Zhewei Sun, Richard Zemel, and Yang Xu, 'Slang generation as categorization', 2019.
- **/ilm** - A pre-trained GPT-2 based language infill model from Chris Donahue, Mina Lee, and Percy Liang, 'Enabling language models to fill in the blanks', 2020
- **/slanggen** - A library for training contrastively learned slang sense embeddings from Zhewei Sun, Richard Zemel, and Yang Xu, 'A computational framework for slang generation', 2021.

Here is a list of scientific Python packages you'll need:

- numpy
- scipy
- nltk
- gensim
- Flair
- PyTorch
- transformers
- sentence_transformers

Import all required dependencies:

In [None]:
import io
import os
import pickle
import re

from tqdm import trange
from tqdm import tqdm

import numpy as np
import scipy.spatial.distance as dist
import scipy.stats
from scipy.stats import norm, mode
from scipy.optimize import minimize

from collections import defaultdict, namedtuple, Counter, defaultdict

from nltk.corpus import stopwords as sw
from nltk.corpus import wordnet as wn
from nltk.stem import WordNetLemmatizer
from nltk.stem.porter import PorterStemmer
from gensim.utils import simple_preprocess

from CatGO.categorize import Categorizer

from slanggen.util import *
from slanggen.dataloader import UD_Wil_Dataset, OED_Dataset
from slanggen.encoder import FTEncoder
from slanggen.contrastive import SlangGenTrainer
from slanggen.model import SlangGenModel

import torch
from transformers import GPT2LMHeadModel

import ilm.tokenize_util
import ilm.infer

from flair.data import Sentence
from flair.models import SequenceTagger

from sentence_transformers import SentenceTransformer

In [None]:
#taken from this StackOverflow answer: https://stackoverflow.com/a/39225039
import gdown

model_url = 'https://drive.google.com/uc?id=1ao0_wjaNAi7qOZ-SBmJpi-ES_KgAwp8I'
model_path = './pretrain/models/sto_ilm/pytorch_model.bin'
gdown.download(model_url, model_path, quiet=False)
OED_url = 'https://drive.google.com/uc?id=1BkXL7gzg3TE9VynTRnVzkE9qPzGKQtcf'
OED_path = './OED_Urban_def_full.npy'
gdown.download(OED_url, OED_path, quiet=False)

### Load and prepare data

Loads the dataset and assosiated indices for data splits. Note that the Oxford Dictionary (OD) data cannot be included so this is just an illustration. 

In [None]:
oed_data = OED_Dataset('OED_Urban_def_full.npy')

ud_dir = '../'
dataset = UD_Wil_Dataset(ud_dir+'Data/', oed_data, load_oov=True)

# ========== create the splits ==========
data_length = len(dataset.slang_data)

# Determine the sizes of each split
train_size = int(data_length * 0.7)
dev_size = int(data_length * 0.15)

# Create index arrays
indices = np.arange(data_length)
np.random.shuffle(indices)
train_ind = indices[:train_size]
dev_ind = indices[train_size:train_size+dev_size]
test_ind = indices[train_size+dev_size:]

# Save index arrays as .npy files
np.save(ud_dir+'train_ind.npy', train_ind)
np.save(ud_dir+'dev_ind.npy', dev_ind)
np.save(ud_dir+'test_ind.npy', test_ind)
# ========== create the splits ==========

slang_inds = DataIndex(np.load(ud_dir+'train_ind.npy'), np.load(ud_dir+'dev_ind.npy'), np.load(ud_dir+'test_ind.npy'))

# ========== verify ==========
print('Train:', len(slang_inds.train))
print('Dev:', len(slang_inds.dev))
print('Test:', len(slang_inds.test))
print('Dataset:\n', dataset)

Extra data bookkeeping:

In [None]:
train_dev_ind = np.concatenate((slang_inds.train, slang_inds.dev))

ex_sents_test = []
ex_sents_inds = []
gt_words_test = []

for i in range(slang_inds.test.shape[0]):
    ind = slang_inds.test[i]
    for s in dataset.slang_data[ind].meta_data['context']:
        ex_sents_test.append(s.strip())
        ex_sents_inds.append(i)
        gt_words_test.append(dataset.slang_data[ind].word)

### Obtain interpretation candidates from a pre-trained language infill model

Set up a POS tagger and tag the blanked out slang expression in the context sentence for every test entry. Note that the slang expression itself is not provided to the tagger to mitigate potential biases. 

In [None]:
penn_pos_map = {'JJ':'adj',\
                'JJR':'adj',\
                'JJS':'adj',\
                'UH':'interj',\
                'RB':'adv',\
                'RBR':'adv',\
                'RBS':'adv',\
                'WRB':'adv',\
                'NN':'noun',\
                'NNS':'noun',\
                'NNP':'noun',\
                'NNPS':'noun',\
                'MD':'verb',\
                'VB':'verb',\
                'VBD':'verb',\
                'VBG':'verb',\
                'VBN':'verb',\
                'VBP':'verb',\
                'VBZ':'verb'}

def conv_penn_pos(tag):
    if tag in penn_pos_map:
        return penn_pos_map[tag]
    return 'other'

tagger = SequenceTagger.load('pos')

punctuations = '!"#$%&()\*\+,-\./:;<=>?@[\\]^_`{|}~'
re_punc = re.compile(r"["+punctuations+r"]+")

def tokenize(sentence):
    return re.compile(r"(?:^|(?<=\s))\S+(?=\s|$)").findall(sentence)

ex_sents_pos = []

for s in tqdm(ex_sents_test):
    sent = re_punc.sub('', s)
    gap_pos = 0
    for j, token in enumerate(tokenize(sent)):
        if len(token) >= 9:
            if token[:9] == 'SLANGAAAP':
                gap_pos = j
                break
    sent = Sentence(re.compile('SLANGAAAP').sub('slanggg', sent))
    tagger.predict(sent)
    # I think this is the right way to do this but we should check
    # See: https://github.com/flairNLP/flair/blob/d55c0e9989e69c954f904bf8a2dd101e6f982948/flair/models/sequence_tagger_model.py#L24
    # tag_pred = conv_penn_pos(sent.get_spans('pos')[gap_pos].tag)
    tag_pred = conv_penn_pos(sent[gap_pos].tag)
    
    ex_sents_pos.append(tag_pred)

Set up a pre-trained language infill model from Donahue et al. (2020). The model can be downloaded from their original repository.

In [None]:
# For issues see: https://github.com/chrisdonahue/ilm?tab=readme-ov-file
MODEL_DIR = 'pretrain/models/sto_ilm' # Change this to where you have placed the pre-trained model
MASK_CLS = 'ilm.mask.hierarchical.MaskHierarchical'

tokenizer = ilm.tokenize_util.Tokenizer.GPT2
with open(os.path.join(MODEL_DIR, 'additional_ids_to_tokens.pkl'), 'rb') as f:
    additional_ids_to_tokens = pickle.load(f)
additional_tokens_to_ids = {v:k for k, v in additional_ids_to_tokens.items()}
try:
    ilm.tokenize_util.update_tokenizer(additional_ids_to_tokens, tokenizer)
except ValueError:
    print('Already updated')
print(additional_tokens_to_ids)

_blank_id = ilm.tokenize_util.encode(' _', tokenizer)[0]

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GPT2LMHeadModel.from_pretrained(MODEL_DIR)
model.eval()
_ = model.to(device)

Apply the language infill model to obtain a list of infilled words for each test example.

In [None]:
def infill_prob(sentence, n_words=5):

    # Preprocess Sentence

    context = sentence.replace('[*SLANGAAAP*]', ' _ ')

    context_ids = ilm.tokenize_util.encode(context, tokenizer)

    context_ids[context_ids.index(_blank_id)] = additional_tokens_to_ids['<|infill_word|>']

    # Obtain Probability Distribution from Softmax

    probs = ilm.infer.infill_with_ilm(
        model,
        additional_tokens_to_ids,
        context_ids,
        num_infills=1).cpu().numpy()[0]

    # Collect Words and Probabilities
    
    top_probs = np.argsort(probs)[::-1]
    top_words = ilm.tokenize_util.ids_to_tokens(top_probs[:n_words], tokenizer)
  
    return probs[top_probs[:n_words]], top_words

infill_results_raw = np.asarray([infill_prob(sent, 150) for sent in tqdm(ex_sents_test)])

First filter out words that contain non-alphanumeric characters. Then, check the part-of-speech (POS) tag predicted from the usage context to see if it matches the candadiate words. Words with matching POS tags are moved to the front of the list. Finally, keep the top 50 candidate words for each test example.

In [None]:
def alphanum_check(w):
    if len(w) == 0:
        return False
    for c in w:
        c_num = ord(c)
        if not ((c_num >= 48 and c_num <= 57) or (c_num >= 65 and c_num <= 90) or (c_num >= 97 and c_num <= 122)):
            return False
    return True

hist_pos_map = {'ADJ':'adj',\
                'X':'interj',\
                'ADV':'adv',\
                'VERB':'verb',\
                'NOUN':'noun'}

def conv_hist_pos(tag):
    if tag in hist_pos_map:
        return hist_pos_map[tag]
    return 'other'

# You'll need to download this from the HistWord Project by Hamilton et al. (2016).
hist_counts = pickle.load(open('histwords/eng-all/pos/1990-pos_counts.pkl', 'rb'))
hist_vocab = set(hist_counts.keys())

def get_hist_posdist(word):
    results = defaultdict(float)
    for key,val in hist_counts[word].items():
        results[conv_hist_pos(key)] += val
    total = np.sum(list(results.values()))
    for key in results.keys():
        results[key] /= total
    return results

def pos_check(word, tag, threshold=0.05):
    if word not in hist_vocab:
        return False
    hist_posdist = get_hist_posdist(word)
    return hist_posdist[tag] >= threshold

def filter_words(result, tag, n_words=50):
    probs = np.asarray(result[0])
    words = np.asarray([s.strip() for s in result[1]])
    result_mask = np.arange(len(result[1]), dtype=np.int32)
    good_pos = set()

    for i in range(result_mask.shape[0]):
        if not alphanum_check(words[i]):
            result_mask[i] = -1
        if pos_check(words[i], tag):
            good_pos.add(i)

    result_mask = np.asarray([i for i in result_mask if i != -1], dtype=np.int32)
    
    mask_A = np.asarray([i for i in result_mask if i in good_pos], dtype=np.int32)
    mask_B = np.asarray([i for i in result_mask if i not in good_pos], dtype=np.int32)
    
    probs_A = probs[mask_A]
    words_A = words[mask_A]
    
    probs_B = probs[mask_B]
    words_B = words[mask_B]

    result_dict_A = defaultdict(float)
    for i in range(words_A.shape[0]):
        result_dict_A[words_A[i].lower()] += float(probs_A[i])
    result_dict_B = defaultdict(float)
    for i in range(words_B.shape[0]):
        result_dict_B[words_B[i].lower()] += float(probs_B[i])

    result_keys_A = np.asarray(list(result_dict_A.keys()))
    result_values_A = np.asarray(list(result_dict_A.values()))
    result_keys_B = np.asarray(list(result_dict_B.keys()))
    result_values_B = np.asarray(list(result_dict_B.values()))

    sort_ind_A = np.argsort(result_values_A)[::-1]
    result_words_A = result_keys_A[sort_ind_A]
    result_probs_A = result_values_A[sort_ind_A]
    
    sort_ind_B = np.argsort(result_values_B)[::-1]
    result_words_B = result_keys_B[sort_ind_B]
    result_probs_B = result_values_B[sort_ind_B]
    
    return np.concatenate((result_words_A, result_words_B))[:n_words], np.concatenate((result_probs_A, result_probs_B))[:n_words]

infill_results = np.asarray([filter_words(infill_results_raw[i], ex_sents_pos[i]) for i in range(infill_results_raw.shape[0])])

For each predicted word, look up the Oxford Dictionary to find an associated definition. If the word cannot be found, try its lemmatized and stemmed version. If all fails, the word itself is taken as the definition sentence.

In [None]:
infill_vocab = set()

for entry in infill_results:
    for w in entry[0]:
        infill_vocab.add(w)
        
infill_vocab = np.asarray(sorted(list(infill_vocab)))
infill_vocab_inds = {infill_vocab[i]:i for i in range(infill_vocab.shape[0])}

lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()

cand_sentences = []
cand_sent_map = {}
c = 0

for v in infill_vocab:
    if v in oed_data.vocab:
        word = v
    elif lemmatizer.lemmatize(v) in oed_data.vocab:
        word = lemmatizer.lemmatize(v)
    elif stemmer.stem(lemmatizer.lemmatize(v)) in oed_data.vocab:
        word = stemmer.stem(lemmatizer.lemmatize(v))
    else:
        word = None
        c += 1
    if word is None:
        cand_sentences.append(v)
    else:
        cand_sentences.append(oed_data.data[word].definitions[0]['def'])
    cand_sent_map[v] = cand_sentences[-1]

### Training contrastive sense encodings

Adapt code from Sun et al. (2021) to train contrastive sense encodings (CSE) using training entries from the dataset:

In [None]:
ft_encoder = FTEncoder('fastText/crawl-300d-2M-subword') # Points to the directory that contains downloaded fastText embeddings.
trainer = SlangGenTrainer(dataset, word_encoder=ft_encoder, out_dir=out_dir, verbose=True)
model = SlangGenModel(trainer, data_dir=out_dir)
params = {'embed_name':'SBERT_contrastive', 'out_name':'predictions', 'model':'cf_prototype_5', 'prior':None, 'prior_name':'uniform', 'contr_params':None}
model.train_contrastive(slang_inds, fold_name='udwil', params=params)

Optional: Train the original slang generation objective from Sun et al. (2021) to obtain good estimates for the kernel width parameters $h_m$ and $h_{cf}$. Note that this can be very memory intensive on the Urban Dictionary data because of its size. Can change params.model to 'prototype' instead to only estimate the $h_m$ parameter which is less memory intensive.

In [None]:
model.train_categorization(slang_inds, fold_name='udwil', params=params)

p_dir = model.data_dir + '/' + fold_name + '/' + params['out_name'] + '/'

with open(p_dir+"parameters_"+params['prior_name']+".pkl","rb") as param_file:
    gen_params = pickle.load(param_file)

###  Semantically informed reranking

Check the appropriateness of each candidate interpretation (i.e. a predicted slang meaning) predicted by the language infill model against the slang's conventional meaning using the trained contrastive sense encoding with a prototype model. Apply collaborative filtering to take parallel semantic change into account.

In [None]:
fold_name='udwil'

N_neighbors = 5
neighbors = np.argsort(trainer.word_dist, axis=1)

infill_cand_defs = []
infill_word_ind = np.zeros((len(gt_words_test), N_neighbors), dtype=np.int32)

for i in range(len(gt_words_test)):
    infill_cand_defs.append([cand_sent_map[w] for w in infill_results[i][0]])
    infill_word_ind[i] = neighbors[dataset.word2id[gt_words_test[i]], :N_neighbors]
        
infill_cand_defs = np.asarray(infill_cand_defs)

h_model = gen_params['prototype'][0]
h_word = 0.1

# If you have enough RAM to estimate both parameters, here's what you would use:
#h_model = gen_params['cf_prototype_5'][0]
#h_word = gen_params['cf_prototype_5'][1]

vd_vocab = normalize(np.exp(-1*trainer.word_dist/h_word), axis=1)

vocab_embeds = model.load_exemplar_embeddings(fold_name=fold_name, params=params)
vocab_proto = np.zeros((dataset.V, len(vocab_embeds[0][0])))
for i in range(dataset.V):
    vocab_proto[i] = np.mean(vocab_embeds[i], axis=0)

preds = np.zeros((len(gt_words_test), infill_results.shape[2]))

for k in trange(len(gt_words_test)):

    prototypes = trainer.get_testtime_embeddings(infill_cand_defs[k], fold_name=fold_name)
    queries = vocab_proto[infill_word_ind[k]]

    N_query = queries.shape[0]
    vd_prototype = np.zeros((N_query, prototypes.shape[0]))

    for i in range(N_query):
        vd_prototype[i] = np.linalg.norm(prototypes - queries[i], axis=1)
    vd_prototype = -1*vd_prototype**2

    l_prototype = normalize(np.exp(vd_prototype/h_model), axis=1)

    cf_weights = vd_vocab[infill_word_ind[k, 0], infill_word_ind[k]]
    preds[k] = normalize_1d(np.sum(l_prototype * normalize(cf_weights[:, np.newaxis], axis=0), axis=0))
    
np.save(o_dir+'interp_lm_ssi.npy', preds)

### Evaluation

Obtain Sentence-BERT embeddings of all definitions sentences involved in the evaluation.

In [None]:
sbert_model = SentenceTransformer('bert-base-nli-mean-tokens')

train_dev_sents = np.asarray([dataset.slang_data[i].def_sent for i in train_dev_ind])
train_dev_embeds = normalize_L2(np.asarray(sbert_model.encode(train_dev_sents)))

test_sents = np.asarray([dataset.slang_data[i].def_sent for i in slang_inds.test])
test_embeds = normalize_L2((sbert_model.encode(test_sents)))

vocab_base_embeds = normalize_L2(np.asarray(sbert_model.encode(cand_sentences)))

infill_base_embed = np.zeros((infill_results.shape[0], 50, vocab_base_embeds.shape[1]))

for i in range(infill_results.shape[0]):
    for j in range(50):
        infill_base_embed[i, j] = vocab_base_embeds[infill_vocab_inds[infill_results[i][0][j]]]

For each test entry, sample 4 negative definition sentences from the training set and the dev set.

In [None]:
def is_close_def(query_sent, target_sent, threshold=0.5):
    query_s = [w for w in simple_preprocess(query_sent) if w not in stopwords]
    target_s = set([w for w in simple_preprocess(target_sent) if w not in stopwords])
    overlap_c = 0
    for word in query_s:
        if word in target_s:
            overlap_c += 1
    return overlap_c >= len(query_s) * threshold

def sample_def(ref, num_samples=4):
    N_cand = train_dev_embeds.shape[0]
    samples = []
    ref_sent = dataset.slang_data[slang_inds.test[ref]].def_sent
    while len(samples) < num_samples:
        new_sample = np.random.randint(N_cand)
        new_sent = train_dev_sents[new_sample]
        # Comment out the following two lines for a completely random sample
        if not is_close_def(ref_sent, new_sent):
            samples.append(new_sample)
        samples.append(new_sample)
    return samples

test_neg_samples = np.asarray([sample_def(i) for i in range(slang_inds.test.shape[0])])

Function used to compute and rank the semantic distance between the predicted definition again the groundtruth definition and 4 other negatively sampled definitions:

In [None]:
def compute_pred_ranks(pred, n=50):
    pred_ranks = np.empty((len(ex_sents_inds), n), dtype=np.int32)

    for i in range(len(ex_sents_inds)):
        pred_embeds = infill_base_embed[i][pred[i, :n]]
        cand_embeds = np.concatenate((test_embeds[ex_sents_inds[i]][np.newaxis, :], train_dev_embeds[test_neg_samples[ex_sents_inds[i]]]))
        _, ranks = np.where(np.argsort(dist.cdist(pred_embeds, cand_embeds), axis=1)==0)
        ranks = ranks + 1
        pred_ranks[i] = ranks

    return pred_ranks

Compute and print the mean reciprocal rank (MRR) results for both the baseline and the semantically informed model.

In [None]:
def compute_interp_results(sg_probs, alpha = 0.5, epsilon = 1e-7):

    results = {}

    prob_baseline = np.asarray(infill_results[:,1,:], dtype=np.float32) + epsilon
    prob_ssi = sg_probs + epsilon

    pred_baseline = np.argsort(normalize(prob_baseline), axis=1)[:, ::-1]
    pred_ssi = np.argsort(normalize(prob_ssi), axis=1)[:, ::-1]

    pred_baseline_ranks = compute_pred_ranks(pred_baseline)
    pred_ssi_ranks = compute_pred_ranks(pred_ssi)
    
    results['pred_baseline'] = pred_baseline
    results['pred_ssi'] = pred_ssi
    
    results['pred_baseline_ranks'] = pred_baseline_ranks
    results['pred_ssi_ranks'] = pred_ssi_ranks
    
    return results

def print_results_interp_mrr(ranks, model_name='default'):
    print(model_name+" - top 1: %f" % np.mean(1/ranks[:,0]))
    
def eval_results_interp_mrr(results):
    print("---")
    print_results_interp_mrr(results['pred_baseline_ranks'], 'LM Infill')
    print("---")
    print_results_interp_mrr(results['pred_ssi_ranks'], 'LM Infill + SSI')
    print("---")
    
sg_probs = np.load(o_dir+'interp_lm_ssi.npy')
results = compute_interp_results(sg_probs)
eval_results_interp_mrr(results)

Chance MRR is 0.457 as computed as follows:

In [None]:
# MRR - chance
np.mean(1/np.arange(1,6))

Slang interpretation results for the i'th test example can be retrieved as follows:

In [None]:
i = 0

# Plain word predictions
print('[LM Infill] '+','.join(infill_results[i, 0][results['pred_baseline'][i,:5]]))
print('[LM Infill + SSI] '+','.join(infill_results[i, 0][results['pred_ssi'][i,:5]]))

# Predictions after dictionary lookup
print('[LM Infill]\n'+'\n'.join([cand_sent_map[w] for w in infill_results[i, 0][results['pred_baseline'][i,:5]]]))
print('[LM Infill + SSI]\n'+'\n'.join([cand_sent_map[w] for w in infill_results[i, 0][results['pred_ssi'][i,:5]]]))