<a href="https://colab.research.google.com/github/s0ku00/DTS/blob/main/zipper_anomaly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Industrial Anomaly Detection using Convolutional Autoencoder
**Dataset:** MVTec AD — Zipper Category  
**Task:** Detect manufacturing defects by learning to reconstruct normal images.  
Anomalies are flagged when reconstruction error exceeds a threshold learned from normal training data.

---

## 1. Imports & Config

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [11]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, Model
from sklearn.metrics import roc_auc_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import RocCurveDisplay

# ── Config ────────────────────────────────────────────────────────────────────
DRIVE_DATA_PATH = '/content/drive/MyDrive/'
CATEGORY        = 'zipper'
IMG_SIZE        = (128, 128)
BATCH_SIZE      = 16
LATENT_DIM      = 512
EPOCHS          = 100
LEARNING_RATE   = 1e-4

print(f"TensorFlow version: {tf.__version__}")

TensorFlow version: 2.19.0


## 2. Data Loading

In [12]:
def load_mvtec(data_dir, category, img_size=IMG_SIZE, batch_size=BATCH_SIZE):
    train_dir = os.path.join(data_dir, category, 'train')
    test_dir  = os.path.join(data_dir, category, 'test')
    normalise = tf.keras.layers.Rescaling(1.0 / 255)

    raw_train = tf.keras.utils.image_dataset_from_directory(
        train_dir, labels=None, image_size=img_size,
        batch_size=batch_size, shuffle=True
    )

    train_ds = (
        raw_train
        .map(lambda x: (normalise(x), normalise(x)), num_parallel_calls=tf.data.AUTOTUNE)
        .prefetch(tf.data.AUTOTUNE)
    )

    test_ds = (
        tf.keras.utils.image_dataset_from_directory(
            test_dir, labels='inferred', label_mode='categorical',
            image_size=img_size, batch_size=batch_size, shuffle=False
        )
        .map(lambda x, y: (normalise(x), y), num_parallel_calls=tf.data.AUTOTUNE)
        .prefetch(tf.data.AUTOTUNE)
    )

    class_names = tf.keras.utils.image_dataset_from_directory(
        test_dir, labels='inferred', label_mode='categorical',
        image_size=img_size, batch_size=1
    ).class_names

    return train_ds, test_ds, class_names

In [13]:
# Larger validation split — need enough batches for reliable val_loss
train_ds, test_ds, class_names = load_mvtec(DRIVE_DATA_PATH, CATEGORY)

total_batches = sum(1 for _ in train_ds)
print(f"Total batches available: {total_batches}")

val_size  = max(3, int(total_batches * 0.25))  # 25% and minimum 3 batches
val_ds    = train_ds.take(val_size)
train_ds  = train_ds.skip(val_size)

print(f"Train batches: {total_batches - val_size}, Val batches: {val_size}")

Found 240 files.
Found 151 files belonging to 8 classes.
Found 151 files belonging to 8 classes.
Total batches available: 15
Train batches: 12, Val batches: 3


## 3. Model: Convolutional Autoencoder

**Design rationale:**  
Each `MaxPool(2,2)` halves spatial dimensions gradually, so the Dense bottleneck receives a manageable number of features. The decoder mirrors the encoder using `Conv2DTranspose` to upsample back to the original resolution.

```
Input 128×128 → Conv → 64×64 → 32×32 → 16×16 → Dense(LATENT_DIM)
                                                        ↓
Output 128×128 ← UpConv ← UpConv ← UpConv ←────── Reshape
```

In [14]:
# Simpler architecture — less capacity, less overfitting
def build_autoencoder(img_shape=(128, 128, 3)):
    enc_input = layers.Input(shape=img_shape)

    # Encoder
    x = layers.Conv2D(32, 3, padding='same')(enc_input)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(2, padding='same')(x)           # 64×64

    x = layers.Conv2D(64, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(2, padding='same')(x)           # 32×32

    x = layers.Conv2D(64, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(2, padding='same')(x)           # 16×16

    # Bottleneck
    x = layers.Conv2D(32, 3, padding='same')(x)             # 16×16×32
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)

    # Decoder
    x = layers.Conv2DTranspose(64, 3, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)                        # 32×32

    x = layers.Conv2DTranspose(64, 3, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)                        # 64×64

    x = layers.Conv2DTranspose(32, 3, strides=2, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)                        # 128×128

    output = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)
    return Model(enc_input, output, name='autoencoder')

## 4. Training

In [15]:
# Build the model
model = build_autoencoder()

model.summary()
model.compile(
    optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
    loss='mse',
    metrics=['mse']
)

In [None]:
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=val_ds,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=15,               # much more patient
            restore_best_weights=True
        ),
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6,               # don't let lr go too low
            verbose=1
        )
    ]
)

