<a href="https://colab.research.google.com/github/loisll/MMAI984/blob/main/lois_trainmodel_Team_Project_VQA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install torch torchvision transformers
!pip install tqdm



STEP 1: Import Libraries

In [5]:
# step 1: Import libraries
import pandas as pd
from PIL import Image
import json
import matplotlib.pyplot as plt
import seaborn as sns
import os
import numpy as np
import zipfile
from google.colab import drive
import random
import tensorflow as tf

from tensorflow.keras import datasets, layers, models
import matplotlib.pyplot as plt
import cv2
import re
from tqdm import tqdm

In [6]:
# Define the data path
drive.mount('/content/drive')
data_path = '/content/drive/My Drive/Colab Notebooks/'


Mounted at /content/drive


STEP 2: LOAD TRAIN DATA

In [7]:
# Define function for loading questions and annotations

def load_data(data_file, feature):

  # Check if the file exists
  if os.path.exists(data_file):
    print("File found:", data_file)

    # Load the JSON file using the json module
    with open(data_file, 'r') as f:
        data = json.load(f)

    # Convert the JSON data to a DataFrame
    # questions = pd.DataFrame(data)

    # Flatten the JSON structure
    data = pd.json_normalize(data[feature])

    # Question preprocessing

    print("Data loaded successfully")
  else:
    print("File not found:", data_file)

  return data

*Load and prepare the training dataset*

In [8]:
# Load training questions

# Define the questions file
train_questions_file = os.path.join(data_path, 'train2015/MultipleChoice_abstract_v002_train2015_questions.json')
#train_questions_file = os.path.join(data_path)
train_questions_feature = 'questions'

train_questions = load_data(train_questions_file, train_questions_feature )

# Check the first 5 rows
#train_questions

File found: /content/drive/My Drive/Colab Notebooks/train2015/MultipleChoice_abstract_v002_train2015_questions.json
Data loaded successfully


In [9]:
# Load trainning annotations

# Define the annotations file
train_annotations_file = os.path.join(data_path, 'train2015/abstract_v002_train2015_annotations.json')
train_annotations_feature = 'annotations'
train_annotations = load_data(train_annotations_file, train_annotations_feature)

#print(train_annotations_file[annotations])
# Check the first 5 rows
#train_annotations

File found: /content/drive/My Drive/Colab Notebooks/train2015/abstract_v002_train2015_annotations.json
Data loaded successfully


In [10]:

# Load trainning captions

# Define the annotations file
train_captions_file = os.path.join(data_path, 'train2015/captions_abstract_v002_train2015.json')
train_captions_feature = 'images'
train_captions = load_data(train_captions_file, train_captions_feature)

#print(train_captions_file[annotations])
# Check the first 5 rows
#train_captions

File found: /content/drive/My Drive/Colab Notebooks/train2015/captions_abstract_v002_train2015.json
Data loaded successfully


In [11]:
# Load trainning captions

# Define the annotations file
train_OpenEnded_file = os.path.join(data_path, 'train2015/OpenEnded_abstract_v002_train2015_questions.json')
train_OpenEnded_feature = 'questions'
train_OpenEnded = load_data(train_OpenEnded_file, train_OpenEnded_feature)

#print(train_captions_file[annotations])
# Check the first 5 rows
#train_OpenEnded

File found: /content/drive/My Drive/Colab Notebooks/train2015/OpenEnded_abstract_v002_train2015_questions.json
Data loaded successfully


2.2 MERGE TRAIN DATA

In [12]:
# Merge questions and answers

#train_data = pd.merge(train_questions, train_annotations, on='question_id')
train_data = pd.merge(train_questions, train_annotations, on=["image_id", "question_id"])

df_train = train_data.merge(train_captions, on='image_id')
df_train.head(5)

