In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import json
from sklearn.model_selection import train_test_split
from sklearn import linear_model
import csv
from keras import optimizers
import keras
from functools import partial
from math import exp
from keras.utils import get_custom_objects
from keras.layers import Activation
from keras.callbacks import ModelCheckpoint
import os
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score
from sklearn.metrics import classification_report 
import joblib
import pickle
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_auc_score, roc_curve
from itertools import cycle
import random

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Set only the first GPU as visible
        tf.config.set_visible_devices(gpus[0], 'GPU')
        # Allow memory growth to allocate memory dynamically on the GPU
        tf.config.experimental.set_memory_growth(gpus[0], True)
        print("GPU configuration successful.")
    except RuntimeError as e:
        print(e)
else:
    print("No GPU detected.")

In [None]:
from keras.mixed_precision import Policy
from keras.mixed_precision import set_global_policy

policy = Policy('mixed_float16')
set_global_policy(policy)

In [None]:
def load_data(data_path):
    """Loads training dataset.
    """
    X_train = np.load(f'{data_path}/X_train.npy')
    X_test = np.load(f'{data_path}/X_test.npy')
    X_validation = np.load(f'{data_path}/X_val.npy')
    
    y_train = np.load(f'{data_path}/y_train.npy')
    y_test = np.load(f'{data_path}/y_test.npy')
    y_validation = np.load(f'{data_path}/y_val.npy')

    y_train = y_train[..., np.newaxis]
    y_test = y_test[..., np.newaxis]
    y_validation = y_validation[..., np.newaxis]

    print("Dataset loaded!")
    
    
    return X_train, X_test, X_validation, y_train, y_test, y_validation

In [None]:
def prepare_dataset(data_path):
    """Creates train, validation and test sets.
    """

    # load dataset
    X_train, X_test, X_validation, y_train, y_test, y_validation = load_data(data_path)

    ############ Scaleing the data ###########
    scaler = StandardScaler()
    num_instances, num_time_steps, num_features = X_train.shape
    X_train = X_train.reshape(-1, num_features)
    X_train = scaler.fit_transform(X_train)
    
    #reshapeing
    X_train = X_train.reshape(num_instances, num_time_steps, num_features) 
    num_instances, num_time_steps, num_features = X_test.shape
    X_test = X_test.reshape(-1, num_features)
    X_test = scaler.fit_transform(X_test)
    
    #reshapeing
    X_test = X_test.reshape(num_instances, num_time_steps, num_features) 
    num_instances, num_time_steps, num_features = X_validation.shape
    X_validation = X_validation.reshape(-1, num_features)
    X_validation = scaler.fit_transform(X_validation)
    
    #reshapeing
    X_validation = X_validation.reshape(num_instances, num_time_steps, num_features) 
    
    # Save the scaler to a file
    joblib.dump(scaler, './scaler/scaler.pkl')

    # add an axis to nd array
    X_train = X_train[..., np.newaxis]
    X_test = X_test[..., np.newaxis]
    X_validation = X_validation[..., np.newaxis]

    return X_train, y_train, X_validation, y_validation, X_test, y_test

In [None]:
DATA_PATH = "/home/soumen/Soumen/Dataset/KWS/data_npy" 
class_names = ['off', 'left', 'down', 'up', 'go', 'on', 'stop', 'unknown', 'right', 'yes']  #, 'silence' , 'no'
EPOCHS = 100
BATCH_SIZE = 16  #64
PATIENCE = 10
LEARNING_RATE = 0.0001
SKIP = 1
CLASS = 10

