In [None]:
import pandas as pd
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer
from torchvision.models import resnet50
num_classes=3
# 设备设置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
folder_path = "./data/"

In [None]:
# 加载BERT模型和tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
pretrained_model = BertModel.from_pretrained("bert-base-multilingual-cased")
# 定义图像转换
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # 将尺寸调整为 128x128
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
max_length = 300  # 输入的最大文本长度
def get_valid_imagesPath_from_directory(folder_path ,df):
    image_paths = []
    for ind in df['guid']:
        image_path = folder_path+str(ind)+".jpg"
        try:
            image = cv2.imread(image_path)
            image_paths.append(image_path)
        except Exception as e:          
            continue
    
    return image_paths

def get_texts_from_textsPath(folder_path,df):
    texts=[]
    # 遍历txt文件并将内容写入txt_list
    for ind in df['guid']:
        file = folder_path+str(ind)+".txt"
        try:
            with open(file, "r",encoding="GB18030") as infile:
                content = infile.read()
                texts.append(content)
        except FileNotFoundError:
            continue
    return texts


def text_preprocess(texts):
    print(len(texts))
    print(texts[:5])
    tokenized_texts = [tokenizer(text,padding='max_length',max_length=max_length,truncation=True,return_tensors="pt") for text in texts]
    return tokenized_texts

#图片和文本混合数据集
class Dataset(torch.utils.data.Dataset):
    def __init__(self, image_paths, tokenized_texts, labels,transform=None):
        self.image_paths = image_paths     
        self.transform = transform
        self.input_ids = [x['input_ids'] for x in tokenized_texts]
        self.attention_mask = [x['attention_mask'] for x in tokenized_texts]
        self.labels = labels

    def __getitem__(self, index):
        input_ids = torch.tensor(self.input_ids[index])
        attention_mask = torch.tensor(self.attention_mask[index])
        labels = torch.tensor(self.labels[index])

        image_path = self.image_paths[index]

        image = Image.open(image_path)

        image = self.transform(image)
        
        return image ,input_ids, attention_mask, labels
    def __len__(self):
        return len(self.input_ids)

In [None]:
#train.txt 标签文件
train_label_path = "train.txt"
train_label_df = pd.read_csv(train_label_path,sep=",")
column_dict = {"positive": 0, "negative": 1,"neutral":2}
new_df = train_label_df.replace({"tag": column_dict})
labels = list(new_df['tag'])
image_paths = get_valid_imagesPath_from_directory(folder_path,new_df)
texts = get_texts_from_textsPath(folder_path,new_df)
# 划分验证集
image_paths_train, image_paths_val, texts_train, texts_val, labels_train, labels_val = train_test_split(
    image_paths, texts, labels, test_size=0.2, random_state=5)
#文本预处理
tokenized_texts_train = text_preprocess(texts_train)
tokenized_texts_val = text_preprocess(texts_val)
# 构建Dataset和DataLoader
# 创建数据集和数据加载器
dataset_train = Dataset(image_paths_train, tokenized_texts_train, labels_train, transform)
dataset_val = Dataset(image_paths_val,tokenized_texts_val, labels_val, transform)

In [None]:
# 特征提取模型定义
class ImageFeatureExtractor(nn.Module):
    def __init__(self):
        super(ImageFeatureExtractor, self).__init__()
        self.resnet = resnet50(pretrained=True)  # 使用ResNet-50
    
    def forward(self, image):
        features = self.resnet(image)
        return features

class TextFeatureExtractor(nn.Module):
    def __init__(self):
        super(TextFeatureExtractor, self).__init__()
        self.bert = pretrained_model

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]  # 获取 pooled_output
        output = pooled_output
        return output
    
# 多模态融合模型定义与消融实验
class FusionModel(nn.Module):
    def __init__(self, num_classes,option):
        super(FusionModel, self).__init__()
        self.image_extractor = ImageFeatureExtractor() 
        self.text_encoder = TextFeatureExtractor()
        self.option=option
        self.classifier0 = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(1000, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(256, num_classes),
            nn.ReLU(inplace=True),
           
        )
        self.classifier1 = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(768, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(256, num_classes),
            nn.ReLU(inplace=True),
        )
        self.classifier2 = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(1768, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(1024, num_classes),
            nn.ReLU(inplace=True),
        )

    
    def forward(self, image, input_ids,attention_mask):
        if(self.option==0):
            image_features = self.image_extractor(image)
            output = image_features
            output = self.classifier0(image_features)
        elif(self.option==1):
            text_features = self.text_encoder(input_ids, attention_mask)
            output = self.classifier1(text_features)
        else:
            image_features = self.image_extractor(image)
            text_features = self.text_encoder(input_ids,attention_mask)      
            fusion_features = torch.cat((text_features,image_features), dim=-1)
            output = self.classifier2(fusion_features)
        return output