Unnamed: 0,image_id,question,multiple_choices,question_id,question_type,multiple_choice_answer,answers,answer_type,url,file_name,width,height
0,11779,Who looks happier?,"[alive, 1, woman, purple, 2, yes, white, boy, ...",117792,who,man,"[{'answer': 'old person', 'answer_confidence':...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000011779.png,700,400
1,11779,Where is the woman sitting?,"[3, no, blue, red, 1, slide, monkey bars, jump...",117790,where is the,blanket,"[{'answer': 'on blanket', 'answer_confidence':...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000011779.png,700,400
2,11779,Where is the man sitting?,"[away, yes, blue, 1, 2, mouse, couch, no, yell...",117791,where is the,bench,"[{'answer': 'on bench', 'answer_confidence': '...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000011779.png,700,400
3,5536,Is this man hungry?,"[water, yellow, 4, running, blue, pouring, out...",55360,is this,yes,"[{'answer': 'yes', 'answer_confidence': 'yes',...",yes/no,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000005536.png,700,400
4,5536,What kind of drink is that?,"[wine, girl would fall, soda, white, yes, coke...",55361,what kind of,soda,"[{'answer': 'water', 'answer_confidence': 'no'...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000005536.png,700,400


**STEP 3 : EDA**

From the above examples, we can see that most of the questions and answers are simple and clean text but some questions contain punctuation, common word contractions like what’s, it’s, they’re, etc, and noun contractions like guy’s, man’s, dog’s, etc., and some answers also contain punctuation. Hence, we need to perform the data cleaning operation on the question and answer dataset and expand contractions before performing EDA.

In [13]:

def decontractions(phrase):
    """decontracted takes text and convert contractions into natural form.
     ref: https://stackoverflow.com/questions/19790188/expanding-english-language-contractions-in-python/47091490#47091490"""
    # specific
    phrase = re.sub(r"won\'t", "will not", phrase)
    phrase = re.sub(r"can\'t", "can not", phrase)
    phrase = re.sub(r"won\’t", "will not", phrase)
    phrase = re.sub(r"can\’t", "can not", phrase)

    phrase = re.sub(r"he\'s", "he is", phrase)
    phrase = re.sub(r"she\'s", "she is", phrase)
    phrase = re.sub(r"it\'s", "it is", phrase)

    phrase = re.sub(r"he\’s", "he is", phrase)
    phrase = re.sub(r"she\’s", "she is", phrase)
    phrase = re.sub(r"it\’s", "it is", phrase)

    # general
    phrase = re.sub(r"n\'t", " not", phrase)
    phrase = re.sub(r"\'re", " are", phrase)
    phrase = re.sub(r"\'d", " would", phrase)
    phrase = re.sub(r"\'ll", " will", phrase)
    phrase = re.sub(r"\'t", " not", phrase)
    phrase = re.sub(r"\'ve", " have", phrase)
    phrase = re.sub(r"\'m", " am", phrase)

    phrase = re.sub(r"n\’t", " not", phrase)
    phrase = re.sub(r"\’re", " are", phrase)
    phrase = re.sub(r"\’d", " would", phrase)
    phrase = re.sub(r"\’ll", " will", phrase)
    phrase = re.sub(r"\’t", " not", phrase)
    phrase = re.sub(r"\’ve", " have", phrase)
    phrase = re.sub(r"\’m", " am", phrase)

    return phrase


def text_preprocess(text):
    text = text.lower()
    text = decontractions(text) # replace contractions into natural form
    text = re.sub('[-,:]', ' ', text) # replace the character "-" "," with space
    text = re.sub("(?!<=\d)(\.)(?!\d)", '', text) # remove the character ".", except from floating numbers
    text = re.sub('[^A-Za-z0-9. ]+', '', text) # remove all punctuation, except A-Za-z0-9
    text = re.sub(' +', ' ', text) # remove extra space
    return text

# Question and Answer text preprocessing
df_train["question_preprocessed"] = df_train["question"].map(lambda x: text_preprocess(x))
df_train["answer_preprocessed"] = df_train["multiple_choice_answer"].map(lambda x: text_preprocess(x))

In [14]:
df_train.head(3)

Unnamed: 0,image_id,question,multiple_choices,question_id,question_type,multiple_choice_answer,answers,answer_type,url,file_name,width,height,question_preprocessed,answer_preprocessed
0,11779,Who looks happier?,"[alive, 1, woman, purple, 2, yes, white, boy, ...",117792,who,man,"[{'answer': 'old person', 'answer_confidence':...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000011779.png,700,400,who looks happier,man
1,11779,Where is the woman sitting?,"[3, no, blue, red, 1, slide, monkey bars, jump...",117790,where is the,blanket,"[{'answer': 'on blanket', 'answer_confidence':...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000011779.png,700,400,where is the woman sitting,blanket
2,11779,Where is the man sitting?,"[away, yes, blue, 1, 2, mouse, couch, no, yell...",117791,where is the,bench,"[{'answer': 'on bench', 'answer_confidence': '...",other,http://visualqa.org/data/abstract_v002/scene_i...,abstract_v002_train2015_000000011779.png,700,400,where is the man sitting,bench


