In [None]:
!pip install japanize-matplotlib
!pip install ginza==4.0.6 ja-ginza==4.0.0
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import japanize_matplotlib
import arviz as az

import tensorflow_probability as tfp
import tensorflow as tf
tfb = tfp.bijectors
tfd = tfp.distributions

sns.reset_defaults()
sns.set_context(context='talk', font_scale=0.8)
colors = sns.color_palette('tab10')
japanize_matplotlib.japanize()

%config InlineBackend.figure_format = 'retina'
%matplotlib inline
import os
os.chdir("/content/drive/MyDrive/projects/instagram/")

import spacy
from collections import Counter

In [None]:
df = pd.read_csv("data/instagram.csv")
print(df.shape)
df.head()

In [3]:
import re
from bs4 import BeautifulSoup

# クリーン用の関数
def clean_text(text, name2variants=None):
    text = BeautifulSoup(text, "html.parser") # webページリンクの削除
    text = text.get_text(strip=True)
    text = re.sub("\n", "", text) # 改行消す
    text = text.lower() # 全部小文字へ
    text = re.sub("w+", "w", text) # 草を一つにする
    text = re.sub(":.*:", "", text) # 絵文字の消去
    text = re.sub("　", " ", text) # 全角空白を半角に
    text = re.sub("```.*```", "", text) # /code blockは消す
    text = re.sub(r'#\S+', '', text) # ハッシュタグの削除
    if name2variants != None: # 表記揺れの処理
        for k, v in name2variants.items():
            text = re.sub("|".join(v), k, text)
    if len(text) == 0: # 0文字になったらdropnaさせる
        return np.nan
    return text

data = df["caption"].apply(clean_text).dropna().tolist()
print(len(data))

11


In [4]:
include_pos = ("NOUN") # 名詞だけ
nlp = spacy.load("ja_ginza")

results = []
words = []
for doc in nlp.pipe(data):
    # include_posに入ってる品詞ワードだけ取り出し
    result = [token.lemma_ for token in doc if (token.pos_ in include_pos)]
    if len(result)!=0:
        results.append(result)
        words.extend(result)

In [None]:
word_counts = Counter(words)
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
ws, cs = zip(*sorted_word_counts)

fig, ax = plt.subplots(figsize=(14, 5))
sns.barplot(x=list(ws), y=list(cs), ax=ax)
plt.xticks(rotation=90)
plt.show()

In [8]:
TN = 1
LENGTH_DOCUMENT = 5 # 一定単語数のdocumentは分析から外す

# 出現頻度がTN回以上の単語をフィルタリング
filtered_words = [w for w, c in word_counts.items() if c > TN]

# 辞書の作成
word2id = {}
id2word = {}
for word in filtered_words:
    if word not in word2id:
        idx = len(word2id)
        word2id[word] = idx
        id2word[idx] = word


# 単語をidに変換
documents = []
for i in range(len(results)):
    # 各投稿を一定出現頻度の単語で表現
    document = [word2id[w] for w in results[i] if w in word2id]
    if len(document) > LENGTH_DOCUMENT:
        documents.append(document)

In [10]:
K = 2 # トピック数
V = len(word2id) # ボキャブラリ数
M = len(documents) # 文書数
N = [len(d) for d in documents]

print(K, V, M)
print(N)

2 42 7
[10, 13, 26, 13, 11, 10, 32]


In [11]:
Root = tfd.JointDistributionCoroutine.Root
def lda_model():
    # 文書におけるトピックの分布
    # 各文書がどのトピックに属するかはディリクレ分布に従う
    theta = yield Root(tfd.Independent(
        tfd.Dirichlet(concentration=tf.ones([M, K])),
        reinterpreted_batch_ndims=1,
        name='theta')) # event shape: M, K

    # トピックにおける単語の分布
    # 各トピックから生成される単語はディリクレ分布に従う
    phi = yield Root(tfd.Independent(
        tfd.Dirichlet(concentration=tf.ones([K, V])),
        reinterpreted_batch_ndims=1,
        name='phi')) # event shape: K, V

    for m in range(M):
        # 文書mについて
        # 観測される単語の分布
        y = yield tfd.Sample(
            # トピック割り当てzについて周辺化したモデル
            tfd.MixtureSameFamily(
                # 文書mに含まれる各単語がどのトピックに含まれるかがパラメタthetaのカテゴリカル分布に従う
                mixture_distribution=tfd.Categorical(probs=theta[..., m, :]), # カテゴリ数：K
                # その単語が含まれるトピックが決まるので、そのトピックの単語のカテゴリカル分布から単語が決まる
                components_distribution=tfd.Categorical(probs=phi)), # カテゴリ数：V
            sample_shape=N[m],
            name=f'y_{m}') # event shape: n, カテゴリ数： V

