In [1]:
import polarice
import gensim.downloader as api

dataset = polarice.PolarizationDataset.load("data/dataset.p")
model = api.load("glove-wiki-gigaword-50")

# FrameAxis 

This module is built upon the FrameAxis approach [^1].

[^1] Kwak, H., An, J., & Ahn, Y. Y. (2020). FrameAxis: Characterizing Framing Bias and Intensity with Word Embedding. arXiv preprint arXiv:2002.08608.


In [105]:
%%writefile frame_axis.py

import numpy as np
import pandas as pd
from typing import Dict
import pickle as pkl
from collections import namedtuple, OrderedDict
import tqdm

"""Hierarchical representations for frame analysis: FrameWord --> FramePoles --> FrameAxes --> FrameSystem."""

class FrameWord:
    """Represents a single framing word. Mainly a wrapper."""
    
    def __init__(self, word, model=None):
        self.word = word
        self.model = model

class FramePole:
    """Represents a pole of the FrameAxis. Thus either positive or negative words."""
    
    def __init__(self, pole_name, words, model):
        self.pole_name = pole_name
        self.words = words
        self.initial_words = words  # For debugging
        self.model = model
        
    def compute(self):
        """Computes everything for its usage (e.g., centroid)."""
        self.retain_model_words_only()
        self.compute_centroid()
        return self
        
    def retain_model_words_only(self, log_removed=False):
        """Cleans the initial words to fit the supplied model."""
        pole_words = []
        for pole_word in self.words:
            if pole_word in self.model.key_to_index:
                pole_words.append(pole_word)
            else:
                if log_removed:
                    print(f"Word {pole_word} not in vocab")
        self.words = pole_words
        return pole_words
    
    def extract_vectors_from_model(self):
        """Extract the relevant vectors from the model. In same order as the words."""
        pole_vecs = []
        for pole_word in self.words:
            vec = self.model.get_vector(pole_word)
            pole_vecs.append(vec)
        print(len(pole_vecs))
        self.pole_vecs = pole_vecs
        return pole_vecs
        
    def compute_centroid(self):
        """Computes the centroid and vectors. Assumes valid vocabulary. Call `retain_model_words_only` beforehand."""
        pole_vecs = self.extract_vectors_from_model()
        centroid = np.mean(pole_vecs, axis=0)
        self.centroid = centroid
        return centroid