# step 4 : Preprocess the image--CNN

In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
import nltk
import numpy as np
nltk.download('punkt')
import numpy as np
from tensorflow.keras.preprocessing import image


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


4.1. build function of preprocssing of image


In [25]:
# preprocess = transforms.Compose([
#     transforms.Resize((224, 224)),  # 调整图像大小为 224x224
#     transforms.ToTensor(),  # 转换为 Tensor
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 标准化
# ])

preprocess = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

# 2. 加载图像并进行预处理
def load_images_from_folder(folder_path):
    images = []
    count_id = []
    for filename in os.listdir(folder_path):
        if filename.endswith('.png'):  # 只加载 PNG 图片
            image_path = os.path.join(folder_path, filename)
            image = Image.open(image_path).convert('RGB')  # 打开并转换为 RGB
            image_tensor = preprocess(image)  # 预处理为 Tensor
            images.append(image_tensor)
            count_id.append(filename.split('.')[0])
    return  torch.stack(images)  # 将所有 Tensor 拼接成一个批处理 (batch) 的 Tensor


In [26]:
#nlp = spacy.load('en_core_web_sm')
drive.mount('/content/drive')
data_path = '/content/drive/My Drive/Colab Notebooks/'
extract_dir_test = os.path.join(data_path, 'train2015/train2015_images/test')
extract_dir_val = os.path.join(data_path, 'train2015/train2015_images/val')


image_batch = load_images_from_folder(extract_dir_test)  # 返回大小为 (6000, 3, 224, 224) 的 Tensor


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [27]:
image_batch.shape

torch.Size([18, 3, 224, 224])

In [41]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import torch.nn.functional as F


# Step 1: Pretrained ResNet for Image Encoding
class ImageEncoder(nn.Module):
    def __init__(self):
        super(ImageEncoder, self).__init__()
        # Load pretrained ResNet50
        resnet = models.resnet50(pretrained=True)
        # Remove the final fully connected layer
        self.encoder = nn.Sequential(*list(resnet.children())[:-2])  # Keep layers till the last conv layer

    def forward(self, x):
        return self.encoder(x)  # Output: (batch_size, 2048, 7, 7)

# Step 2: CNN Model After Image Encoding
class CNNAfterEncoding(nn.Module):
    def __init__(self):
        super(CNNAfterEncoding, self).__init__()
        # Encoder (ResNet50)
         # First Conv Layer: 3 input channels (RGB), 16 output channels, 3x3 kernel size
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)

        # Second Conv Layer: 16 input channels, 32 output channels, 3x3 kernel size
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)

        # Max Pool Layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Fully Connected (hidden) layer: Flatten the input to fit the fully connected layer
        self.fc1 = nn.Linear(32 * 56 * 56, 128)  # 32 filters, 56x56 feature map size after pooling twice

        # Output layer: 128 input features, 10 output classes (example for classification)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        # Pass through first Conv layer followed by ReLU and Max Pooling
        x = self.pool(F.relu(self.conv1(x)))  # Output: (batch_size, 16, 112, 112)

        # Pass through second Conv layer followed by ReLU and Max Pooling
        x = self.pool(F.relu(self.conv2(x)))  # Output: (batch_size, 32, 56, 56)

        # Flatten the tensor for the fully connected layer
        x = x.view(-1, 32 * 56 * 56)  # Flatten to shape (batch_size, 32 * 56 * 56)

        # Pass through the fully connected hidden layer
        x = F.relu(self.fc1(x))  # Output: (batch_size, 128)

        # # Pass through the final output layer
        # x = self.fc2(x)  # Output: (batch_size, 10) - logits for 10 classes

        return x

# Example usage:
# Initialize model
model = CNNAfterEncoding()

# Forward pass through the model
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    output = model(image_batch)
    print(f'Output logits: {output.shape}')


Output logits: torch.Size([18, 128])


In [28]:
# images_target = []
# count_id_target = []
# for filename in os.listdir(extract_dir_val):
#      # Check if the file is a PNG image
#      if filename.endswith('.png'):
#        # Create the full path to the image
#         image_path = os.path.join(extract_dir_val, filename)
#               # Open the image
#         image = Image.open(image_path)
#               # preprocessing image
#         tensor_gpu=preprocess_image(image_path)
#         images_target.append(tensor_gpu)
#         count_id_target.append(filename.split('.')[0])

