# 顔文字生成器（サンプル）
LSTMを用いた構成．LSTMの内部構造を書き下している．
Peeky(?)というような手法を追加してる．

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd

## 前処理

### データ読み込み

In [2]:
# 特殊文字
sp = {'pad': '<PAD>', 'bos': '<BOS>', 'eos': '<EOS>', 'unk': '<UNK>'}
# pad : padding. 文字列長を一定にするために使う
# bos : begin of sequence. 文頭文字．Decoderの最初の入力
# eos : end of sequence. 文末文字．
# unk : unknown. 出現数が低いものに割り当てる

In [3]:
KAOMOJI_MAX = 20    # 顔文字最大長

kmj_list = []   # 顔文字リスト
len_list = []       # <BOS> から <EOS> までの文字数のリスト
char_list = []      # 顔文字に使用されている文字のリスト

char_list += list(sp.values())
file_name = 'kaomoji_MAX=' + str(KAOMOJI_MAX) + '.txt'

with open(file_name, mode='r') as file:
  for line in file:
    temp = [sp['bos']]
    temp += list(line.replace('\n', ''))
    temp += [sp['eos']]
    len_list.append(len(temp))
    temp += [sp['pad'] for _ in range(KAOMOJI_MAX+2 - len(temp))]
    kmj_list.append(temp)
    char_list += temp

# 重複を消す
char_list = sorted(set(char_list), key=char_list.index)

In [4]:
print(kmj_list[0])

['<BOS>', '(', '✿', '\u3000', '́', '꒳', '`', ')', 'ノ', '°', '+', '.', '*', '<EOS>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']


In [5]:
print('Number of kaomoji  :', len(kmj_list))
print('Number of character:', len(char_list))

Number of kaomoji  : 11587
Number of character: 1106


### 出現数が少ないものを置換

In [24]:
# 最小出現数
MIN_APPEAR = 20

kmj_list = np.array(kmj_list)

cnt = 0
for c in char_list:
  mask = (kmj_list == c)
  if np.sum(mask) < MIN_APPEAR:
    kmj_list[mask] = sp['unk']

char_list = list(sp.values()) + kmj_list.flatten().tolist()
char_list = sorted(set(char_list), key=char_list.index)

In [25]:
print('Number of character:', len(char_list))

Number of character: 220


### 添字検索
顔文字に使われる文字が文字リストの何番目にあるか調べる

In [26]:
kmj_index = []    # 添字リスト

for kmj in kmj_list.tolist():
  temp = [char_list.index(c) for c in kmj]
  kmj_index.append(temp)

In [27]:
print(len_list[0])
kmj_index[0]

14


[1, 4, 3, 5, 6, 3, 7, 8, 9, 10, 11, 12, 13, 2, 0, 0, 0, 0, 0, 0, 0, 0]

### One-hotベクトル化



In [28]:
kmj_num = len(kmj_index)        # 顔文字の総数
kmj_size = len(kmj_index[0])    # 1つの顔文字の長さ
char_num = len(char_list)       # 文字の種類数

# One-hotベクトルリスト
kmj_onehot = np.zeros((kmj_num, kmj_size, char_num))

for i, index in enumerate(kmj_index):
  mask = range(char_num) == np.array(index).reshape((kmj_size, 1))
  kmj_onehot[i][mask] = 1

In [29]:
kmj_onehot.shape

(11587, 22, 220)

### 訓練・検証・テスト用に分ける

In [30]:
dataset = torch.utils.data.TensorDataset(
  torch.tensor(kmj_onehot.astype('float32')),
  torch.tensor(len_list)
)

In [31]:
train_size = int(len(dataset) * 0.85)
valid_size = int(len(dataset) * 0.10)
test_size  = len(dataset) - train_size - valid_size

# indices = np.arange(len(dataset))

# dataset_train = torch.utils.data.Subset(dataset, indices[:train_size])
# dataset_valid = torch.utils.data.Subset(dataset, indices[train_size:train_size+valid_size])
# dataset_test  = torch.utils.data.Subset(dataset, indices[train_size+valid_size:])

split = [train_size, valid_size, test_size]

dataset_train, dataset_valid, dataset_test = torch.utils.data.random_split(dataset, split)

In [32]:
dataset_train[0]

(tensor([[0., 1., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 1.,  ..., 0., 0., 0.],
         [1., 0., 0.,  ..., 0., 0., 0.],
         [1., 0., 0.,  ..., 0., 0., 0.]]),
 tensor(20))

In [33]:
batch_size = 32

dataloader_train = torch.utils.data.DataLoader(
  dataset_train,
  batch_size=batch_size,
  shuffle=True
)

dataloader_valid = torch.utils.data.DataLoader(
  dataset_valid,
  batch_size=batch_size,
  shuffle=True
)

In [34]:
for x, len_seq in dataloader_train:
  print(x.shape, len_seq.shape)
  break

torch.Size([32, 22, 220]) torch.Size([32])


## モデル