class FrameAxis:
    """Represents a Frame Axis, which is a Semantic Axis (SemAxis) with Bias and Intensity."""
    
    def __init__(self, name, pos_words, neg_words, wv_name, word_vectors):
        self.name = name
        self.pos_words = pos_words
        self.neg_words = neg_words
        self.wv_name = wv_name  # required to reproduce frame_axis
        self.word_vectors = word_vectors
        # TODO: compute axis
        self.axis = None
        self.baseline_bias = None
        self.model = word_vectors
        
        self.sim_cache = dict()  # use it to cache word similarities for reuse
    
    @classmethod
    def from_poles(cls, pos_pole, neg_pole):
        name = pos_pole.pole_name + "/" + neg_pole.pole_name
        assert pos_pole.model == neg_pole.model
        return cls(name, pos_pole.words, neg_pole.words, "", pos_pole.model)
        
    @classmethod
    def load(cls, filename):
        with open(filename, "rb") as f:
            return pkl.load(f)
        
    def save(self, filename):
        with open(filename, "wb") as f:
            pkl.dump(self, f)
        
    def attach_model(self, model):
        self.model = model
        
    def detach_model(self):
        self.model = None
        
    def compute_word_sim(self, word):
        pass
    
    def compute(self):
        self.pos_words, _ = self.retain_words_in_model(self.pos_words, self.model)
        self.neg_words, _ = self.retain_words_in_model(self.neg_words, self.model)
        self.compute_axis()
        return self
        
    def compute_axis(self):
        pos_centroid, pos_vecs = self.compute_centroid(self.pos_words, self.word_vectors)
        neg_centroid, neg_vecs = self.compute_centroid(self.neg_words, self.word_vectors)
        self.pos_centroid = pos_centroid
        self.neg_centroid = neg_centroid
        axis = pos_centroid - neg_centroid
        self.axis = axis
        return axis
        
    def retain_words_in_model(self, initial_words, model=None):
        if not model:
            model = self.model
        
        words_in_vocab = []
        words_not_in_vocab = []
        for word in initial_words:
            if word in model.key_to_index:
                words_in_vocab.append(word)
            else:
                words_not_in_vocab.append(word)
        return words_in_vocab, words_not_in_vocab

    def compute_centroid(self, frame_words, model=None):
        """Assumes valid list of words."""
        if not model:
            model = self.model
        
        frame_vecs = []
        for frame_word in frame_words:
            assert frame_word in model.key_to_index
            vec = model.get_vector(frame_word)
            frame_vecs.append(vec)
        centroid = np.mean(frame_vecs, axis=0)
        return centroid, frame_vecs
    
    def compute_bias_document(self, doc, model=None):
        if not model:
            model = self.model
    
        word_vecs = []
        words = doc.split()
        for word in words:
            if not word in model.key_to_index:
                continue
            word_vec = model.get_vector(word)
            word_vecs.append(word_vec)
        if not word_vecs:
            return 0  # No bias when no words
        sims = model.cosine_similarities(self.axis, word_vecs)
        return np.sum(sims) / len(words)
        
    def compute_baseline_bias(self, docs, model=None):
        if not model:
            model = self.model
        
        doc_biases = []
        for doc in docs:
            doc_biases.append(self.compute_bias_document(doc, model))
        baseline_bias = np.mean(doc_biases)
        self.baseline_bias = baseline_bias
        return baseline_bias
    
    def compute_intensity_document(self, doc, baseline_bias=None):
        if not baseline_bias:
            baseline_bias = self.baseline_bias
        if not baseline_bias:
            raise ValueError("Neithher baseline_bias provided nor inherent to object.")
        word_vecs = []
        words = doc.split()
        for word in words:
            if not word in self.word_vectors.key_to_index:
                continue
            word_vec = self.word_vectors.get_vector(word)
            word_vecs.append(word_vec)
        if not word_vecs:
            return 0  # No bias when no words
        sims = self.word_vectors.cosine_similarities(self.axis, word_vecs)
        sim_dev = (sims - baseline_bias)**2
        return np.sum(sim_dev) / len(words)
    
    def compute_baseline_intensity(self, docs, baseline_bias=None, model=None):
        if not model:
            model = self.model
        
        doc_intensities = []
        for doc in docs:
            doc_intensity = self.compute_intensity_document(doc, baseline_bias)
            doc_intensities.append(doc_intensity)
        baseline_intensity = np.mean(doc_intensities)
        self.baseline_intensity = baseline_intensity
        return baseline_intensity

    def effect_size(self, corpus: pd.Series, num_bootstrap_samples=1000):
        corpus_bias = self.compute_baseline_bias(corpus)
        corpus_intensity = self.compute_baseline_intensity(corpus, baseline_bias=corpus_bias)
        boostrap_samples = [corpus.sample(n=len(corpus), replace=True) for _ in range(num_bootstrap_samples)]
        
        cum_sample_bias = 0
        cum_sample_intensity = 0
        for sample in tqdm.tqdm(boostrap_samples):
            sample = corpus.sample(n=len(corpus))
            bootstrapped_bias = self.compute_baseline_bias(sample)
            bootstrapped_intensity = self.compute_baseline_intensity(sample, baseline_bias=bootstrapped_bias)
            cum_sample_bias += bootstrapped_bias
            cum_sample_intensity += bootstrapped_intensity
        # Effect sizes for bias and intensity
        eta_bias = abs(corpus_bias - cum_sample_bias/num_bootstrap_samples)
        eta_intensity = abs(corpus_intensity - cum_sample_intensity/num_bootstrap_samples)
        
        EffectSize = namedtuple("EffectSize", ["eta_bias", "eta_intensity"])
        return EffectSize(eta_bias, eta_intensity)