lda = tfd.JointDistributionCoroutine(lda_model)

def target_log_prob_fn(theta, phi):
    return lda.log_prob(theta, phi, *documents)

In [12]:
num_results = 1000
num_burnin_steps = 500

tf.random.set_seed(42)

# パラメータの初期値
initial_state = [
    tf.fill([M, K], value=1/K, name='theta'),
    tf.fill([K, V], value=1/V, name='phi')
]

# パラメータの制約に合わせた変数変換
unconstraining_bijectors = [
    tfb.SoftmaxCentered(),
    tfb.SoftmaxCentered(),
]

In [None]:
%%time
# HMC法によるサンプリング用の関数
@tf.function(autograph=False)
def sample():
  return tfp.mcmc.sample_chain(
    num_results=num_results,
    num_burnin_steps=num_burnin_steps,
    current_state=initial_state,
    # HMCのステップサイズを自動的に調整
    kernel=tfp.mcmc.SimpleStepSizeAdaptation(
        # Bijectorを利用して変数の制約に対処
        tfp.mcmc.TransformedTransitionKernel(
            # HMC法
            inner_kernel=tfp.mcmc.HamiltonianMonteCarlo(
                target_log_prob_fn=target_log_prob_fn,
                 step_size=0.1,
                 num_leapfrog_steps=5),
            bijector=unconstraining_bijectors),
         num_adaptation_steps=400),
    trace_fn=lambda _, pkr: pkr.inner_results.inner_results.is_accepted)

[theta, phi], is_accepted = sample()

print('acceptance rate: {:.1%}'.format(is_accepted.numpy().mean()))

In [17]:
print(theta.shape, phi.shape)

(1000, 7, 2) (1000, 2, 42)


In [None]:
def format_trace(states, var_name, chain_dim=None):
    if chain_dim is None:
        # chainが１つだと明示するためaxisを追加
        trace = {k: v[tf.newaxis].numpy() for k, v in zip(var_name, states)}
    else:
        # axis0がchainの次元になるようにする
        trace = {k: np.swapaxes(v.numpy(), chain_dim, 0) for k, v in zip(var_name, states)}
    # from_tfpもあるが、実行するとeager_executionがオフにされてしまうなど現状使いづらいので、from_dictを用いている
    return az.from_dict(trace)

trace = format_trace([theta, phi], ['theta', 'phi'])
az.plot_trace(trace)
plt.tight_layout();

In [None]:
# (1 - a) x 100% のサンプルが入る区間を求める
a = 0.1
# phiは各トピックにおける単語の分布
# サンプルを1000こ取ってるのでその5%、50%、95%点を求める
lwr, med, upr = np.quantile(phi, [a / 2, 0.5, 1 - a / 2], axis=0)

# 描画用
xticks = range(V)
vocabs = id2word.values()

fig, axes = plt.subplots(K, 1, sharex=True, sharey=True, figsize=(12, 3*K))

for i in range(K):
    ax = axes[i]
    ax.scatter(range(V), med[i], color=colors[0], marker='s', label='pred')
    ax.vlines(range(V), lwr[i], upr[i], color=colors[0], label=f'{1-a:.0%} HDI')
    #ax.scatter(range(V), word_dist[i], color=colors[1], marker='x', label='truth')
    if i == K - 1:
        ax.set_xlabel('word')
    ax.set_ylabel('probability')
    ax.set_xticks(xticks)
    ax.set_xticklabels(vocabs, rotation=90)
    ax.set_title(f'topic {i}')

handles, labels = ax.get_legend_handles_labels()
fig.legend(handles, labels, loc='center left', bbox_to_anchor=[1.0, 0.5])
plt.tight_layout()

In [None]:
# 各文書がどのトピックに含まれるか
a = 0.1
lwr, med, upr = np.quantile(theta, [a / 2, 0.5, 1 - a / 2], axis=0)

ncol = 5
nrow = round(np.ceil(M/ncol))
fig, axes = plt.subplots(nrow, ncol, sharex=True, sharey=True, figsize=(ncol*3, nrow*2))

for i in range(M):
    ax = axes.ravel()[i]
    ax.scatter(range(K), med[i], color=colors[0], marker='s', label='pred')
    ax.vlines(range(K), lwr[i], upr[i], color=colors[0], label=f'{1-a:.0%} HDI')
    #ax.scatter(range(K), topic_dist[i], color=colors[1], marker='x', label='truth')
    # ax.legend()
    ax.set_xticks(range(K))
    if i >= ncol * (nrow-1):
        ax.set_xlabel('topic')
    if not i % ncol:
        ax.set_ylabel('probability')
    ax.set_title(f'document {i}')

handles, labels = ax.get_legend_handles_labels()
fig.legend(handles, labels, loc='center left', bbox_to_anchor=[1.0, 0.5])
plt.tight_layout()