In [None]:
# wafer_experiment_pkl.py

import os
import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report
from collections import Counter
from PIL import Image
import pickle
import tqdm

# -------------------
# Config
# -------------------
class Cfg:
    seed = 42
    img_size = 128
    batch_size = 96
    num_workers = 0
    lr = 0.001
    epochs = 50
    use_focal = True
    focal_gamma = 1.0
    use_class_weights = True
    device = "cuda" if torch.cuda.is_available() else "cpu"
    data_path = "data/wafer_maps.pkl"
    out_dir = "outputs/pkl_exp1"


os.makedirs(Cfg.out_dir, exist_ok=True)
random.seed(Cfg.seed)
np.random.seed(Cfg.seed)
torch.manual_seed(Cfg.seed)
if Cfg.device == "cuda":
    torch.cuda.manual_seed_all(Cfg.seed)


In [2]:
import torch
print(torch.__version__)
print(torch.version.cuda)
print(torch.cuda.is_available())
print(torch.cuda.device_count())
print(torch.cuda.current_device() if torch.cuda.is_available() else "No CUDA device")

2.8.0+cu126
12.6
True
1
0


### Load dataset

In [4]:
df = pd.read_pickle(Cfg.data_path)

In [5]:
df.tail()

Unnamed: 0,waferMap,dieSize,lotName,waferIndex,trianTestLabel,failureType
811452,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 1,...",600.0,lot47542,23.0,[[Test]],[[Edge-Ring]]
811453,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 1, 1,...",600.0,lot47542,24.0,[[Test]],[[Edge-Loc]]
811454,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 1,...",600.0,lot47542,25.0,[[Test]],[[Edge-Ring]]
811455,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,...",600.0,lot47543,1.0,[],[]
811456,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 1,...",600.0,lot47543,2.0,[],[]


In [6]:
#Correcting typos if any
if 'trianTestLabel' in df and 'trainTestLabel' not in df:
    df['trainTestLabel']=df['trianTestLabel']
    df.drop('trianTestLabel', axis=1, inplace=True)
elif 'trianTestLabel' in df and 'trainTestLabel' in df:
    df.drop('trianTestLabel', axis=1, inplace=True)

Flattening TrainTestLabel and failureType values to be strings instead of single element arrays

In [10]:
def flatten_label(x):
    if isinstance(x, (list, np.ndarray)):
        # 2D list with one inner list
        if len(x) == 1 and isinstance(x[0], (list, np.ndarray)):
            if len(x[0]) == 1:
                return x[0][0]
            elif len(x[0]) == 0:
                return ""  # empty inner list
            else:
                return x[0]  # unclear case, return inner list as is
        # 1D list
        elif len(x) == 1:
            return x[0]
        elif len(x) == 0:
            return ""
        else:
            return x
    else:
        # Not a list/array, return as is
        return x

In [None]:
# Extract relevant data points
# Flatten trainTestLabel and failureType arrays: e.g. [[Training]] -> "Training"
df["trainTestLabel"] = df["trainTestLabel"].apply(flatten_label)
df["failureType"] = df["failureType"].apply(flatten_label)

# Filter only Training data for training/validation split. 
# Note:I swapped Training and Test since Test has more samples than Training
train_df = df[df["trainTestLabel"] == "Test"].reset_index(drop=True)
test_df = df[df["trainTestLabel"] == "Training"].reset_index(drop=True)

Assigning class labels to target variable

In [None]:
# Get unique classes and class to index mapping
classes = sorted(train_df["failureType"].unique())
class_to_idx = {c: i for i, c in enumerate(classes)}

# Map labels to integers
train_df["label_idx"] = train_df["failureType"].map(class_to_idx)
test_df["label_idx"] = test_df["failureType"].map(class_to_idx)

In [14]:
class_to_idx

{np.str_('Center'): 0,
 np.str_('Donut'): 1,
 np.str_('Edge-Loc'): 2,
 np.str_('Edge-Ring'): 3,
 np.str_('Loc'): 4,
 np.str_('Near-full'): 5,
 np.str_('Random'): 6,
 np.str_('Scratch'): 7,
 np.str_('none'): 8}

