In [None]:
import pandas as pd
import numpy as np
import cv2
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import torch
import torch.nn as nn
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import torch.optim as optim
from torchvision.models import vgg16
import torch.nn.functional as F

# 指定文件路径
def get_texts(folder_path,data):
    texts=[]
    for guid in data['guid']:
        file = folder_path + str(guid)+".txt"
        try:
            with open(file, "r",encoding="GB18030") as f:
                text = f.read()
                texts.append(text)
        except FileNotFoundError:
            continue
    return texts

# 文本预处理函数
def text_tokenizer(texts):
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(texts)  # 文本训练Tokenizer
    sequences = tokenizer.texts_to_sequences(texts)  # 转换为整数序列
    padded_sequences = pad_sequences(sequences, maxlen = 200)  # 填充序列统一长度
    return padded_sequences, tokenizer

folder_path = "dataset/data/"
train_path = "dataset/train.txt"
train_data = pd.read_csv(train_path,sep=",")
tags = {"positive": 0, "negative": 1,"neutral":2}
replaced_data = train_data.replace({"tag": tags})
labels = list(replaced_data['tag'])

texts = get_texts(folder_path,replaced_data)
# 调用文本预处理函数
processed_texts, tokenizer = text_tokenizer(texts)
#print(processed_texts)


def get_images(folder_path ,data):
    image_paths = []
    for guid in data['guid']:
        image_path = folder_path + str(guid) + ".jpg"
        try:
            image = cv2.imread(image_path)
            height,width,channels = image.shape
            image_paths.append(image_path)
        except Exception as e:
            continue
    return image_paths

# 图像数据预处理
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # 图片缩放到vgg统一大小
    transforms.ToTensor(),  # 将图片转换为tensor
    #transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

image_paths = get_images(folder_path,replaced_data)
#print(image_paths)


# 准备数据集和数据加载器
class FusionModelDataset(Dataset):
    def __init__(self, image_paths, text_sequences, labels, transform):
        self.image_paths = image_paths
        self.text_sequences = text_sequences
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, index):
        image_path = self.image_paths[index]
        text_sequence = self.text_sequences[index]
        label = self.labels[index]
        
        image = Image.open(image_path).convert('RGB')
        image = self.transform(image)

        return image, text_sequence, label


# 定义图像特征提取器#############################################################
class ImageModel(nn.Module):
    def __init__(self):
        super(ImageModel, self).__init__()
        self.vgg = vgg16(pretrained=True).features
        self.avg_pool = nn.AdaptiveAvgPool2d((7, 7))
        self.fc = nn.Linear(512 * 7 * 7, 4096)  # 调整全连接层的输出维度
        #self.dropout = nn.Dropout(0.5)  # 添加Dropout层进行正则化//
        
    def forward(self, x):
        x = self.vgg(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        #x = self.dropout(x)  # 在全连接层前应用Dropout层
        x = self.fc(x)
        return x


# 定义文本特征提取器###################################################################
class TextModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(TextModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        
    def forward(self, x):
        x = self.embedding(x)
        _, (h_n, _) = self.lstm(x)
        x = h_n.squeeze(0)
        return x
        


# 定义多模态融合模型#######################################################################
class FusionModel(nn.Module):
    def __init__(self, classes, vocab_size, embedding_dim, hidden_dim):
        super(FusionModel, self).__init__()
        self.image_extractor = ImageModel()
        self.text_extractor = TextModel(vocab_size, embedding_dim, hidden_dim)
        self.fc = nn.Linear(4096 + hidden_dim, classes)
        
    def forward(self, image, text):
        image_features = self.image_extractor(image)
        text_features = self.text_extractor(text)
        features = torch.cat((image_features, text_features), dim=1)
        output = self.fc(features)
        return output


In [None]:
# 定义超参数
classes = 3
vocab_size = len(tokenizer.word_index) + 1  # tokenizer从1开始索引
embedding_dim = 100
hidden_dim = 100
batch_size = 64 
lr = 0.001
num_epochs = 10

# 划分训练集和验证集
train_size = int(0.8 * len(labels))
train_image_paths = image_paths[:train_size]
train_text_sequences = processed_texts[:train_size]
train_labels = labels[:train_size]
valid_image_paths = image_paths[train_size:]
valid_text_sequences = processed_texts[train_size:]
valid_labels = labels[train_size:]


# 创建数据集和数据加载器
train_dataset = FusionModelDataset(train_image_paths, train_text_sequences, train_labels, transform)
valid_dataset = FusionModelDataset(valid_image_paths, valid_text_sequences, valid_labels, transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)

# 创建模型和优化器
model = FusionModel(classes, vocab_size, embedding_dim, hidden_dim)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)

In [None]:
# 训练模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

best_accuracy = 0.0
best_model_path = 'best_model.pt'  # 最佳模型的保存路径

print("多模态模型:")
for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}训练中……')
    model.train()
    train_loss = 0.0
    
    for images, texts, labels in train_loader:
        images = images.to(device)
        texts = texts.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        # 更新学习率
        #scheduler.step()

        outputs = model(images, texts)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item() * images.size(0)
    
    train_loss /= len(train_dataset)
    
    model.eval()
    valid_loss = 0.0
    correct = 0
    
    with torch.no_grad():
        for images, texts, labels in valid_loader:
            images = images.to(device)
            texts = texts.to(device)
            labels = labels.to(device)
            
            outputs = model(images, texts)
            loss = criterion(outputs, labels)
            
            valid_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
    
    valid_loss /= len(valid_dataset)
    accuracy = correct / len(valid_dataset)
    
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}, Valid Accuracy: {accuracy:.4f}')

    # 保存最佳模型
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), best_model_path)


