### Define The Ranger Optimizer


In [None]:
class Ranger(tf.keras.optimizers.Optimizer):
    def __init__(self, learning_rate=1e-5, weight_decay=1e-4, sync_period=5, slow_step=0.5, name="Ranger", **kwargs):
        # Pass learning_rate directly to the base class initializer
        super(Ranger, self).__init__(name=name, learning_rate=learning_rate, **kwargs)
        
        # Store other parameters
        self.weight_decay = weight_decay
        self.sync_period = sync_period
        self.slow_step = slow_step
        self.iterations = tf.Variable(0, dtype=tf.int64, trainable=False)
        
        # Lookahead slow weights initialization
        self.slow_weights = None

    def apply_gradients(self, grads_and_vars, name=None, **kwargs):
        # Use self.learning_rate directly in apply_gradients
        for grad, var in grads_and_vars:
            if grad is not None:
                if self.weight_decay > 0:
                    grad += self.weight_decay * var
                var.assign_sub(self.learning_rate * grad)

        # Initialize slow weights on the first call
        if self.slow_weights is None:
            self.slow_weights = [tf.Variable(v, trainable=False) for _, v in grads_and_vars]

        # Increment the iteration counter
        self.iterations.assign_add(1)

        # Apply Lookahead every sync_period steps using tf.cond
        def apply_lookahead():
            for slow_var, (_, var) in zip(self.slow_weights, grads_and_vars):
                slow_var.assign_add(self.slow_step * (var - slow_var))
                var.assign(slow_var)
            return tf.no_op()  # no-op to satisfy return type requirement

        # Check if the current iteration is a sync period
        tf.cond(tf.equal(self.iterations % self.sync_period, 0), apply_lookahead, lambda: tf.no_op())

    def get_config(self):
        config = super(Ranger, self).get_config()
        config.update({
            "learning_rate": self.learning_rate,
            "weight_decay": self.weight_decay,
            "sync_period": self.sync_period,
            "slow_step": self.slow_step
        })
        return config

# Instantiate and use the custom Ranger optimizer
optimizer = Ranger(learning_rate=1e-5, weight_decay=1e-4, sync_period=5, slow_step=0.5)
optimizer_Adam = tfk.optimizers.Adam()

### Segmentation 
(This came afterwards, and it did not provide any increment in the accuracy)

In [None]:
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import cv2

images = X_test_aug3[:500] # Assuming images are stored under the key 'images'
labels = y_test_aug3[:500]# Assuming labels are stored under the key 'labels'

# Function to apply k-means clustering to an image
def cluster_image(image, n_clusters=5):
    h, w, c = image.shape
    flat_image = image.reshape(-1, c)  # Flatten image to (pixels, channels)
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    labels = kmeans.fit_predict(flat_image)  # Cluster assignment for each pixel
    return labels.reshape(h, w), kmeans.cluster_centers_

def apply_histogram_equalization(image):
    # Ensure the image is in grayscale
    if len(image.shape) == 3:  # If the image has multiple channels (e.g., RGB/BGR)
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    else:
        gray = image
    
    # Ensure the image is uint8
    if gray.dtype != np.uint8:
        gray = (gray * 255).clip(0, 255).astype(np.uint8)  # Scale to [0, 255] and convert to uint8
    
    # Apply histogram equalization
    equalized = cv2.equalizeHist(gray)
    
    # Convert back to RGB (optional)
    return cv2.cvtColor(equalized, cv2.COLOR_GRAY2RGB)

