# CLIP模型的处理

In [1]:
from PIL import Image
import requests
from transformers import CLIPProcessor, CLIPModel

devices = "cuda:0"

clip_model = CLIPModel.from_pretrained("/data4/zxf/hf/openai/clip-vit-large-patch14").to(devices)
processor = CLIPProcessor.from_pretrained("/data4/zxf/hf/openai/clip-vit-large-patch14")

In [2]:
import torch
import torch.nn as nn

class VisualProjection(nn.Module):
    def __init__(self, visual_projection):
        super().__init__()
        self.visual_projection = visual_projection

    def forward(self, x):
        """
        将输入张量 x 映射到 (batch, len, 768)
        """
        x = self.visual_projection(x)
        return x


class TextProjection(nn.Module):
    def __init__(self, visual_projection):
        super().__init__()
        self.visual_projection = visual_projection

    def forward(self, x):
        """
        将输入张量 x 映射到 (batch, len, 768)
        """
        x = self.visual_projection(x)
        
        return x
    

visual_projection = clip_model.visual_projection
Text_projection = clip_model.text_projection


# 创建 VisualProjection 模块并进行测试
Visual_module = VisualProjection(visual_projection).to(devices)
# input_tensor_1 = outputs["vision_model_output"]["last_hidden_state"]
# Visual_output_tensor = Visual_module(input_tensor_1)



Text_module = TextProjection(Text_projection).to(devices)
# input_tensor_2 = outputs["text_model_output"]["last_hidden_state"]
# Text_output_tensor = Text_module(input_tensor_2)

# print(Text_output_tensor.size())
# print(Visual_output_tensor.size())

In [3]:
def CLIP_pipeline(x1,x2):
    tmp_inputs_text = processor(text=x1, return_tensors="pt", padding=True,truncation=True, max_length=77).to(devices)
    tmp_inputs_image = processor(images=x2, return_tensors="pt").to(devices)
    
    outputs_1 = clip_model.text_model(**tmp_inputs_text)
    outputs_2 = clip_model.vision_model(**tmp_inputs_image)
    outputs_tensor_1 = outputs_1["last_hidden_state"]
    Text_output_tensor = Text_module(outputs_tensor_1)
    outputs_tensor_2 = outputs_2["last_hidden_state"]
    Visual_output_tensor = Visual_module(outputs_tensor_2)
    return Text_output_tensor,Visual_output_tensor

# x1 = ["a photo of a cat", "a photo of a dog"]
# x2 = [Image.open(requests.get("http://images.cocodataset.org/val2017/000000039769.jpg", stream=True).raw),
#       Image.open(requests.get("http://images.cocodataset.org/val2017/000000397133.jpg", stream=True).raw)]

# x1,x2 = CLIP_pipeline(x1,x2)

# print(x1.size())
# print(x2.size())



# 神经网络层的构建

In [4]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F




class multimodal_attention(nn.Module):
    """
    dot-product attention mechanism
    """

    def __init__(self, attention_dropout=0.5):
        super(multimodal_attention, self).__init__()
        self.dropout = nn.Dropout(attention_dropout)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, q, k, v, scale=None, attn_mask=None):

        attention = torch.matmul(q, k.transpose(-2, -1))
        # print('attention.shape:{}'.format(attention.shape))
        if scale:
            attention = attention * scale

        if attn_mask:
            attention = attention.masked_fill_(attn_mask, -np.inf)
            
        attention = self.softmax(attention)
        # print('attention.shftmax:{}'.format(attention))
        attention = self.dropout(attention)
        v_result = torch.matmul(attention, v)
        # print('attn_final.shape:{}'.format(attention.shape))

        return v_result





class CrossAttention(nn.Module):
    """
    Multi-Head Cross Attention mechanism
    """

    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(CrossAttention, self).__init__()

        self.model_dim = model_dim
        self.dim_per_head = model_dim // num_heads
        self.num_heads = num_heads

        self.linear_q = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_k = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_v = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)

        self.dot_product_attention = multimodal_attention(dropout)
        self.linear_final = nn.Linear(model_dim, model_dim, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, query, key, value, attn_mask=None):
        residual = query

        # Linear projection
        query = self.linear_q(query)
        key = self.linear_k(key)
        value = self.linear_v(value)

        # Split by heads
        batch_size = query.size(0)
        query = query.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        key = key.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        value = value.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)

        # Scaled dot product attention
        scale = (self.dim_per_head) ** -0.5
        attention = self.dot_product_attention(query, key, value, scale, attn_mask)

        # Concatenate heads
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.dim_per_head)

        # Final linear projection
        output = self.linear_final(attention)

        # Dropout
        output = self.dropout(output)

        # Add residual and norm layer
        output = self.layer_norm(residual + output)

        return output