# images_target = np.array(images_target)
# images_target = images_target.astype('float32')
# images_target /= 255.0
# images_target = tf.convert_to_tensor(images_target)


# step 5 : preprocessing the Question and Answer --RNN

In [33]:
# 创建一个简单的词汇表，将问题文本转换为索引
# 使用 LabelEncoder 将每个唯一单词转换为索引
all_questions = " ".join(df_train['question_preprocessed'][:18]).split()
unique_words = list(set(all_questions))
word_to_index = {word: index for index, word in enumerate(unique_words)}
vocab_size = len(word_to_index)  # 词汇表大小

# 定义 QuestionEncoder 类
class QuestionEncoder(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(QuestionEncoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)

    def forward(self, question):
        embedded = self.embedding(question)
        _, (hidden, _) = self.lstm(embedded)
        return hidden[-1]  # 返回最后一层的隐藏状态

# 模型参数
embed_size = 128     # 嵌入维度
hidden_size = 64     # LSTM 隐藏层维度
num_layers = 2       # LSTM 层数

# 创建模型实例
model = QuestionEncoder(vocab_size, embed_size, hidden_size, num_layers)

# 将问题文本转换为索引
def text_to_indices(text):
    # 将问题拆分为单词并转换为对应的索引
    return [word_to_index[word] for word in text.split() if word in word_to_index]

# 将 DataFrame 中的问题转换为索引张量
questions_indices = [text_to_indices(question) for question in df_train['question_preprocessed'][:18]]
# 填充索引以使其具有相同的长度（假设最大长度为10）
max_length = 10
questions_indices_padded = [q + [0] * (max_length - len(q)) if len(q) < max_length else q[:max_length] for q in questions_indices]

# 将问题索引转换为 Tensor
questions_tensor = torch.tensor(questions_indices_padded, dtype=torch.long)

# 前向传播，获取问题特征
question_features = model(questions_tensor)

# 输出特征的形状
print("Output shape:", question_features.shape)  # 应该是 (batch_size, hidden_size)

Output shape: torch.Size([18, 64])


(18,)

Answer encoder


In [29]:
answers = df_train["answer_preprocessed"][:18]

# Step 2: Create a vocabulary of all unique words
vocab = {word: idx for idx, word in enumerate(set(answers))}
print("Vocabulary:", vocab)

# Example Output: {'blue': 0, 'yes': 1, 'no': 2, 'maybe': 3, 'red': 4, 'definitely': 5, 'sometimes': 6, 'green': 7}

# Step 3: Convert each answer into its corresponding index
answer_indices = [torch.tensor([vocab[word]]) for word in answers]
print("Answer Indices (before batching):", answer_indices)

# Example Output: [tensor([1]), tensor([2]), tensor([3]), tensor([5]), tensor([1]), tensor([2]), ...]

# Step 4: Pad the sequences to ensure they have the same length
# Since these are single words, padding is not needed. But if the answers are sequences of different lengths:
padded_batch = pad_sequence(answer_indices, batch_first=True)
print("Padded Batch Shape:", padded_batch.shape)


Vocabulary: {'around tree': 0, 'blanket': 1, 'soccer ball': 2, 'beige': 3, 'yes': 4, 'bench': 5, 'mushroom': 6, 'dog': 7, 'man': 8, 'brown': 9, 'soda': 10, 'beehive': 11, 'sunny': 12, 'no': 13, 'bone': 14}
Answer Indices (before batching): [tensor([8]), tensor([1]), tensor([5]), tensor([4]), tensor([10]), tensor([3]), tensor([8]), tensor([12]), tensor([13]), tensor([11]), tensor([11]), tensor([6]), tensor([0]), tensor([2]), tensor([4]), tensor([9]), tensor([7]), tensor([14])]


NameError: name 'pad_sequence' is not defined

step 5 :  build model

In [44]:
output.shape

torch.Size([18, 128])

In [49]:


# VQA model: Combine image and question features and predict the answer
class VQAModel(nn.Module):
    def __init__(self, image_feat_size, question_feat_size, hidden_size, answer_vocab_size):
        super(VQAModel, self).__init__()
        self.fc1 = nn.Linear(image_feat_size + question_feat_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, answer_vocab_size)

    def forward(self, image_features, question_features):
        combined_features = torch.cat((image_features, question_features), dim=1)
        x = torch.relu(self.fc1(combined_features))
        x = self.fc2(x)
        return x


In [52]:
output.shape

torch.Size([18, 128])

In [50]:
question_features.shape

torch.Size([18, 64])

In [53]:
# Example usage
def main():
    # Hyperparameters
    vocab_size = 1000  # Example vocab size (should be equal to the number of unique tokens in your question dataset)
    embed_size = 300
    hidden_size = 512
    num_layers = 1
    answer_vocab_size = 100  # Example number of possible answers


    #question_encoder = QuestionEncoder(vocab_size, embed_size, hidden_size, num_layers)
    vqa_model = VQAModel(image_feat_size=128, question_feat_size=64, hidden_size=512, answer_vocab_size=answer_vocab_size)

    #flattened_image_features = image_batch.view(batch_size, -1)
    # Predict the answer
    result = vqa_model(output, question_features)
    print("Model output (answer logits):", output)

    # You would typically apply a softmax here to get answer probabilities
    predicted_answer = torch.argmax(output, dim=1)
    print("Predicted answer index:", predicted_answer)

if __name__ == "__main__":
    main()

Model output (answer logits): tensor([[0.0856, 0.0264, 0.0000,  ..., 0.2453, 0.0000, 0.0000],
        [0.1309, 0.0000, 0.0000,  ..., 0.3332, 0.0000, 0.0000],
        [0.0000, 0.0931, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        ...,
        [0.1329, 0.0000, 0.0184,  ..., 0.0000, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0805, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0501, 0.0000, 0.0000]])
Predicted answer index: tensor([ 10,  39,  92,  39,  67,  88,  39,  92,  39, 110, 109,  92, 115,  39,
         78,  39, 119,  39])


In [22]:
image_batch.shape

torch.Size([18, 3, 224, 224])

In [23]:

image_batch = load_images_from_folder(extract_dir_test)  # 返回大小为 (6000, 3, 224, 224) 的 Tensor

# 3. 定义卷积层 (Conv2D)
conv_layer = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)

# 4. 将 6000 张图片传入卷积层
output = conv_layer(image_batch)

print("输出的形状:", output.shape)

输出的形状: torch.Size([18, 16, 224, 224])


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

# Define a simple CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, padding=1)  # Convolutional layer
        self.pool = nn.MaxPool2d(2, 2)  # Pooling layer
        self.conv2 = nn.Conv2d(16, 32, 3, padding=1)  # Second conv layer
        self.fc1 = nn.Linear(32 * 8 * 8, 120)  # Fully connected layer (flattened)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)  # Output layer (10 classes for CIFAR-10)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))  # Apply conv1, relu, and pool
        x = self.pool(torch.relu(self.conv2(x)))  # Apply conv2, relu, and pool
        x = x.view(-1, 32 * 8 * 8)  # Flatten the feature maps into a vector
        x = torch.relu(self.fc1(x))  # Apply fully connected layers
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)  # Output logits (class scores)
        return x


