# Pythonで学ぶ画像認識　第6章 画像キャプショニング
## 第6.4節 アテンション機構による手法〜Show, attend and tellを実装してみよ

###モジュールのインポートとGoogleドライブのマウント

In [None]:
import os
import glob
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import datetime
from tqdm import tqdm
import pickle
import random
from torch.utils.data.sampler import SubsetRandomSampler
from pycocotools.coco import COCO
from PIL import Image
import skimage.transform
from typing import Sequence, Dict, Tuple, Union
from collections import deque

import torch
from torch import nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_padded_sequence
from torchvision import models
import torchvision.transforms as T
import torchvision.datasets as dataset
from torch.utils.data.dataset import Subset

from google.colab import drive
drive.mount('/content/drive')

import sys
sys.path.append('drive/MyDrive/python_image_recognition/6_img_captioning/6_4_show_attend_and_tell')

import util

### エンコーダの実装

In [None]:
class CNNEncoder(nn.Module):
    '''
    Show, attend and tellのエンコーダ
    encoded_img_size: 画像部分領域サイズ
    '''
    def __init__(self, encoded_img_size: int):
        super().__init__()

        # IMAGENET1K_V2で事前学習された
        # ResNet152モデルをバックボーンとする
        resnet = models.resnet152(weights="IMAGENET1K_V2") 

        # AdaptiveAvgPool2dで部分領域(14x14)を作成        
        resnet.avgpool = nn.AdaptiveAvgPool2d(encoded_img_size)

        # 特徴抽出器として使うため全結合層を削除
        modules = list(resnet.children())[:-1]
        self.backbone = nn.Sequential(*modules)

    '''
    エンコーダの順伝播
    imgs : 入力画像, [バッチサイズ, チャネル数, 高さ, 幅]
    '''
    @torch.no_grad()
    def forward(self, imgs: torch.Tensor):
        # 特徴抽出
        features = self.backbone(imgs)

        # 並び替え -> [バッチサイズ, 特徴マップの幅 * 高さ, チャネル数]
        features = features.permute(0, 2, 3, 1).flatten(1, 2)

        return features

### アテンション機構の実装

In [None]:
class Attention(nn.Module):
    '''
    アテンション機構 (Attention mechanism)
    dim_encoder  : エンコーダ出力の特徴次元
    dim_decoder  : デコーダ出力の次元
    dim_attention: アテンション機構の次元
    '''
    def __init__(self, dim_encoder: int, 
                 dim_decoder: int, dim_attention: int):
        super().__init__()

        # z: エンコーダ出力を変換する線形層(Wz)
        self.encoder_att = nn.Linear(dim_encoder, dim_attention)

        # h: デコーダ出力を変換する線形層(Wh)
        self.decoder_att = nn.Linear(dim_decoder, dim_attention)

        # e: アライメントスコアを計算するための線形層
        self.full_att = nn.Linear(dim_attention, 1)

        # α: アテンション重みを計算する活性化関数
        self.relu = nn.ReLU(inplace=True)

    '''
    Attentionの順伝播
    encoder_out   : エンコーダ出力,
                    [バッチサイズ, 特徴マップの幅 * 高さ, チャネル数]
    decoder_hidden: デコーダ隠れ状態の次元
    '''
    def forward(self, encoder_out: torch.Tensor, 
                decoder_hidden: torch.Tensor):
        # e: アライメントスコア
        att1 = self.encoder_att(encoder_out)    # Wz * z
        att2 = self.decoder_att(decoder_hidden) # Wh * h_{t-1}
        att = self.full_att(
                self.relu(att1 + att2.unsqueeze(1))).squeeze(2) 

        # α: T個の部分領域ごとのアテンション重み
        alpha = att.softmax(dim=1)

        # c: コンテキストベクトル
        context_vector = (encoder_out * alpha.unsqueeze(2)).sum(dim=1)

        return context_vector, alpha

### アテンション機構付きデコーダの実装

