### Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from getpass import getpass
import os

# 1. github token
token = getpass("GitHub Token: ")

# 2. Git information
!git config --global user.email "abnerl2021@gmail.com"
!git config --global user.name "yukunl20"

# 3. set up token for the environment
os.environ["GITHUB_TOKEN"] = token
os.environ["GITHUB_USER"] = "yukunl20"
os.environ["GITHUB_REPO"] = "wafer-failure-detection"

# 4. enter google drive path
%cd "/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/"

# 5. clone the repo
!git clone https://{os.environ["GITHUB_USER"]}:{os.environ["GITHUB_TOKEN"]}@github.com/{os.environ["GITHUB_USER"]}/{os.environ["GITHUB_REPO"]}.git

# 6. enter git path
%cd "/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/wafer-failure-detection"

!pwd
!ls

GitHub Token: ··········
/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection
fatal: destination path 'wafer-failure-detection' already exists and is not an empty directory.
/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/wafer-failure-detection
/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/wafer-failure-detection
 wafer_CNNBSL_best.pth	 'Wafer - Data Preprocessing.ipynb'
'Wafer - CNN BSL.ipynb'


In [3]:
# Go to Repo
%cd "/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/wafer-failure-detection"

/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/wafer-failure-detection


In [4]:
!git status

Refresh index:  66% (2/3)Refresh index: 100% (3/3)Refresh index: 100% (3/3), done.
On branch main
Your branch is up to date with 'origin/main'.

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
	[31mmodified:   Wafer - CNN BSL.ipynb[m

no changes added to commit (use "git add" and/or "git commit -a")


### Import Packages

In [5]:
# Point pip's cache to Drive '
%pip config set global.cache-dir "/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/pip-cache"

# Install dependencies
%pip install -q numpy pandas scikit-learn regex unidecode tokenizers transformers tqdm

Writing to /root/.config/pip/pip.conf


In [6]:
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

### Import Data

In [7]:
path = "/content/drive/MyDrive/Colab Notebooks/Project: Wafer Failure Detection/wafer.pkl"
df = pd.read_pickle(path)

In [8]:
df.head(10)

Unnamed: 0,waferMap,dieSize,lotName,waferIndex,trainTestLabel,failureType,wafer dimension,waferMap_resize
0,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,1,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
1,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,2,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
2,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,3,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
3,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,4,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
4,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,5,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
5,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,6,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
6,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,7,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
7,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,8,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
8,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,9,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
9,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,10,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."


### Encode classes

In [9]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()

df["failureType_encoded"] = le.fit_transform(df["failureType"])

In [10]:
df.head()

Unnamed: 0,waferMap,dieSize,lotName,waferIndex,trainTestLabel,failureType,wafer dimension,waferMap_resize,failureType_encoded
0,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,1,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8
1,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,2,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8
2,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,3,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8
3,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,4,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8
4,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,5,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8


### Sampling a small data set for testing

In [None]:
small_df, _ = train_test_split(
    df,
    train_size=0.5,                 # 10% of full dataset
    random_state=42,
    stratify=df["failureType"]
)

### Train Test Split
- training: 80%
- validation: 10%
- testing: 10%

In [11]:
X = list(df['waferMap_resize'])         # replace to df for full evaluation
y = list(df['failureType_encoded'])

In [13]:
print("\n==== Small dataset ====\n")
print(f"Test dataset has a size of {len(X)}")

print("\n==== Statistical distribution ===\n")
print(df['failureType_encoded'].value_counts(normalize=True).mul(100).sort_index())



==== Small dataset ====

Test dataset has a size of 172950

==== Statistical distribution ===

failureType_encoded
0     2.482798
1     0.320902
2     3.000289
3     5.596993
4     2.077479
5     0.086152
6     0.500723
7     0.689795
8    85.244868
Name: proportion, dtype: float64


### Stratified K-fold cross-validation

In [15]:
df.head(3)

Unnamed: 0,waferMap,dieSize,lotName,waferIndex,trainTestLabel,failureType,wafer dimension,waferMap_resize,failureType_encoded
0,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,1,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8
1,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,2,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8
2,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",1683.0,lot1,3,[[Training]],none,"(45, 48)","[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...",8


In [25]:
from sklearn.model_selection import StratifiedKFold, train_test_split
import numpy as np

In [29]:
X = df['waferMap'].values
y = df['failureType_encoded'].values

X_trainval, X_test, y_trainval, y_test = train_test_split(
    X,
    y,
    test_size=0.15,
    stratify=y,
    random_state=42
)
num_classes = int(np.max(y) + 1)

In [30]:
print("Train+Val:", len(X_trainval), " Test:", len(X_test))
print("Train+Val counts:", np.bincount(y_trainval, minlength=num_classes))
print("Test counts:", np.bincount(y_test, minlength=num_classes))

Train+Val: 147007  Test: 25943
Train+Val counts: [  3650    472   4410   8228   3054    127    736   1014 125316]
Test counts: [  644    83   779  1452   539    22   130   179 22115]


In [42]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


### Dataset

In [19]:
from torch.utils.data import Dataset
class WaferDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return (len(self.X))

    def __getitem__(self, idx):
        img = self.X[idx]
        label = self.y[idx]

        # convert to float32 tensor and add the channel dimension (1, H, W)
        img = torch.tensor(img, dtype=torch.float32).unsqueeze(0)
        label = torch.tensor(label, dtype=torch.long)

        return img, label


### Compute class weights for training fold (weighted loss)

In [31]:
def compute_class_weights(y_train_fold, num_classes, device):
    counts = np.bincount(y_train_fold, minlength=num_classes).astype(np.float32)
    counts = np.maximum(counts, 1.0)
    weights = counts.sum()/(num_classes * counts) # denominator multiply num_classes to normalize the data and avoid exploding
    return torch.tensor(weights, dtype=torch.float32, device=device)


### Training per epoch

In [32]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    train_loss = 0.0

    for x, y in loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad(set_to_none=True)
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * x.size(0)

    return total_loss / len(loader.dataset)

### Evaluation Function

In [38]:
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
import torch

@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    all_y, all_pred = [], []

    for x, y in loader:
        x = x.to(device)
        logits = model(x)
        pred = logits.argmax(dim=1).cpu().numpy()

        all_pred.append(pred)
        all_y.append(y.numpy())

    y_true = np.concatenate(all_y)
    y_pred = np.concatenate(all_pred)

    return {
        "acc": accuracy_score(y_true, y_pred),
        "macro_f1": f1_score(y_true, y_pred, average="macro"),
        "cm": confusion_matrix(y_true, y_pred)
    }

### CNN architecture

In [37]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class waferCNN(nn.Module):
    def __init__(self, num_classes: int = 9):
        super().__init__()

        # Block 1
        # Input: 1 × 48 × 48
        # After Conv: 32 × 48 × 48
        # After Pool: 32 × 24 × 24
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2)   # 48 → 24
        )

        # Block 2
        # After Conv: 64 × 24 × 24
        # After Pool: 64 × 12 × 12
        self.block2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2)   # 24 → 12
        )

        # Block 3
        # After Conv: 128 × 12 × 12
        # After Pool: 128 × 6 × 6
        self.block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2)   # 12 → 6
        )

        # Flattened dimension
        flattened_dim = 128 * 6 * 6     # = 4608 for 48×48 input

        # Fully connected classifier
        self.fc1 = nn.Linear(flattened_dim, 256)
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)

        x = torch.flatten(x, start_dim=1)   # shape: [B, 4608]

        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)

        logits = self.fc2(x)                # [B, num_classes]

        return logits