In [None]:
# generate train, validation and test sets
X_train, y_train, X_validation, y_validation, X_test, y_test = prepare_dataset(DATA_PATH)
print(X_train.shape)
print(y_train.shape)
print(X_validation.shape)
print(y_validation.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
import random
import math
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam
from keras.layers import SeparableConv2D, DepthwiseConv2D, GlobalAveragePooling2D

input_shape = (X_train.shape[1], X_train.shape[2], 1)

In [None]:
##################### Objective functions #####################
def objective_accuracy(model, X_val, y_val):
    """Compute validation accuracy."""
    _, accuracy = model.evaluate(X_val, y_val, verbose=0)
    return accuracy

def objective_model_size(model):
    """Compute model size in megabytes."""
    model_size = sum([np.prod(v.get_shape().as_list()) for v in model.trainable_weights]) #* 4 / (1024 ** 2)
    return model_size

In [None]:
# Generate a neighboring solution
import random

def perturb_solution(solution, bounds):
    """
    Generate a perturbed solution using a concise approach within the given bounds.
    
    Parameters:
        solution (list): The current solution.
        bounds (list): Bounds for each parameter. Format:
            - For integers: (min, max)
            - For lists: A range for length and a range for values
            - For categorical choices: A list of valid options
    
    Returns:
        list: A new perturbed solution.
    """
    return [
        # Conv layers (integer range)
        max(bounds[0][0], min(bounds[0][1], solution[0] + random.randint(-1, 1))),
        
        # Filters (integer range)
        max(bounds[1][0], min(bounds[1][1], solution[1] + random.randint(-32, 32))),
        
        # Kernel size (choice from list)
        random.choice(bounds[2]),
        
        # FC layers (list of integers with random perturbations)
        [max(bounds[4][0], min(bounds[4][1], fc + random.randint(-8, 8)))
         for fc in solution[3]],
        
        # Batch Normalization (choice from list)
        random.choice(bounds[5]),
        
        # Dropout (choice from list)
        random.choice(bounds[6])
    ]

In [None]:
# Create a Depthwise Separable CNN model
def create_dscnn_model(conv_layers, filters, kernel_size, fc_layers, use_bn, use_dropout, input_shape, depth_multiplier=1):
	model = Sequential()
    
 # First Conv2D layer
	model.add(Conv2D(filters, kernel_size=kernel_size, strides=(1, 1), padding='same', activation='relu', input_shape=input_shape))
	model.add(Conv2D(filters, kernel_size=kernel_size, strides=(1, 1), padding='same', activation='relu'))
	current_shape = model.output_shape
	if current_shape[1] > 2 and current_shape[2] > 2:
			model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

	# Depthwise Separable Convolution Blocks
	def depthwise_separable_block(filters):
		model.add(SeparableConv2D(filters, (3, 3), depth_multiplier=depth_multiplier, padding="same", activation="relu"))
		if use_bn:
			model.add(BatchNormalization())
		model.add(SeparableConv2D(filters, (3, 3), depth_multiplier=depth_multiplier, padding="same", activation="relu"))
		if use_bn:
			model.add(BatchNormalization())
		current_shape = model.output_shape
		if current_shape[1] > 2 and current_shape[2] > 2:
			model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))
	if conv_layers == 1:
					depthwise_separable_block(64)
	if conv_layers == 2:
					depthwise_separable_block(128)
	if conv_layers == 3:
					depthwise_separable_block(256)    
	if conv_layers == 4:
					depthwise_separable_block(512)

	model.add(Flatten())

  # Fully connected layers (dense layers)
	for neurons in fc_layers:
		if neurons == 4:
			model.add(Dense(512, activation='relu'))
			if use_dropout:
				model.add(Dropout(0.5))
		if neurons == 3:
			model.add(Dense(256, activation='relu'))
			if use_dropout:
				model.add(Dropout(0.5))
		if neurons == 2:
			model.add(Dense(128, activation='relu'))
			if use_dropout:
				model.add(Dropout(0.3))
		if neurons == 1:
			model.add(Dense(64, activation='relu'))
			if use_dropout:
				model.add(Dropout(0.3))
	model.add(Dense(CLASS, activation='softmax'))
	model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
	# model.summary()
	return model