In [None]:
class RNNDecoderWithAttention(nn.Module):
    '''
    アテンション機構付きデコーダネットワーク
    dim_attention: アテンション機構の次元
    dim_embedding: 埋込み次元
    dim_encoder  : エンコーダ出力の特徴量次元
    dim_decoder  : デコーダの次元
    vocab_size   : 辞書の次元
    dropout      : ドロップアウト確率
    '''
    def __init__(self, dim_attention: int, dim_embedding: int, 
                 dim_encoder: int, dim_decoder: int,
                 vocab_size: int, dropout: float=0.5):
        super().__init__()

        self.vocab_size = vocab_size

        # アテンション機構
        self.attention = Attention(dim_encoder, dim_decoder, 
                                   dim_attention)

        # 単語の埋め込み
        self.embed = nn.Embedding(vocab_size, dim_embedding)
        self.dropout = nn.Dropout(dropout)

        # LSTMセル
        self.decode_step = nn.LSTMCell(dim_embedding + dim_encoder, 
                                       dim_decoder, bias=True)

        # LSTM隠れ状態/メモリセルの初期値を生成する全結合層
        self.init_linear = nn.Linear(dim_encoder, dim_decoder * 2)

         # シグモイド活性化前の全結合層
        self.f_beta = nn.Linear(dim_decoder, dim_encoder)

        # 単語出力用の全結合層
        self.linear = nn.Linear(dim_decoder, vocab_size)

        # 埋め込み層、全結合層の重みを初期化
        self._reset_parameters()
        
    '''
    パラメータの初期化関数
    '''
    def _reset_parameters(self):
        nn.init.uniform_(self.embed.weight, -0.1, 0.1)
        nn.init.uniform_(self.linear.weight, -0.1, 0.1)
        nn.init.constant_(self.linear.bias, 0)

    '''
    アテンション機構付きデコーダの順伝播
    features: エンコーダ出力,
              [バッチサイズ, 特徴マップの幅 * 高さ, チャネル数]
    captions: キャプション, [バッチサイズ, 最大系列長]
    lengths : 系列長のリスト
    '''
    def forward(self, features: torch.Tensor, captions: torch.Tensor,
                lengths: list):
        # バッチサイズの取得
        bs = features.shape[0]

        # 単語埋込み
        embeddings = self.embed(captions)

        # 隠れ状態ベクトル、メモリセルの初期値を生成
        mean_features = features.mean(dim=1)
        init_state = self.init_linear(mean_features)
        h, c = init_state.chunk(2, dim=1)

        # 最大系列長（<start>を除く）
        dec_lengths = [length - 1 for length in lengths]

        # キャプショニング結果を保持するためのテンソル
        preds = features.new_zeros(
            (bs, max(dec_lengths), self.vocab_size))

        # センテンス予測処理
        for t in range(max(dec_lengths)):
            bs_valid = sum([l > t for l in dec_lengths])

            # コンテキストベクトル, アテンション重み
            context_vector, _ = self.attention(
                features[:bs_valid], h[:bs_valid])

            # LSTMセル
            gate = self.f_beta(h[:bs_valid]).sigmoid()
            context_vector = gate * context_vector
            context_vector = torch.cat(
                (embeddings[:bs_valid, t], context_vector), dim=1)
            h, c = self.decode_step(
                context_vector, (h[:bs_valid], c[:bs_valid]))
            
            # 単語予測
            pred = self.linear(self.dropout(h))

            # 情報保持
            preds[:bs_valid, t] = pred

        # Show and tellの出力に合わせる
        preds = pack_padded_sequence(preds, dec_lengths, 
                                     batch_first=True)

        return preds

    '''
    サンプリングによる説明文出力（ビームサーチ無し）
    features  : エンコーダ出力特徴,
                [1, 特徴マップの幅 * 高さ, 埋め込み次元]
    word_to_id: 単語->単語ID辞書
    '''    
    def sample(self, features: torch.Tensor, word_to_id: list):        
        # 隠れ状態ベクトル、メモリセルの初期値を生成
        mean_features = features.mean(dim=1)
        init_state = self.init_linear(mean_features)
        h, c = init_state.chunk(2, dim=1)

        # センテンス生成の初期値として<start>を埋め込み
        id_start = word_to_id['<start>']
        prev_word = features.new_tensor((id_start,),
                                        dtype=torch.int64)

        # サンプリングによるセンテンス生成
        preds = []
        alphas = []
        for _ in range(50):
            # 単語埋め込み
            embeddings = self.embed(prev_word)

            # コンテキストベクトル, アテンション重み
            context_vector, alpha = self.attention(features, h)
            
            # LSTMセル
            gate = self.f_beta(h).sigmoid()
            context_vector = gate * context_vector
            context_vector = torch.cat(
                (embeddings, context_vector), dim=1)
            h, c = self.decode_step(context_vector, (h, c))

            # 単語予測
            pred = self.linear(h)
            pred = pred.softmax(dim=1)
            prev_word = pred.argmax(dim=1)

            # 予測結果とアテンション重みを保存
            preds.append(prev_word[0].item())
            alphas.append(alpha)

        return preds, alphas

