## 数据预处理

In [1]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset
from transformers import BertTokenizer
from PIL import Image
import torchvision.transforms as transforms
import torch

# 设置文件路径
data_dir = r'/root/多模态/实验五/实验五数据'  # 该目录包含train.txt和test_without_label.txt
txt_dir = os.path.join(data_dir, 'data')  # 文本和图像文件都在data目录下
img_dir = txt_dir  # 图像文件也在同一目录下

train_file = os.path.join(data_dir, 'train.txt')  # 训练数据标注文件
test_file = os.path.join(data_dir, 'test_without_label.txt')  # 测试数据文件

def load_labels(train_file):
    labels = []
    guids = []
    with open(train_file, 'r') as f:
        next(f)  # 跳过第一行（标题行）
        for line in f:
            guid, label = line.strip().split(',')
            guids.append(guid)
            labels.append(label)
    return guids, labels


# 预加载所有文本数据
def load_all_texts(txt_dir, guids):
    text_dict = {}
    for guid in guids:
        text_dict[guid] = load_text(guid, txt_dir)
    return text_dict

# 加载文本数据
def load_text(guid, txt_dir):
    txt_path = os.path.join(txt_dir, f'{guid}.txt')
    
    try:
        with open(txt_path, 'r', encoding='utf-8', errors='ignore') as file:
            text = file.read()
    except FileNotFoundError:
        print(f"Text file not found: {txt_path}")
        text = ""
    except Exception as e:
        print(f"Error reading {txt_path}: {str(e)}")
        text = ""
    
    return text

# 加载图像数据
def load_image(guid, img_dir):
    img_path = os.path.join(img_dir, f'{guid}.jpg')
    
    if not os.path.exists(img_path):
        print(f"Image file not found: {img_path}")
        return None  # 图像文件缺失
    
    image = Image.open(img_path).convert('RGB')  # 确保图像是RGB格式
    return image

# 创建自定义的Dataset
class MultimodalDataset(Dataset):
    def __init__(self, txt_dir, img_dir, guids, labels, tokenizer, transform=None):
        self.txt_dir = txt_dir
        self.img_dir = img_dir
        self.gids = guids
        self.labels = labels
        self.tokenizer = tokenizer
        self.transform = transform
        self.text_data = load_all_texts(txt_dir, guids)  # 预加载文本

    def __len__(self):
        return len(self.gids)

    def label_to_int(self, label):
        """将情感标签转换为整数"""
        if label == 'positive':
            return 2
        elif label == 'neutral':
            return 1
        elif label == 'negative':
            return 0
        elif label == 'null':
            return -1
        else:
            raise ValueError(f"Unknown label: {label}")

    def __getitem__(self, idx):
        guid = self.gids[idx]
        label = self.labels[idx]
        
        # 获取文本数据
        text = load_text(guid, self.txt_dir)
        if text is None:
            print(f"Warning: No text found for {guid}")
        
        # 获取图像数据
        image = load_image(guid, self.img_dir)
        if image is None:
            print(f"Warning: No image found for {guid}")
        
        # 文本处理（分词和编码）
        inputs = self.tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors="pt")
        
        if self.transform and image is not None:
            image = self.transform(image)  # 图像预处理
        
        label_int = self.label_to_int(label)
        
        # 如果图像缺失，则填充一个零张量
        if image is None:
            image = torch.zeros(3, 224, 224)  # 用零填充（可以根据实际需求调整）
        
        return {
            'guid': guid,
            'input_ids': inputs['input_ids'].squeeze(0),
            'attention_mask': inputs['attention_mask'].squeeze(0),
            'image': image,
            'label': label_int
        }

# 配置BERT tokenizer
tokenizer = BertTokenizer.from_pretrained(r'/root/多模态/实验五/实验五数据/models--bert-base-uncased/snapshots/86b5e0934494bd15c9632b12f734a8a67f723594')

# 定义图像预处理（包括数据增强）
image_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.RandomResizedCrop(224),  # 随机裁剪并调整为224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 读取训练集标签和guid
guids, labels = load_labels(train_file)

# 将数据分为训练集和验证集（80%训练，20%验证）
train_guids, val_guids, train_labels, val_labels = train_test_split(guids, labels, test_size=0.2, random_state=42)

# 输出划分情况
print(f"Training set size: {len(train_guids)}")
print(f"Validation set size: {len(val_guids)}")

