# Task A: Designing a Convolution Module for Variable Input Channels

Design a special convolutional module that is spatial size invariant and can handle an arbitrary number of input channels. 

Explain:

1. design principles

2. references

3. additional costs (such as FLOPS or #PARAMS) 

and compare with naive models. 

In [2]:
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset

import torch
import torch.nn as nn       # module
import torch.optim as optim # optimizer

from torch.utils.data import DataLoader, random_split
import torch.nn.functional as F
from torchvision import datasets, transforms

In [3]:
print("Pytorch version:",torch.__version__)
print("Cuda version:",torch.version.cuda)
print("cuDNN version:",torch.backends.cudnn.version())
print("Check if cuda is available:",torch.cuda.is_available())

Pytorch version: 2.6.0+cu126
Cuda version: 12.6
cuDNN version: 90501
Check if cuda is available: True


In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用設備: {device}")

使用設備: cuda:0


In [5]:
with open(os.path.join(os.getcwd(),"images","train.txt")) as Name_file:
    lines = Name_file.readlines()
    print(len(lines))
with open(os.path.join(os.getcwd(),"images","test.txt")) as Name_file:
    lines = Name_file.readlines()
    print(len(lines))
with open(os.path.join(os.getcwd(),"images","val.txt")) as Name_file:
    lines = Name_file.readlines()
    print(len(lines))

63325
450
450


In [None]:
class ImageClassDataset(Dataset):
    def __init__(self, names_file):
        self.images_dir = os.path.join(os.getcwd(), "images")
        self.x = []
        self.y = []
        
        with open(os.path.join(self.images_dir, names_file)) as Name_file:
            lines = Name_file.readlines()
            self.n_samples = len(lines)
            
            for line in lines:
                parts = line.strip().split(' ')
                self.x.append(parts[0])
                self.y.append(int(parts[1]))
    
    def __getitem__(self, index):
        img_path = os.path.join(self.images_dir, self.x[index])
        image = Image.open(img_path).convert('RGB')
        label = torch.tensor(self.y[index], dtype = torch.long)
        
        return image, label

    def __len__(self):
        return self.n_samples

## Base line model: AlexNet

In [None]:
class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super().__init__()
        self.features = nn.Sequential(

            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),

            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),

            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),

            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os

class MyDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = os.listdir(data_dir)

    def __getitem__(self, idx):
        img_path = os.path.join(self.data_dir, self.image_paths[idx])
        image = Image.open(img_path)  # 加載圖片
        label = 0  # 假設每個圖片的標籤是 0

        if self.transform:
            image = self.transform(image)
        return image, label
    def __len__(self):
        return len(self.image_paths)

# 假設圖像處理的轉換（例如，轉換為張量）
from torchvision import transforms
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

# 創建自定義數據集實例
dataset = MyDataset(data_dir="path_to_images", transform=transform)

# 使用 DataLoader 加載數據
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

# 訓練過程中使用 DataLoader
for images, labels in dataloader:
    print(images.shape, labels)


In [None]:
def train_model(model, train_loader, val_loader, num_epochs=10, learning_rate=0.001, weight_decay=1e-5, patience=5, device=None):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    loss_func=torch.nn.CrossEntropyLoss()

    # early stopping needed parameters
    best_val_loss = float('inf')
    early_stop_counter = 0
    # record the training process
    train_losses, val_losses = [], []
    for epoch in range(num_epochs):
        # 训练阶段
        model.train()
        train_loss = 0.0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()  # 梯度初始化設為0
            outputs = model(inputs)
            loss = loss_func(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
        
        train_loss = train_loss / len(train_loader.dataset)
        train_losses.append(train_loss)
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = model(inputs)
                loss = loss_func(outputs, labels)
                
                val_loss += loss.item() * inputs.size(0)
                
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        val_loss = val_loss / len(val_loader.dataset)
        val_accuracy = correct / total
        val_losses.append(val_loss)
        
        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train Loss: {train_loss:.4f}, '
              f'Val Loss: {val_loss:.4f}, '
              f'Val Accuracy: {val_accuracy:.4f}')
        
        # 早停检查
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stop_counter = 0
            # 保存最佳模型
            torch.save(model.state_dict(), 'best_model.pth')
            print("保存最佳模型")
        else:
            early_stop_counter += 1
            if early_stop_counter >= patience:
                print(f"早停: {patience} 轮验证损失未改善")
                break

    model.load_state_dict(torch.load('best_model.pth'))
    return model, train_losses, val_losses

In [None]:




# === 2. 準備數據集 (以 CIFAR-10 為例) ===
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

batch_size = 64  # 你可以調整 batch size
dataset = datasets.CIFAR10(root="./data", train=True, transform=transform, download=True)

# 80% 訓練集，20% 驗證集
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

# === 3. 定義模型 (假設 myModel 是你設計的模型) ===
class myModel(nn.Module):
    def __init__(self, num_classes=10):
        super(myModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(128 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# === 4. 初始化模型，並使用 Data Parallel (多 GPU 訓練) ===
model = myModel().to(device)
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs!")
    model = nn.DataParallel(model)

# === 5. 定義損失函數與 Adam 優化器 ===
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# === 6. 設定 Early Stopping 來根據 Validation Loss 調整參數 ===
best_val_loss = float("inf")
patience = 5  # 若 5 個 epoch 沒有進步則停止
patience_counter = 0

# === 7. 訓練與驗證迴圈 ===
num_epochs = 20
for epoch in range(num_epochs):
    model.train()  # 訓練模式
    train_loss = 0.0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()  # 反向傳播
        optimizer.step()  # 更新參數

        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    train_accuracy = 100 * correct / total
    train_loss /= len(train_loader)

    # === 8. Validation (驗證模型) ===
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():  # 停止梯度計算，加速推理
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_accuracy = 100 * correct / total
    val_loss /= len(val_loader)

    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%")
    print(f"Validation Loss: {val_loss:.4f}, Validation Acc: {val_accuracy:.2f}%\n")

    # === 9. Early Stopping (如果 Validation Loss 沒有改善，則停止訓練) ===
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0  # 重置耐心計數器
        torch.save(model.state_dict(), "best_model.pth")  # 儲存最佳模型
        print("Best model saved!")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered!")
            break

# === 10. 加載最佳模型並進行推論 ===
model.load_state_dict(torch.load("best_model.pth"))
model.eval()
print("Best model loaded for inference.")
