### Install relavent

In [None]:
#!pipenv install -q git+https://github.com/huggingface/transformers
#!pipenv install scikit-learn

### Import



In [None]:
import os
import torch
import torchvision
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import torch.utils.data as data
from torch.autograd import Variable
from torchvision import transforms
from transformers import ViTModel
from transformers import ViTFeatureExtractor
# import torch.nn.functional as F
# from torchvision.transforms import ToTensor
# from transformers.modeling_outputs import SequenceClassifierOutput

### Training Parameters

In [None]:
# 改 EPOCHS/BATCH_SIZE/LEARNING_RATE
# 改 dataset_location(資料集位置)/result_location(訓練結果資料夾)
# 改 save_model load_model位置
# 其他不用改
EPOCHS = 50
BATCH_SIZE = 16
LEARNING_RATE = 2e-5    

transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor()
])
train_folder_name = f"train_e{EPOCHS}_b{BATCH_SIZE}_lr{LEARNING_RATE:0e}"
dataset_location = "D:\deeplearning_class\dl_hw3\mpii_image_classification_mediapipe_v1"
result_location = f'D:/deeplearning_class/dl_hw3/result/med1/{train_folder_name}'

# dataset
train_ds = torchvision.datasets.ImageFolder(f'{dataset_location}/train/', transform=transform)
valid_ds = torchvision.datasets.ImageFolder(f'{dataset_location}/val/', transform=transform)
test_ds  = torchvision.datasets.ImageFolder(f'{dataset_location}/test/', transform=transform)
print("Number of train samples: ", len(train_ds))
print("Number of valid samples: ", len(valid_ds))
print("Number of test samples: ", len(test_ds))
print("Detected Classes are: ", train_ds.class_to_idx) 
train_loader = data.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=4, drop_last=True)
valid_loader  = data.DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, drop_last=True) 
test_loader  = data.DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, drop_last=True)

# write hyperparameters
os.makedirs(result_location, exist_ok=True)
file_path = os.path.join(result_location, 'hyperparameters.txt')
with open(file_path, 'w') as file:
    file.write(f"batch_size: {BATCH_SIZE}\n")
    file.write(f"learning_rate: {LEARNING_RATE}\n")
    file.write(f"epoch: {EPOCHS}\n")

### Vision Transformer

In [None]:
class ViTForImageClassification(nn.Module):
    def __init__(self, num_labels=3):
        super(ViTForImageClassification, self).__init__()
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.vit.config.hidden_size, num_labels)
        self.num_labels = num_labels

    def forward(self, pixel_values, labels):
        outputs = self.vit(pixel_values=pixel_values)
        output = self.dropout(outputs.last_hidden_state[:,0])
        logits = self.classifier(output)

        loss = None
        if labels is not None:
          loss_fct = nn.CrossEntropyLoss()
          loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        if loss is not None:
          return logits, loss.item()
        else:
          return logits, None

### Setup

In [None]:
"""
Pretrain ViT: https://huggingface.co/models?search=google/vit
"""
# Model
model = ViTForImageClassification(num_labels = len(train_ds.classes)) 
model.vit = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
# model.load_state_dict(torch.load(model_path))

# Pretrained ViT
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k', do_rescale=False)

#  Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Cross Entropy Loss
loss_func = nn.CrossEntropyLoss()

# Use GPU if available  
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 
if torch.cuda.is_available():
    model.cuda()

### Train

In [None]:
# 初始化紀錄矩陣
history = {
    "train_loss": [],
    "valid_loss": [],
    "test_loss": [],
    "train_accuracy": [],
    "valid_accuracy": [],
    "test_accuracy": [],
}