# 创建训练集和验证集的Dataset实例
train_dataset = MultimodalDataset(txt_dir, img_dir, train_guids, train_labels, tokenizer, transform=image_transform)
val_dataset = MultimodalDataset(txt_dir, img_dir, val_guids, val_labels, tokenizer, transform=image_transform)

# 示例：读取一个训练样本
sample = train_dataset[0]
print(f"Sample guid: {sample['guid']}")
print(f"Sample labels: {sample['label']}")
print(f"Text tokens: {sample['input_ids']}")
print(f"Image shape: {sample['image'].size()}")

Training set size: 3200
Validation set size: 800
Sample guid: 4836
Sample labels: 0
Text tokens: tensor([ 101, 4982, 1037, 3940, 1004, 2954, 1001, 4111, 7875, 8557,  102,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
           0,    0,    0,    0,    0,    0,    0,    0])
Image shape: torch.Size([3, 224, 224])




## 文本编码器

In [2]:
from transformers import BertModel
from torch import nn

class TextEncoder(nn.Module):
    def __init__(self, model_name=r'C:\Users\ThinkPad\Desktop\实验五\实验五数据\models--bert-base-uncased\snapshots\86b5e0934494bd15c9632b12f734a8a67f723594'):
        super(TextEncoder, self).__init__()
        self.bert = BertModel.from_pretrained(model_name)
    
    def forward(self, input_ids, attention_mask):
        # 通过BERT提取文本特征
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # 使用[CLS]标记的输出作为文本的特征表示
        return outputs.pooler_output  # shape: [batch_size, hidden_size]

## 图像编码器

In [3]:
import torchvision.models as models
import torch.nn as nn

class ImageEncoder(nn.Module):
    def __init__(self, pretrained=True):
        super(ImageEncoder, self).__init__()
        self.resnet = models.resnet50(pretrained=pretrained)
        # 去掉最后的全连接层，保留卷积部分
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])
        self.flatten = nn.Flatten()

    def forward(self, x):
        # 输入图像，提取特征
        x = self.resnet(x)
        x = self.flatten(x)
        return x  # shape: [batch_size, feature_dim]

## 多模态模型

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultimodalModel(nn.Module):
    def __init__(self, text_encoder, image_encoder, hidden_size=512, num_classes=3, dropout=0.5):
        super(MultimodalModel, self).__init__()
        self.text_encoder = text_encoder
        self.image_encoder = image_encoder
        self.fc1 = nn.Linear(2048 + 768, hidden_size)  # 图像特征2048 + 文本特征768
        self.fc2 = nn.Linear(hidden_size, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)  # Dropout层用于防止过拟合
        self.batch_norm = nn.BatchNorm1d(hidden_size)  # BatchNorm层有助于训练稳定

    def forward(self, input_ids, attention_mask, image):
        # 提取文本特征
        text_features = self.text_encoder(input_ids, attention_mask)
        # 提取图像特征
        image_features = self.image_encoder(image)
        # 融合文本和图像特征
        fused_features = torch.cat((text_features, image_features), dim=1)  # 拼接
        # 通过全连接层1
        x = self.fc1(fused_features)
        x = self.relu(x)
        x = self.dropout(x)  # 应用Dropout
        x = self.batch_norm(x)  # 使用BatchNorm
        # 通过全连接层2，得到输出
        x = self.fc2(x)
        return x  # 输出情感类别的logits（未经过Softmax）



In [5]:
# 定义训练函数
def train(model, train_loader, val_loader, epochs=10, lr=1e-5, patience=3):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    best_val_loss = float('inf')
    no_improvement_count = 0
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        correct_preds = 0
        total_preds = 0
        
        # 训练阶段
        train_iter = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} Training", unit="batch")
        
        for batch in train_iter:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            images = batch['image'].to(device)
            labels = batch['label'].to(device)
            
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask, images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1)
            correct_preds += (preds == labels).sum().item()
            total_preds += labels.size(0)
        
        # 计算训练损失和准确率
        train_accuracy = correct_preds / total_preds
        train_loss = total_loss / len(train_loader)
        
        # 验证阶段
        model.eval()
        val_loss = 0
        val_correct_preds = 0
        val_total_preds = 0
        
        val_iter = tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} Validation", unit="batch")
        
        with torch.no_grad():
            for batch in val_iter:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                images = batch['image'].to(device)
                labels = batch['label'].to(device)
                
                outputs = model(input_ids, attention_mask, images)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                preds = torch.argmax(outputs, dim=1)
                val_correct_preds += (preds == labels).sum().item()
                val_total_preds += labels.size(0)
        
        val_accuracy = val_correct_preds / val_total_preds
        val_loss = val_loss / len(val_loader)
        
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")
        
        # 早停策略
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            no_improvement_count = 0
            # 保存最佳模型
            torch.save(model.state_dict(), "best_multimodal_model.pth")
        else:
            no_improvement_count += 1
            if no_improvement_count >= patience:
                print("Early stopping!")
                break



