# CLIP模型的处理

In [None]:
from PIL import Image
import requests
from transformers import CLIPProcessor, CLIPModel
import torch

devices = "cuda:3"

clip_model = CLIPModel.from_pretrained("/data4/zxf/hf/openai/clip-vit-large-patch14").to(devices)
processor = CLIPProcessor.from_pretrained("/data4/zxf/hf/openai/clip-vit-large-patch14")

In [2]:
import torch
import torch.nn as nn

class VisualProjection(nn.Module):
    def __init__(self, visual_projection):
        super().__init__()
        self.visual_projection = visual_projection

    def forward(self, x):
        """
        将输入张量 x 从 (batch, len, 768) 映射到 (batch, len, 768)
        """
        x = self.visual_projection(x)
        return x


class TextProjection(nn.Module):
    def __init__(self, visual_projection):
        super().__init__()
        self.visual_projection = visual_projection

    def forward(self, x):
        """
        将输入张量 x 从 (batch, len, 768) 映射到 (batch, len, 768)
        """
        x = self.visual_projection(x)
        
        return x
    

visual_projection = clip_model.visual_projection
Text_projection = clip_model.text_projection


# 创建 VisualProjection 模块并进行测试
Visual_module = VisualProjection(visual_projection).to(devices)
# input_tensor_1 = outputs["vision_model_output"]["last_hidden_state"]
# Visual_output_tensor = Visual_module(input_tensor_1)



Text_module = TextProjection(Text_projection).to(devices)
# input_tensor_2 = outputs["text_model_output"]["last_hidden_state"]
# Text_output_tensor = Text_module(input_tensor_2)

# print(Text_output_tensor.size())
# print(Visual_output_tensor.size())

In [3]:
def CLIP_pipeline(x1,x2,x3,x4):
    tmp_inputs_text = processor(text=x1, return_tensors="pt", padding=True,truncation=True, max_length=77).to(devices)
    tmp_inputs_image = processor(images=x2, return_tensors="pt").to(devices)
    tmp_inputs_analy1= processor(text=x3, return_tensors="pt", padding=True,truncation=True, max_length=77).to(devices)
    tmp_inputs_analy2 = processor(text=x4, return_tensors="pt", padding=True,truncation=True, max_length=77).to(devices)
    
    outputs_1 = clip_model.text_model(**tmp_inputs_text)
    outputs_2 = clip_model.vision_model(**tmp_inputs_image)
    outputs_tensor_1 = outputs_1["last_hidden_state"]
    Text_output_tensor = Text_module(outputs_tensor_1)
    outputs_tensor_2 = outputs_2["last_hidden_state"]
    Visual_output_tensor = Visual_module(outputs_tensor_2)
    outputs_3 = clip_model.text_model(**tmp_inputs_analy1)["last_hidden_state"]
    outputs_4 = clip_model.text_model(**tmp_inputs_analy2)["last_hidden_state"]
    outputs_3 = Text_module(outputs_3)
    outputs_4 = Text_module(outputs_4)
    
    
    return Text_output_tensor, Visual_output_tensor, outputs_3, outputs_4

# x1 = ["a photo of a cat", "a photo of a dog"]
# x2 = [Image.open(requests.get("http://images.cocodataset.org/val2017/000000039769.jpg", stream=True).raw),
#       Image.open(requests.get("http://images.cocodataset.org/val2017/000000397133.jpg", stream=True).raw)]

# x1,x2 = CLIP_pipeline(x1,x2)

# print(x1.size())
# print(x2.size())



# 神经网络层的构建

In [4]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F




class multimodal_attention(nn.Module):
    """
    dot-product attention mechanism
    """

    def __init__(self, attention_dropout=0.5):
        super(multimodal_attention, self).__init__()
        self.dropout = nn.Dropout(attention_dropout)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, q, k, v, scale=None, attn_mask=None):

        attention = torch.matmul(q, k.transpose(-2, -1))
        # print('attention.shape:{}'.format(attention.shape))
        if scale:
            attention = attention * scale

        if attn_mask:
            attention = attention.masked_fill_(attn_mask, -np.inf)
            
        attention = self.softmax(attention)
        # print('attention.shftmax:{}'.format(attention))
        attention = self.dropout(attention)
        v_result = torch.matmul(attention, v)
        # print('attn_final.shape:{}'.format(attention.shape))

        return v_result





