In [None]:
!nvidia-smi

In [None]:
!pip install torch torchvision opencv-python pandas scikit-learn pillow matplotlib

In [None]:
!rm best_model_fine-tune.pth
!rm best_model_initial.pth
!rm card_grader_model.pth

In [None]:
# Import libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from torch.cuda.amp import autocast, GradScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from PIL import Image
import os
import cv2

ACTIVATE_WEIGHTS_TENSOR = 0
ACTIVATE_CROPPING = 0

# Set device (assumes GPU like H200 is available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

# Load dataset
df = pd.read_csv('../scrape/psa_sales_190786_20250222_143954.csv')  # Replace with your CSV path
image_dir = '../scrape/pictures'  # Replace with your image directory
df['filename'] = df['certNumber'].apply(lambda x: os.path.join(image_dir, f"cert_{x}.jpg"))

# print how many images are missing
print(f"Missing images: {len(df[df['filename'].apply(lambda x: not os.path.exists(x))])}")

# Remove non-existing images
df = df[df['filename'].apply(os.path.exists)]

# Split into training and validation sets
train_df = df.sample(frac=0.8, random_state=42)
val_df = df.drop(train_df.index)

# Encode labels
le = LabelEncoder()
le.fit(df['grade'])
train_df['label'] = le.transform(train_df['grade'])
val_df['label'] = le.transform(val_df['grade'])

# Check class distribution
print("Training set grade distribution:\n", train_df['grade'].value_counts())

# Compute class weights for imbalance
if ACTIVATE_WEIGHTS_TENSOR:
    classes = np.unique(train_df['grade'])
    class_weights = compute_class_weight('balanced', classes=classes, y=train_df['grade'])
    class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

# Define cropping functions inspired by psa_pokemon_cards
def crop_card_for_light_image(image):
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    _, otsu_grad = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    contours, _ = cv2.findContours(otsu_grad, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return None
    height, width = image.shape[:2]
    image_area = height * width
    contours = sorted(contours, key=cv2.contourArea, reverse=True)
    for contour in contours:
        peri = cv2.arcLength(contour, True)
        approx = cv2.approxPolyDP(contour, 0.001 * peri, True)
        x, y, w, h = cv2.boundingRect(approx)
        area = w * h
        if 0.48 * image_area <= area <= 0.6 * image_area:
            return image[y:y+h, x:x+w]
    return None

def crop_card_for_dark_image(image):
    gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    blur = cv2.GaussianBlur(gray, (3, 3), -10)
    adaptive_binary = cv2.adaptiveThreshold(blur, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 7, 3)
    edges = cv2.Canny(adaptive_binary, 100, 200)
    binarized_grad = 255 - edges
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (7, 7))
    open_binarized_grad = cv2.morphologyEx(binarized_grad, cv2.MORPH_OPEN, kernel)
    contours, _ = cv2.findContours(open_binarized_grad, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    if not contours:
        return None
    height, width = image.shape[:2]
    image_area = height * width
    contours = sorted(contours, key=cv2.contourArea, reverse=True)
    for contour in contours:
        area = cv2.contourArea(contour)
        if 0.48 * image_area <= area <= 0.7 * image_area:
            x, y, w, h = cv2.boundingRect(contour)
            return image[y:y+h, x:x+w]
    return None

def crop_card(image):
    cropped = crop_card_for_light_image(image)
    if cropped is not None:
        return cropped
    cropped = crop_card_for_dark_image(image)
    if cropped is not None:
        return cropped
    return image  # Fallback to original if cropping fails

# Define transforms
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomAffine(degrees=5, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Custom dataset with cropping
class CardDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
        if ACTIVATE_CROPPING:
            self.failed_crops = []

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['filename']
        label = self.df.iloc[idx]['label']
        image = Image.open(img_path).convert('RGB')
        if ACTIVATE_CROPPING:
            image_np = np.array(image)
            cropped_np = crop_card(image_np)
            if cropped_np is image_np:  # Cropping failed, log it
                self.failed_crops.append(img_path)
            image = Image.fromarray(cropped_np)
        if self.transform:
            image = self.transform(image)
        return image, label

# print some sample cropped images
def show_sample_crops(df, transform):
    sample_df = df.sample(5)
    fig, axes = plt.subplots(1, 5, figsize=(15, 3))
    for ax, (_, row) in zip(axes, sample_df.iterrows()):
        img_path = row['filename']
        image = Image.open(img_path).convert('RGB')
        image_np = np.array(image)
        cropped_np = crop_card(image_np)
        image = Image.fromarray(cropped_np)
        if transform:
            image = transform(image)
        ax.imshow(image.permute(1, 2, 0))
        ax.axis('off')
    plt.show()

if ACTIVATE_CROPPING:
    show_sample_crops(train_df, train_transform)
else:
    print("Skipping cropping visualization.")
    # instead show some sample images
    fig, axes = plt.subplots(1, 5, figsize=(15, 3))
    for ax, (_, row) in zip(axes, train_df.sample(5).iterrows()):
        img_path = row['filename']
        image = Image.open(img_path).convert('RGB')
        ax.imshow(image)
        ax.axis('off')
    plt.show()


# Create datasets and dataloaders
batch_size = 128  # Adjust based on GPU memory
train_dataset = CardDataset(train_df, transform=train_transform)
val_dataset = CardDataset(val_df, transform=val_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# Log failed crops
if ACTIVATE_CROPPING:
    print(f"Training failed crops: {len(train_dataset.failed_crops)}")
    print(f"Validation failed crops: {len(val_dataset.failed_crops)}")

# Build ResNet50 model
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
model.fc = nn.Sequential(
    nn.Linear(2048, 1024),
    nn.ReLU(),
    nn.Dropout(0.6),
    nn.Linear(1024, len(le.classes_))
)
model = model.to(device)

# Freeze all layers except fc initially
for name, param in model.named_parameters():
    if 'fc' not in name:
        param.requires_grad = False

# Loss and optimizer
if ACTIVATE_WEIGHTS_TENSOR:
    criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
else:
    criterion = nn.CrossEntropyLoss()
    
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=3, min_lr=1e-6)

# Validation function
def validate(model, val_loader, criterion):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_loss /= len(val_loader)
    val_acc = correct / total
    return val_loss, val_acc

# Training function with early stopping
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, phase='initial'):
    best_val_loss = float('inf')
    patience = 5
    patience_counter = 0
    scaler = GradScaler()
    history = {'train_loss': [], 'val_loss': [], 'val_accuracy': []}

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            train_loss += loss.item()
        train_loss /= len(train_loader)

        val_loss, val_acc = validate(model, val_loader, criterion)
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_acc)

        print(f'{phase} Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
        scheduler.step(val_loss)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), f'best_model_{phase.lower()}.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print('Early stopping')
                break

    return history

