In [2]:
import numpy as np
import requests
import re
from bs4 import BeautifulSoup
from scipy.special import digamma
from textblob import TextBlob
from collections import defaultdict, Counter

In [2]:
def get_wikitext_sentences(url):
    response = requests.get(url)
    parsed = BeautifulSoup(response.text, 'html.parser')
    return [tag.get_text().strip() for tag in parsed.select('div.mw-parser-output')[0].find_all('p')]


def postprocess(paragraphs):
    result = []
    wrapper = re.compile('[({\[].{,20}[)}\]]')
    whitespace = re.compile('\s')
    for paragraph in paragraphs:
        processed = wrapper.sub('', paragraph)
        result.append(whitespace.sub(' ', processed))
    return result

# 말뭉치 생성하기

세 가지 서로 다른 주제의 wikipedia ariticle들의 각 문단들을 하나의 document로 간주해서 가장 기본 형태의 LDA를 구현한 후 이 모형이 세 가지 토픽의 구성 단어들을 효과적으로 찾아내는지 확인한다. 세 article들의 제목은 각각 아래와 같다. 간단한 전처리를 위해 모든 소괄호, 대괄호와 그 안에 포함된 문자열을 삭제했고, `\n`과 같이 공백을 표현하는 문자열은 ' '로 일괄 변경했다.

1. Korea

In [3]:
korea = postprocess(get_wikitext_sentences("https://en.wikipedia.org/wiki/Korea"))
print(korea[2])

Korea is a region in East Asia; since 1945 it has been divided into what are now two distinct sovereign states: North Korea (officially the "Democratic People's Republic of Korea") and South Korea (officially the "Republic of Korea"). Korea consists of the Korean Peninsula, Jeju Island, and several minor islands near the peninsula. It is bordered by China to the northwest and Russia to the northeast. It is separated from Japan to the east by the Korea Strait and the Sea of Japan .


2. Aristotle

In [4]:
aristotle = postprocess(get_wikitext_sentences("https://en.wikipedia.org/wiki/Aristotle"))
print(aristotle[2])

Aristotle  Greek: Ἀριστοτέλης Aristotélēs, pronounced ; 384–322 BC) was a Greek philosopher and polymath during the Classical period in Ancient Greece. Taught by Plato, he was the founder of the Lyceum, the Peripatetic school of philosophy, and the Aristotelian tradition. His writings cover many subjects including physics, biology, zoology, metaphysics, logic, ethics, aesthetics, poetry, theatre, music, rhetoric, psychology, linguistics, economics, politics, and government. Aristotle provided a complex synthesis of the various philosophies existing prior to him. It was above all from his teachings that the West inherited its intellectual lexicon, as well as problems and methods of inquiry. As a result, his philosophy has exerted a unique influence on almost every form of knowledge in the West and it continues to be a subject of contemporary philosophical discussion.


3. Google

In [5]:
google = postprocess(get_wikitext_sentences("https://en.wikipedia.org/wiki/Google"))
print(google[2])

Google, LLC is an American multinational technology company that specializes in Internet-related services and products, which include online advertising technologies, a search engine, cloud computing, software, and hardware. It is considered one of the Big Four technology companies, alongside Amazon, Apple, and Microsoft.


세 부류의 문단들을 하나로 합쳐서 말뭉치를 만들고, 문단들의 순서를 랜덤하게 섞는다.

In [6]:
documents = korea + aristotle + google
documents = list(np.random.choice(documents, len(documents), replace=False))

# 모형 구현하기

In [7]:
len(documents)

322

총 322개의 문서가 있고, 한 iteration마다 전체 문서에서 batchsize만큼의 문서를 샘플링해서 variational parameter들을 업데이트한다. batchsize가 총 문서의 개수와 같으면(즉, 이 예시에서는 322면) 일반적인 LDA와 같은 결과를 얻을 수 있다. 

