In [1]:
import numpy as np
import cv2
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Dropout, BatchNormalization, GaussianNoise, Lambda, Dense, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [2]:
size = 512
channels =  1  #input image format
input_img = Input(shape=(size, size, channels))
#print(input_img)

In [3]:
# Encoder
x = GaussianNoise(0.1)(input_img)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Dropout(0.2)(x)
x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
x = BatchNormalization()(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Dropout(0.2)(x)

# Flattened features for anomaly detection
flattened = Flatten()(x)

# Anomaly Detection Branch
y = Dense(128, activation='relu')(flattened)
anomaly_output = Dense(1, activation='sigmoid', name='anomaly_output')(y)

# Decoder
x = Conv2D(16, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Dropout(0.2)(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
x = Dropout(0.2)(x)
decoded = Conv2D(1, (3, 3), activation='sigmoid', padding='same', name='decoded_output')(x)


autoencoder = Model(input_img, [decoded, anomaly_output])
autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss={'decoded_output': 'mse', 'anomaly_output': 'binary_crossentropy'}, loss_weights={'decoded_output': 1.0, 'anomaly_output': 0.5})



In [4]:
def preprocess_image(image):
    
    # Check and convert the image to uint8 if it's not in that format
    if image.dtype != np.uint8:
        # Normalize to 0-255 and convert to uint8 if it seems to be in a [0, 1] range
        if image.max() <= 1.0:
            image = (image * 255).astype(np.uint8)
        else:
            image = image.astype(np.uint8)
    # Apply erosion to reduce noise
    kernel = np.ones((3,3), np.uint8)  # Adjust the kernel size as needed
    eroded_image = cv2.erode(image, kernel, iterations=1)
    # Apply edge detection
    edges = cv2.Canny(image, 100, 200)
    
    # Resize the image to match the input shape of the network and add a channel dimension
    resized_image = cv2.resize(edges, (size, size))
    
    # Normalize the image to range [0, 1] for the neural network
    final_image = np.expand_dims(resized_image, axis=-1) / 1.0
    
    return final_image

def custom_preprocessor(img):
    # Assuming 'img' is a NumPy array of image data
    return preprocess_image(img)

# Setup the data generator
datagen = ImageDataGenerator(
    rescale=1./255,
    preprocessing_function=custom_preprocessor,
    validation_split=0.2  # if you have validation data
)


In [None]:


def generate_data(generator, normal_path, anomaly_path, batch_size):
    # Normal data generator
    normal_gen = generator.flow_from_directory(
        normal_path,
        target_size=(size, size),
        color_mode='grayscale',
        batch_size=batch_size,
        class_mode=None,
        shuffle=True
    )
    
    # Anomaly data generator
    anomaly_gen = generator.flow_from_directory(
        anomaly_path,
        target_size=(size, size),
        color_mode='grayscale',
        batch_size=batch_size,
        class_mode=None,
        shuffle=True
    )
    
    while True:
        # Get a batch of normal and anomalous images
        normal_images = next(normal_gen)
        anomaly_images = next(anomaly_gen)
        
        # Create a combined batch
        combined_images = np.concatenate([normal_images, anomaly_images])
        
        # Create labels for the batch
        normal_labels = np.zeros((batch_size, 1))
        anomaly_labels = np.ones((batch_size, 1))
        combined_labels = np.concatenate([normal_labels, anomaly_labels])
        
        # Shuffle combined batch and labels together to mix normal and anomalies
        indices = np.arange(combined_images.shape[0])
        np.random.shuffle(indices)
        combined_images = combined_images[indices]
        combined_labels = combined_labels[indices]
        
        yield combined_images, [combined_images, combined_labels]

# Assuming 'thesis/dataset/normal' and 'thesis/dataset/anomaly' as paths
train_gen = generate_data(datagen, 'thesis/dataset/normal', 'thesis/dataset/anomaly', 8)
validation_gen = generate_data(datagen, 'thesis/dataset_val/normal', 'thesis/dataset_val/anomaly', 8)


In [7]:
# Define early stopping callback
early_stopper = EarlyStopping(monitor='val_loss', patience=2, verbose=1, restore_best_weights=True)

# Now include the callback in the fit method
autoencoder.fit(
    train_generator,
    epochs=100,
    validation_data=validation_generator,
    callbacks=[early_stopper]
)

Epoch 1/100


  self._warn_if_super_not_called()


[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 3s/step - loss: 0.2085 - val_loss: 0.2354
Epoch 2/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3s/step - loss: 0.0130 - val_loss: 0.2173
Epoch 3/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3s/step - loss: 0.0056 - val_loss: 0.2007
Epoch 4/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3s/step - loss: 0.0054 - val_loss: 0.1875
Epoch 5/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3s/step - loss: 0.0056 - val_loss: 0.1773
Epoch 6/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 3s/step - loss: 0.0055 - val_loss: 0.1697
Epoch 7/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3s/step - loss: 0.0056 - val_loss: 0.1636
Epoch 8/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3s/step - loss: 0.0054 - val_loss: 0.1588
Epoch 9/100
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 3

<keras.src.callbacks.history.History at 0x17426c7f0>

In [8]:
#save model
autoencoder.save('0505_2.keras')