In [46]:
class LSTM(nn.Module):
  def __init__(self, in_dim, hid_dim):
    super().__init__()
    self.hid_dim = hid_dim
    glorot = 6/(in_dim + hid_dim*2)

    self.W_i = nn.Parameter(torch.tensor(np.random.uniform(
                    low=-np.sqrt(glorot),
                    high=np.sqrt(glorot),
                    size=(in_dim + hid_dim, hid_dim)
                ).astype('float32')))
    self.b_i = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    self.W_f = nn.Parameter(torch.tensor(np.random.uniform(
                    low=-np.sqrt(glorot),
                    high=np.sqrt(glorot),
                    size=(in_dim + hid_dim, hid_dim)
                ).astype('float32')))
    self.b_f = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    self.W_o = nn.Parameter(torch.tensor(np.random.uniform(
                    low=-np.sqrt(glorot),
                    high=np.sqrt(glorot),
                    size=(in_dim + hid_dim, hid_dim)
                ).astype('float32')))
    self.b_o = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

    self.W_c = nn.Parameter(torch.tensor(np.random.uniform(
                    low=-np.sqrt(glorot),
                    high=np.sqrt(glorot),
                    size=(in_dim + hid_dim, hid_dim)
                ).astype('float32')))
    self.b_c = nn.Parameter(torch.tensor(np.zeros([hid_dim]).astype('float32')))

  def function(self, state_c, state_h, x):
    i = torch.sigmoid(torch.matmul(torch.cat([state_h, x], dim=1), self.W_i) + self.b_i)
    f = torch.sigmoid(torch.matmul(torch.cat([state_h, x], dim=1), self.W_f) + self.b_f)
    o = torch.sigmoid(torch.matmul(torch.cat([state_h, x], dim=1), self.W_o) + self.b_o)
    c = f*state_c + i*torch.tanh(torch.matmul(torch.cat([state_h, x], dim=1), self.W_c) + self.b_c)
    h = o*torch.tanh(c)
    return c, h

  def forward(self, x, len_seq_max, init_state_h=None, init_state_c=None):
    batch_size = x.shape[0]
    x = x.transpose(0, 1)  # 系列のバッチ処理のため、次元の順番を「系列、バッチ」の順に入れ替える

    state_c = init_state_c
    state_h = init_state_h
    if init_state_c is None:  # 初期値を設定しない場合は0で初期化する
        state_c = torch.zeros((batch_size, self.hid_dim)).to(x.device)
    if init_state_h is None:  # 初期値を設定しない場合は0で初期化する
        state_h = torch.zeros((batch_size, self.hid_dim)).to(x.device)

    size = list(state_h.unsqueeze(0).size())
    size[0] = 0
    output = torch.empty(size, dtype=torch.float).to(x.device)  # 一旦空テンソルを定義して順次出力を追加する

    # if len_seq_max == 0:
    #     len_seq_max = x.size(0)
    for i in range(len_seq_max):
        state_c, state_h = self.function(state_c, state_h, x[i])
        output = torch.cat([output, state_h.unsqueeze(0)])  # 出力系列の追加
        if init_state_h is not None:
          state_h += init_state_h

    return output

In [47]:
class Encoder(nn.Module):
  def __init__(self, in_dim, emb_dim, hid_dim):
    super().__init__()
    self.emb = nn.Linear(in_dim, emb_dim, bias=False)
    self.lstm = LSTM(emb_dim, hid_dim)
    self.linear = nn.Linear(hid_dim, hid_dim)

  def forward(self, x, len_seq):
    x = self.emb(x)
    h = self.lstm(x, len_seq.max())
    h = h[len_seq - 1, list(range(len(x))), :]
    y = self.linear(h)
    return y

In [48]:
class Decoder(nn.Module):
  def __init__(self, in_dim, emb_dim, hid_dim):
    super().__init__()
    self.emb = nn.Linear(in_dim, emb_dim, bias=False)
    self.lstm = LSTM(emb_dim, hid_dim)
    self.linear = nn.Linear(hid_dim, in_dim)

  def forward(self, x, len_seq, init_state_h):
    x = self.emb(x)
    h = self.lstm(x, len_seq.max(), init_state_h)
    # h += init_state_h
    y = self.linear(h)
    return y

In [49]:
class Generator(nn.Module):
  def __init__(self, in_dim, emb_dim, hid_dim):
    super().__init__()
    self.encoder = Encoder(in_dim, emb_dim, hid_dim)
    self.decoder = Decoder(in_dim, emb_dim, hid_dim)

  def forward(self, x, len_seq):
    # <BOS>以降を入力
    h = self.encoder(x[:, 1:, :], len_seq - 1)
    y = self.decoder(x, len_seq, h)
    return y

## 学習

In [50]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [73]:
n_epochs = 5

in_dim = len(char_list)
emb_dim  = 16
hid_dim  = 32