class CrossAttention(nn.Module):
    """
    Multi-Head Cross Attention mechanism
    """

    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(CrossAttention, self).__init__()

        self.model_dim = model_dim
        self.dim_per_head = model_dim // num_heads
        self.num_heads = num_heads

        self.linear_q = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_k = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_v = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)

        self.dot_product_attention = multimodal_attention(dropout)
        self.linear_final = nn.Linear(model_dim, model_dim, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, query, key, value, attn_mask=None):
        residual = query

        # Linear projection
        query = self.linear_q(query)
        key = self.linear_k(key)
        value = self.linear_v(value)

        # Split by heads
        batch_size = query.size(0)
        query = query.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        key = key.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        value = value.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)

        # Scaled dot product attention
        scale = (self.dim_per_head) ** -0.5
        attention = self.dot_product_attention(query, key, value, scale, attn_mask)

        # Concatenate heads
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.dim_per_head)

        # Final linear projection
        output = self.linear_final(attention)

        # Dropout
        output = self.dropout(output)

        # Add residual and norm layer
        output = self.layer_norm(residual + output)

        return output

class MultiHeadCrossAttention(nn.Module):
    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(MultiHeadCrossAttention, self).__init__()

        self.model_dim = model_dim
        self.dim_per_head = model_dim // num_heads
        self.num_heads = num_heads

        self.cross_attention = CrossAttention(model_dim, num_heads, dropout)
        self.layer_norm = nn.LayerNorm(model_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x1, x2, attn_mask=None):
        # Cross attention from x1 to x2
        cross_attn_output_1 = self.cross_attention(x1, x2, x2, attn_mask)
        # Cross attention from x2 to x1
        cross_attn_output_2 = self.cross_attention(x2, x1, x1, attn_mask)

        # Combine the outputs
        output_1 = self.layer_norm(x1 + cross_attn_output_1)
        output_2 = self.layer_norm(x2 + cross_attn_output_2)

        return output_1, output_2

# # Example usage
# batch_1, len_1, dim = 2, 10, 768
# batch_2, len_2, dim = 2, 15, 768

# x1 = torch.randn(batch_1, len_1, dim)
# x2 = torch.randn(batch_2, len_2, dim)

# layer = MultiHeadCrossAttention(model_dim=768, num_heads=8, dropout=0.5)
# output_1, output_2 = layer(x1, x2)


# print("output_1 shape:", output_1.size())  # Expected: [batch_1, len_1, 768]
# print("output_2 shape:", output_2.size())  # Expected: [batch_2, len_2, 768]


In [5]:
class MultiHeadSelfAttention(nn.Module):
    """
    Multi-Head Self Attention mechanism
    """

    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(MultiHeadSelfAttention, self).__init__()

        self.model_dim = model_dim
        self.dim_per_head = model_dim // num_heads
        self.num_heads = num_heads

        self.linear_q = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_k = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)
        self.linear_v = nn.Linear(model_dim, self.dim_per_head * num_heads, bias=False)

        self.dot_product_attention = multimodal_attention(dropout)
        self.linear_final = nn.Linear(model_dim, model_dim, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, x, attn_mask=None):
        residual = x

        # Linear projection
        query = self.linear_q(x)
        key = self.linear_k(x)
        value = self.linear_v(x)

        # Split by heads
        batch_size = query.size(0)
        query = query.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        key = key.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        value = value.view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)

        # Scaled dot product attention
        scale = (self.dim_per_head) ** -0.5
        attention = self.dot_product_attention(query, key, value, scale, attn_mask)

        # Concatenate heads
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.dim_per_head)

        # Final linear projection
        output = self.linear_final(attention)

        # Dropout
        output = self.dropout(output)

        # Add residual and norm layer
        output = self.layer_norm(residual + output)

        return output

# # Example usage
# batch_size = 2
# seq_len = 10
# model_dim = 768

# x = torch.randn(batch_size, seq_len, model_dim)

# self_attention = MultiHeadSelfAttention(model_dim=model_dim, num_heads=8, dropout=0.5)
# output = self_attention(x)

# print("output shape:", output.size())  # Expected: [batch_size, seq_len, model_dim]

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultiHeadAttention(nn.Module):
    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.dim_per_head = model_dim // num_heads
        
        self.linear_q = nn.Linear(model_dim, model_dim)
        self.linear_k = nn.Linear(model_dim, model_dim)
        self.linear_v = nn.Linear(model_dim, model_dim)
        
        self.dropout = nn.Dropout(dropout)
        self.softmax = nn.Softmax(dim=-1)
        self.linear_out = nn.Linear(model_dim, model_dim)
        
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        
        query = self.linear_q(query).view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        key = self.linear_k(key).view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        value = self.linear_v(value).view(batch_size, -1, self.num_heads, self.dim_per_head).transpose(1, 2)
        
        scores = torch.matmul(query, key.transpose(-2, -1)) / (self.dim_per_head ** 0.5)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        attn = self.softmax(scores)
        attn = self.dropout(attn)
        
        context = torch.matmul(attn, value).transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.dim_per_head)
        output = self.linear_out(context)
        
        return output