# Initialize the CNN model
model = SimpleCNN()

# Define a loss function and optimizer
criterion = nn.CrossEntropyLoss()  # Cross-entropy loss for classification
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Train the CNN model with batch size
for epoch in range(2):  # Train for 2 epochs (as an example)
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # Get the inputs (images) and labels from the data loader
        lable=[8, 4, 7, 5, 8, 6, 6, 5, 1, 8, 7, 4, 1, 2, 4, 3]
        inputs= images

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass: compute output
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass: compute gradient and update weights
        loss.backward()
        optimizer.step()

        # Print loss statistics
        running_loss += loss.item()
        if i % 100 == 99:  # Print every 100 mini-batches
            print(f"[{epoch + 1}, {i + 1}] loss: {running_loss / 100:.3f}")
            running_loss = 0.0

print('Finished Training')


In [79]:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import spacy

# Load the pre-trained spacy model for tokenization
nlp = spacy.load('en_core_web_sm')
drive.mount('/content/drive')
data_path = '/content/drive/My Drive/Colab Notebooks/'
extract_dir_test = os.path.join(data_path, 'train2015/train2015_images/test')


# Define the VQA Model
class VQAModel(nn.Module):
    def __init__(self, vocab_size, hidden_size, output_size):
        super(VQAModel, self).__init__()
        # Load pre-trained ResNet for image encoding
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.fc = nn.Identity()  # Remove the final classification layer

        # LSTM for question encoding
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True)

        # Fully connected layers to combine image and question encodings
        self.fc1 = nn.Linear(hidden_size + 512, 512)
        self.fc2 = nn.Linear(512, output_size)

    def forward(self, image, question):
        # Image encoding
        img_features = self.resnet(image)

        # Question encoding
        question_embed = self.embedding(question)
        _, (hidden, _) = self.lstm(question_embed)

        # Combine image and question encodings
       # combined = torch.cat((img_features, hidden.squeeze(0)), dim=1)
         # 结合图像和问题的特征
        combined = torch.cat((img_features, question_embed), dim=1)


        # Pass through fully connected layers
        x = torch.relu(self.fc1(combined))
        x = self.fc2(x)

        return x