In [None]:
# 训练过程
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()  
    running_loss = 0
    total_correct = 0 
    for images, input_ids, attention_mask, labels in train_loader:
        images = images.to(device)
        input_ids = input_ids.squeeze(1).to(device)
        attention_mask = attention_mask.to(device)     
        labels = labels.to(device)     
        optimizer.zero_grad()     
        outputs = model(images, input_ids,attention_mask)
        _, preds = torch.max(outputs, 1)
        total_correct += torch.sum(preds == labels)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()   
        running_loss += loss.item()
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = total_correct.item() / len(train_loader.dataset)
    return epoch_loss, epoch_acc

# 预测过程
def predict_model(model, test_loader, device):
    model.eval()
    predictions = []
    for images,input_ids, attention_mask,  _ in test_loader:
        images = images.to(device)
        #texts = texts.to(device)
        input_ids = input_ids.squeeze(1).to(device)
        attention_mask = attention_mask.to(device)
        with torch.no_grad():
            outputs = model(images, input_ids,attention_mask)
            _, preds = torch.max(outputs, 1)
        predictions.extend(preds.cpu().numpy())
    return predictions


In [None]:
# 模型训练和验证
torch.cuda.set_device(0)
criterion = nn.CrossEntropyLoss()
lr= 3e-5
batch_size = 32
best_acc = 0
# 创建数据集和数据加载器
loader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
loader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=False)
model = FusionModel(num_classes,2)
model = model.to(device)
# 在创建优化器时使用 weight_decay
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
num_epochs = 5
for epoch in range(num_epochs):
    train_loss, train_acc = train_model(model, loader_train, criterion, optimizer, device)
    val_predictions = predict_model(model, loader_val, device)
    # 计算验证集准确率    
    val_predictions = np.array(val_predictions)
    val_labels = np.array(labels_val)
    val_acc = (val_predictions == val_labels).sum() / len(val_labels)
    if(val_acc>best_acc):
        best_acc = val_acc
        torch.save(model, 'multi_model.pt')
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Best Val Acc:{best_acc:.4f}")

In [None]:
#读取test文件并生成预测文件
test_path = "test_without_label.txt"
test_df = pd.read_csv(test_path,sep=",")
test_df.iloc[:,-1]=0
test_labels = np.array(test_df['tag'])

#image_paths
image_paths_test = get_valid_imagesPath_from_directory(folder_path,test_df)
test_texts = get_texts_from_textsPath(folder_path,test_df)

tokenized_texts_test = text_preprocess(test_texts)
dataset_test = Dataset(image_paths_test, tokenized_texts_test, test_labels, transform)
loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)

best_model = torch.load('multi_model.pt').to(device)
test_predictions = predict_model(best_model, loader_test, device)  
test_predictions = np.array(test_predictions)

column_dict_ = {0:"positive", 1:"negative",2:"neutral"}
test_df['tag'] = test_predictions
pre_df = test_df.replace({"tag": column_dict_})
pre_df.to_csv('predict.txt',sep=',',index=False)

In [None]:
#消融实验：只有图片数据
# 模型训练和验证
torch.cuda.set_device(0)
criterion = nn.CrossEntropyLoss()
lr= 3e-5
batch_size = 32
best_acc = 0
# 创建数据集和数据加载器
loader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
loader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=False)
model = FusionModel(num_classes,0)
model = model.to(device)
# 在创建优化器时使用 weight_decay
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
num_epochs = 5
for epoch in range(num_epochs):
    train_loss, train_acc = train_model(model, loader_train, criterion, optimizer, device)
    val_predictions = predict_model(model, loader_val, device)
    # 计算验证集准确率    
    val_predictions = np.array(val_predictions)
    val_labels = np.array(labels_val)
    val_acc = (val_predictions == val_labels).sum() / len(val_labels)
    if(val_acc>best_acc):
        best_acc = val_acc
        torch.save(model, 'multi_model.pt')
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Best Val Acc:{best_acc:.4f}")

In [None]:
#消融实验：只有文本数据
# 模型训练和验证
torch.cuda.set_device(0)
criterion = nn.CrossEntropyLoss()
lr= 3e-5
batch_size = 32
best_acc = 0
# 创建数据集和数据加载器
loader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
loader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=False)
model = FusionModel(num_classes,1)
model = model.to(device)
# 在创建优化器时使用 weight_decay
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
num_epochs = 5
for epoch in range(num_epochs):
    train_loss, train_acc = train_model(model, loader_train, criterion, optimizer, device)
    val_predictions = predict_model(model, loader_val, device)
    # 计算验证集准确率    
    val_predictions = np.array(val_predictions)
    val_labels = np.array(labels_val)
    val_acc = (val_predictions == val_labels).sum() / len(val_labels)
    if(val_acc>best_acc):
        best_acc = val_acc
        torch.save(model, 'multi_model.pt')
    print(f" Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Best Val Acc:{best_acc:.4f}")