In [6]:
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer
import time
import torch.optim as optim
from tqdm import tqdm


# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载数据集
train_dataset = MultimodalDataset(txt_dir, img_dir, train_guids, train_labels, tokenizer, transform=image_transform)
val_dataset = MultimodalDataset(txt_dir, img_dir, val_guids, val_labels, tokenizer, transform=image_transform)

# 创建DataLoader
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)  # 增加 num_workers
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

# 初始化模型
text_encoder = TextEncoder(model_name=r'/root/多模态/实验五/实验五数据/models--bert-base-uncased/snapshots/86b5e0934494bd15c9632b12f734a8a67f723594').to(device)
image_encoder = ImageEncoder(pretrained=True).to(device)
model = MultimodalModel(text_encoder=text_encoder, image_encoder=image_encoder).to(device)

# 设置训练参数
epochs = 20
lr = 1e-5
patience = 3

# 创建优化器和学习率调度器
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# 训练前，记录开始时间
start_time = time.time()

# 开始训练
train(model, train_loader, val_loader, epochs=epochs, lr=lr, patience=patience)

# 训练后，记录结束时间
end_time = time.time()
print(f"Total training time: {end_time - start_time:.2f} seconds")

# 保存最终模型（可选）
torch.save(model.state_dict(), "multimodal_model.pth")

# 训练后，保存最优模型
torch.save(model.state_dict(), "best_multimodal_model.pth")



Epoch 1/20 Training: 100%|██████████| 100/100 [00:14<00:00,  6.95batch/s]
Epoch 1/20 Validation: 100%|██████████| 25/25 [00:01<00:00, 13.63batch/s]


Epoch 1/20, Train Loss: 1.1364, Train Accuracy: 0.3859
Validation Loss: 1.0409, Validation Accuracy: 0.4925


Epoch 2/20 Training: 100%|██████████| 100/100 [00:13<00:00,  7.33batch/s]
Epoch 2/20 Validation: 100%|██████████| 25/25 [00:01<00:00, 13.93batch/s]


Epoch 2/20, Train Loss: 1.0333, Train Accuracy: 0.4791
Validation Loss: 1.0113, Validation Accuracy: 0.5275


Epoch 3/20 Training: 100%|██████████| 100/100 [00:13<00:00,  7.28batch/s]
Epoch 3/20 Validation: 100%|██████████| 25/25 [00:02<00:00, 12.38batch/s]


Epoch 3/20, Train Loss: 0.9177, Train Accuracy: 0.5984
Validation Loss: 0.9386, Validation Accuracy: 0.6200


Epoch 4/20 Training: 100%|██████████| 100/100 [00:14<00:00,  6.88batch/s]
Epoch 4/20 Validation: 100%|██████████| 25/25 [00:02<00:00, 10.00batch/s]


Epoch 4/20, Train Loss: 0.7556, Train Accuracy: 0.7097
Validation Loss: 0.7203, Validation Accuracy: 0.7362


Epoch 5/20 Training: 100%|██████████| 100/100 [00:14<00:00,  6.70batch/s]
Epoch 5/20 Validation: 100%|██████████| 25/25 [00:02<00:00, 10.33batch/s]


Epoch 5/20, Train Loss: 0.6062, Train Accuracy: 0.7950
Validation Loss: 0.7868, Validation Accuracy: 0.7200


Epoch 6/20 Training: 100%|██████████| 100/100 [00:14<00:00,  6.71batch/s]
Epoch 6/20 Validation: 100%|██████████| 25/25 [00:02<00:00, 10.20batch/s]


Epoch 6/20, Train Loss: 0.4855, Train Accuracy: 0.8381
Validation Loss: 1.0293, Validation Accuracy: 0.7025


Epoch 7/20 Training: 100%|██████████| 100/100 [00:15<00:00,  6.63batch/s]
Epoch 7/20 Validation: 100%|██████████| 25/25 [00:02<00:00,  9.37batch/s]


Epoch 7/20, Train Loss: 0.3914, Train Accuracy: 0.8778
Validation Loss: 0.8961, Validation Accuracy: 0.7075
Early stopping!
Total training time: 121.00 seconds