###学習におけるハイパーパラメータやオプションの設定

In [None]:
class ConfigTrain(object):
    '''
    ハイパーパラメータ、グローバル変数の設定
    '''  
    def __init__(self):

        # ハイパーパラメータ
        self.enc_img_size = 14     # Attention計算用画像サイズ
        self.dim_attention = 128   # Attention層の次元
        self.dim_embedding = 128   # 埋め込み層の次元
        self.dim_encoder = 2048    # エンコーダの特徴マップのチャネル数
        self.dim_hidden = 128      # LSTM隠れ層の次元
        self.lr = 0.001            # 学習率
        self.dropout = 0.5         # dropout確率
        self.batch_size = 30       # ミニバッチ数
        self.num_epochs = 30       # エポック数
        
        # パスの設定
        self.img_directory = 'val2014'
        self.anno_file = 'drive/MyDrive/python_image_recognition/data/coco2014/captions_val2014.json'
        self.word_to_id_file = 'drive/MyDrive/python_image_recognition/6_img_captioning/vocab/word_to_id.pkl'
        self.save_directory = 'drive/MyDrive/python_image_recognition/6_img_captioning/model'

        # 検証に使う学習セット内のデータの割合
        self.val_ratio = 0.3

        # データローダーに使うCPUプロセスの数
        self.num_workers = 4

        # 学習に使うデバイス
        self.device = 'cuda'

        # 移動平均で計算する損失の値の数
        self.moving_avg = 100

### 学習を行う関数