Epoch 1/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 3s/step - loss: 0.1436 - mse: 0.1436 - val_loss: 0.1139 - val_mse: 0.1139 - learning_rate: 1.0000e-04
Epoch 2/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 3s/step - loss: 0.1078 - mse: 0.1078 - val_loss: 0.1139 - val_mse: 0.1139 - learning_rate: 1.0000e-04
Epoch 3/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 2s/step - loss: 0.0753 - mse: 0.0753 - val_loss: 0.1116 - val_mse: 0.1116 - learning_rate: 1.0000e-04
Epoch 4/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 2s/step - loss: 0.0494 - mse: 0.0494 - val_loss: 0.1102 - val_mse: 0.1102 - learning_rate: 1.0000e-04
Epoch 5/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 2s/step - loss: 0.0327 - mse: 0.0327 - val_loss: 0.1067 - val_mse: 0.1067 - learning_rate: 1.0000e-04
Epoch 6/100
[1m12/12[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 2s/step - loss: 0.0225 - mse

In [None]:
# Plot training history
plt.figure(figsize=(8, 4))
plt.plot(history.history['loss'],     label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('MSE Loss')
plt.title('Training History')
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(DRIVE_DATA_PATH, 'training_history.png'), dpi=150)
plt.show()

## 5. Anomaly Scoring & Threshold

The model is trained only on normal images. At inference time, anomalous images will have **higher reconstruction error** because the model has never seen those patterns.  
We set a detection threshold at the **95th percentile** of normal training errors.

In [None]:
def reconstruction_error(model, images):
    """Mean squared error per image — scalar anomaly score."""
    reconstructions = model.predict(images, verbose=0)
    mse = np.mean((images - reconstructions) ** 2, axis=(1, 2, 3))
    return mse, reconstructions


def find_threshold(model, train_ds, percentile=95):
    """Derive detection threshold from the training (normal) distribution."""
    all_errors = []
    for x_batch, _ in train_ds:
        errors, _ = reconstruction_error(model, x_batch.numpy())
        all_errors.extend(errors)
    threshold = np.percentile(all_errors, percentile)
    print(f"Detection threshold (p{percentile}): {threshold:.6f}")
    return threshold


threshold = find_threshold(model, train_ds, percentile=95)

## 6. Evaluation: ROC-AUC & Confusion Matrix

In [None]:
# Collect predictions across the full test set
all_errors, all_labels, all_images, all_recons = [], [], [], []

for x_batch, y_batch in test_ds:
    x_np = x_batch.numpy()
    errors, recons = reconstruction_error(model, x_np)
    all_errors.extend(errors)
    all_labels.extend(np.argmax(y_batch.numpy(), axis=1))
    all_images.extend(x_np)
    all_recons.extend(recons)

all_errors = np.array(all_errors)
all_labels = np.array(all_labels)

# Binary labels: 'good' = 0 (normal), anything else = 1 (anomaly)
good_idx    = class_names.index('good')
binary_true = (all_labels != good_idx).astype(int)
binary_pred = (all_errors > threshold).astype(int)

auc = roc_auc_score(binary_true, all_errors)
print(f"ROC-AUC: {auc:.4f}")

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# ROC Curve
RocCurveDisplay.from_predictions(binary_true, all_errors, ax=axes[0])
axes[0].set_title(f'ROC Curve  (AUC = {auc:.3f})')

# Confusion Matrix
cm   = confusion_matrix(binary_true, binary_pred)
disp = ConfusionMatrixDisplay(cm, display_labels=['Normal', 'Anomaly'])
disp.plot(ax=axes[1], colorbar=False)
axes[1].set_title('Confusion Matrix')

# Error distribution per defect class
for cls_idx, cls_name in enumerate(class_names):
    mask = all_labels == cls_idx
    if mask.sum() == 0:
        continue
    axes[2].hist(all_errors[mask], bins=20, alpha=0.6, label=cls_name, density=True)
axes[2].axvline(threshold, color='red', linestyle='--', label=f'Threshold ({threshold:.4f})')
axes[2].set_xlabel('Reconstruction Error (MSE)')
axes[2].set_ylabel('Density')
axes[2].set_title('Error Distribution by Class')
axes[2].legend(fontsize=7)

plt.tight_layout()
plt.savefig(os.path.join(DRIVE_DATA_PATH, 'anomaly_evaluation.png'), dpi=150)
plt.show()

## 7. Visualise Reconstructions & Error Maps

In [None]:
def visualise_reconstructions(images, reconstructions, errors, labels, class_names, n=6):
    """Show originals, reconstructions, and per-pixel error maps for the n worst detections."""
    indices = np.argsort(errors)[::-1][:n]   # highest error first

    fig, axes = plt.subplots(n, 3, figsize=(10, n * 3))
    axes[0, 0].set_title('Original')
    axes[0, 1].set_title('Reconstruction')
    axes[0, 2].set_title('Error Map')

    for row, idx in enumerate(indices):
        orig    = np.clip(images[idx], 0, 1)
        recon   = np.clip(reconstructions[idx], 0, 1)
        err_map = np.mean(np.abs(orig - recon), axis=-1)

        axes[row, 0].imshow(orig)
        axes[row, 0].set_ylabel(class_names[labels[idx]], fontsize=9)
        axes[row, 1].imshow(recon)
        im = axes[row, 2].imshow(err_map, cmap='hot')
        plt.colorbar(im, ax=axes[row, 2])

        for ax in axes[row]:
            ax.axis('off')

    plt.suptitle('Top Anomaly Detections (highest reconstruction error)', y=1.01)
    plt.tight_layout()
    plt.savefig(os.path.join(DRIVE_DATA_PATH, 'anomaly_reconstructions.png'), dpi=150, bbox_inches='tight')
    plt.show()


visualise_reconstructions(all_images, all_recons, all_errors, all_labels, class_names, n=6)

## 8. Save Model

In [None]:
save_path = os.path.join(DRIVE_DATA_PATH, 'anomaly_autoencoder.keras')
model.save(save_path)
print(f"Model saved to: {save_path}")