In [None]:
# Install necessary packages
!pip install opendatasets hickle

import opendatasets as od
import os

# Download dataset
od.download("https://www.kaggle.com/datasets/karthiknm1/ucsd-anomaly-detection-dataset")

# Check if the data is downloaded successfully
if os.path.exists("/content/ucsd-anomaly-detection-dataset"):
    print("Dataset downloaded successfully.")
else:
    print("Failed to download dataset.")

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Collecting hickle
  Downloading hickle-5.0.3-py3-none-any.whl (107 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m108.0/108.0 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: hickle, opendatasets
Successfully installed hickle-5.0.3 opendatasets-0.1.22
Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: ayushaiml
Your Kaggle Key: ··········
Dataset URL: https://www.kaggle.com/datasets/karthiknm1/ucsd-anomaly-detection-dataset
Downloading ucsd-anomaly-detection-dataset.zip to ./ucsd-anomaly-detection-dataset


100%|██████████| 702M/702M [00:11<00:00, 62.3MB/s]



Dataset downloaded successfully.


In [None]:
# Import necessary libraries
import numpy as np
import tensorflow as tf
from PIL import Image
import glob
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import cv2
import os

In [None]:
# Load a subset of images from a directory
def load_images_from_directory(directory, target_size=(128, 128), grayscale=False, load_fraction=0.3):
    images = []
    labels = []
    total_files = []

    # Gather all files from the directory
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.lower().endswith(('png', 'jpg', 'jpeg', 'bmp', 'gif', 'tif', 'tiff')):
                total_files.append(os.path.join(root, file))

    # Calculate the number of files to load
    num_files_to_load = int(len(total_files) * load_fraction)
    files_to_load = np.random.choice(total_files, num_files_to_load, replace=False)

    # Load the selected files
    for img_path in files_to_load:
        try:
            if grayscale:
                img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
                if img is None:
                    print(f"Error loading image {img_path}: Image is None")
                    continue
                img = cv2.resize(img, target_size)
                img = np.expand_dims(img, axis=-1)  # Add a channel dimension
            else:
                img = tf.keras.preprocessing.image.load_img(img_path, target_size=target_size)
                img = tf.keras.preprocessing.image.img_to_array(img)
            img = img / 255.0
            images.append(img)
            label = 0 if 'Train' in root else 1  # Simple logic to assign labels
            labels.append(label)
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
    return np.array(images), np.array(labels)

In [None]:
# Define directories
normal_dir = "/content/ucsd-anomaly-detection-dataset/UCSD_Anomaly_Dataset.v1p2/UCSDped1/Train"
anomaly_dir = "/content/ucsd-anomaly-detection-dataset/UCSD_Anomaly_Dataset.v1p2/UCSDped1/Test"


In [None]:
# Load images (30% of the total data)
normal_images, normal_labels = load_images_from_directory(normal_dir, target_size=(128, 128), load_fraction=0.3)
anomaly_images, anomaly_labels = load_images_from_directory(anomaly_dir, target_size=(128, 128), load_fraction=0.3)


print(f"Number of normal images loaded: {len(normal_images)}")
print(f"Normal Images shape: {normal_images.shape}")
print(f"Normal Labels shape: {normal_labels.shape}")
print(f"Number of anomaly images loaded: {len(anomaly_images)}")
print(f"Anomaly Images shape: {anomaly_images.shape}")
print(f"Anomaly Labels shape: {anomaly_labels.shape}")

Number of normal images loaded: 2040
Normal Images shape: (2040, 128, 128, 3)
Normal Labels shape: (2040,)
Number of anomaly images loaded: 2760
Anomaly Images shape: (2760, 128, 128, 3)
Anomaly Labels shape: (2760,)


In [None]:
# Combine normal and anomaly images
all_images = np.concatenate([normal_images, anomaly_images], axis=0)
all_labels = np.concatenate([normal_labels, anomaly_labels], axis=0)

# Split into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(all_images, all_labels, test_size=0.4, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)


In [None]:
# Define Neural Network Models
def create_cnn_model(input_shape):
    model = Sequential()
    model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

input_shape = (128, 128, 3)
cnn_model = create_cnn_model(input_shape)

early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

cnn_history = cnn_model.fit(X_train, y_train,
                            validation_data=(X_val, y_val),
                            epochs=10, callbacks=[early_stopping])


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Define ANN Model
def create_ann_model(input_shape):
    model = Sequential()
    model.add(Flatten(input_shape=input_shape))
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

ann_model = create_ann_model(input_shape)

ann_history = ann_model.fit(X_train, y_train,
                            validation_data=(X_val, y_val),
                            epochs=10, callbacks=[early_stopping])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Define MLP Model
def create_mlp_model(input_shape):
    model = Sequential()
    model.add(Flatten(input_shape=input_shape))
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(256, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

mlp_model = create_mlp_model(input_shape)

mlp_history = mlp_model.fit(X_train, y_train,
                            validation_data=(X_val, y_val),
                            epochs=10, callbacks=[early_stopping])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Evaluate the Models
cnn_val_loss, cnn_val_acc = cnn_model.evaluate(X_val, y_val)
print("CNN Validation Accuracy:", cnn_val_acc)

cnn_test_loss, cnn_test_acc = cnn_model.evaluate(X_test, y_test)
print("CNN Test Accuracy:", cnn_test_acc)

ann_val_loss, ann_val_acc = ann_model.evaluate(X_val, y_val)
print("ANN Validation Accuracy:", ann_val_acc)

ann_test_loss, ann_test_acc = ann_model.evaluate(X_test, y_test)
print("ANN Test Accuracy:", ann_test_acc)

mlp_val_loss, mlp_val_acc = mlp_model.evaluate(X_val, y_val)
print("MLP Validation Accuracy:", mlp_val_acc)

mlp_test_loss, mlp_test_acc = mlp_model.evaluate(X_test, y_test)
print("MLP Test Accuracy:", mlp_test_acc)

CNN Validation Accuracy: 0.9281250238418579
CNN Test Accuracy: 0.9145833253860474
ANN Validation Accuracy: 0.5885416865348816
ANN Test Accuracy: 0.5479166507720947
MLP Validation Accuracy: 0.5885416865348816
MLP Test Accuracy: 0.5479166507720947


In [None]:
# Isolation Forest
from sklearn.ensemble import IsolationForest

iso_forest = IsolationForest(contamination=0.1)
iso_forest.fit(X_train.reshape((X_train.shape[0], -1)))
y_pred_iso = iso_forest.predict(X_test.reshape((X_test.shape[0], -1)))
y_pred_iso = [0 if x == 1 else 1 for x in y_pred_iso]  # Convert to binary labels

print("Isolation Forest Accuracy:", accuracy_score(y_test, y_pred_iso))
print("Classification Report:\n", classification_report(y_test, y_pred_iso))

Isolation Forest Accuracy: 0.45208333333333334
Classification Report:
               precision    recall  f1-score   support

           0       0.45      0.89      0.60       434
           1       0.50      0.09      0.15       526

    accuracy                           0.45       960
   macro avg       0.47      0.49      0.37       960
weighted avg       0.48      0.45      0.35       960



In [None]:
# Local Outlier Factor (LOF)
from sklearn.neighbors import LocalOutlierFactor

# Flatten images for LOF
X_train_flat = X_train.reshape(X_train.shape[0], -1)
X_test_flat = X_test.reshape(X_test.shape[0], -1)

lof = LocalOutlierFactor(n_neighbors=20, contamination=0.1)
y_pred_lof = lof.fit_predict(X_test_flat)
y_pred_lof = [0 if x == 1 else 1 for x in y_pred_lof]

print("Local Outlier Factor Accuracy:", accuracy_score(y_test, y_pred_lof))
print("Classification Report:\n", classification_report(y_test, y_pred_lof))


Local Outlier Factor Accuracy: 0.5020833333333333
Classification Report:
               precision    recall  f1-score   support

           0       0.47      0.94      0.63       434
           1       0.75      0.14      0.23       526

    accuracy                           0.50       960
   macro avg       0.61      0.54      0.43       960
weighted avg       0.63      0.50      0.41       960



In [None]:
# One-Class SVM
from sklearn.svm import OneClassSVM

one_class_svm = OneClassSVM(kernel='rbf', gamma=0.001, nu=0.03)
one_class_svm.fit(X_train_flat)
y_pred_svm = one_class_svm.predict(X_test_flat)
y_pred_svm = [0 if x == 1 else 1 for x in y_pred_svm]

print("One-Class SVM Accuracy:", accuracy_score(y_test, y_pred_svm))
print("Classification Report:\n", classification_report(y_test, y_pred_svm))

One-Class SVM Accuracy: 0.4395833333333333
Classification Report:
               precision    recall  f1-score   support

           0       0.44      0.93      0.60       434
           1       0.38      0.03      0.06       526

    accuracy                           0.44       960
   macro avg       0.41      0.48      0.33       960
weighted avg       0.41      0.44      0.31       960



In [None]:
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.models import Model
import numpy as np
from sklearn.metrics import accuracy_score

# Define Autoencoder Model
def create_autoencoder(input_shape):
    input_img = Input(shape=input_shape)

    # Encoder
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)

    # Decoder
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    # Change the number of output channels to 3 to match input
    decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    autoencoder = Model(input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
    return autoencoder

# Example usage
input_shape = (128, 128, 3)  # Update with your image dimensions
autoencoder = create_autoencoder(input_shape)

# Train the autoencoder on normal images
autoencoder.fit(X_train, X_train, epochs=10, batch_size=32, validation_data=(X_val, X_val))

# Use the autoencoder to predict and compute reconstruction errors
reconstructed_images = autoencoder.predict(X_test)
reconstruction_errors = np.mean(np.abs(X_test - reconstructed_images), axis=(1, 2, 3))

# Use reconstruction errors for anomaly detection
threshold = np.percentile(reconstruction_errors, 95)  # Example threshold
anomalies = reconstruction_errors > threshold

# Assuming y_test contains true labels (0 for normal, 1 for anomaly)
y_pred = anomalies.astype(int) # Convert boolean anomalies to 0 and 1
accuracy = accuracy_score(y_test, y_pred)
print("Number of anomalies detected:", np.sum(anomalies))
print("Accuracy:", accuracy)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Number of anomalies detected: 48
Accuracy: 0.42291666666666666