In [None]:
def train():
    config = ConfigTrain()

    # 辞書（単語→単語ID）の読み込み
    with open(config.word_to_id_file, 'rb') as f:
        word_to_id = pickle.load(f)

    # 辞書サイズを保存
    vocab_size = len(word_to_id)
        
    # モデル出力用のディレクトリを作成
    os.makedirs(config.save_directory, exist_ok=True)

    # 画像のtransformsを定義
    transforms = T.Compose([
        T.Resize((224, 224)),
        T.RandomHorizontalFlip(),
        T.ToTensor(),
        # ImageNetデータセットの平均と標準偏差
        T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) 
    ])

    # COCOデータロードの定義
    train_dataset = dataset.CocoCaptions(root=config.img_directory, 
                                         annFile=config.anno_file, 
                                         transform=transforms)
    
    # Subset samplerの生成
    val_set, train_set = util.generate_subset(
        train_dataset, config.val_ratio)

    # 学習時にランダムにサンプルするためのサンプラー
    train_sampler = SubsetRandomSampler(train_set)

    # DataLoaderを生成
    collate_func_lambda = lambda x: util.collate_func(x, word_to_id)
    train_loader = torch.utils.data.DataLoader(
                        train_dataset, 
                        batch_size=config.batch_size, 
                        num_workers=config.num_workers, 
                        sampler=train_sampler,
                        collate_fn=collate_func_lambda)
    val_loader = torch.utils.data.DataLoader(
                        train_dataset, 
                        batch_size=config.batch_size, 
                        num_workers=config.num_workers, 
                        sampler=val_set,
                        collate_fn=collate_func_lambda)

    # モデルの定義
    encoder = CNNEncoder(config.enc_img_size)
    decoder = RNNDecoderWithAttention(config.dim_attention,
                                      config.dim_embedding, 
                                      config.dim_encoder,
                                      config.dim_hidden,
                                      vocab_size,
                                      config.dropout)
    encoder.to(config.device)
    decoder.to(config.device)
    
    # 損失関数の定義
    loss_func = lambda x, y: F.cross_entropy(
        x, y, ignore_index=word_to_id.get('<null>', None))
    
    # 最適化手法の定義
    optimizer = torch.optim.AdamW(decoder.parameters(), lr=config.lr)
    
    # 学習経過の書き込み
    now = datetime.datetime.now()
    train_loss_file = f'{config.save_directory}/' \
    '6-4_train_loss_{now.strftime("%Y%m%d_%H%M%S")}.csv'
    val_loss_file = f'{config.save_directory}/' \
    '6-4_val_loss_{"%Y%m%d_%H%M%S"}.csv'

    # 学習
    val_loss_best = float('inf')
    for epoch in range(config.num_epochs):
        with tqdm(train_loader) as pbar:
            pbar.set_description(f'[エポック {epoch + 1}]')

            # 学習モードに設定
            encoder.train()
            decoder.train()

            train_losses = deque()
            for i, (imgs, captions, lengths) in enumerate(pbar):
                # ミニバッチを設定
                imgs = imgs.to(config.device)
                captions = captions.to(config.device)

                optimizer.zero_grad()

                # エンコーダ-デコーダモデル
                features = encoder(imgs)
                outputs = decoder(features, captions, lengths)

                # ロスの計算
                captions = captions[:, 1:] 
                lengths = [length - 1 for length in lengths]
                targets = pack_padded_sequence(captions, lengths, 
                                               batch_first=True)
                loss = loss_func(outputs.data, targets.data)

                # 誤差逆伝播
                loss.backward()
                
                optimizer.step()

                # 学習時の損失をログに書き込み
                train_losses.append(loss.item())
                if len(train_losses) > config.moving_avg:
                    train_losses.popleft()
                pbar.set_postfix({
                    'loss': torch.Tensor(train_losses).mean().item()})
                with open(train_loss_file, 'a') as f:
                    print(f'{epoch}, {loss.item()}', file=f)

        # 検証
        with tqdm(val_loader) as pbar:
            pbar.set_description(f'[検証]')

            # 評価モード
            encoder.eval()
            decoder.eval()

            val_losses = []
            for j, (imgs, captions, lengths) in enumerate(pbar):

                # ミニバッチを設定
                imgs = imgs.to(config.device)
                captions = captions.to(config.device)

                # エンコーダ-デコーダモデル
                features = encoder(imgs)
                outputs = decoder(features, captions, lengths)

                # ロスの計算
                captions = captions[:, 1:] 
                lengths = [length - 1 for length in lengths]
                targets = pack_padded_sequence(captions, lengths, 
                                               batch_first=True)
                val_loss = loss_func(outputs.data, targets.data)
                val_losses.append(val_loss.item())

                # Validation Lossをログに書き込み
                with open(val_loss_file, 'a') as f:
                    print(f'{epoch}, {val_loss.item()}', file=f)

        # Loss 表示
        val_loss = np.mean(val_losses)
        print(f'Validation loss: {val_loss}')

        # より良い検証結果が得られた場合、モデルを保存
        if val_loss < val_loss_best:
            val_loss_best = val_loss

            # エンコーダモデルを保存
            torch.save(
                encoder.state_dict(),
                f'{config.save_directory}/6-4_encoder_best.pth')

            # デコーダモデルを保存
            torch.save(
                decoder.state_dict(),
                f'{config.save_directory}/6-4_decoder_best.pth')

###学習データの解凍

In [None]:
!unzip drive/MyDrive/python_image_recognition/data/coco2014/val2014.zip

###学習の実行

In [None]:
train()

###デモにおけるハイパーパラメータやオプションの設定