In [39]:
import torch
import torch.nn as nn

class WaferCNN(nn.Module):
    def __init__(
        self,
        num_classes: int = 9,
        input_size: int = 48,
        base_channels: int = 32,       # 16/32/64
        channel_mults=(1, 2, 4),       # multiplies base_channels per block
        fc_dim: int = 256,             # 128/256/512
        dropout: float = 0.5,          # 0.0~0.6
        kernel_size: int = 3,
        use_batchnorm: bool = True,
    ):
        super().__init__()

        assert input_size % (2 ** len(channel_mults)) == 0, (
            "input_size must be divisible by 2^(num_blocks) because of MaxPool2d(2). "
            f"Got input_size={input_size}, num_blocks={len(channel_mults)}"
        )

        padding = kernel_size // 2

        def conv_block(in_ch, out_ch):
            layers = [
                nn.Conv2d(
                    in_channels=in_ch,
                    out_channels=out_ch,
                    kernel_size=kernel_size,
                    padding=padding,
                ),
                nn.BatchNorm2d(out_ch),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(kernel_size=2),
            ]
            return nn.Sequential(*layers)

        # Build blocks dynamically
        blocks = []
        in_ch = 1
        for m in channel_mults:
            out_ch = base_channels * m
            blocks.append(conv_block(in_ch, out_ch))
            in_ch = out_ch
        self.blocks = nn.Sequential(*blocks)

        # Compute flattened dim automatically
        # After each pool: size halves. After N blocks: input_size / 2^N
        num_blocks = len(channel_mults)
        final_spatial = input_size // (2 ** num_blocks)
        flattened_dim = in_ch * final_spatial * final_spatial

        self.classifier = nn.Sequential(
            nn.Linear(flattened_dim, fc_dim),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout),
            nn.Linear(fc_dim, num_classes),
        )

    def forward(self, x):
        x = self.blocks(x)
        x = torch.flatten(x, start_dim=1)
        logits = self.classifier(x)
        return logits