In [None]:
#读取test文件并生成预测文件
test_path = "dataset/test_without_label.txt"
test_data = pd.read_csv(test_path,sep=",")
test_data['tag'] = 0
test_labels = np.array(test_data['tag'])
test_image_paths = get_images(folder_path,test_data)
test_texts = get_texts(folder_path,test_data)
processed_test_texts,t = text_tokenizer(test_texts)

test_dataset = FusionModelDataset(test_image_paths, processed_test_texts, test_labels, transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

#print(test_loader.dataset)

# 加载最佳模型的参数
model.load_state_dict(torch.load(best_model_path))
model.eval()
predicted_labels = []

with torch.no_grad():
    for images, texts, _ in test_loader:
        images = images.to(device)
        texts = texts.to(device)
        
        outputs = model(images, texts)
        _, predicted = torch.max(outputs, 1)
        predicted_labels.extend(predicted.tolist())

# 将预测结果写入测试集文件
test_data['tag'] = [list(tags.keys())[label] for label in predicted_labels[:len(test_data)]]
test_data.to_csv("predict1.txt", index=False)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 创建图像数据集和加载器
image_dataset = FusionModelDataset(valid_image_paths, valid_text_sequences, valid_labels, transform)
image_loader = DataLoader(image_dataset, batch_size=batch_size)

# 创建文本数据集和加载器
text_dataset = FusionModelDataset(valid_image_paths, valid_text_sequences, valid_labels, transform)
text_loader = DataLoader(text_dataset, batch_size=batch_size)

# 创建只包含图像特征提取器的模型
image_model = ImageModel()
image_model.to(device)

# 创建只包含文本特征提取器的模型
text_model = TextModel(vocab_size, embedding_dim, hidden_dim)
text_model.to(device)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()

In [None]:
# 训练和验证图像数据模型
image_optimizer = optim.Adam(image_model.parameters(), lr=lr)
#image_scheduler = torch.optim.lr_scheduler.StepLR(image_optimizer, step_size=2, gamma=0.1)
print("图像数据模型:")

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}训练中……')
    image_model.train()
    image_train_loss = 0.0

    for images, _, labels in image_loader:
        images = images.to(device)
        labels = labels.to(device)

        image_optimizer.zero_grad()

        outputs = image_model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        image_optimizer.step()

        image_train_loss += loss.item() * images.size(0)

    image_train_loss /= len(image_dataset)

    image_model.eval()
    image_valid_loss = 0.0
    image_correct = 0

    with torch.no_grad():
        for images, _, labels in image_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = image_model(images)
            loss = criterion(outputs, labels)

            image_valid_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            image_correct += (predicted == labels).sum().item()

    image_valid_loss /= len(image_dataset)
    image_accuracy = image_correct / len(image_dataset)

    print(f'Epoch {epoch+1}/{num_epochs}, Image Train Loss: {image_train_loss:.4f}, Image Valid Loss: {image_valid_loss:.4f}, Image Valid Accuracy: {image_accuracy:.4f}')


In [None]:
# 训练和验证文本数据模型
text_optimizer = optim.Adam(text_model.parameters(), lr=lr)
#text_scheduler = torch.optim.lr_scheduler.StepLR(text_optimizer, step_size=2, gamma=0.1)
print("文本数据模型:")

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}训练中……')
    text_model.train()
    text_train_loss = 0.0

    for _, texts, labels in text_loader:
        texts = texts.to(device)
        labels = labels.to(device)

        text_optimizer.zero_grad()

        outputs = text_model(texts)
        loss = criterion(outputs, labels)

        loss.backward()
        text_optimizer.step()

        text_train_loss += loss.item() * texts.size(0)

    text_train_loss /= len(text_dataset)

    text_model.eval()
    text_valid_loss = 0.0
    text_correct = 0

    with torch.no_grad():
        for _, texts, labels in text_loader:
            texts = texts.to(device)
            labels = labels.to(device)

            outputs = text_model(texts)
            loss = criterion(outputs, labels)

            text_valid_loss += loss.item() * texts.size(0)
            _, predicted = torch.max(outputs, 1)
            text_correct += (predicted == labels).sum().item()

    text_valid_loss /= len(text_dataset)
    text_accuracy = text_correct / len(text_dataset)

    print(f'Epoch {epoch+1}/{num_epochs}, Text Train Loss: {text_train_loss:.4f}, Text Valid Loss: {text_valid_loss:.4f}, Text Valid Accuracy: {text_accuracy:.4f}')