class CoAttention(nn.Module):
    def __init__(self, model_dim=768, num_heads=8, dropout=0.5):
        super(CoAttention, self).__init__()
        self.attention1 = MultiHeadAttention(model_dim, num_heads, dropout)
        self.attention2 = MultiHeadAttention(model_dim, num_heads, dropout)
        self.linear_out = nn.Linear(2 * model_dim, model_dim)
        self.layer_norm = nn.LayerNorm(model_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x1, x2, mask1=None, mask2=None):
        attn_output1 = self.attention1(x1, x2, x2, mask2)
        attn_output2 = self.attention2(x2, x1, x1, mask1)
        
        combined = torch.cat([attn_output1.mean(dim=1), attn_output2.mean(dim=1)], dim=-1)
        output = self.dropout(self.linear_out(combined))
        output = self.layer_norm(output)
        
        return output




# # Example usage
# batch_size, len_1, len_2, dim = 2, 10, 15, 768

# x1 = torch.randn(batch_size, len_1, dim)
# x2 = torch.randn(batch_size, len_2, dim)

# model = CoAttention(model_dim=dim, num_heads=8, dropout=0.5)
# output = model(x1, x2)

# print("output shape:", output.size())  # Expected: [batch, 768]

In [7]:
class PositionalWiseFeedForward(nn.Module):
    """
    Fully-connected network
    """

    def __init__(self, model_dim=768, ffn_dim=2048, dropout=0.5):
        super(PositionalWiseFeedForward, self).__init__()
        self.w1 = nn.Linear(model_dim, ffn_dim)
        self.w2 = nn.Linear(ffn_dim, model_dim)
        

        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(model_dim)

    def forward(self, x):
        residual = x

        x = self.w2(F.relu(self.w1(x)))
        x = self.dropout(x)
        x += residual

        x = self.layer_norm(x)
        output = x
        return output

In [8]:

