# concat.ipynb
- 2つのモデルで特徴を抽出した後、2つの特徴ベクトルをconcatして1つの特徴ベクトルを作る
- そのベクトルを用いて、距離学習を行う

In [22]:
import torch
import torch.nn as nn
import torch.nn.functional as F

"""
モデル1: ViT -> Transformer ->
input: 画像
flow: image[batch,3,H,W] -> ViT=features[batch, sequence_length, dimension] -> Transformer = [batch, 256, 384]
"""
class Model1(nn.Module): #SimpleViT
    """
    x[int] = [batch_size=8, H=224, W=224]
    out: [batch=8, dim=256]
    """
    def __init__(self, img_dim, img_size=224, patch_size=16, in_channels=3, embed_dim=384, num_patches=196):
        super(Model1, self).__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.embed_dim = embed_dim
        self.num_patches = num_patches

        # パッチ埋め込み: 画像をパッチに分割し、埋め込み次元に変換
        self.patch_embed = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
        
        # ポジションエンコーディング: パッチの位置情報を保持
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches, embed_dim))

        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=8, dim_feedforward=embed_dim * 4, activation="gelu")
        # Transformerブロック: シーケンスデータを処理
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=4)
        # 正規化層
        self.norm = nn.LayerNorm(embed_dim)
        self.sequence_to_vector = SequenceToVector(input_dim=embed_dim, output_dim=256, sequence_length=num_patches)

    def forward(self, x):
        # 入力画像のサイズチェック
        assert x.shape[2] == self.img_size and x.shape[3] == self.img_size, \
            f"Input image size must be ({self.img_size}, {self.img_size})"

        # パッチ埋め込み (B, C, H, W) -> (B, E, num_patches^0.5, num_patches^0.5)
        x = self.patch_embed(x)  # [batch_size, embed_dim, H/patch_size, W/patch_size]
        B, E, H, W = x.shape

        # フラット化してシーケンスデータに変換 (B, E, H*W) -> (B, H*W, E)
        x = x.flatten(2).transpose(1, 2)  # [batch_size, num_patches, embed_dim]

        # ポジションエンコーディングを加える
        x = x + self.pos_embedding

        # Transformerを通す (B, num_patches, embed_dim)
        x = x.transpose(0, 1)  # [num_patches, batch_size, embed_dim]
        x = self.transformer(x)  # [num_patches, batch_size, embed_dim]
        x = x.transpose(0, 1)  # [batch_size, num_patches, embed_dim]

        # 正規化
        x = self.norm(x)  # [batch_size, num_patches, embed_dim]
        x = self.sequence_to_vector(x)
        return x

"""Model2 幾何モデル

Returns:
    _type_: _description_
"""
class Model2(nn.Module):
    def __init__(self, input_dim):
        super(Model2, self).__init__()
        self.image_to_vector = ImageToVector(input_channels=3, img_size=224, embed_dim=256)

    def forward(self, x):
        mid = self.image_to_vector(x)
        return mid
    
class ImageToVector(nn.Module):
    def __init__(self, input_channels=3, img_size=224, embed_dim=384):
        super(ImageToVector, self).__init__()
        self.input_channels = input_channels
        self.img_size = img_size
        self.embed_dim = embed_dim

        # 画像を平坦化して埋め込み次元に変換する線形層
        self.flatten = nn.Flatten()  # (B, C, H, W) -> (B, C * H * W)
        self.fc = nn.Linear(input_channels * img_size * img_size, embed_dim)  # フラット化 -> 埋め込み

    def forward(self, x):
        # 入力画像のサイズをチェック
        assert x.shape[1:] == (self.input_channels, self.img_size, self.img_size), \
            f"Input size must be [batch_size, {self.input_channels}, {self.img_size}, {self.img_size}]"

        # フラット化
        x = self.flatten(x)  # (B, 3, 224, 224) -> (B, 3 * 224 * 224)

        # 線形層でベクトルに変換
        x = self.fc(x)  # (B, 3 * 224 * 224) -> (B, 384)

        return x

class SequenceToVector(nn.Module):
    """
    x = [batch, seq_length, input_dim]
    out: [batch, output_dim=256]
    """
    def __init__(self, input_dim=384, output_dim=256, sequence_length=196):
        super(SequenceToVector, self).__init__()
        self.sequence_length = sequence_length

        # 自己注意を用いた重要な情報の抽出
        self.attention = nn.MultiheadAttention(embed_dim=input_dim, num_heads=8)

        # 線形層で最終的な次元に変換
        self.fc = nn.Linear(input_dim, output_dim)

        # 正規化層
        self.norm = nn.LayerNorm(output_dim)

    def forward(self, x):
        """
        入力: x (batch, sequence_length, input_dim)
        出力: (batch, output_dim)
        """
        # 転置してMultiheadAttentionの入力形式に合わせる (batch, sequence, dim) -> (sequence, batch, dim)
        x = x.transpose(0, 1)

        # 自己注意で重要な特徴を抽出 (sequence, batch, dim)
        attention_output, _ = self.attention(x, x, x)

        # 平均プーリングでシーケンス次元を圧縮 (sequence, batch, dim) -> (batch, dim)
        pooled_output = attention_output.mean(dim=0)

        # 線形層で次元を縮小
        output = self.fc(pooled_output)

        # 正規化
        output = self.norm(output)

        return output