In [12]:
test_df

Unnamed: 0,waferMap,dieSize,lotName,waferIndex,failureType,trainTestLabel,label_idx
0,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,1.0,none,Training,8
1,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,2.0,none,Training,8
2,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,3.0,none,Training,8
3,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,4.0,none,Training,8
4,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,5.0,none,Training,8
...,...,...,...,...,...,...,...
54350,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1534.0,lot46729,21.0,none,Training,8
54351,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1534.0,lot46729,22.0,none,Training,8
54352,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1534.0,lot46729,23.0,none,Training,8
54353,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1534.0,lot46729,24.0,none,Training,8


### Saving partially processed data

In [10]:
train_df.to_pickle("data/wafer_train.pkl")
test_df.to_pickle("data/wafer_test.pkl")

In [18]:
# Randomly select samples
df_samples = test_df.sample(n=3, random_state=42)

In [20]:
df_samples

Unnamed: 0,waferMap,dieSize,lotName,waferIndex,failureType,trainTestLabel,label_idx
3953,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 1, 1, 1,...",533.0,lot181,8.0,none,Training,8
46913,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 1, 1,...",501.0,lot45075,24.0,none,Training,8
17707,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1998.0,lot15573,4.0,none,Training,8


In [21]:
df_samples.index

Index([3953, 46913, 17707], dtype='int64')

In [24]:
# Save images as PNG with filename: waferIndex_defectCategory.png
for _, row in df_samples.iterrows():
    wafer_map = row["waferMap"]
    wafer_index = _
    defect = row["failureType"]
    print(wafer_index)

3953
46913
17707


### Train val test split

In [2]:
train_df = pd.read_pickle("data/wafer_train.pkl")
test_df = pd.read_pickle("data/wafer_test.pkl")
classes = sorted(train_df["failureType"].unique())

In [3]:
train_imgs, val_imgs, train_labels, val_labels = train_test_split(
    train_df["waferMap"].values, train_df["label_idx"].values,
    test_size=0.2, stratify=train_df["label_idx"].values, random_state=Cfg.seed)

# Extract test images and labels
test_imgs = test_df["waferMap"].values
test_labels = test_df["label_idx"].values



### Dataset transformations

In [None]:
class WaferMapDataset(Dataset):
    def __init__(self, wafer_maps, labels, transform=None):
        self.wafer_maps = wafer_maps
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.wafer_maps)
    
    def __getitem__(self, idx):
        img = self.wafer_maps[idx]
        # Convert to uint8 image scaled 0-255
        img = np.array(img)
        if img.ndim == 2:
            img = np.stack([img]*3, axis=-1)
        if img.max() > img.min():
            img_norm = ((img - img.min()) / (img.max() - img.min()) * 255).astype(np.uint8)
        else:
            img_norm = np.zeros_like(img, dtype=np.uint8)
        img = Image.fromarray(img_norm)
        if self.transform:
            img = self.transform(img)

        return img, self.labels[idx]