class MultiHeadCrossAttention(nn.Module):
    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(MultiHeadCrossAttention, self).__init__()

        self.model_dim = model_dim
        self.dim_per_head = model_dim // num_heads
        self.num_heads = num_heads

        self.cross_attention = CrossAttention(model_dim, num_heads, dropout)
        self.layer_norm = nn.LayerNorm(model_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x1, x2, attn_mask=None):
        # Cross attention from x1 to x2
        cross_attn_output_1 = self.cross_attention(x1, x2, x2, attn_mask)
        # Cross attention from x2 to x1
        cross_attn_output_2 = self.cross_attention(x2, x1, x1, attn_mask)

        # Combine the outputs
        output_1 = self.layer_norm(x1 + cross_attn_output_1)
        output_2 = self.layer_norm(x2 + cross_attn_output_2)

        return output_1, output_2

# # Example usage
# batch_1, len_1, dim = 2, 10, 768
# batch_2, len_2, dim = 2, 15, 768

# x1 = torch.randn(batch_1, len_1, dim)
# x2 = torch.randn(batch_2, len_2, dim)

# layer = MultiHeadCrossAttention(model_dim=768, num_heads=8, dropout=0.5)
# output_1, output_2 = layer(x1, x2)


# print("output_1 shape:", output_1.size())  # Expected: [batch_1, len_1, 768]
# print("output_2 shape:", output_2.size())  # Expected: [batch_2, len_2, 768]


In [5]:
class MultiHeadSelfAttention(nn.Module):
    """
    Multi-Head Self Attention mechanism
    """

    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(MultiHeadSelfAttention, self).__init__()

        self.model_dim = model_dim
        self.dim_per_head = model_dim // num_heads
        self.num_heads = num_heads

        self.linear_q = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_k = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_v = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)

        self.dot_product_attention = multimodal_attention(dropout)
        self.linear_final = nn.Linear(model_dim, model_dim, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, x, attn_mask=None):
        residual = x

        # Linear projection
        query = self.linear_q(x)
        key = self.linear_k(x)
        value = self.linear_v(x)

        # Split by heads
        batch_size = query.size(0)
        query = query.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        key = key.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        value = value.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)

        # Scaled dot product attention
        scale = (self.dim_per_head) ** -0.5
        attention = self.dot_product_attention(query, key, value, scale, attn_mask)

        # Concatenate heads
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.dim_per_head)

        # Final linear projection
        output = self.linear_final(attention)

        # Dropout
        output = self.dropout(output)

        # Add residual and norm layer
        output = self.layer_norm(residual + output)

        return output

# # Example usage
# batch_size = 2
# seq_len = 10
# model_dim = 768

# x = torch.randn(batch_size, seq_len, model_dim)

# self_attention = MultiHeadSelfAttention(model_dim=model_dim, num_heads=8, dropout=0.5)
# output = self_attention(x)

# print("output shape:", output.size())  # Expected: [batch_size, seq_len, model_dim]

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.dim_per_head = model_dim // num_heads
        
        self.linear_q = nn.Linear(model_dim, model_dim)
        self.linear_k = nn.Linear(model_dim, model_dim)
        self.linear_v = nn.Linear(model_dim, model_dim)
        
        self.dropout = nn.Dropout(dropout)
        self.softmax = nn.Softmax(dim=-1)
        self.linear_out = nn.Linear(model_dim, model_dim)
        
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        query = self.linear_q(query).view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        key = self.linear_k(key).view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        value = self.linear_v(value).view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        
        scores = torch.matmul(query, key.transpose(-2, -1)) / (self.dim_per_head ** 0.5)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        attn = self.softmax(scores)
        attn = self.dropout(attn)
        
        context = torch.matmul(attn, value).transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.dim_per_head)
        output = self.linear_out(context)
        
        return output

class CoAttention(nn.Module):
    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(CoAttention, self).__init__()
        self.attention1 = MultiHeadAttention(model_dim, num_heads, dropout)
        self.attention2 = MultiHeadAttention(model_dim, num_heads, dropout)
        self.linear_out = nn.Linear(2 * model_dim, model_dim)
        self.layer_norm = nn.LayerNorm(model_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x1, x2, mask1=None, mask2=None):
        attn_output1 = self.attention1(x1, x2, x2, mask2)
        attn_output2 = self.attention2(x2, x1, x1, mask1)
        
        combined = torch.cat([attn_output1.mean(dim=1), attn_output2.mean(dim=1)], dim=-1)
        output = self.dropout(self.linear_out(combined))
        output = self.layer_norm(output)
        
        return output