class MLP(nn.Module):
    def __init__(self, in_features, out_features, hidden_size=256, dropout=0.5):
        super(MLP, self).__init__()
        self.linear1 = nn.Linear(in_features, hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(hidden_size, out_features)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x
    
    

class VLR(nn.Module):
    def __init__(self, dim=768):
        super(VLR, self).__init__()
        # Self-attention for individual modalities
        self.text_self_attention = MultiHeadSelfAttention(model_dim=dim, num_heads=8, dropout=0.5)
        self.image_self_attention = MultiHeadSelfAttention(model_dim=dim, num_heads=8, dropout=0.5)
        
        # Trainable weighting parameters for fusion
        self.alpha = nn.Parameter(torch.tensor(0.5))
        self.beta = nn.Parameter(torch.tensor(0.5))
        
        # First prediction module (fusion)
        self.first_judge = nn.Sequential(
            nn.Linear(dim, dim//2),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(dim//2, 2)
        )
        
        # Cross-attention for adversarial reasoning
        self.adversarial_cross_attention = MultiHeadCrossAttention(
            model_dim=dim, num_heads=8, dropout=0.5
        )
        
        # Second prediction module (adversarial reasoning)
        self.second_judge = nn.Sequential(
            nn.Linear(dim, dim//2),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(dim//2, 2)
        )

    def forward(self, text_features, image_features, adversarial_arguments_1,adversarial_arguments_2):
        # Self-attention on individual modalities
        R1 = self.text_self_attention(text_features).mean(dim=1)
        R2 = self.image_self_attention(image_features).mean(dim=1)
        
        # Fused features with trainable weights
        G = self.alpha * R1 + self.beta * R2
        
        # First prediction
        z1 = self.first_judge(G)
        
        # Transform adversarial arguments using text encoder
        # (assuming this is done before calling the forward method)
        
        # Adversarial reasoning module
        # Cross-attention between fused features and adversarial arguments
        
        adversarial_arguments = adversarial_arguments_1.mean(dim=1) + adversarial_arguments_2.mean(dim=1)
        
        # print(adversarial_arguments)
        Lg,_ = self.adversarial_cross_attention(adversarial_arguments, G)
        
        Lg = Lg.mean(dim=1)
        # print(adversarial_arguments)
        # Second prediction based on adversarial reasoning
        z2 = self.second_judge(Lg)
        
        # Final outputs (softmax on both predictions)
        z1_prob = F.softmax(z1, dim=-1)
        z2_prob = F.softmax(z2, dim=-1)
        
        return z1, z2



In [9]:
# batch_size, len_1, len_2, dim = 4, 10, 15, 768

# x1 = torch.randn(batch_size, len_1, dim)
# x2 = torch.randn(batch_size, len_2, dim)
# x3 = torch.randn(batch_size, 20, dim)
# x4 = torch.randn(batch_size, 25, dim)

# model = VLR()
# output,output_1 = model(x1, x2,x3,x4)

# print("output_1 shape:", output_1.size()) 

# print(output_1)

# 训练过程

In [10]:

import torch
from sklearn.metrics import precision_recall_fscore_support

def classification_metrics(predicted, labels):
    """
    计算二分类任务的精度、召回率、F1-score
    
    参数:
    predicted (torch.Tensor): 模型预测的输出,形状为(batch_size,)
    labels (torch.Tensor): 数据的标签,形状为(batch_size,)
    """
    
    # 将预测输出和标签转换为 numpy 数组
    y_pred = predicted.detach().cpu().numpy()
    y_true = labels.detach().cpu().numpy()
    
    # 计算分类指标
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average=None)
    
    # 输出结果
    print("真新闻指标:")
    print(f"Precision={precision[0]:.4f}, Recall={recall[0]:.4f}, F1-score={f1[0]:.4f}")
    
    print("假新闻指标:")
    print(f"Precision={precision[1]:.4f}, Recall={recall[1]:.4f}, F1-score={f1[1]:.4f}")

In [11]:


import torch.optim as optim

class Trainer:
    def __init__(self, model, train_loader, test_loader, criterion, optimizer, device):
        self.model = model
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device


    def train_epoch(self):
        self.model.train()
        running_loss = 0.0
        for x1, x2, x3, x4, labels in self.train_loader:
            
            x1, x2, x3, x4= CLIP_pipeline(x1, x2, x3, x4)
            x1, x2, x3, x4, labels = x1.to(self.device), x2.to(self.device), x3.to(self.device), x4.to(self.device),labels.to(self.device)
            self.optimizer.zero_grad()
            outputs_1, outputs = self.model(x1, x2, x3, x4)
            # print(labels)
            # print(outputs)
            # print(outputs_1)

            loss = self.criterion(outputs, labels)+self.criterion(outputs_1, labels)
            loss.backward()
            self.optimizer.step()
            running_loss += loss.item() * x1.size(0)
        epoch_loss = running_loss / len(self.train_loader.dataset)
        return epoch_loss


    def test(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0
        
        # 累积预测输出和标签
        all_predicted = []
        all_labels = []
        
        with torch.no_grad():
            for x1, x2, x3, x4, labels in self.test_loader:
                
                x1, x2, x3, x4= CLIP_pipeline(x1, x2, x3, x4)
                x1, x2, x3, x4, labels = x1.to(self.device), x2.to(self.device), x3.to(self.device), x4.to(self.device),labels.to(self.device)
                self.optimizer.zero_grad()
                outputs_1, outputs = self.model(x1, x2, x3, x4)
                

                loss = self.criterion(outputs, labels)+self.criterion(outputs_1, labels)

                running_loss += loss.item() * x1.size(0)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                
                # 将预测输出和标签添加到列表中
                all_predicted.extend(predicted.cpu())
                all_labels.extend(labels.cpu())
                
        epoch_loss = running_loss / len(self.test_loader.dataset)
        accuracy = correct / total
        
        # 调用 classification_metrics 函数
        classification_metrics(torch.tensor(all_predicted), torch.tensor(all_labels))
        
        return epoch_loss, accuracy

    def fit(self, epochs):
        for epoch in range(epochs):
            train_loss = self.train_epoch()
            val_loss, val_accuracy = self.test()
            print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')


In [12]:

learning_rate = 2e-5
num_epochs = 40



# Model, loss function, optimizer
model = VLR(dim=768).to(devices)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)




In [13]:
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F

import VLM_MR2_en_dataloader

batch_size = 32

train_loader = VLM_MR2_en_dataloader.load_train_MR2(batch_size)
test_loader = VLM_MR2_en_dataloader.load_test_MR2(batch_size)



In [None]:
# Trainer
trainer = Trainer(model, train_loader, test_loader, criterion, optimizer, device=devices)
trainer.fit(num_epochs)