## 数据读取与处理

In [1]:
!pip install -q transformers

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m78.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m236.8/236.8 kB[0m [31m29.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m89.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m71.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from PIL import Image
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split

from transformers import AdamW, get_linear_schedule_with_warmup, AutoModel, AutoTokenizer

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from google.colab import drive

import os
drive.mount('/content/drive', force_remount=True)

%cd drive/MyDrive/Multimodel/

Mounted at /content/drive
/content/drive/MyDrive/Multimodel


In [5]:
!unzip './data.zip' -d './'


[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
  inflating: ./data/3347.jpg         
  inflating: ./data/3347.txt         
  inflating: ./data/3348.jpg         
  inflating: ./data/3348.txt         
  inflating: ./data/3349.jpg         
  inflating: ./data/3349.txt         
  inflating: ./data/335.jpg          
  inflating: ./data/335.txt          
  inflating: ./data/3350.jpg         
  inflating: ./data/3350.txt         
  inflating: ./data/3351.jpg         
  inflating: ./data/3351.txt         
  inflating: ./data/3352.jpg         
  inflating: ./data/3352.txt         
  inflating: ./data/3353.jpg         
  inflating: ./data/3353.txt         
  inflating: ./data/3354.jpg         
  inflating: ./data/3354.txt         
  inflating: ./data/3355.jpg         
  inflating: ./data/3355.txt         
  inflating: ./data/3356.jpg         
  inflating: ./data/3356.txt         
  inflating: ./data/3357.jpg         
  inflating: ./data/3357.txt         
  inflating: ./data/3358.jpg         
  inflati

读取数据，记录训练集与测试集的id，将label转化成0、1、2三类，对应积极、中立、消极三种情感

In [6]:
with open('./train.txt', 'r') as f:
  lines = f.readlines()

train_set = []

for line in lines[1:]:
  data = {}
  line = line.replace('\n','')
  guid, tag = line.split(',')
  if tag == 'positive':
    label = 0
  elif tag == 'neutral':
    label = 1
  else:
    label = 2
  data['guid'] = guid
  data['label'] = label
  train_set.append(data)

# print(len(train_set)) # 4000
# print(train_set)

In [7]:
with open('./test_without_label.txt', 'r') as f:
  lines = f.readlines()

test_set = []
for line in lines[1:]:
  data = {}
  data['guid'] = line.split(',')[0]
  test_set.append(data)

对所有图像的大小进行统一，规范至(224, 224, 3)，符合ResNet18的输入大小

读取并存储文本至数据集中，无法解码的字符使用ignore进行忽略

In [8]:
def data_process(dataset):
  for data in dataset:
    guid = data['guid']
    image_path = './data/' + guid + '.jpg'
    image = Image.open(image_path).convert('RGB')
    array = np.array(image.resize((224, 224)))
    data['image'] = array.reshape((3, 224, 224))

    text_path = './data/' + guid + '.txt'
    f = open(text_path, 'r', errors='ignore')
    lines = f.readlines()
    # print(lines)
    text = ''
    for line in lines:
      text += line
    data['text'] = text

In [9]:
data_process(train_set)
data_process(test_set)

划分数据集，验证集采用和测试集相近的数目（500条）

In [10]:
train_set_num = 3500
valid_set_num = 500
train_set, valid_set = random_split(train_set, [train_set_num, valid_set_num])

## 图像分类器

图像分类采用的ResNet18模型，在第三次实验中表现最佳

定义残差块ResBlock和ShorcutResBlock，前者不改变通道数，后者会改变通道数：

In [11]:
class ResBlock(nn.Module):
  def __init__(self, input_channel, output_channel):
    super(ResBlock, self).__init__()
    self.conv1 = nn.Conv2d(input_channel, output_channel, kernel_size=(3, 3), padding=1, stride=1)
    self.bn1 = nn.BatchNorm2d(output_channel)
    self.conv2 = nn.Conv2d(output_channel, output_channel, kernel_size=(3, 3), padding=1, stride=1)
    self.bn2 = nn.BatchNorm2d(output_channel)

  def forward(self, x):
    output = self.conv1(x)
    output = self.bn1(output)
    output = F.relu(output)
    output = self.conv2(x)
    output = self.bn2(output)
    output = F.relu(output + x)
    return output


class ShortcutResBlock(nn.Module):
  def __init__(self, input_channel, output_channel):
    super(ShortcutResBlock, self).__init__()
    self.conv1 = nn.Conv2d(input_channel, output_channel, kernel_size=(1, 1), stride=2)
    self.bn1 = nn.BatchNorm2d(output_channel)
    self.conv2 = nn.Conv2d(input_channel, output_channel, kernel_size=(3, 3), padding=1, stride=2)
    self.bn2 = nn.BatchNorm2d(output_channel)
    self.conv3 = nn.Conv2d(output_channel, output_channel, kernel_size=(3, 3), padding=1, stride=1)
    self.bn3 = nn.BatchNorm2d(output_channel)

  def forward(self, x):
    output1 = self.conv1(x)
    output1 = self.bn1(output1)
    output2 = self.conv2(x)
    output2 = self.bn2(output2)
    output2 = F.relu(output2)
    output2 = self.conv3(output2)
    output2 = self.bn3(output2)
    output = F.relu(output1 + output2)
    return output

定义完整的ResNet18：

In [12]:
class ResNet18(nn.Module):
  def __init__(self):
    super(ResNet18, self).__init__()
    self.conv1 = nn.Conv2d(3, 64, kernel_size=(7, 7), padding=3, stride=2)
    self.bn1 = nn.BatchNorm2d(64)
    self.pool1 = nn.MaxPool2d(kernel_size=(3, 3), padding=1, stride=2)
    self.res1 = ResBlock(64, 64)
    self.res2 = ResBlock(64, 64)
    self.shortcut1 = ShortcutResBlock(64, 128)
    self.res3 = ResBlock(128, 128)
    self.shortcut2 = ShortcutResBlock(128, 256)
    self.res4 = ResBlock(256, 256)
    self.shortcut3 = ShortcutResBlock(256, 512)
    self.res5 = ResBlock(512, 512)
    self.pool2 = nn.AvgPool2d((7, 7))
    self.dropout = nn.Dropout(0)
    self.fc = nn.Linear(512, 3)

  def forward(self, x):
    output = self.conv1(x)
    output = self.bn1(output)
    output = self.pool1(F.relu(output))
    output = self.res1(output)
    output = self.res2(output)
    output = self.shortcut1(output)
    output = self.res3(output)
    output = self.shortcut2(output)
    output = self.res4(output)
    output = self.shortcut3(output)
    output = self.res5(output)
    output = self.pool2(output)
    output = torch.flatten(output, 1)
    output = self.fc(output)
    return output

验证一下图像分类器单独分类的性能

使用TensorDataset生成训练图像分类器的数据集：

In [13]:
image_train = []
image_train_labels = []
image_valid = []
image_valid_labels = []

for data in train_set:
  image_train.append(data['image'])
  image_train_labels.append(data['label'])

for data in valid_set:
  image_valid.append(data['image'])
  image_valid_labels.append(data['label'])

image_train = torch.from_numpy(np.array(image_train))
image_train_labels = torch.from_numpy(np.array(image_train_labels))
image_valid = torch.from_numpy(np.array(image_valid))
image_valid_labels = torch.from_numpy(np.array(image_valid_labels))

train_loader = DataLoader(TensorDataset(image_train, image_train_labels), batch_size=100, shuffle=True)
valid_loader = DataLoader(TensorDataset(image_valid, image_valid_labels), batch_size=50)

训练图像分类器，学习率5e-6，训练50个epoch

In [14]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [15]:
image_classifier = ResNet18()
image_classifier.to(device)

epoch_num = 50
learning_rate = 1e-6
total_step = epoch_num * len(train_loader)

optimizer = AdamW(image_classifier.parameters(), lr=learning_rate, eps=1e-8)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0.1*total_step, num_training_steps=total_step)


# optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
criterion = nn.CrossEntropyLoss()



In [16]:
for epoch in range(epoch_num):
  running_loss = 0
  for i, data in enumerate(train_loader):
    inputs, labels = data
    inputs = inputs.float()
    inputs = inputs.to(device)
    labels = labels.to(device)
    # print(inputs.shape)
    outputs = image_classifier(inputs)
    # print(outputs.shape)
    loss = criterion(outputs, labels)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    scheduler.step()
    running_loss += loss.item()
  print('epoch: %d  loss: %.3f' % (epoch+1, running_loss / 35))
  running_loss = 0

epoch: 1  loss: 0.938
epoch: 2  loss: 0.928
epoch: 3  loss: 0.915
epoch: 4  loss: 0.905
epoch: 5  loss: 0.896
epoch: 6  loss: 0.889
epoch: 7  loss: 0.882
epoch: 8  loss: 0.878
epoch: 9  loss: 0.872
epoch: 10  loss: 0.870
epoch: 11  loss: 0.865
epoch: 12  loss: 0.862
epoch: 13  loss: 0.858
epoch: 14  loss: 0.856
epoch: 15  loss: 0.854
epoch: 16  loss: 0.850
epoch: 17  loss: 0.849
epoch: 18  loss: 0.844
epoch: 19  loss: 0.843
epoch: 20  loss: 0.843
epoch: 21  loss: 0.839
epoch: 22  loss: 0.837
epoch: 23  loss: 0.834
epoch: 24  loss: 0.830
epoch: 25  loss: 0.832
epoch: 26  loss: 0.829
epoch: 27  loss: 0.827
epoch: 28  loss: 0.823
epoch: 29  loss: 0.822
epoch: 30  loss: 0.822
epoch: 31  loss: 0.819
epoch: 32  loss: 0.817
epoch: 33  loss: 0.816
epoch: 34  loss: 0.814
epoch: 35  loss: 0.814
epoch: 36  loss: 0.812
epoch: 37  loss: 0.812
epoch: 38  loss: 0.811
epoch: 39  loss: 0.808
epoch: 40  loss: 0.809
epoch: 41  loss: 0.807
epoch: 42  loss: 0.808
epoch: 43  loss: 0.806
epoch: 44  loss: 0.8

在验证集上测试参数训练的效果：

In [17]:
correct_num = 0
total_num = 0
with torch.no_grad():
  for data in valid_loader:
    inputs, answers = data
    inputs = inputs.float()
    inputs = inputs.to(device)
    answers = answers.to(device)
    outputs = image_classifier(inputs)
    _, predicted = torch.max(outputs.data, 1)
    for i in range(len(predicted.tolist())):
      total_num += answers.size(0)
      correct_num += (predicted == answers).sum().item()

print('Training Accuracy: %.3f%%' % (100 * correct_num / total_num))

Training Accuracy: 60.200%


图像分类器在图像数据集上的正确率能够达到50%左右,最高能够到达60%

## 文本分类器

使用预训练模型bert-base-chinese

In [18]:
checkpoint = 'bert-base-chinese'
tokenizer = AutoTokenizer.from_pretrained(checkpoint)
bert_model = AutoModel.from_pretrained(checkpoint)
# bert_model.to(device)

Downloading (…)okenizer_config.json:   0%|          | 0.00/29.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/624 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/110k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/269k [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/412M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-chinese were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


创建文本分类器模型，在Bert的基础上增添一个线性层

In [19]:
class TextClassifier(nn.Module):
  def __init__(self):
    super(TextClassifier, self).__init__()
    self.model = bert_model
    self.model = self.model.to(device)
    self.dropout = nn.Dropout(0)
    # self.model.to(device)
    self.fc = nn.Linear(768, 3)

  def forward(self, x, attn_mask=None):
    x = x.to(device)
    attn_mask = attn_mask.to(device)
    output = self.model(x, attention_mask=attn_mask)
    # output = output.to(device)
    output = output[1]
    output = torch.flatten(output, 1)
    output = self.fc(output)
    return output

验证Bert在文本分类上的性能

对输入数据进行tokenize，统一长度，生成注意力分数

In [20]:
text_train = []
text_valid = []

for data in train_set:
  tokenized_text = tokenizer(data['text'], max_length=128, padding='max_length', truncation=True)
  # tokenized_text['input_ids'] = torch.from_numpy(np.array(tokenized_text['input_ids']))
  tokenized_text['label'] = data['label']
  text_train.append(tokenized_text)

for data in valid_set:
  tokenized_text = tokenizer(data['text'], max_length=128, padding='max_length', truncation=True)
  tokenized_text['label'] = data['label']
  text_valid.append(tokenized_text)

重载Dataset类，便于生成Dataloader

In [21]:
class TextDataset(Dataset):
  def __init__(self, data):
    super(TextDataset, self).__init__()
    self.data = data

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    input_ids = self.data[idx]['input_ids']
    attn_mask = self.data[idx]['attention_mask']
    label = self.data[idx]['label']
    return input_ids, attn_mask, label

train_loader = DataLoader(TextDataset(text_train), batch_size=25, shuffle=True)
valid_loader = DataLoader(TextDataset(text_valid), batch_size=25)

In [22]:
text_classifier = TextClassifier()
text_classifier.to(device)
# classifier.model.to(device)

epoch_num = 20
learning_rate = 1e-5
total_step = epoch_num * len(train_loader)

optimizer = AdamW(text_classifier.parameters(), lr=learning_rate, eps=1e-8)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0.1*total_step, num_training_steps=total_step)

criterion = nn.CrossEntropyLoss()

In [23]:
# classifier.train()

for epoch in range(epoch_num):
  running_loss = 0
  for i, data in enumerate(train_loader):
    input_ids, attn_mask, labels = data
    input_ids = torch.tensor([item.numpy() for item in input_ids])
    attn_mask = torch.tensor([item.numpy() for item in attn_mask])
    input_ids = input_ids.T
    attn_mask = attn_mask.T
    # labels = torch.tensor([item.numpy() for item in labels])
    input_ids = input_ids.to(device)
    attn_mask = attn_mask.to(device)
    labels = labels.to(device)

    # print(input_ids.shape)
    # print(attn_mask.shape)

    outputs = text_classifier(input_ids, attn_mask)
    # print(outputs.shape)
    loss = criterion(outputs, labels)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    scheduler.step()

    running_loss += loss.item()
  print('epoch: %d  loss: %.3f' % (epoch+1, running_loss/140))
  running_loss = 0

  input_ids = torch.tensor([item.numpy() for item in input_ids])


epoch: 1  loss: 0.929
epoch: 2  loss: 0.858
epoch: 3  loss: 0.756
epoch: 4  loss: 0.547
epoch: 5  loss: 0.324
epoch: 6  loss: 0.200
epoch: 7  loss: 0.140
epoch: 8  loss: 0.105
epoch: 9  loss: 0.084
epoch: 10  loss: 0.067
epoch: 11  loss: 0.062
epoch: 12  loss: 0.058
epoch: 13  loss: 0.051
epoch: 14  loss: 0.045
epoch: 15  loss: 0.042
epoch: 16  loss: 0.040
epoch: 17  loss: 0.039
epoch: 18  loss: 0.036
epoch: 19  loss: 0.035
epoch: 20  loss: 0.034


In [24]:
correct_num = 0
total_num = 0
with torch.no_grad():
  for data in valid_loader:
    input_ids, attn_mask, labels = data
    input_ids = torch.tensor([item.numpy() for item in input_ids])
    input_ids = input_ids.T
    attn_mask = torch.tensor([item.numpy() for item in attn_mask])
    attn_mask = attn_mask.T
    input_ids = input_ids.to(device)
    attn_mask = attn_mask.to(device)
    labels = labels.to(device)

    outputs = text_classifier(input_ids, attn_mask)
    _, predicted = torch.max(outputs.data, 1)
    for i in range(len(predicted.tolist())):
      total_num += labels.size(0)
      correct_num += (predicted == labels).sum().item()

print('Training Accuracy: %.3f%%' % (100 * correct_num / total_num))

Training Accuracy: 62.800%


基于bert的文本分类器在数据集上的正确率能够到达60%

## 融合模型

定义完整的Dataset，输入时向文本分类器提供文本，图片分类器提供图片

In [25]:
class MultimodalDataset(Dataset):
  def __init__(self, data):
    super(MultimodalDataset, self).__init__()
    self.data = data

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    guid = self.data[idx]['guid']
    input_ids = torch.tensor(self.data[idx]['input_ids'])
    attn_mask = torch.tensor(self.data[idx]['attn_mask'])
    image = torch.tensor(self.data[idx]['image'])
    label = self.data[idx].get('label')
    if label is None:
      label = -100
    label = torch.tensor(label)
    return guid, input_ids, attn_mask, image, label

In [26]:
def dataset_process(dataset):
  for data in dataset:
    tokenized_text = tokenizer(data['text'], max_length=128, padding='max_length', truncation=True)
    data['input_ids'] = tokenized_text['input_ids']
    data['attn_mask'] = tokenized_text['attention_mask']

In [27]:
dataset_process(train_set)
dataset_process(valid_set)
dataset_process(test_set)

In [28]:
train_loader = DataLoader(MultimodalDataset(train_set), batch_size=25, shuffle=True)
valid_loader = DataLoader(MultimodalDataset(valid_set), batch_size=25)
test_loader = DataLoader(MultimodalDataset(test_set), batch_size=25)

构建完整的融合模型类，文本分类器的输出结果与图像分类器的输出结果拼接，经过线性层分类后输出

两个分类器的输出shape均为为(batch_size, output_features)

对拼接后的特征向量，先进入一个线性层，使模型学习两个特征向量之间的关系

最后进入分类层，输出结果

In [29]:
class MultimodalModel(nn.Module):
  def __init__(self, image_classifier, text_classifier, output_features, image_weight=0.5, text_weight=0.5):
    super(MultimodalModel, self).__init__()
    self.image_classifier = image_classifier
    self.text_classifier = text_classifier
    # 将最后的全连接层删除
    self.image_classifier.fc = nn.Sequential()  # (batch_num, 512)
    self.text_classifier.fc = nn.Sequential()    # (batch_num, 768)
    # 文本特征向量和图片特征向量的权重, 默认均为0.5
    self.image_weight = image_weight
    self.text_weight = text_weight
    self.fc1 = nn.Linear((512+768), output_features)
    self.fc2 = nn.Linear(output_features, 3)

  def forward(self, input_ids, attn_mask, image):
    image_output = self.image_classifier(image)
    text_output = self.text_classifier(input_ids, attn_mask)
    output = torch.cat([image_output, text_output], dim=-1)
    output = self.fc1(output)
    output = self.fc2(output)
    return output

实例化时使用先前训练完成的模型，在各自数据集上的分类效果较好，提取出的特征向量表现相较于初始化的模型也更优

In [30]:
multimodal_model = MultimodalModel(image_classifier=image_classifier, text_classifier=text_classifier, output_features=100, image_weight=0.5, text_weight=0.5)
multimodal_model.to(device)

epoch_num = 10
learning_rate = 1e-5
total_step = epoch_num * len(train_loader)

optimizer = AdamW(multimodal_model.parameters(), lr=learning_rate, eps=1e-8)
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0.1*total_step, num_training_steps=total_step)
criterion = nn.CrossEntropyLoss()

In [31]:
for epoch in range(epoch_num):
  running_loss = 0
  for i, data in enumerate(train_loader):
    _, input_ids, attn_mask, image, label = data
    input_ids = input_ids.to(device)
    attn_mask = attn_mask.to(device)
    image = image.to(device)
    image = image.float()
    label = label.to(device)

    outputs = multimodal_model(input_ids=input_ids, attn_mask=attn_mask, image=image)
    # print(outputs.shape)
    loss = criterion(outputs, label)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    scheduler.step()

    running_loss += loss.item()
  print('epoch: %d  loss: %.3f' % (epoch+1, running_loss/140))
  running_loss = 0

epoch: 1  loss: 0.609
epoch: 2  loss: 0.184
epoch: 3  loss: 0.113
epoch: 4  loss: 0.078
epoch: 5  loss: 0.055
epoch: 6  loss: 0.046
epoch: 7  loss: 0.040
epoch: 8  loss: 0.035
epoch: 9  loss: 0.033
epoch: 10  loss: 0.030


In [32]:
correct_num = 0
total_num = 0
with torch.no_grad():
  for data in valid_loader:
    _, input_ids, attn_mask, image, label = data
    input_ids = input_ids.to(device)
    attn_mask = attn_mask.to(device)
    image = image.to(device)
    image = image.float()
    label = label.to(device)

    outputs = multimodal_model(input_ids=input_ids, attn_mask=attn_mask, image=image)
    _, predicted = torch.max(outputs.data, 1)
    for i in range(len(predicted.tolist())):
      total_num += label.size(0)
      correct_num += (predicted == label).sum().item()

print('Training Accuracy: %.3f%%' % (100 * correct_num / total_num))

Training Accuracy: 63.600%


In [33]:
test_dict = {}
with torch.no_grad():
  for data in test_loader:
    guid, input_ids, attn_mask, image, label = data
    input_ids = input_ids.to(device)
    attn_mask = attn_mask.to(device)
    image = image.to(device)
    image = image.float()
    label = label.to(device)

    outputs = multimodal_model(input_ids=input_ids, attn_mask=attn_mask, image=image)
    _, predicted = torch.max(outputs.data, 1)
    predicted = predicted.tolist()
    for i in range(len(predicted)):
      id = guid[i]
      test_dict[id] = predicted[i]

将分类结果重新写入文件

In [34]:
with open('./test_without_label.txt', 'r') as f:
  lines = f.readlines()

f1 = open('./test.txt', 'w')
f1.write(lines[0])

for line in lines[1:]:
  # print(line)
  guid = line.split(',')[0]
  f1.write(guid)
  f1.write(',')
  label = test_dict[guid]
  if label == 0:
    f1.write('positive\n')
  elif label == 1:
    f1.write('neutral\n')
  else:
    f1.write('negative\n')