# class CoAttention(nn.Module):
#     def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
#         super(CoAttention, self).__init__()
#         self.attention1 = MultiHeadAttention(model_dim, num_heads, dropout)
#         self.attention2 = MultiHeadAttention(model_dim, num_heads, dropout)
#         self.linear_out = nn.Linear(2 * model_dim, model_dim)
#         self.layer_norm1 = nn.LayerNorm(model_dim)
#         self.layer_norm2 = nn.LayerNorm(model_dim)
#         self.dropout = nn.Dropout(dropout)
        
#     def forward(self, x1, x2, mask1=None, mask2=None):
#         attn_output1 = self.layer_norm1(x1 + self.attention1(x1, x2, x2, mask2))
#         attn_output2 = self.layer_norm2(x2 + self.attention2(x2, x1, x1, mask1))
#         # print(attn_output1.size())
#         # print(attn_output2.size())
        
        
#         # 使用更复杂的pooling策略,比如max pooling或者learned pooling
#         pooled1 = attn_output1.max(dim=1)[0]
#         pooled2 = attn_output2.max(dim=1)[0]
        
#         # print(pooled1.size())
#         # print(pooled2.size())
        
#         combined = torch.cat([pooled1, pooled2], dim=-1)
#         output = self.dropout(self.linear_out(combined))
        
#         return output


# Example usage
batch_size, len_1, len_2, dim = 2, 10, 15, 768

x1 = torch.randn(batch_size, len_1, dim)
x2 = torch.randn(batch_size, len_2, dim)

model = CoAttention(model_dim=dim, num_heads=8, dropout=0.5)
output = model(x1, x2)

print("output shape:", output.size())  # Expected: [batch, 768]

In [7]:

class MLP(nn.Module):
    def __init__(self, in_features, out_features, hidden_size=256, dropout=0.5):
        super(MLP, self).__init__()
        self.linear1 = nn.Linear(in_features, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(hidden_size, out_features)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x
    
    

class VLR(nn.Module):
    def __init__(self, dim=768):
        super(VLR, self).__init__()
        self.cross_layer_1 = MultiHeadCrossAttention(model_dim=dim, num_heads=16, dropout=0.5)
        self.co_layer_1 = CoAttention(model_dim=dim, num_heads=16, dropout=0.5)
        
        
        # self.layer_norm_1 = nn.LayerNorm(model_dim)
        
        # self.self_attention_1 = MultiHeadSelfAttention(model_dim=dim, num_heads=8, dropout=0.5)
        # self.self_attention_2 = MultiHeadSelfAttention(model_dim=dim, num_heads=8, dropout=0.5)
        
        
        self.mlp = MLP(in_features=dim, out_features=2)

    def forward(self, x1, x2):
        x1, x2 = self.cross_layer_1(x1, x2)
        # x1 = self.self_attention_1(x1)
        # x2 = self.self_attention_2(x2)
        
        
        
        output = self.co_layer_1(x1, x2)
        output = self.mlp(output)
        output = F.softmax(output, dim=-1)
        return output




# 训练过程

In [8]:

import torch.optim as optim
import json


class Trainer:
    def __init__(self, model, train_loader, test_loader, criterion, optimizer, device):
        self.model = model
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device


    def train_epoch(self):
        self.model.train()
        running_loss = 0.0
        for x1, x2, labels in self.train_loader:

            x1, x2 = CLIP_pipeline(x1,x2)
            x1, x2, labels = x1.to(self.device), x2.to(self.device), labels.to(self.device)
            self.optimizer.zero_grad()
            outputs = self.model(x1, x2)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()
            running_loss += loss.item() * x1.size(0)
        epoch_loss = running_loss / len(self.train_loader.dataset)
        return epoch_loss


    def test(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for x1, x2, labels in self.test_loader:
                x1, x2 = CLIP_pipeline(x1,x2)
                x1, x2, labels = x1.to(self.device), x2.to(self.device), labels.to(self.device)
                outputs = self.model(x1, x2)
                loss = self.criterion(outputs, labels)
                running_loss += loss.item() * x1.size(0)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        epoch_loss = running_loss / len(self.test_loader.dataset)
        accuracy = correct / total
        return epoch_loss, accuracy
    


    def test_with_least_confidence(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        uncertainty_results = []  # To store uncertainty results
        
        with torch.no_grad():
            for x1, x2, labels in self.test_loader:
                x1, x2 = CLIP_pipeline(x1, x2)
                x1, x2, labels = x1.to(self.device), x2.to(self.device), labels.to(self.device)
                outputs = self.model(x1, x2)
                loss = self.criterion(outputs, labels)
                running_loss += loss.item() * x1.size(0)
                
                # Least Confidence
                probs = F.softmax(outputs, dim=1)
                max_probs, _ = torch.max(probs, dim=1)
                uncertainties = 1 - max_probs  # Least confidence
                
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                
                # Collect results for each sample
                for i in range(len(labels)):
                    uncertainty_results.append({
                        'true_label': labels[i].item(),
                        'predicted_label': predicted[i].item(),
                        'uncertainty': uncertainties[i].item()
                    })

        epoch_loss = running_loss / len(self.test_loader.dataset)
        accuracy = correct / total
        
        # Save results to JSON file
        with open('least_confidence_results.json', 'w') as f:
            json.dump(uncertainty_results, f, indent=4)

        return epoch_loss, accuracy   
        
        
        
        
    def test_with_prediction_entropy(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        uncertainty_results = []  # To store uncertainty results
        
        with torch.no_grad():
            for x1, x2, labels in self.test_loader:
                x1, x2 = CLIP_pipeline(x1, x2)
                x1, x2, labels = x1.to(self.device), x2.to(self.device), labels.to(self.device)
                outputs = self.model(x1, x2)
                loss = self.criterion(outputs, labels)
                running_loss += loss.item() * x1.size(0)
                
                # Prediction Entropy
                probs = F.softmax(outputs, dim=1)
                entropy = -torch.sum(probs * torch.log(probs + 1e-8), dim=1)  # Avoid log(0)
                
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                
                # Collect results for each sample
                for i in range(len(labels)):
                    uncertainty_results.append({
                        'true_label': labels[i].item(),
                        'predicted_label': predicted[i].item(),
                        'uncertainty': entropy[i].item()
                    })

        epoch_loss = running_loss / len(self.test_loader.dataset)
        accuracy = correct / total
        
        # Save results to JSON file
        with open('prediction_entropy_results.json', 'w') as f:
            json.dump(uncertainty_results, f, indent=4)

        return epoch_loss, accuracy

        
    def test_with_monte_carlo_dropout(self, T=8):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        uncertainty_results = []  # To store uncertainty results
        
        # Enable dropout during inference (Monte Carlo Dropout)
        def apply_dropout(model):
            for module in model.modules():
                if isinstance(module, torch.nn.Dropout):
                    module.train()
            return model

        with torch.no_grad():
            for x1, x2, labels in self.test_loader:
                x1, x2 = CLIP_pipeline(x1, x2)
                x1, x2, labels = x1.to(self.device), x2.to(self.device), labels.to(self.device)
                
                # Apply dropout during inference
                self.model = apply_dropout(self.model)
                
                all_outputs = []
                for t in range(T):
                    outputs = self.model(x1, x2)
                    all_outputs.append(outputs.unsqueeze(0))
                
                # Stack and average the outputs for Monte Carlo estimation
                all_outputs = torch.cat(all_outputs, dim=0)
                mean_outputs = all_outputs.mean(dim=0)
                
                # Monte Carlo Dropout - using entropy as uncertainty measure
                probs = F.softmax(mean_outputs, dim=1)
                entropy = -torch.sum(probs * torch.log(probs + 1e-8), dim=1)
                
                # Final prediction
                _, predicted = torch.max(mean_outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                
                # Collect results for each sample
                for i in range(len(labels)):
                    uncertainty_results.append({
                        'true_label': labels[i].item(),
                        'predicted_label': predicted[i].item(),
                        'uncertainty': entropy[i].item()
                    })

        epoch_loss = running_loss / len(self.test_loader.dataset)
        accuracy = correct / total
        
        # Save results to JSON file
        with open('monte_carlo_dropout_results.json', 'w') as f:
            json.dump(uncertainty_results, f, indent=4)

        return epoch_loss, accuracy
    




    def fit_uncertainty(self, epochs, uncertainty_method="LC", T=8):

        for epoch in range(epochs):
            # Train for one epoch
            train_loss = self.train_epoch()
            
            # Evaluate validation loss, accuracy, and uncertainty
            if uncertainty_method == "LC":
                val_loss, val_accuracy = self.test_with_least_confidence()
            elif uncertainty_method == "PE":
                val_loss, val_accuracy = self.test_with_prediction_entropy()
            elif uncertainty_method == "MCD":
                val_loss, val_accuracy = self.test_with_monte_carlo_dropout(T=T)
            else:
                raise ValueError("Invalid uncertainty method")
            
            print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, '
                  f'Val Accuracy: {val_accuracy:.4f}, Uncertainty Method: {uncertainty_method}')


    def fit(self, epochs):
        for epoch in range(epochs):
            train_loss = self.train_epoch()
            val_loss, val_accuracy = self.test()
                      
            print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')


In [9]:

learning_rate = 4e-4
num_epochs = 40



# Model, loss function, optimizer
model = VLR(dim=768).to(devices)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)



In [10]:
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F

import FHM_dataload

batch_size = 32

train_loader = FHM_dataload.load_train_FHM(batch_size)
test_loader= FHM_dataload.load_test_FHM(batch_size)

In [None]:
# Trainer
trainer = Trainer(model, train_loader, test_loader, criterion, optimizer, device=devices)
trainer.fit(num_epochs)