# Model Hyperparameters
hidden_size = 256
output_size = 3  # Let's say we are predicting one of 3 possible answers

# Initialize model, loss function, and optimizer
model = VQAModel(vocab_size, hidden_size, output_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)




Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [82]:

# Forward pass
output = model(image_features, question_features)

# Example target (the correct answer is index 1)
target = torch.tensor([1])

# Calculate loss
loss = criterion(output, target)
print(f'Loss: {loss.item()}')

# Backpropagation and optimization
optimizer.zero_grad()
loss.backward()
optimizer.step()

print("Training step completed.")


RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [18, 512]

build answer

In [97]:
import torch
import torch.nn as nn
import torchvision.models as models
from torch.nn.utils.rnn import pack_padded_sequence
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
import numpy as np

# 定义VQA模型
class VQAModel(nn.Module):
    def __init__(self, hidden_size, vocab_size, num_classes):
        super(VQAModel, self).__init__()
        # 使用预训练的ResNet处理图像
        resnet = models.resnet50(pretrained=True)
        self.resnet = nn.Sequential(*list(resnet.children())[:-1])  # 移除最后的全连接层

        # 定义LSTM处理文本
        self.lstm = nn.LSTM(input_size=300, hidden_size=hidden_size, num_layers=1, batch_first=True)

        # 定义全连接层用于组合图像和文本特征
        self.fc1 = nn.Linear(hidden_size + 2048, 1024)
        self.fc2 = nn.Linear(1024, num_classes)

    def forward(self, image, question, lengths):
         #提取图像特征
        #img_features = self.resnet(image)
        #img_features = img_features.view(img_features.size(0), -1)  # 展平图像特征

        # 提取问题的特征
        packed = pack_padded_sequence(question, lengths, batch_first=True, enforce_sorted=False)
        _, (hn, _) = self.lstm(packed)
        ques_features = hn[-1]

        # 结合图像和问题的特征
        combined = torch.cat((image, ques_features), dim=1)
        #combined = torch.cat((img_features, ques_features), dim=1)
        x = torch.relu(self.fc1(combined))
        output = self.fc2(x)
        return output

# 假设你已经有数据集并进行预处理
def preprocess_image(image_path):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    image = Image.open(image_path).convert('RGB')
    image = transform(image)
    return image.unsqueeze(0)  # 添加batch维度

# 简单的训练函数
def train(model, data_loader, criterion, optimizer, num_epochs=5):
    model.train()
    for epoch in range(num_epochs):
        for images, questions, lengths, labels in data_loader:
            outputs = model(images, questions, lengths)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


In [101]:
 combined = torch.cat((image.features, question_features), dim=1)

AttributeError: module 'keras._tf_keras.keras.preprocessing.image' has no attribute 'features'

In [94]:

lengths = [4]

# 初始化模型
vocab_size = 1000  # 假设词汇表大小
hidden_size = 512
num_classes = 1000  # 假设有1000种可能的答案
model = VQAModel(hidden_size, vocab_size, num_classes)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
# 假设你有一个data_loader，包含图像、问题、长度、标签等
# train(model, data_loader, criterion, optimizer)

# 模型推理
model.eval()
with torch.no_grad():
    output = model(image_features, question_features, lengths)
    _, predicted = torch.max(output, 1)
    print(f'Predicted answer: {predicted.item()}')


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x4 and 300x2048)