In [102]:
class OnlineLDA:
    
    
    def __init__(self,
                 references,
                 n_documents,
                 maxlen_vocabs,
                 n_topics,
                 batchsize,
                 alpha=0.1, eta=0.01, tau=1, kappa=0.75, tol=1e-4, local_maxiter=100, global_maxiter=1000):
        """
        Aiming truely online LDA. So although D can be exact, but V will be approximate.
        """
        self.references = np.array(references)

        self._vocab2idx = defaultdict(lambda: len(self._vocab2idx))
        self._D = n_documents
        self._V = maxlen_vocabs
        self._K = n_topics
        self._S = batchsize
        self._alpha = alpha
        self._eta = eta
        self._tau = tau
        self._kappa = kappa
        self._tol = tol
        self._local_maxiter = local_maxiter
        self._global_maxiter = global_maxiter
        
        self._lambda = np.random.normal(1, 0.01, (n_topics, maxlen_vocabs))
        self._eq_lnbeta = self._dirichlet_expectation(self._lambda)
        self._exp_eq_lnbeta = np.exp(self._eq_lnbeta)

        
    def _dirichlet_expectation(self, params):
        """
        Input  : Parameter vector of Dirichlet distribution(np.array)
        Output : Expectation of log of corresponding Dirichlet random vector(np.array)
        """
        if len(params.shape) == 1:
            return digamma(params) - digamma(np.sum(params))
        return digamma(params) - digamma(np.sum(params, axis=1).reshape((-1,1)))
    
        
    def _get_batch_indices(self):
        """
        Operating np.random.choice(range(D), batchsize) becomes expensive when D gets larger.
        This function provides faster way to generate random numbers from discrete uniform distribution [0, D)
        """
        return list(map(int, np.random.random((self._S,)) * self._D))
        
        
    def _reference_to_wordlist(self, reference):
        """
        (** Function to be distributed **)
        Process of converting : single <reference> - <document> - <wordlist>

         1. reference -> document (** domain specific **)
             In this simple example, this task is simple because reference is document by itself. 
             However, this job can be more complicated if this is not the case.
             (ex. reference can be only a URL whose corresponding page contains body text to be scraped.)

         2. document -> wordlist (** domain specific **)
             This example extracts words from English document, so textblob.TextBlob is used.
             Depending on the specific language to be analyzed, morpheme analyzer should be changed accordingly.
        """
        document = reference

        wordlist = [tag[0].lower() 
                    for tag in TextBlob(document).tags 
                    if tag[1].startswith('NN') and len(tag[0]) > 2]
        
        return wordlist
    
    
    def _wordlists_to_indexcounts(self, wordlists):
        """
        Convert <wordlists> into 2-dim array of word indices and corresponding count, denoted as <indexcount>. 
        Index of a word is referred from vocab2idx(defaultdict) defined at the initialization of the instance.
        """
        indexcounts = []
        for wordlist in wordlists:
            indexcount = Counter([self._vocab2idx[word] for word in wordlist])
            index = list(indexcount.keys())
            count = list(indexcount.values())
            indexcounts.append([index, count])
            
        return indexcounts
        
    
    def _update_locals(self, indexcounts):
        """
        Update local variational parameters corresponding to documents in a minibatch(indexcounts).
        
        Notation
         - exp_eq_lnbw  : \mathbb{E}_q[\ln\beta_{.w_s}] (array of shape K * len(indices))
         - eq_lntheta_s : \mathbb{E}_q[\ln\theta_s]
         - exp_eq_lnths : \exp\{\mathbb{E}_q[\ln\theta_s]\}
         - phis_normalizers : \sum_{k=1}^{K}_\phi_{s}^k
         
        Process
        - To make Dirichlet prior as flat as possible, assign value close to 1 as possible when initializing gamma
         1. Initialize gamma_s, s = 1, ... ,S
         2. for each indexcount_s:
          2-1. Calculate expectation on initialized values
          2-2. Update gamma_s until its change is ignorable
          2-3. Record gamma_s and update sufficient statistic 
        """        
        gamma = np.random.normal(1., 0.01, (self._S, self._K))
        suffstats = np.zeros(self._lambda.shape)
        
        for s, indexcount in enumerate(indexcounts):
            indicies, counts = indexcount
            exp_eq_lnbw = self._exp_eq_lnbeta[:,indices]
            gamma_s = gamma[s,:]        
            eq_lntheta_s = self._dirichlet_expectation(gamma_s)
            exp_eq_lnths = np.exp(eq_lntheta_s)
            phis_normalizers = np.dot(exp_eq_lnths, exp_eq_lnbw) + 1e-100
            
            for _ in range(self._local_maxiter):
                gamma_s_old = gamma_s.copy()
                gamma_s = self._alpha + exp_eq_lnths * np.dot(counts / phis_normalizers, exp_eq_lnbw.T)
                eq_lntheta_s = self._dirichlet_expectation(gamma_s)
                exp_eq_lnths = np.exp(eq_lntheta_s)
                phis_normalizers = np.dot(exp_eq_lnths, exp_eq_lnbw) + 1e-100
                if np.mean(np.abs(gamma_s - gamma_s_old)) < tol:
                    break
                    
            gamma[s,:] = gamma_s
            suffstats[:, indicies] += np.outer(exp_eq_lnths.T, counts/phis_normalizers)
            
        suffstats *= self._exp_eq_lnbeta
        return gamma, suffstats
    
    
    def _update_global(self, indexcounts, t):
        """
        
        """
        gamma, suffstats = self._update_local(indexcounts)
        
        return gamma
    
    
    def _calculate_elbo(self, indexcounts, gamma):
        """
        
        """
        pass
    
    
    def batch_call(self):
        """
         1. Sample subset of size S from references
         2. Convert each reference into wordlists
         3. return corresponding indexcounts.
        """
        batch_idx = self._get_batch_indices()
        minibatch = self.references[batch_idx]

        wordlists = []
        for reference in minibatch:
            wordlists.append(self._reference_to_wordlist(reference))
        
        return self._wordlists_to_indexcounts(wordlists)
    
    
    def fit(self):
        """
        
        """
        self.ELBO = [-np.inf]
        
        for t in range(self._global_maxiter):
            indexcounts = self.batch_call()
            gamma = self._update_global(indexcounts, t)
            self.ELBO.append(self._calculate_elbo(indexcounts, gamma))
            
    
    def idx2vocab(self):
        """
        Create hashmap from each vocaulary's index to vocabulary for result interpretation
        """
        return dict((item[1], item[0]) for item in self._vocab2idx.items())