def segment_nucleus(image):
    """
    Segments nuclei in the given image.

    Parameters:
    - image (numpy.ndarray): Input image array (H, W, C) in BGR format.

    Returns:
    - segmented_only (numpy.ndarray): Image containing only the segmented nuclei.
    """
    if image is None:
        raise ValueError("Input image is None")

    # Convert to grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # Apply stronger noise reduction with median filter
    denoised = cv2.medianBlur(gray, 5)

    # Use adaptive thresholding to handle varying lighting
    thresh = cv2.adaptiveThreshold(
        denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2
    )

    # Use morphological operations to remove small artifacts
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    morphed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)

    # Find contours
    contours, _ = cv2.findContours(morphed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Create a mask for segmented nuclei
    mask = np.zeros_like(gray)
    for contour in contours:
        area = cv2.contourArea(contour)
        # Adjust area filter based on expected nucleus size
        if 200 < area < 5000:  # These thresholds can be tuned
            cv2.drawContours(mask, [contour], -1, 255, thickness=cv2.FILLED)

    # Apply the mask to the original image to extract only the nuclei
    segmented_only = cv2.bitwise_and(image, image, mask=mask)

    return segmented_only

# Function to evaluate clusters in an image using a trained model
def evaluate_clusters(image, image_clusters, cluster_centers, model, confidence_threshold=0.2):
    h, w, c = image.shape
    significant_mask = np.zeros((h, w), dtype=bool)  # Mask to keep significant clusters
    
    for cluster_id in range(len(cluster_centers)):
        # Create a mask for the current cluster
        cluster_mask = (image_clusters == cluster_id)
        
        # Reconstruct the image with only this cluster
        reconstructed_image = np.zeros_like(image)
        reconstructed_image[cluster_mask] = image[cluster_mask]
        
        # Classify the reconstructed image
        prediction = model.predict(reconstructed_image[np.newaxis, ...])  # Add batch dimension
        confidence = np.max(prediction)  # Get max confidence
        
        # If confidence is high, keep this cluster
        if confidence > confidence_threshold:
            significant_mask = significant_mask | cluster_mask
    
    return significant_mask

# Function to reconstruct an image based on the significant mask
def reconstruct_image(image, significant_mask):
    filtered_image = np.zeros_like(image)
    filtered_image[significant_mask] = image[significant_mask]
    return filtered_image

# Apply the pipeline to the dataset
filtered_images = []
filtered_labels = []

for idx, image in enumerate(images):

    '''
    print(f"Processing image {idx+1}/{len(images)}")
    image_clusters, cluster_centers = cluster_image(image, n_clusters=15)  # Adjust n_clusters as needed
    significant_mask = evaluate_clusters(image, image_clusters, cluster_centers, ft_model, confidence_threshold=0.2)
    filtered_image = reconstruct_image(image, significant_mask)
    filtered_images.append(filtered_image)
    filtered_labels.append(labels[idx])  # Keep the original label
    '''

    prediction = ft_model.predict(image[np.newaxis, ...])   # Add batch dimension
    confidence = np.max(prediction)  # Get max confidence
        
        # If confidence is high, keep this cluster
    if confidence > 0.90:
            
            filtered_images.append(image)
            filtered_labels.append(labels[idx]) 
                                   
    else:
        
        print("ko")
        equalized_image = np.clip(image, 0, 255).astype(np.uint8)

        # Plot the original and equalized images
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 2)
        plt.imshow(equalized_image)
        plt.title("Equalized Image")
        plt.axis("off")
        filtered_images.append( apply_histogram_equalization(image))
        filtered_labels.append(labels[idx]) 



# Convert to NumPy arrays
filtered_images = np.array(filtered_images)
filtered_labels = np.array(filtered_labels)

# Generate predictions on the test set and print a classification report
y_pred = ft_model.predict(filtered_images)
y_pred_classes = y_pred.argmax(axis=1)  # Convert probabilities to class labels
y_test_classes = filtered_labels.argmax(axis=1)

# Print classification report
print("Classification Report:")
print(classification_report(y_test_classes, y_pred_classes))


### High-Pass Filter

In [None]:
import numpy as np
from scipy.fft import fft2, fftshift, ifft2
import cv2

# Assuming the data has been loaded into variables
# (X_test_aug4, y_test_aug4) = load_data("augmented_set4.npz")

# Function to create an adaptive high-pass filter based on the frequency spectrum
def adaptive_high_pass_filter(f_transform, threshold_ratio=0.2):
    """
    Generates an adaptive high-pass filter mask based on the frequency spectrum.

    Parameters:
    - f_transform: The Fourier transform of the image.
    - threshold_ratio: Ratio of the average magnitude to set as the cutoff.

    Returns:
    - mask: High-pass filter mask.
    """
    # Compute magnitude spectrum
    magnitude_spectrum = np.abs(f_transform)
    
    # Compute adaptive threshold based on mean magnitude
    adaptive_threshold = magnitude_spectrum.mean() * threshold_ratio
    
    # Create a mask where frequencies below the threshold are set to 0
    mask = magnitude_spectrum > adaptive_threshold
    return mask

