In [2]:
# Importing libraries
from typing import List, Tuple, Dict, List
import numpy as np

In [3]:
class VectorModel:
    
    def __init__(self, vector_dict: Dict[str, np.ndarray]):
        self.index = dict()
        self.reverse_index = dict()
        arrays = []
        for idx, item in enumerate(vector_dict.items()):
            word, vec = item
            arrays.append(vec)
            self.index[word] = idx
            self.reverse_index[idx] = word
        self.embeddings = np.stack(arrays)
        
    def embed(self, word: str) -> np.ndarray:
        if word in self.index:
            return self.embeddings[self.index[word]]
    
    def vector_size(self) -> int:
        return self.embeddings.shape[1]
    
    def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

    def most_similar(self, word: str, top_n: int=5) -> List[Tuple[str, float]]:
        if word not in self.index:
            return None
        
        sims = []
        for idx in range(len(self.index)):
            sims.append(self.cosine_similarity(self.embed(word), self.embeddings[idx]))
        sim = np.array(sims)
        top_n += 1
        topn_indices = list(np.argpartition(sims, -top_n)[-top_n:])
        if self.index[word] in topn_indices:
            topn_indices.remove(self.index[word])
        else:
            topn_indices = topn_indices[:-1]
        
        result = []
        for idx in topn_indices:
            result.append((self.reverse_index[idx], sim[idx]))
        return sorted(result, key=lambda x: -x[1])
        
    def most_similar_vec(self, vec: np.ndarray, top_n: int=5) -> List[Tuple[str, float]]:
       
        sims = []
        for idx in range(len(self.index)):
            sims.append(self.cosine_similarity(vec, self.embeddings[idx]))
        sim = np.array(sims)
        topn_indices = list(np.argpartition(sims, -top_n)[-top_n:])
        
        result = []
        for idx in topn_indices:
            result.append((self.reverse_index[idx], sim[idx]))
        return sorted(result, key=lambda x: -x[1])

In [None]:
class SemanticAxis:
    
    def __init__(self, model: VectorModel):
        """
        Args:
            embedding_dict (Dict[str, np.ndarray]) - A dictionary of embedding vectors
        """
        self.model = model
    
    def fit(
        self, 
        positive_seeds: List[str],
        negative_seeds: List[str]
    ):
        positive_centroid = np.array([self.model.embed(seed) for seed in positive_seeds]).sum(axis=0)
        negative_centroid = np.array([self.model.embed(seed) for seed in negative_seeds]).sum(axis=0)
        
        axis = positive_centroid - negative_centroid
        
        scores = dict()
        
        for token in self.model.index:
            scores[token] = self.model.cosine_similarity(axis, self.model.embed(token))
            
        # Make sure scores have zero mean and unit variance
        mean = np.mean(list(scores.values()))
        std = np.std(list(scores.values()))
        
        
        return {
            token: (score - mean) / std
            for token, score in scores.items()
        }

In [None]:
class SentimentPropagation:
    """
    Sentiment Propagation according to Hamilton et al (2016)
    https://doi.org/10.18653/v1/D16-1057
    """
    
    def __init__(self, embedding_dict: Dict[str, np.ndarray]):
        """
        Args:
            embedding_dict (Dict[str, np.ndarray]) - A dictionary of embedding vectors
        """
        self.word_index = dict()
        self.index_word = dict()
        self.embeddings = list()
        for idx, item in enumerate(embedding_dict.items()):
            token, embedding = item
            self.embeddings.append(embedding)
            self.word_index[token] = idx
        self.embeddings = np.stack(self.embeddings)
        self.vocab_size = len(self.word_index)
        
    def edge_matrix(self, n_neighbors:int=10) -> np.ndarray:
        """
        Construct the graph edge matrix where:
        E[i, j] = arccos(-(w[i].T @ w[j]) / (norm(w[i]) * norm(w[j])))
        
        Args:
            n_neighbors (int) - The number of neighbors each vertex in the graph has
        Returns:
            (np.ndarray)      - The edge matrix
        """
        product = self.embeddings @ self.embeddings.T
        normed = np.linalg.norm(self.embeddings, axis=1).reshape(-1, 1)
        norm_product = normed @ normed.T
        arg = (product / norm_product)
        np.fill_diagonal(arg, 0)
        for idx, row in enumerate(arg):
            arg[idx, np.argsort(row)[:-20]] = 0
        return np.arccos(-arg.clip(-1, 1))
    
    def transition_matrix(self, edge_matrix: np.ndarray) -> np.ndarray:
        """
        Construct the transition matrix T from the edge matrix
        
        In contrast to the paper we use 1/s here (maybe an error in the paper?)
        
        Args:
            edge_matrix (np.ndarray) - edge_matrix
        Returns:
            (np.ndarray)             - The transition matrix
        """
        diag = np.diag([1/s if s > 0 else 0 for s in edge_matrix.sum(axis=0)])**(1/2)
        return diag @ edge_matrix @ diag
    
    def run(self, 
            T: np.ndarray, s: np.ndarray, p: np.ndarray, 
            beta: float, eps: float, max_iter: int) -> np.ndarray:
        """
        Update p until convergence
        
        Args:
            T (np.ndarray) - Transition matrix
            s (np.ndarray) - Seed vector
            p (np.ndarray) - Word-Sentiment vector
            beta (float)   - Local consistency vs. global consistency
            eps (float)    - Convergence criterium
            max_iter (int) - Convergence criterium
        Returns:
            p (np.ndarray) - Learned word-sentiment vector
        """
        for i in range(max_iter):
            new_p = (beta * T @ p + (1 - beta) * s)
            if np.abs(new_p - p).sum() < eps:
                break
            p = new_p
        return p
    
    def fit(self, 
              positive_seeds: List[str],
              negative_seeds: List[str], 
              beta: float=0.1, 
              n_neighbors: int=10, 
              eps:float=1e-6,
              max_iter:int=100) -> Dict[str, float]:
        """
        Learn sentiment scores
        
        Args:
            positive_seeds (List[str]) - A list of positive seed words
            negative_seeds (List[str]) - A list of negative seed words
            beta (float)               - Local consistency vs. global consistency
            n_neighbors (int)          - The number of neighbors each vertex in the graph has
            eps (float)                - Convergence criterium
            max_iter (int)             - Convergence criterium
        Returns:
            (Dict[str, float])         - A dictionary containing the learned sentiment scores
                                         with zero mean and unit variance
        """
        T = self.transition_matrix(self.edge_matrix(n_neighbors))
        
        s_positive = np.zeros(self.vocab_size)
        s_negative = np.zeros(self.vocab_size)
        
        s_positive[[self.word_index[seed] for seed in positive_seeds]] = 1 / len(positive_seeds)
        s_negative[[self.word_index[seed] for seed in negative_seeds]] = 1 / len(negative_seeds)
        
        p_positive = self.run(
            T, s_positive.reshape(-1, 1), 
            np.ones((self.vocab_size, 1)) / self.vocab_size, 
            beta, eps, max_iter
        )
        p_negative = self.run(
            T, s_negative.reshape(-1, 1),
            np.ones((self.vocab_size, 1)) / self.vocab_size,
            beta, eps, max_iter
        )
        
        score = p_positive / (p_positive + p_negative)
        # Make sure scores have zero mean and unit variance
        score = score - np.mean(score)
        score = score / np.std(score)
        
        # Construct dictionary
        sentiment = dict()
        for token, idx in self.word_index.items():
            sentiment[token] = score[idx].item()
            
        return sentiment