### K-fold training and validation

Using device: cpu


In [36]:
k = 5
skf = StratifiedKFold(
    n_splits = k,
    shuffle=True,
    random_state=42
)

In [None]:
import copy
from torch.utils.data import DataLoader

num_epochs = 50
batch_size = 32
lr = 1e-3

patience = 7
min_delta = 1e-4

fold_best_macro_f1 = []
fold_best_epochs = []

for fold, (train_idx, val_idx) in enumerate(skf.split(X_trainval, y_trainval), start=1):
    print(f"\n========== Fold {fold}/{k} ==========")

    # split current fold
    X_tr, y_tr = X_trainval[train_idx], y_trainval[train_idx]
    X_va, y_va = X_trainval[val_idx], y_trainval[val_idx]

    # datasets and loaders
    train_ds = WaferDataset(X_tr, y_tr)
    val_ds = WaferDataset(X_va, y_va)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size=batch_size * 2, shuffle=False)

    # refresh model for each fold
    model = waferCNN(num_classes=9).to(device)

    # loss with class weights
    class_weights = compute_class_weights(y_tr, num_classes, device)
    criterion = torch.nn.CrossEntropyLoss(weight=class_weights)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Early stopping state
    best_f1 = -1.0
    beast_epoch = -1
    best_cm = None
    best_state = None

    epochs_since_improve = 0

    for epoch in range(1, num_epochs + 1):
        tr_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)
        metrics = evaluate(model, val_loader, device)

        cur_f1 = metrics['macro_f1']
        cur_acc = metrics['acc']

        if cur_f1 > best_f1 + min_delta:
            best_f1 = cur_f1
            best_epoch = epoch
            best_cm = metrics["cm"]

            # store best weights
            best_state = copy.deepcopy(model.state_dict())

            epoch_since_improve = 0
        else:
            epoch_since_improve += 1

        # early stopping
        if epoch_since_improve >= patience:
            print(
                f"Early stopping triggered at epoch {epoch:02d}."
                f"Best macro-F1 was {best_f1:.3f} at epoch {best_epoch:02d}."
            )
            break

    # restore best model weights for this fold
    if best_state is not None:
        model.load_state_dict(best_state)

    fold_best_macro_f1.append(best_f1)
    fold_best_epochs.append(best_epoch)

    print(f"Fold {fold} BEST | epoch={best_epoch:02d} | macro_f1={best_f1:.3f}")