In [None]:
train_transforms = transforms.Compose([
    transforms.Resize((Cfg.img_size, Cfg.img_size)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.1, contrast=0.05),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

val_test_transforms = transforms.Compose([
        transforms.Resize((Cfg.img_size, Cfg.img_size)),
        transforms.ToTensor(),
        transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])

In [6]:
# -------------------
# Create DataLoaders
# -------------------
train_ds = WaferMapDataset(train_imgs, train_labels, transform=train_transforms)
val_ds = WaferMapDataset(val_imgs, val_labels, transform=val_test_transforms)
test_ds = WaferMapDataset(test_imgs, test_labels, transform=val_test_transforms)

train_loader = DataLoader(train_ds, batch_size=Cfg.batch_size, shuffle=True, num_workers=Cfg.num_workers, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=Cfg.batch_size, shuffle=False, num_workers=Cfg.num_workers, pin_memory=True)
test_loader = DataLoader(test_ds, batch_size=Cfg.batch_size, shuffle=False, num_workers=Cfg.num_workers, pin_memory=True)

### Model building

In [None]:
def build_model(num_classes):
    model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1).to(Cfg.device)
    in_features = model.classifier[1].in_features
    model.classifier[1] = nn.Linear(in_features, num_classes)
    return model

model = build_model(len(classes)).to(Cfg.device)

class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0, reduction='mean', ignore_index=-100):
        """
        alpha: Tensor of shape (num_classes,) giving weight per class. If None, no weighting.
        gamma: focusing parameter >=0, default 2.0.
        reduction: 'mean', 'sum', or 'none'
        ignore_index: class index to ignore in loss.
        """
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
        self.ignore_index = ignore_index
        self.ce_loss = nn.CrossEntropyLoss(weight=None, reduction='none', ignore_index=ignore_index)

        if self.alpha is not None:
            if not torch.is_tensor(self.alpha):
                self.alpha = torch.tensor(self.alpha, dtype=torch.float32)
            self.alpha = self.alpha.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))

    def forward(self, inputs, targets):
        """
        inputs: (batch_size, num_classes) raw logits (no softmax applied)
        targets: (batch_size,) ground truth class indices (long)
        """
        # Compute cross entropy loss per sample
        ce_loss = self.ce_loss(inputs, targets)  # shape: (batch_size,)

        # Calculate pt = exp(-CE)
        pt = torch.exp(-ce_loss)  # pt is probability of true class

        # Compute focal loss modulation
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss

        # Apply alpha class weights if provided
        if self.alpha is not None:
            at = self.alpha.gather(0, targets)
            focal_loss = at * focal_loss

        # Reduction
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

#from collections import Counter
class_weights = None
if Cfg.use_class_weights:
    label_counts = Counter(train_labels)
    weights = np.array([1.0 / max(1, label_counts[i]) for i in range(len(classes))], dtype=np.float32)
    weights = weights / weights.sum() * len(classes)
    class_weights = torch.tensor(weights, dtype=torch.float32, device=Cfg.device)

criterion = FocalLoss(gamma=Cfg.focal_gamma, weight=class_weights) if Cfg.use_focal else nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.AdamW(model.parameters(), lr=Cfg.lr)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=Cfg.epochs)

### Sample training experiment

In [1]:
import torch
print(torch.__version__)

# Test your .to() with non_blocking argument
x = torch.randn(2,2)
y = x.to("cuda", non_blocking=True)  # should work without error
print(y.device)

2.8.0+cu126
cuda:0


In [1]:
import torch
from torchvision import models, transforms
from PIL import Image
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1).to(device)
model.classifier[1] = torch.nn.Linear(model.classifier[1].in_features, 2)
model.to(device)

dummy_img = np.random.randint(0, 255, (64,64))
dummy_img = Image.fromarray(dummy_img.astype(np.uint8)).convert("RGB")
transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
    transforms.Normalize([0.485]*3, [0.229]*3)
])

img_tensor = transform(dummy_img).unsqueeze(0).to(device)
print(f"Input tensor device: {img_tensor.device}")

output = model(img_tensor)
print(f"Output device: {output.device}")


Using device: cuda
Input tensor device: cuda:0
Output device: cuda:0


### Training model

In [None]:
early_stop_patience = 15

best_val_acc = 0
early_stop_counter = 0

checkpoint_path = os.path.join(Cfg.out_dir, "checkpoint_last.pt")
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path, map_location=Cfg.device if Cfg.device != "auto" else ("cuda" if torch.cuda.is_available() else "cpu"))
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    best_val_acc = checkpoint.get("best_val_acc", 0.0)
    best_val_f1 = checkpoint.get("best_val_f1", 0.0)
    start_epoch = checkpoint["epoch"] + 1
    print(f"Resuming training from epoch {start_epoch}")
else:
    best_val_acc = 0.0
    best_val_f1 = 0.0
    start_epoch = 0

model.to(Cfg.device)