# Process each image in the dataset in-place
for i, image in enumerate(X_test_aug4):
    # Ensure image is in uint8 format if needed
    if image.dtype != np.uint8:
        image = image.astype(np.uint8)

    # Process each color channel separately for color images
    if len(image.shape) == 3:  # If image has color channels
        filtered_channels = []
        
        for channel in cv2.split(image):
            # Fourier transform for the channel
            f_transform = fft2(channel)
            f_transform_shifted = fftshift(f_transform)
            
            # Apply adaptive high-pass filter
            adaptive_filter = adaptive_high_pass_filter(f_transform_shifted, threshold_ratio=0.2)
            filtered_transform = f_transform_shifted * adaptive_filter
            
            # Convert back to spatial domain
            filtered_channel = np.abs(ifft2(np.fft.ifftshift(filtered_transform)))
            filtered_channel = cv2.normalize(filtered_channel, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
            
            # Append the filtered channel
            filtered_channels.append(filtered_channel)
        
        # Merge channels back into a color image and update in-place
        X_test_aug4[i] = cv2.merge(filtered_channels)
    
    else:  # Grayscale image
        # Perform Fourier transform
        f_transform = fft2(image)
        f_transform_shifted = fftshift(f_transform)
        
        # Apply adaptive high-pass filter
        adaptive_filter = adaptive_high_pass_filter(f_transform_shifted, threshold_ratio=0.2)
        filtered_transform = f_transform_shifted * adaptive_filter
        
        # Convert back to spatial domain and normalize
        filtered_image = np.abs(ifft2(np.fft.ifftshift(filtered_transform)))
        X_test_aug4[i] = cv2.normalize(filtered_image, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)


### Hyper Parameter Tuning

In [None]:
!pip install keras-tuner
from  keras_tuner import HyperModel
from  keras_tuner.tuners import Hyperband

# Define a custom HyperModel for automatic model order selection
class ModelOrderHyperModel(HyperModel):
    def __init__(self, input_shape, num_classes):
        self.input_shape = input_shape
        self.num_classes = num_classes

    def build(self, hp):
        inputs = tfk.Input(shape=self.input_shape, name='input_layer')

        # MobileNetV3Small as a feature extractor
        mobilenet = tf.keras.applications.MobileNetV3Small(
            input_shape=self.input_shape,
            include_top=False,
            weights='imagenet'
        )
        mobilenet.trainable = False  # Freeze the base model
        x = mobilenet(inputs)

        # Global Average Pooling
        x = tfkl.GlobalAveragePooling2D(name='avg_pool')(x)

        # Add batch normalization
        if hp.Boolean('batch_norm_before_dense'):
            x = tfkl.BatchNormalization(name='batch_norm')(x)

        # Dynamically add dense layers
        for i in range(hp.Int('num_dense_layers', 1, 5)):
            x = tfkl.Dense(
                units=hp.Choice(f'units_dense_{i}', [64, 128, 256, 512]),
                activation='gelu',
                name=f'dense_{i}'
            )(x)
            if hp.Boolean(f'batch_norm_after_dense_{i}'):
                x = tfkl.BatchNormalization(name=f'batch_norm_{i}')(x)
            x = tfkl.Dropout(
                rate=hp.Float(f'dropout_rate_{i}', 0.2, 0.5, step=0.1),
                name=f'dropout_{i}'
            )(x)

        # Output layer
        outputs = tfkl.Dense(
            self.num_classes, activation='softmax', name='output'
        )(x)

        # Define and compile the model
        model = Model(inputs=inputs, outputs=outputs, name='model')
        model.compile(
            optimizer=tf.keras.optimizers.Adam(
                learning_rate=hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
            ),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        return model


# Set up Hyperband Tuner
input_shape = (96, 96, 3)
num_classes = 8

hypermodel = ModelOrderHyperModel(input_shape, num_classes)

tuner = Hyperband(
    hypermodel,
    objective='val_accuracy',
    max_epochs=30,
    factor=3,
    directory='hyperband_dir',
    project_name='model_order_selection'
)

# Callback for early stopping
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy', patience=5, restore_best_weights=True
)

# Perform the search
tuner.search(
   X_test_concat2, y_test_concat2,
    validation_data=(X_test_aug2, y_test_aug2),
    epochs=50,
    callbacks=[early_stop],
    batch_size=32
)

# Retrieve the best hyperparameters and build the best model
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
best_model = tuner.hypermodel.build(best_hps)

# Train the best model
history = best_model.fit(
    x_train, y_train,
    validation_data=(x_val, y_val),
    epochs=50,
    callbacks=[early_stop],
    batch_size=32
)

# Evaluate the best model
test_loss, test_accuracy = best_model.evaluate(x_test, y_test)
print(f"Test Accuracy: {test_accuracy}")