In [None]:
class ConfigDemo(object):
    '''
    ハイパーパラメータ、グローバル変数の設定
    '''  
    def __init__(self):

        # ハイパーパラメータ
        self.enc_img_size = 14     # Attention計算用画像サイズ
        self.dim_attention = 128   # Attention層の次元
        self.dim_embedding = 128   # 埋め込み層の次元
        self.dim_encoder = 2048    # エンコーダの特徴マップのチャネル数
        self.dim_hidden = 128      # LSTM隠れ層の次元
        
        # パスの設定
        # 画像キャプショニング推論
        self.word_to_id_file = 'drive/MyDrive/python_image_recognition/6_img_captioning/vocab/word_to_id.pkl'
        self.id_to_word_file = 'drive/MyDrive/python_image_recognition/6_img_captioning/vocab/id_to_word.pkl'
        self.img_dirirectory = 'drive/MyDrive/python_image_recognition/data/image_captioning/'    
        self.save_directory = 'drive/MyDrive/python_image_recognition/6_img_captioning/model'
        
        # 学習に使うデバイス
        self.device = 'cuda'

###デモを行う関数

In [None]:
def demo():
    config = ConfigDemo()

    # 辞書（単語→単語ID）の読み込み
    with open(config.word_to_id_file, 'rb') as f:
        word_to_id = pickle.load(f)

    # 辞書（単語ID→単語）の読み込み
    with open(config.id_to_word_file, 'rb') as f:
        id_to_word = pickle.load(f)

    # 辞書サイズを保存
    vocab_size = len(id_to_word)
    
    # 画像のtransformsを定義
    transforms = T.Compose([
        T.Resize((224, 224)),
        T.ToTensor(),
        # ImageNetデータセットの平均と標準偏差
        T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) 
    ])
    
    # モデルの定義
    encoder = CNNEncoder(config.enc_img_size)
    decoder = RNNDecoderWithAttention(config.dim_attention,
                                      config.dim_embedding, 
                                      config.dim_encoder,
                                      config.dim_hidden,
                                      vocab_size)
    encoder.to(config.device)
    decoder.to(config.device)
    encoder.eval()
    decoder.eval()

    # モデルの学習済み重みパラメータをロード
    encoder.load_state_dict(
        torch.load(f'{config.save_directory}/6-4_encoder_best.pth'))
    decoder.load_state_dict(
        torch.load(f'{config.save_directory}/6-4_decoder_best.pth'))

    # ディレクトリ内の画像を対象としてキャプショニング実行
    for img_file in sorted(
        glob.glob(os.path.join(config.img_dirirectory, '*.jpg'))):

        # 画像読み込み
        img = Image.open(img_file)
        img = transforms(img)
        img = img.unsqueeze(0)
        img = img.to(config.device)

        # エンコーダ・デコーダモデルによる予測
        with torch.no_grad():
            feature = encoder(img)
            sampled_ids, alphas = decoder.sample(feature, word_to_id)

        # 入力画像を表示
        img_plt = Image.open(img_file)
        img_plt = img_plt.resize([224, 224], Image.LANCZOS)
        plt.imshow(img_plt)
        plt.axis('off')
        plt.show()
        print(f'入力画像: {os.path.basename(img_file)}')

        # 画像キャプショニングの実行
        sampled_caption = []
        for word_id, alpha in zip(sampled_ids, alphas):
            word = id_to_word[word_id]
            sampled_caption.append(word)
            
            alpha = alpha.view(
                config.enc_img_size, config.enc_img_size)
            alpha = alpha.to('cpu').numpy()
            alpha = skimage.transform.pyramid_expand(
                alpha, upscale=16, sigma=8)
            
            # タイムステップtの画像をプロット
            plt.imshow(img_plt)
            plt.text(0, 1, f'{word}', color='black',
                     backgroundcolor='white', fontsize=12)
            plt.imshow(alpha, alpha=0.8)
            plt.set_cmap(cm.Greys_r)
            plt.axis('off')
            plt.show()
            
            if word == '<end>':
                break
        
        sentence = ' '.join(sampled_caption)
        print(f'出力キャプション: {sentence}')

        # 推定結果を書き込み
        gen_sentence_out = img_file[:-4] + '_show_attend_and_tell.txt'
        with open(gen_sentence_out, 'w') as f:
            print(sentence, file=f)

###デモの実行

In [None]:
demo()