In [3]:
np.inf

inf

In [156]:
def _dirichlet_expectation(params):
    """
    Input  : Parameter vector of Dirichlet distribution(np.array)
    Output : Expectation of log of corresponding Dirichlet random vector(np.array)
    """
    if len(params.shape) == 1:
        return digamma(params) - digamma(np.sum(params))
    return digamma(params) - digamma(np.sum(params, axis=1).reshape((-1,1)))

In [None]:
def _update_local(indices, counts)
    """
    (** Function to be distributed **)
    Update local variational parameter for single document in a minibatch.

    Notatation
     - eq_lnthd     : \mathbb{E}_q[\ln\theta_d]
     - exp_eq_lnthd : \exp\{\mathbb{E}_q[\ln\theta_d]\}
     - exp_eq_lnbw  : \mathbb{E}_q[\ln\beta_w] (array of shape K * len(indices))
     - sum_k_phidn  : \sum_{k=1}^{K}_\phi_{dn}^k
    """
    gamma_d = np.random.normal(1., 0.01, (self._K,))
    eq_lnthd = self._dirichlet_expectation(gamma_d)
    exp_eq_lnthd = np.exp(eq_lnthd)
    exp_eq_lnbw = self._exp_eq_lnbeta[:, indices]
    sum_k_phidn = np.dot(exp_eq_lnthd, exp_eq_lnbw) + 1e-100

    for _ in range(self._local_maxiter):
        gamma_d_old = gamma_d.copy()
        gamma_d = self._alpha + exp_eq_lnthd * np.dot(counts / sum_k_phidn, exp_eq_lnbw.T)

In [160]:
exp_eq_lnthd = np.exp(_dirichlet_expectation(np.random.normal(1., 0.01, (3,))))

In [161]:
exp_eq_lnbw = _dirichlet_expectation(np.random.normal(1., 0.01, (3,5)))

In [167]:
exp_eq_lnthd.copy()

array([0.23019205, 0.22146245, 0.21920042])

In [198]:
exp_eq_lnbw.T

array([[-2.06926277, -2.06123602, -2.09115928],
       [-2.06444983, -2.10291524, -2.09700874],
       [-2.05158881, -2.10750919, -2.05389508],
       [-2.11329267, -2.08140144, -2.08409843],
       [-2.10212964, -2.08978762, -2.11048065]])

In [103]:
model = OnlineLDA(korea, len(korea), 1, 2)

In [113]:
a = np.array([[1,0,-1],[-1,0,2]])

In [138]:
np.empty((2,0)).shape[1]

0

In [137]:
np.hstack((np.empty((2,0)), np.array([[2.,3., 4.],[2.,3., 4.]])))

array([[2., 3., 4.],
       [2., 3., 4.]])

dirichlet_expectation : 파라미터 벡터를 받아 각 원소에 대해 기대값 계산한 벡터 반환
parse_doc_list : 문서들의 리스트, word2idx를 받아 각 문서별로 id, count를 리스트로 담아 반환

In [139]:
def alalala(a,
           b,
           c,
           d):
    return a+b+c+d

alalala(1,2,3,4)

10

In [186]:
import ray
import time

In [175]:
ray.init(ignore_reinit_error=True)

2020-09-17 15:43:03,542	ERROR worker.py:666 -- Calling ray.init() again after it has already been called.


In [193]:
start = time.time()
result = ray.get(container)
time.time() - start

2.880129814147949

In [194]:
result

[[-1, 1], [0, 2], [1, 3], [2, 4], [3, 5]]