In [7]:
import os
import torch
import pandas as pd
from torch.utils.data import DataLoader
from tqdm import tqdm

# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 加载数据集
test_file = os.path.join(data_dir, 'test_without_label.txt')  # 测试数据文件

# 加载测试数据的guid（没有标签）
def load_test_guids(test_file):
    guids = []
    with open(test_file, 'r') as f:
        next(f)  # 跳过第一行（标题行）
        for line in f:
            guid = line.strip().split(',')[0]  # 仅获取guid
            guids.append(guid)
    return guids

test_guids = load_test_guids(test_file)

class MultimodalDatasettest(Dataset):
    def __init__(self, txt_dir, img_dir, guids, labels=None, tokenizer=None, transform=None):
        self.txt_dir = txt_dir
        self.img_dir = img_dir
        self.gids = guids
        self.labels = labels if labels is not None else []  # 如果没有标签，默认为空列表
        self.tokenizer = tokenizer
        self.transform = transform
        self.text_data = load_all_texts(txt_dir, guids)  # 预加载文本

    def __len__(self):
        return len(self.gids)

    def label_to_int(self, label):
        """将情感标签转换为整数"""
        if label == 'positive':
            return 2
        elif label == 'neutral':
            return 1
        elif label == 'negative':
            return 0
        else:
            raise ValueError(f"Unknown label: {label}")

    def __getitem__(self, idx):
        guid = self.gids[idx]
        
        # 获取文本数据
        text = load_text(guid, self.txt_dir)
        if text is None:
            print(f"Warning: No text found for {guid}")
        
        # 获取图像数据
        image = load_image(guid, self.img_dir)
        if image is None:
            print(f"Warning: No image found for {guid}")
        
        # 文本处理（分词和编码）
        inputs = self.tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors="pt")
        
        if self.transform and image is not None:
            image = self.transform(image)  # 图像预处理
        
        # 如果是测试集，返回默认的标签（此处使用0作为默认标签）
        if len(self.labels) > 0:
            label_int = self.label_to_int(self.labels[idx])
        else:
            label_int = -1  # 用-1表示测试集没有标签
        
        # 如果图像缺失，则填充一个零张量
        if image is None:
            image = torch.zeros(3, 224, 224)  # 用零填充（可以根据实际需求调整）
        
        return {
            'guid': guid,
            'input_ids': inputs['input_ids'].squeeze(0),
            'attention_mask': inputs['attention_mask'].squeeze(0),
            'image': image,
            'label': label_int
        }

# 创建测试集的Dataset实例
test_dataset = MultimodalDatasettest(txt_dir, img_dir, test_guids, labels=[], tokenizer=tokenizer, transform=image_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=8)

# 加载最佳模型
model = MultimodalModel(text_encoder=text_encoder, image_encoder=image_encoder).to(device)
model.load_state_dict(torch.load("best_multimodal_model.pth"))
model.eval()  # 切换到评估模式

# 预测并保存结果
def predict(model, test_loader):
    predictions = []
    guids = []

    # 使用tqdm进度条
    with torch.no_grad():
        for batch in tqdm(test_loader, desc="Predicting", unit="batch"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            images = batch['image'].to(device)

            # 获取模型输出
            outputs = model(input_ids, attention_mask, images)
            preds = torch.argmax(outputs, dim=1)  # 获取预测类别

            # 将预测结果和guid保存
            guids.extend(batch['guid'])
            predictions.extend(preds.cpu().numpy())  # 将结果从GPU转移到CPU

    return guids, predictions


# 执行预测
guids, predictions = predict(model, test_loader)

# 将结果保存到CSV文件
output_df = pd.DataFrame({
    'guid': guids,
    'predicted_label': ['positive' if pred == 2 else 'neutral' if pred == 1 else 'negative' for pred in predictions]
})

# 保存预测结果
output_df.to_csv("predictions.csv", index=False)

print("Predictions saved to predictions.csv.")

# 读取预测结果的CSV文件
predictions_df = pd.read_csv('predictions.csv')

# 重命名列名
predictions_df.rename(columns={'predicted_label': 'tag'}, inplace=True)

# 将数据保存为txt文件，格式为guid,tag
predictions_df.to_csv('predictions.txt', sep=',', index=False, header=True)


  model.load_state_dict(torch.load("best_multimodal_model.pth"))
Predicting: 100%|██████████| 16/16 [00:02<00:00,  7.33batch/s]

Predictions saved to predictions.csv.