def compute_bias(document, frame_vector, model):
    word_vecs = []
    words = document.split(" ")
    for word in words:
        if not word in model.key_to_index:
            continue
        word_vec = model.get_vector(word)
        word_vecs.append(word_vec)
    if not word_vecs:
        return 0  # No bias when no words
    sims = model.cosine_similarities(frame_vector, word_vecs)
    return np.sum(sims) / len(words)

def compute_intensity(document, frame_vector, model, baseline_bias):
    word_vecs = []
    words = document.split(" ")
    for word in words:
        if not word in model.key_to_index:
            continue
        word_vec = model.get_vector(word)
        word_vecs.append(word_vec)
    if not word_vecs:
        return 0  # No bias when no words
    sims = model.cosine_similarities(frame_vector, word_vecs)
    sim_dev = (sims - baseline_bias)**2
    return np.sum(sim_dev) / len(words)

class FrameSystem:
    """Represents a set of FrameAxes that form a complete system."""
    
    def __init__(self, frame_axes: Dict[str, FrameAxis]):
        self.frame_axes = frame_axes
        
    def transform_df(self, df: pd.DataFrame, text_col: str, model):
        for name, axis in self.frame_axes.items():
            pos_name, neg_name = name.split("/")  # TODO: move this informtion to FrameAxis
            axis_code = pos_name[:4]
            df[axis_code + "_bias"] = df[text_col].map(lambda x: compute_bias(x, axis.axis, model))
            baseline_bias = df[axis_code + "_bias"].mean()
            df[axis_code + "_inte"] = df[text_col].map(lambda x: compute_intensity(x, axis.axis, model, baseline_bias))
        return df
    
    def axes_ordered_by_effect_sizes(self, corpus: pd.Series, num_bootstrap_samples=1000, sort_key="eta_bias"):
        axes_effect_sizes = {}
        for name, axis in self.frame_axes.items():
            axes_effect_sizes[name] = axis.effect_size(corpus=corpus, num_bootstrap_samples=num_bootstrap_samples)
        return OrderedDict(sorted(axes_effect_sizes.items(), key=lambda x: -getattr(x[1], sort_key)))
    
    def compute_baseline_biases(self, df: pd.DataFrame, text_col: str, model):
        baseline_biases = {}
        for name, axis in self.frame_axes.items():
            pos_name, neg_name = name.split("/")  # TODO: move this informtion to FrameAxis
            axis_code = pos_name[:4]
            baseline_bias = df[text_col].map(lambda x: compute_bias(x, axis.axis, model)).mean()
            baseline_biases[name] = baseline_bias
        return baseline_biases
    
    def attach_model(self, model):
        self.model = model
        for name, axis in self.frame_axes.items():
            axis.attach_model(model)
            
    def compute(self):
        for name, axis in self.frame_axes.items():
            axis.compute()
        return self
    
    @classmethod
    def load(cls, filename):
        with open(filename, "rb") as f:
            return pkl.load(f)
        
    def save(self, filename):
        # Detach models before storing
        for _, frame_axis in self.frame_axes.items():
            frame_axis.detach_model()
        
        with open(filename, "wb") as f:
            pkl.dump(self, f)
            
# Helper classes
            
class FrameExperiment:
    """Combination of Dictionary Mapping, WordEmbeddings, and Dataset. Used to conduct evaluations."""
    def __init__(self, dictionary_mapping, wordembeddings, dataset):
        self.dictionary_mapping = dictionary_mapping
        self.wordembeddings = wordembeddings
        self.dataset = dataset
        
class FrameVisualization:
    """Used to visualize the frame system."""
    def __init__(self, frame_system: FrameSystem):
        self.frame_system = frame_system
            
def frame_axis_polarization(frame_axis):
    pos_centroid = frame_axis.pos_centroid
    neg_centroid = frame_axis.neg_centroid
    pos = frame_axis.pos_words
    neg = frame_axis.neg_words
    
    word_vectors = frame_axis.word_vectors
    
    inter_dist_pos = np.mean(word_vectors.distances(pos_centroid, pos))
    inter_dist_neg = np.mean(word_vectors.distances(neg_centroid, neg))
    between_dist_pos = np.mean(word_vectors.distances(pos_centroid, neg))
    between_dist_neg = np.mean(word_vectors.distances(neg_centroid, pos))
    print(f"{inter_dist_pos=} {inter_dist_neg=} {between_dist_pos=} {between_dist_neg=}")
    
    return (between_dist_pos+between_dist_neg)/2-(inter_dist_pos+inter_dist_neg)/2