net = Generator(in_dim, emb_dim, hid_dim)
net.to(device)
optimizer = optim.Adam(net.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()

In [74]:
for epoch in range(n_epochs):
  losses_train = []
  losses_valid = []

  # 訓練
  net.train()
  for x, len_seq in dataloader_train:
    net.zero_grad()  # 勾配の初期化

    x = x.to(device)
    len_seq = len_seq.to(device)

    y = net(x, len_seq).transpose(0, 1)

    loss = criterion(y[:, :len_seq.max()-1, :], x[:, 1:len_seq.max(), :])
    loss.backward()  # 誤差の逆伝播

    torch.nn.utils.clip_grad_norm_(net.parameters(), 0.1)

    losses_train.append(loss.tolist())

    optimizer.step()  # パラメータの更新

  # 検証
  net.eval()
  for x, len_seq in dataloader_valid:
    x = x.to(device)
    len_seq = len_seq.to(device)

    y = net(x, len_seq).transpose(0, 1)

    loss = criterion(y[:, :len_seq.max()-1, :], x[:, 1:len_seq.max(), :])
    losses_valid.append(loss.tolist())

  if (epoch+1) % 1 == 0:
    print('EPOCH: {:>2}, Train Loss: {:>4.5f}, Valid Loss: {:>4.5f}'.format(
        epoch,
        np.mean(losses_train),
        np.mean(losses_valid),
    ))

EPOCH:  0, Train Loss: 0.19035, Valid Loss: 0.16212
EPOCH:  1, Train Loss: 0.16289, Valid Loss: 0.15550
EPOCH:  2, Train Loss: 0.15588, Valid Loss: 0.14838
EPOCH:  3, Train Loss: 0.15137, Valid Loss: 0.14528
EPOCH:  4, Train Loss: 0.14765, Valid Loss: 0.14260


## 評価

In [75]:
def convert_str(x):
  x = np.array(char_list)[x.argmax(dim=1)]
  x = [c for c in x if c not in sp.values()]

  return ''.join(x)

def generate(base, net, rate=3.0):
  x, len_seq = base[0].to(device), base[1].to(device)
  x = x.unsqueeze(0)
  len_seq = len_seq.unsqueeze(0)

  # eps = 2*rate * np.random.rand(hid_dim).astype('float32') - rate
  z = net.encoder(x[:, 1:, :], len_seq - 1)

  onehot = torch.zeros(1, kmj_size, char_num).to(device)
  bos_index = char_list.index(sp['bos'])
  onehot[0][0][bos_index] = 1

  len_seq_max = torch.tensor(kmj_size).unsqueeze(0)

  for i in range(kmj_size-1):
    y = net.decoder(onehot, len_seq_max, z)
    onehot[0][i+1][y[i][0].argmax()] = 1

  gen = convert_str(onehot[0].cpu())
  # gen = convert_str(net(x, len_seq).squeeze(1).cpu())

  return ''.join(gen)

In [77]:
for i in np.random.randint(0, len(dataset_test), size=10):
  test = dataset_test[i]
  print('base     :', convert_str(test[0]))
  print('generate :', generate(test, net, rate=0.0))

base     : ・・・(▼ー▼)
generate : ﻿Σ(≧へ≦A)
base     : ━(★　́∀`)ノ━ォ!!!
generate : ﻿(　`へ)o
base     : ヽ(・∀・)
generate : ﻿(　゚ロ)
base     : (/^^)/(--;)\(^^\)!!
generate : ﻿Σd(　́皿`へ)ノ⌒C<
base     : (~^~m)・・(m~^~)m
generate : ﻿Σ(≧皿≦へq)。C┓
base     : (　　　)【(　▽　’!)/
generate : ∑(　́皿≦へ`)o　́Д`　)
base     : (　́`)
generate : ∑(　́皿`)
base     : (-△-)
generate : ﻿(　̆-
base     : (ΦДΦ)
generate : ﻿(　゚▽)
base     : ♪Ю―(^O^　)ゞ
generate : ∑(　́Д`へ;)


## 感想

このネットワークは，前のとだいたい同じだが，  
Encoder：Embedding＋LSTM＋Linear  
Decoder：Embedding＋LSTM＋Linear

Encoderにも全結合層を付けた．あと，Decoderの入力も少し変更している．

計算例  
入力：（＾－＾）

Encoder  
入力："（"　　　------>　h0  
入力："＾"，h0　------>　h1  
入力："ー"，h1　------>　h2  
入力："＾"，h2　------>　h3  
入力："）"，h3　------>　h4  
系列データを1つに集約する感じ？

Decoder  
入力："BOS", h4　　------>　H0　------>　推論："（"  
入力："（"， H0+h4 ------>　H1　------>　推論："＾"  
入力："＾"， H1+h4 ------>　H2　------>　推論："ー"  
入力："ー"， H2+h4 ------>　H3　------>　推論："＾"  
入力："＾"， H3+h4 ------>　H4　------>　推論："（"  

Decoderの全ての入力にEncoderで求めた潜在ベクトルを足している．
性能は良くなったかは微妙．
LSTMに限界を感じつつある．

あと，入力をバッチにしたから学習が早くなった．