In [None]:
##################### Simulated Annealing with Multi-Objective Optimization #####################
#################################################################################################
def chaos_simulated_annealing(bounds, max_iter, cooling_rate=0.012, initial_temps=(10, 10), input_shape=input_shape, T_min=1e-6, no_improve_iter=10):
    
    # Initialize solution, and archive
    current_solution = [
        random.randint(*bounds[0]),                    # Conv layers
        random.randint(*bounds[1]),                    # Filters
        random.choice(bounds[2]),                      # Kernel size
        [random.randint(*bounds[4]) for _ in range(random.randint(*bounds[3]))],  # FC layers
        random.choice(bounds[5]),                      # Batch Normalization
        random.choice(bounds[6])                       # Dropout
    ]
    temperatures = list(initial_temps)
    archive = []
    archive_all = []  # Initialize archive_all
    no_improve_count = 0  # Counter for no improvement

    for iteration in range(max_iter):
        print(f"Iteration {iteration+1}/{max_iter}")
           
        # Check if temperatures have reached the threshold
        if all(t < T_min for t in temperatures):
            print("Temperatures are below the minimum threshold. Stopping.")
            break

        # Generate a neighbor solution
        new_solution = perturb_solution(current_solution, bounds)

        print(f"current_solution: {current_solution}")
        print(f"new_solution: {new_solution}")
        
        # Build and evaluate models for current and new solutions
        current_model = create_dscnn_model(*current_solution, input_shape=input_shape)
        new_model = create_dscnn_model(*new_solution, input_shape=input_shape)
        
        # Early stopping
        early_stopping = EarlyStopping(monitor='loss', patience=PATIENCE)
        current_model.fit(X_train, y_train, epochs=500, batch_size=16, callbacks=[early_stopping], verbose=0)
        new_model.fit(X_train, y_train, epochs=500, batch_size=16, callbacks=[early_stopping], verbose=0)

        current_obj = [
            objective_accuracy(current_model, X_validation, y_validation),
            objective_model_size(current_model)
        ]
        new_obj = [
            objective_accuracy(new_model, X_validation, y_validation),
            objective_model_size(new_model)
        ]
        
        print(f"current_obj: {current_obj}")
        print(f"new_obj: {new_obj}")
        
        # Acceptance probability for each objective
        accept_probs = [
            # For accuracy (should increase)
            1 if new_obj[0] > current_obj[0] else 
            np.exp(-(current_obj[0] - new_obj[0]) / temperatures[0]),
            
            # For model size (should decrease)
            1 if new_obj[1] < current_obj[1] else 
            np.exp(-(new_obj[1] - current_obj[1]) / temperatures[1])
        ]

        # Check acceptance
        if any(p == 1 for p in accept_probs):
            current_solution = new_solution
            archive_all.append((current_solution, new_obj))  # Store all evaluated good solutions
        else:
            all_accepted = True
            for p in accept_probs:
                if random.random() >= p:
                    all_accepted = False
                    break
            if all_accepted:
                current_solution = new_solution
                archive_all.append((current_solution, new_obj))  # Store all evaluated good solutions

        print(f"accept_probs: {accept_probs}")
        
        # Update temperatures using exponential cooling
        temperatures = [t * math.exp(-cooling_rate) for t in temperatures]
        # print(f"Archive_all: {archive_all}")
        print(f"temperatures: {temperatures}")
        print("###############################################")
    
    # Moved return outside the for-loop
    return archive, archive_all

In [None]:
##################### Define bounds for hyperparameters #####################
bounds = [
    (1, 3),                     # Number of convolutional layers
    (16, 64),                   # Number of filters
    [(3, 3), (5, 5)],           # Kernel size
    (1, 3),                     # Number of fully connected layers (1 to 3 layers)
    (128, 512),                  # Number of neurons per fully connected layer
    [True, False],              # Batch Normalization
    [True, False]               # Dropout
]

In [None]:
##################### Run CSA #####################
archive_all = chaos_simulated_annealing(bounds, max_iter=500)

In [None]:
print("All Archive:", archive_all)

In [None]:
import pandas as pd 
def store_pareto_parameter_archive_all(archive, filename="DS-CNN_all_archive.xlsx"):
    """
    Store the final Pareto archive in an Excel file.
    
    Parameters:
        archive (list): List of tuples where each tuple contains (solution, objectives).
        filename (str): Name of the Excel file to save the data.
    """
    # Prepare data for saving
    data = []
    for idx, (solution, objectives) in enumerate(archive):
        # Flatten the solution and append objectives
        record = {
            "Solution ID": f"Solution_{idx + 1}",
            "Conv Layers": solution[0],
            "Filters": solution[1],
            "Kernel Size": solution[2],
            "FC Layers": "-".join(map(str, solution[3])),  # Join FC layers as a string
            "Batch Normalization": solution[4],
            "Dropout": solution[5],
            "accuracy": objectives[0],
            "model_size": objectives[1]
        }
        data.append(record)
    
    # Convert to DataFrame
    df = pd.DataFrame(data)
    
    # Save to Excel
    df.to_excel(filename, index=False)
    print(f"Pareto archive saved to {filename}")


In [None]:
store_pareto_parameter_archive_all(archive_all)