# Initial training
history_initial = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, 50, 'Initial')

# Load best model and unfreeze layer4 and fc for fine-tuning
model.load_state_dict(torch.load('best_model_initial.pth'))
for name, param in model.named_parameters():
    if 'layer4' in name or 'fc' in name:
        param.requires_grad = True
    else:
        param.requires_grad = False
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0001)

# Fine-tuning
history_fine = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, 50, 'Fine-tune')

# Load best fine-tuned model
model.load_state_dict(torch.load('best_model_fine-tune.pth'))

# Visualize training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history_initial['val_accuracy'] + history_fine['val_accuracy'], label='Val Accuracy')
plt.plot(history_initial['train_loss'] + history_fine['train_loss'], label='Train Loss')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Metric')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history_initial['val_loss'] + history_fine['val_loss'], label='Val Loss')
plt.plot(history_initial['train_loss'] + history_fine['train_loss'], label='Train Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()

# Save final model
torch.save(model.state_dict(), 'card_grader_model.pth')

# Prediction function
def predict_grade(img_path, model, le, transform):
    model.eval()
    image = Image.open(img_path).convert('RGB')
    image_np = np.array(image)
    cropped_np = crop_card(image_np)
    image = Image.fromarray(cropped_np)
    image = transform(image).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs, 1)
    return le.inverse_transform([predicted.item()])[0]

# Test prediction
test_img_path = '../scrape/pictures/cert_01443963.jpg'  # Replace with your test image
predicted_grade = predict_grade(test_img_path, model, le, val_transform)
print(f"Predicted PSA grade: {predicted_grade}")

In [None]:
test_img_path = '../scrape/pictures/test.jpg'  # Replace with your test image
predicted_grade = predict_grade(test_img_path, model, le, val_transform)
print(f"Predicted PSA grade: {predicted_grade}; Expected 8")

In [None]:
test_img_path = '../scrape/pictures/cert_99449754.jpg'  # Replace with your test image
predicted_grade = predict_grade(test_img_path, model, le, val_transform)
print(f"Predicted PSA grade: {predicted_grade}; Expected 8")

In [None]:
test_img_path = '../scrape/pictures/cert_95743446.jpg'  # Replace with your test image
predicted_grade = predict_grade(test_img_path, model, le, val_transform)
print(f"Predicted PSA grade: {predicted_grade}; Expected 6")