for epoch in range(start_epoch, Cfg.epochs):
    
    model.train()
    train_losses, y_true_train, y_pred_train = [], [], []

    print(f"Device for model parameters: {next(model.parameters()).device}")

    train_loader = tqdm.tqdm(train_loader, desc=f"Epoch {epoch+1}/{Cfg.epochs}", ncols=80)
    for images, labels in train_loader:
        
        images, labels = images.float().to(Cfg.device), labels.to(Cfg.device)
        
        outputs = model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()

        loss.backward()
        optimizer.step()
        train_losses.append(loss.item())
        preds = torch.argmax(outputs, dim=1)
        y_true_train.extend(labels.cpu().numpy())
        y_pred_train.extend(preds.cpu().numpy())
    scheduler.step()

    train_acc = accuracy_score(y_true_train, y_pred_train)
    train_loss = np.mean(train_losses)

    model.eval()
    val_losses, y_true_val, y_pred_val = [], [], []

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.float().to(Cfg.device), labels.to(Cfg.device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_losses.append(loss.item())
            preds = torch.argmax(outputs, dim=1)
            y_true_val.extend(labels.cpu().numpy())
            y_pred_val.extend(preds.cpu().numpy())

    val_acc = accuracy_score(y_true_val, y_pred_val)
    val_f1 = f1_score(y_true_val, y_pred_val, average="macro")
    val_loss = np.mean(val_losses)

    scheduler.step()

    print(
        f"Epoch [{epoch+1}/{Cfg.epochs}] Train Loss: {train_loss:.4f} Train Acc: {train_acc:.4f} Val Loss: {val_loss:.4f} Val Acc: {val_acc:.4f}"
    )

    # if val_acc > best_val_acc:
    #     best_val_acc = val_acc  # <-- this update must happen!
    #     torch.save(model.state_dict(), "best_model.pt")

        # Save checkpoint
    torch.save({
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict(),
        "best_val_acc": best_val_acc,   # save the best metric
        "best_val_f1": best_val_f1
    }, checkpoint_path)

    # # Early stopping logic
    # if val_acc > best_val_acc:
    #     best_val_acc = val_acc
    #     early_stop_counter = 0
    #     torch.save(model.state_dict(), os.path.join(Cfg.out_dir, "best_model.pt"))
    # else:
    #     early_stop_counter += 1
    #     if early_stop_counter >= early_stop_patience:
    #         print("Early stopping triggered.")
    #         break

        # Early stopping logic
    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        early_stop_counter = 0
        torch.save(model.state_dict(), os.path.join(Cfg.out_dir, "best_model.pt"))
    else:
        early_stop_counter += 1
        if early_stop_counter >= early_stop_patience:
            print("Early stopping triggered.")
            break

Device for model parameters: cuda:0


Epoch 1/50:   0%|                                       | 0/742 [00:00<?, ?it/s]

Epoch 1/50:  12%|███▌                          | 88/742 [00:42<05:13,  2.09it/s]


KeyboardInterrupt: 

### Model evaluation

In [None]:
model.load_state_dict(torch.load(os.path.join(Cfg.out_dir, "best_model.pt")))
model.eval()
y_true_test, y_pred_test = [], []

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.float().to(Cfg.device), labels.to(Cfg.device)
        outputs = model(images)
        preds = torch.argmax(outputs, dim=1)
        y_true_test.extend(labels.cpu().numpy())
        y_pred_test.extend(preds.cpu().numpy())

test_acc = accuracy_score(y_true_test, y_pred_test)
test_f1 = f1_score(y_true_test, y_pred_test, average="macro")
print(f"Test Accuracy: {test_acc:.4f} | Test Macro F1: {test_f1:.4f}")
print(classification_report(y_true_test, y_pred_test))


### ONNX export

In [None]:
dummy_input = torch.randn(1, 3, Cfg.img_size, Cfg.img_size).to(Cfg.device)
torch.onnx.export(
    model,
    dummy_input,
    os.path.join(Cfg.out_dir, "wafer.onnx"),
    opset_version=12,
)
print("Exported ONNX model.")