# MLP: 5x次元を4x次元に圧縮
class MLP(nn.Module):
    def __init__(self, input_dim=256, hidden_dim=512, output_dim=256):
        super(MLP, self).__init__()
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),  # 256 → 512
            nn.ReLU(),                        # 活性化関数
            nn.Linear(hidden_dim, output_dim) # 512 → 256
        )

    def forward(self, x):
        return (x)


In [23]:
class CombinedModel(nn.Module):
    def __init__(self, input_dim):
        super(CombinedModel, self).__init__()
        self.model1 = Model1(input_dim)        # モデル1: [BS, 256]
        self.model2 = Model2(input_dim)        # モデル2: [BS, ]
        self.mlp = MLP()

    def forward(self, x):
        # 各モデルから特徴ベクトルを取得
        feature1 = self.model1(x)  # [バッチサイズ, 2x]
        feature2 = self.model2(x)  # [バッチサイズ, 3x]

        from icecream import ic
        ic(feature1.shape, feature2.shape)
        # 特徴ベクトルを結合
        combined_features = torch.cat((feature1, feature2), dim=1)  # [バッチサイズ, 5x]
        # MLPに通して圧縮
        final_features = self.mlp(combined_features)  # [バッチサイズ, 4x]
        return final_features


In [27]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import CosineSimilarity
from torch.optim import Adam
from tqdm import tqdm

def train(model, dataloader, optimizer, device, margin=0.5, num_epochs=3):
    """
    モデルの複数エポック分の学習を行う関数

    Args:
        model: CombinedModel インスタンス
        dataloader: DataLoader インスタンス
        optimizer: Optimizer インスタンス
        device: 使用するデバイス（例: 'cuda'）
        margin: Margin Ranking Loss のマージン値
        num_epochs: 学習するエポック数

    Returns:
        各エポックの平均損失値のリスト
    """
    epoch_losses = []  # 各エポックの平均損失を記録

    for epoch in range(num_epochs):
        model.train()  # 学習モード
        total_loss = 0

        print(f"Epoch [{epoch + 1}/{num_epochs}]")
        for batch in tqdm(dataloader, desc=f"Training Epoch {epoch + 1}"):
            images, labels = batch  # DataLoaderからデータを取得
            images, labels = images.to(device), labels.to(device)

            # 出力特徴ベクトルの取得
            features = model(images)  # [batch_size, 256]

            # 類似度の計算（上位半分と下位半分のペア）
            batch_size = images.size(0)
            similarity = CosineSimilarity(dim=1)(
                features[:batch_size // 2],
                features[batch_size // 2:]
            )

            # Contrastive Loss の計算
            target = torch.zeros_like(similarity)  # すべてのペアが異なるものとして仮定
            loss = F.margin_ranking_loss(
                similarity,
                target,
                labels[:batch_size // 2].float(),
                margin=margin
            )

            # 勾配の計算とパラメータの更新
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # ロスを蓄積
            total_loss += loss.item()

        # 平均損失値を計算して記録
        avg_loss = total_loss / len(dataloader)
        epoch_losses.append(avg_loss)
        print(f"Epoch [{epoch + 1}/{num_epochs}] - Average Loss: {avg_loss:.4f}")

    return epoch_losses


In [28]:
# ダミーデータの準備
from torch.utils.data import DataLoader, TensorDataset

batch_size = 8
input_dim = 24  # 入力次元
num_batches = 10
num_samples = batch_size * num_batches

# ランダムな画像とラベルデータを生成
images = torch.randn(num_samples, 3, 224, 224)
labels = torch.randint(0, 2, (num_samples,))  # 0: 異なるペア, 1: 同じペア

# DataLoader
dataset = TensorDataset(images, labels)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# モデルとオプティマイザの準備
device = 'cuda' if torch.cuda.is_available() else 'cpu'
combined_model = CombinedModel(input_dim).to(device)
optimizer = Adam(combined_model.parameters(), lr=1e-3)

# 3エポック分の学習
epoch_losses = train(combined_model, dataloader, optimizer, device, num_epochs=3)
print(f"Epoch Losses: {epoch_losses}")


Epoch [1/3]


Training Epoch 1:   0%|          | 0/10 [00:00<?, ?it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 1:  30%|███       | 3/10 [00:00<00:00, 23.73it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 1:  60%|██████    | 6/10 [00:00<00:00, 24.17it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 1:  90%|█████████ | 9/10 [00:00<00:00, 24.35it/s]ic| feature1.s

Epoch [1/3] - Average Loss: 0.2750
Epoch [2/3]


Training Epoch 2:   0%|          | 0/10 [00:00<?, ?it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 2:  30%|███       | 3/10 [00:00<00:00, 24.42it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 2:  60%|██████    | 6/10 [00:00<00:00, 24.71it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 2:  90%|█████████ | 9/10 [00:00<00:00, 24.37it/s]ic| feature1.s

Epoch [2/3] - Average Loss: 0.2125
Epoch [3/3]


Training Epoch 3:   0%|          | 0/10 [00:00<?, ?it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 3:  30%|███       | 3/10 [00:00<00:00, 24.75it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 3:  60%|██████    | 6/10 [00:00<00:00, 23.95it/s]ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
ic| feature1.shape: torch.Size([8, 256])
    feature2.shape: torch.Size([8, 256])
Training Epoch 3:  90%|█████████ | 9/10 [00:00<00:00, 22.74it/s]ic| feature1.s

Epoch [3/3] - Average Loss: 0.2250
Epoch Losses: [0.275, 0.2125, 0.225]