for epoch in range(EPOCHS):
    # 訓練階段
    model.train()
    train_loss = 0.0
    correct_train = 0
    total_train = 0

    # Training loop
    for step, (x, y) in enumerate(train_loader):
        # Fetch every individual in batch
        x = np.split(np.squeeze(np.array(x)), BATCH_SIZE)
        for index, array in enumerate(x):
            x[index] = np.squeeze(array)

        # Recover to batch after processing by feature extractor
        x = torch.tensor(np.stack(feature_extractor(x)['pixel_values'], axis=0))

        # Apply to GPU
        x, y = x.to(device), y.to(device)
        b_x = Variable(x)
        b_y = Variable(y)

        # Forward pass
        output, loss = model(b_x, None)
        loss = loss_func(output, b_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate train loss
        train_loss += loss.item()
        predictions = output.argmax(1)
        correct_train += (predictions == y).sum().item()
        total_train += y.size(0)

        if step % 10 == 0:
            print(f"Epoch {epoch + 1}/{EPOCHS}, Step {step}, Train Loss: {loss.item():.4f}")
    
    # 計算平均訓練損失和準確度
    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = correct_train / total_train
    history["train_loss"].append(avg_train_loss)
    history["train_accuracy"].append(train_accuracy)

    # 驗證階段（若有 valid_loader）
    if valid_loader:
        model.eval()
        valid_loss = 0.0
        correct_valid = 0
        total_valid = 0

        with torch.no_grad():
            for valid_x, valid_y in valid_loader:
                # Reshape data
                valid_x = np.split(np.squeeze(np.array(valid_x)), BATCH_SIZE)
                for index, array in enumerate(valid_x):
                    valid_x[index] = np.squeeze(array)
                valid_x = torch.tensor(np.stack(feature_extractor(valid_x)['pixel_values'], axis=0))

                valid_x, valid_y = valid_x.to(device), valid_y.to(device)

                output, _ = model(valid_x, None)
                valid_loss = loss_func(output, valid_y)

                # Accumulate loss and accuracy
                valid_loss += valid_loss.item()
                predictions = output.argmax(1)
                correct_valid += (predictions == valid_y).sum().item()
                total_valid += valid_y.size(0)

        # Append average test loss and accuracy for the epoch
        avg_valid_loss = valid_loss / len(valid_loader)
        valid_accuracy = correct_valid / total_valid
        history["valid_loss"].append(avg_valid_loss.item())
        history["valid_accuracy"].append(valid_accuracy)

    # 測試階段
    model.eval()
    test_loss = 0.0
    correct_test = 0
    total_test = 0

    with torch.no_grad():
        for test_x, test_y in test_loader:
            # Reshape test data
            test_x = np.split(np.squeeze(np.array(test_x)), BATCH_SIZE)
            for index, array in enumerate(test_x):
                test_x[index] = np.squeeze(array)
            test_x = torch.tensor(np.stack(feature_extractor(test_x)['pixel_values'], axis=0))

            # Apply to GPU
            test_x, test_y = test_x.to(device), test_y.to(device)

            # Forward pass for testing
            output, _ = model(test_x, None)
            test_loss = loss_func(output, test_y)

            # Accumulate test loss and accuracy
            test_loss += test_loss.item()
            predictions = output.argmax(1)
            correct_test += (predictions == test_y).sum().item()
            total_test += test_y.size(0)

    # Append average test loss and accuracy for the epoch
    avg_test_loss = test_loss / len(test_loader)
    test_accuracy = correct_test / total_test
    history["test_loss"].append(avg_test_loss.item())
    history["test_accuracy"].append(test_accuracy)

    # Print epoch summary
    print("----------------------------------------------------")
    print(f"Epoch {epoch + 1}/{EPOCHS}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}, "
          f"Valid Loss: {avg_valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.2f}, "
          f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}\n")

### 存history到.csv--(training_history.csv)

In [None]:
import csv

# 定義 CSV 文件路徑
csv_file_path = f"{result_location}/training_history.csv"

# 取得所有 epoch 數
epochs = len(history["train_loss"])

# 建立 CSV 表頭
header = [
    "epoch",
    "train/loss",
    "train/accuracy",
    "valid/loss",
    "valid/accuracy",
    "test/loss",
    "test/accuracy",
]

# 將結果寫入 CSV 檔案
with open(csv_file_path, mode="w", newline="") as file:
    writer = csv.writer(file)

    # 寫入表頭
    writer.writerow(header)

    # 寫入每個 epoch 的資料
    for epoch in range(epochs):
        writer.writerow([
            epoch + 1,  # epoch 編號從 1 開始
            history["train_loss"][epoch],
            history["train_accuracy"][epoch],
            history["valid_loss"][epoch] if "valid_loss" in history else None,
            history["valid_accuracy"][epoch] if "valid_accuracy" in history else None,
            history["test_loss"][epoch],
            history["test_accuracy"][epoch],
        ])

### Loss_Curve.png & Accuracy_Curve.png

In [None]:
# 繪製 Loss 曲線
plot_epochs = range(1, len(history["train_loss"]) + 1)
plt.figure(figsize=(8, 6))
plt.plot(plot_epochs, history["train_loss"], label="Train Loss", color="blue", marker="o")
plt.plot(plot_epochs, history["valid_loss"], label="Valid Loss", color="orange", marker="o")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss Curve")
plt.legend()
plt.grid(True)
plt.ylim(0.0, 3.0)  # 設定 y 軸範圍
plt.tight_layout()
plt.savefig(f"{result_location}/train_loss_curve.png")  # 儲存 Loss 曲線
plt.close()  # 關閉圖表以便後續繪圖
# plt.show()

# 繪製 Accuracy 曲線
plot_epochs = range(1, len(history["train_loss"]) + 1)
plt.figure(figsize=(8, 6))
plt.plot(plot_epochs, history["train_accuracy"], label="Train Accuracy", color="blue", marker="o")
plt.plot(plot_epochs, history["valid_accuracy"], label="Valid Accuracy", color="orange", marker="o")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Accuracy Curve")
plt.legend()
plt.grid(True)
plt.ylim(0.0, 1.0)  # 設定 y 軸範圍
plt.tight_layout()
plt.savefig(f"{result_location}/train_curve.png")  # 儲存 Accuracy 曲線
plt.close()  # 關閉圖表
# plt.show()

# loss
plt.figure(figsize=(8, 6))
plt.plot(plot_epochs, history["test_loss"], label="Test Loss", color="red", marker="o")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss Curve")
plt.legend()
plt.grid(True)
plt.ylim(0.0, 3.0)  # 設定 y 軸範圍
plt.tight_layout()
plt.savefig(f"{result_location}/test_loss_curve.png")  # 儲存 Loss 曲線
plt.close()  # 關閉圖表以便後續繪圖
# plt.show()

# Accuracy
plt.figure(figsize=(8, 6))
plt.plot(plot_epochs, history["test_accuracy"], label="Test Accuracy", color="red", marker="o")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Accuracy Curve")
plt.legend()
plt.grid(True)
plt.ylim(0.0, 1.0)  # 設定 y 軸範圍
plt.tight_layout()
plt.savefig(f"{result_location}/test_curve.png")  # 儲存 Accuracy 曲線
plt.close()  # 關閉圖表
# plt.show()

### 評估model--(sample.png)

In [None]:
# Disable grad
predicted_classes = []
target_classes = []
original_inputs = []  

with torch.no_grad():
    
    for inputs, targets in valid_loader:
        
        original_inputs.extend(inputs.clone().permute(0, 2, 3, 1).cpu().numpy())

        # Preprocess
        processed_inputs = []
        for batch_index, input_image in enumerate(inputs):
            input_image = input_image.permute(1, 2, 0).numpy()
            processed = np.squeeze(input_image)
            processed = torch.tensor(feature_extractor(processed)['pixel_values'][0])
            processed_inputs.append(processed)

        # Stack as batch
        inputs = torch.stack(processed_inputs).to(device)
        targets = targets.to(device)

        # Predict
        predictions, losses = model(inputs, targets)
        _, predicted_batch = torch.max(predictions, 1)

        # 收集每個批次的結果
        predicted_classes.extend(predicted_batch.cpu().tolist())
        target_classes.extend(targets.cpu().tolist())


label_display_limit = {"good": 2} 
label_counts = {label: 0 for label in label_display_limit.keys()}


plt.figure(figsize=(12, 12))
display_count = 0  

for i in range(len(original_inputs)):
    
    predicted_label = list(valid_ds.class_to_idx.keys())[list(valid_ds.class_to_idx.values()).index(predicted_classes[i])]
    actual_label = list(valid_ds.class_to_idx.keys())[list(valid_ds.class_to_idx.values()).index(target_classes[i])]

   
    if predicted_label in label_display_limit:
        if label_counts[predicted_label] >= label_display_limit[predicted_label]:
            continue  
        label_counts[predicted_label] += 1

    
    plt.subplot(5, 2, display_count + 1)
    plt.imshow(original_inputs[i])
    plt.title(f"Prediction: {predicted_label} — Actual: {actual_label}")
    plt.axis("off")

    display_count += 1
    if display_count >= 10: 
        break

plt.tight_layout()
plt.savefig(f"{result_location}/sample.png")
plt.close()
# plt.show()

### Indicator

In [None]:
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, average_precision_score, classification_report, ConfusionMatrixDisplay
"""
classification_report

    col
        precision : TP / (TP+FP)
        recall    : TP / (TP+FN)
        F1-score  : 2*precision*recall / (precision + recall)
        support   : number of samples.

    row
        accuracy     : (TP + TN) / All
        macro avg    : Unweighted average of an indicator.
        weighted avg : Weighted average of an indicator by support.

"""

# Confusion matrix
class_names = list(valid_ds.class_to_idx.keys())
cm = confusion_matrix(target_classes, predicted_classes, labels=range(len(class_names)))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
disp.plot(cmap=plt.cm.Blues)

fig = disp.ax_.get_figure()     # Adjust size
fig.set_figwidth(10)
fig.set_figheight(10)  

plt.title("Confusion Matrix")   # plot
plt.savefig(f"{result_location}/Confusion_Matrix.png")
plt.close()
# plt.show()

# Precision, Recall, F1-score
precision = precision_score(target_classes, predicted_classes, average='macro')
recall    = recall_score(target_classes, predicted_classes, average='macro')
f1        = f1_score(target_classes, predicted_classes, average='macro')
# print(f"Precision: {precision:.4f}")
# print(f"Recall: {recall:.4f}")
# print(f"F1-score: {f1:.4f}")

# mAP
binary_targets      = np.eye(len(class_names))[target_classes]
binary_predictions  = np.eye(len(class_names))[predicted_classes]
mAP = average_precision_score(binary_targets, binary_predictions, average='macro')
# print(f"mAP: {mAP:.4f}")

# Display Report
report = classification_report(target_classes, predicted_classes, target_names=class_names)
# print("\nClassification Report:\n", report)

# 儲存至 .csv
output_csv_path = f"{result_location}/indicator.csv"

# 檢查是否已存在檔案，若不存在則新增並寫入標頭
file_exists = os.path.exists(output_csv_path)

with open(output_csv_path, mode='a', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    
    # 若檔案不存在，新增標頭
    if not file_exists:
        writer.writerow(["Metric", "Value"])
        writer.writerow(["Precision", precision])
        writer.writerow(["Recall", recall])
        writer.writerow(["F1-score", f1])
        writer.writerow(["mAP", mAP])
    
    # 寫入分類報告（逐行寫入每個類別的數據）
    writer.writerow([])
    writer.writerow(["Classification Report"])
    report_lines = report.split("\n")
    for line in report_lines:
        if line.strip():  # 跳過空行
            writer.writerow([line])


### Save Model


In [None]:
torch.save(model, f'D:\deeplearning_class\dl_hw3\Model\image_med1/ViT_{train_folder_name}.pt')

## Use your Exported Model

In [None]:
MODEL_PATH = f'D:\deeplearning_class\dl_hw3\Model\image_med1/ViT_{train_folder_name}.pt'

model = torch.load(MODEL_PATH)
model.eval()