Overwriting frame_axis.py


In [106]:
%run frame_axis.py

In [89]:
pole1 = ["carbon", "mitigation"]
pole2 = ["migration", "adaption"]
fa = FrameAxis("Climate", pole1, pole2, "glove-wiki-gigaword-50", model)
fa.compute_axis()

array([ 0.03649503,  0.44649416,  1.3033249 , -0.09901801, -0.543165  ,
        0.06154996,  1.1722801 , -0.11289489,  0.37668502,  0.62156   ,
       -0.52041996,  0.10456613, -0.072725  , -0.25025004, -0.2866685 ,
        0.774215  ,  1.00493   ,  0.42712498,  0.28918886, -1.32272   ,
        0.46895504, -0.16249001,  0.58235496, -1.5769    , -1.02247   ,
       -0.10630002,  0.30814502,  0.64058   ,  0.317215  , -0.03951496,
        0.29331493, -0.2059555 , -0.30391002, -0.70380497,  0.903245  ,
        0.24992001,  0.746135  ,  0.08126003,  1.6797299 , -0.38700998,
       -0.47811   , -0.5595455 ,  1.02496   ,  0.20258254,  0.3488285 ,
        0.51743495, -0.50139296,  0.410907  ,  0.56883854,  0.340655  ],
      dtype=float32)

In [90]:
bias_pole1 = fa.compute_bias_document(" ".join(pole1), model)
print(bias_pole1)
assert bias_pole1 > 0

0.5866973996162415


In [91]:
bias_pole2 = fa.compute_bias_document(" ".join(pole2), model)
print(bias_pole2)
assert bias_pole2 < 0

-0.3108203411102295


In [92]:
# From here on, we need a model
fa.attach_model(model)

In [93]:
dataset.df["bias"] = dataset.df[dataset.text_column].map(fa.compute_bias_document)
dataset.df["bias"].describe()

count    1100.000000
mean        0.091483
std         0.014653
min         0.048531
25%         0.079724
50%         0.089863
75%         0.102914
max         0.130834
Name: bias, dtype: float64

In [94]:
dataset.df[dataset.df["group"] == "adaption"]["bias"].describe()

count    500.000000
mean       0.079753
std        0.008566
min        0.048531
25%        0.074649
50%        0.079393
75%        0.084895
max        0.105505
Name: bias, dtype: float64

In [95]:
dataset.df[dataset.df["group"] == "mitigation"]["bias"].describe()

count    500.000000
mean       0.101707
std        0.011240
min        0.071160
25%        0.094788
50%        0.102441
75%        0.109304
max        0.130834
Name: bias, dtype: float64

In [96]:
fa.compute_baseline_bias(dataset.df[dataset.text_column].values)

0.09148284338588908

In [97]:
dataset.df["inte"] = dataset.df[dataset.text_column].map(fa.compute_intensity_document)
dataset.df["inte"].describe()

count    1100.000000
mean        0.014131
std         0.003604
min         0.007037
25%         0.011049
50%         0.013707
75%         0.017065
max         0.025742
Name: inte, dtype: float64

In [98]:
# Test IO
import tempfile

tempdir = tempfile.gettempdir() + "/"
fa.save(tempdir + "test.pkl")
FrameAxis.load(tempdir + "test.pkl")

<__main__.FrameAxis at 0x7f18d785a070>

In [99]:
frame_axis_polarization(fa)

inter_dist_pos=0.16403705 inter_dist_neg=0.23015118 between_dist_pos=0.7200742 between_dist_neg=0.68450624


0.5051960647106171

In [100]:
fa.effect_size(dataset.df[dataset.text_column], num_bootstrap_samples=2)

EffectSize(eta_bias=1.5796836061454655e-11, eta_intensity=3.761895309151164e-12)