#Before starting:
* This notebook is meant to run on Google Colaboratory;
* It requires you to download a third party software, otherwise it will not run. (I know it's tedious but I don't own the program and so I can't share it with you);
* I suggest to use an environment with a GPU.


##Preliminaries

###Install 'tensorflow-model-optimization' for Quantization Aware Training (QAT)

In [None]:
!pip install tensorflow ai-edge-litert

###Get 'stm32tflm' software

* Download 'X-CUBE-AI-Linux' package from https://www.st.com/en/embedded-software/x-cube-ai.html;
* Extract the 'stm32tflm' executable from the downloaded package;
* Put it in the folder you're working (usually '/content/' for Google Colaboratory).

Enable the execution of the 'stm32tflm' program

In [None]:
!chmod +x stm32tflm

In [None]:
from tensorflow import keras
from pathlib import Path
import tensorflow as tf
import numpy as np
import subprocess
import datetime
import shutil
import glob
import re
import os

## ColabNAS code

In [None]:
class ColabNAS :
    architecture_name = 'resulting_architecture'
    def __init__(self, max_RAM, max_Flash, max_MACC, path_to_training_set, val_split, cache=False, input_shape=(50,50,3), save_path='.', path_to_stm32tflm='/content/stm32tflm') :
        self.learning_rate = 1e-3
        self.batch_size = 128
        self.epochs = 100 #minimum 2

        self.max_MACC = max_MACC
        self.max_Flash = max_Flash
        self.max_RAM = max_RAM
        self.path_to_training_set = path_to_training_set
        self.num_classes = len(next(os.walk(path_to_training_set))[1])
        self.val_split = val_split
        self.cache = cache
        self.input_shape = input_shape
        self.save_path = Path(save_path)

        self.path_to_trained_models = self.save_path / "trained_models"
        self.path_to_trained_models.mkdir(parents=True, exist_ok=True)

        self.path_to_stm32tflm = Path(path_to_stm32tflm)

        self.load_training_set()

    def load_training_set(self):
        if 3 == self.input_shape[2] :
            color_mode = 'rgb'
        elif 1 == self.input_shape[2] :
            color_mode = 'grayscale'

        train_ds = tf.keras.utils.image_dataset_from_directory(
            directory= self.path_to_training_set,
            labels='inferred',
            label_mode='categorical',
            color_mode=color_mode,
            batch_size=self.batch_size,
            image_size=self.input_shape[0:2],
            shuffle=True,
            seed=11,
            validation_split=self.val_split,
            subset='training'
        )

        validation_ds = tf.keras.utils.image_dataset_from_directory(
            directory= self.path_to_training_set,
            labels='inferred',
            label_mode='categorical',
            color_mode=color_mode,
            batch_size=self.batch_size,
            image_size=self.input_shape[0:2],
            shuffle=True,
            seed=11,
            validation_split=self.val_split,
            subset='validation'
        )

        data_augmentation = tf.keras.Sequential([
            tf.keras.layers.RandomFlip("horizontal"),
            tf.keras.layers.RandomRotation(0.2, fill_mode='constant', interpolation='bilinear'),
            #tf.keras.layers.Rescaling(1./255)
            ])

        if self.cache :
            self.train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE).cache().prefetch(buffer_size=tf.data.AUTOTUNE)
            self.validation_ds = validation_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
        else :
            self.train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE).prefetch(buffer_size=tf.data.AUTOTUNE)
            self.validation_ds = validation_ds.prefetch(buffer_size=tf.data.AUTOTUNE)

    def get_data(self):
        return self.train_ds, self.validation_ds

    def quantize_model_uint8(self, model_name) :
        def representative_dataset():
            for data in self.train_ds.rebatch(1).take(150) :
                yield [tf.dtypes.cast(data[0], tf.float32)]

        model = tf.keras.models.load_model(self.path_to_trained_models / f"{model_name}.h5")
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.representative_dataset = representative_dataset
        converter.target_spec.supported_types = [tf.int8]
        converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
        converter.inference_input_type = tf.uint8
        converter.inference_output_type = tf.uint8
        tflite_quant_model = converter.convert()

        with open(self.path_to_trained_models / f"{model_name}.tflite", 'wb') as f:
            f.write(tflite_quant_model)

        (self.path_to_trained_models / f"{model_name}.h5").unlink()

    def evaluate_flash_and_peak_RAM_occupancy(self, model_name) :
        #quantize model to evaluate its peak RAM occupancy and its Flash occupancy
        self.quantize_model_uint8(model_name)

        #evaluate its peak RAM occupancy and its Flash occupancy using STMicroelectronics' X-CUBE-AI
        proc = subprocess.Popen([self.path_to_stm32tflm, self.path_to_trained_models / f"{model_name}.tflite"], stdout=subprocess.PIPE)
        try:
            outs, errs = proc.communicate(timeout=15)
            Flash, RAM = re.findall(r'\d+', str(outs))
        except subprocess.TimeoutExpired:
            proc.kill()
            outs, errs = proc.communicate()
            print("stm32tflm error")
            exit()

        return int(Flash), int(RAM)

    def evaluate_model(self, model, MACC, number_of_cells_limited, model_name) :
        print(f"\n{model_name}\n")
        checkpoint = tf.keras.callbacks.ModelCheckpoint(
            str(self.path_to_trained_models / f"{model_name}.h5"), monitor='val_accuracy',
            verbose=1, save_best_only=True, save_weights_only=False, mode='auto')
        #One epoch of training must be done before quantization, which is needed to evaluate RAM and Flash occupancy
        model.fit(self.train_ds, epochs=1, validation_data=self.validation_ds, validation_freq=1)
        model.save(self.path_to_trained_models / f"{model_name}.h5")
        Flash, RAM = self.evaluate_flash_and_peak_RAM_occupancy(model_name)
        print(f"\nRAM: {RAM},\t Flash: {Flash},\t MACC: {MACC}\n")
        if MACC <= self.max_MACC and Flash <= self.max_Flash and RAM <= self.max_RAM and not number_of_cells_limited :
            hist = model.fit(self.train_ds, epochs=self.epochs - 1, validation_data=self.validation_ds, validation_freq=1, callbacks=[checkpoint])
            self.quantize_model_uint8(model_name)
        return {'RAM': RAM if RAM <= self.max_RAM else "Outside the upper bound",
                'Flash': Flash if Flash <= self.max_Flash else "Outside the upper bound",
                'MACC': MACC if MACC <= self.max_MACC else "Outside the upper bound",
                'max_val_acc':
                np.around(np.amax(hist.history['val_accuracy']), decimals=3)
                if 'hist' in locals() else -3}

    def search(self, NAS):
      nas = NAS(
        evaluate_model_fnc = self.evaluate_model, 
        input_shape = self.input_shape, 
        num_classes = self.num_classes, 
        learning_rate = self.learning_rate
        )
      resulting_architecture, take_time = nas.search()

      if (resulting_architecture['max_val_acc'] > 0) :
            resulting_architecture_name = f"k_{resulting_architecture['k']}_c_{resulting_architecture['c']}.tflite"
            self.path_to_resulting_architecture = self.save_path / f"resulting_architecture_{resulting_architecture_name}"
            (self.path_to_trained_models / f"{resulting_architecture_name}").rename(self.path_to_resulting_architecture)
            shutil.rmtree(self.path_to_trained_models)
            print(f"\nResulting architecture: {resulting_architecture}\n")
      else :
          print(f"\nNo feasible architecture found\n")
      print(f"Elapsed time (search): {take_time}\n")

      return self.path_to_resulting_architecture

In [None]:
from ai_edge_litert.interpreter import Interpreter # Changed class name to Interpreter
import tensorflow as tf
import numpy as np

def test_litert_model(path_to_resulting_model, test_ds):
    # Initialize the LiteRT Interpreter
    interpreter = Interpreter(model_path=str(path_to_resulting_model))
    interpreter.allocate_tensors()

    output_details = interpreter.get_output_details()[0]
    input_details = interpreter.get_input_details()[0]
    input_dtype = input_details['dtype']

    correct = 0
    total = 0

    for images_batch, labels_batch in test_ds:
        for image, label in zip(images_batch, labels_batch):
            # 1. Handle Quantization Rescaling
            if input_dtype == np.uint8 or input_dtype == tf.uint8:
                scale, zero_point = input_details["quantization"]
                # Convert to integer via the formula: q = (f / scale) + zero_point
                image = (image / scale) + zero_point

            # 2. Cast and add batch dimension
            input_data = np.expand_dims(image.numpy().astype(input_dtype), axis=0)

            # 3. Inference
            interpreter.set_tensor(input_details['index'], input_data)
            interpreter.invoke()

            # 4. Get results
            prediction = interpreter.get_tensor(output_details['index'])

            if np.argmax(label) == np.argmax(prediction):
                correct += 1
            total += 1

    accuracy = correct / total if total > 0 else 0
    print(f"\nLiteRT model test accuracy: {accuracy:.4f}")

# Usage
test_litert_model(path_to_tflite_model, test_ds)

## OurNAS

In [None]:
class OurNAS():
  architecture_name = 'resulting_architecture'
  def __init__(self, evaluate_model_fnc, input_shape, num_classes, learning_rate, evaluate_model):
    self.evaluate_model_fnc = evaluate_model_fnc
    self.model_count = 0
    self.model_name = ""
    self.input_shape = input_shape
    self.num_classes = num_classes
    self.learning_rate = learning_rate
    self.evaluate_model = evaluate_model

  def create_model(self, k, c):
    kernel_size = (3,3)
    pool_size = (2,2)
    pool_strides = (2,2)

    number_of_cells_limited = False
    number_of_mac = 0

    inputs = keras.Input(shape=self.input_shape)

    #convolutional base
    n = int(k)
    multiplier = 2

    #first convolutional layer
    c_in = self.input_shape[2]
    x = keras.layers.Conv2D(n, kernel_size, padding='same')(inputs)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.ReLU()(x)

    number_of_mac = number_of_mac + (c_in * kernel_size[0] * kernel_size[1] * x.shape[1] * x.shape[2] * x.shape[3])

    #adding cells
    for i in range(1, c + 1) :
        if x.shape[1] <= 1 or x.shape[2] <= 1 :
            number_of_cells_limited = True
            break;
        n = int(np.ceil(n * multiplier))
        multiplier = multiplier - 2**-i
        x = keras.layers.MaxPooling2D(pool_size=pool_size, strides=pool_strides, padding='valid')(x)
        c_in = x.shape[3]
        x = keras.layers.Conv2D(n, kernel_size, padding='same')(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.ReLU()(x)
        number_of_mac = number_of_mac + (c_in * kernel_size[0] * kernel_size[1] * x.shape[1] * x.shape[2] * x.shape[3])

    #classifier
    x = keras.layers.GlobalAveragePooling2D()(x)
    input_shape = x.shape[1]
    x = keras.layers.Dense(n)(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.ReLU()(x)
    number_of_mac = number_of_mac + (input_shape * x.shape[1])
    x = keras.layers.Dense(self.num_classes)(x)
    x = keras.layers.BatchNormalization()(x)
    outputs = keras.layers.Softmax()(x)
    number_of_mac = number_of_mac + (x.shape[1] * outputs.shape[1])

    model = keras.Model(inputs=inputs, outputs=outputs)

    opt = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
    model.compile(optimizer=opt,
            loss='categorical_crossentropy',
            metrics=['accuracy'])

    model.summary()

    return model, number_of_mac, number_of_cells_limited

  def search(self):
    self.model_counter = 0
    epsilon = 0.005
    k0 = 4

    start = datetime.datetime.now()

    k = k0
    previous_architecture = self.explore_num_cells(k)
    k = 2 * k
    current_architecture = self.explore_num_cells(k)

    if (current_architecture['max_val_acc'] > previous_architecture['max_val_acc']) :
        previous_architecture = current_architecture
        k = 2 * k
        current_architecture = self.explore_num_cells(k)
        while(current_architecture['max_val_acc'] > previous_architecture['max_val_acc'] + epsilon) :
            previous_architecture = current_architecture
            k = 2 * k
            current_architecture = self.explore_num_cells(k)
    else :
        k = k0 / 2
        current_architecture = self.explore_num_cells(k)
        while(current_architecture['max_val_acc'] >= previous_architecture['max_val_acc']) :
            previous_architecture = current_architecture
            k = k / 2
            current_architecture = self.explore_num_cells(k)

    resulting_architecture = previous_architecture
    end = datetime.datetime.now()

    return resulting_architecture, end-start

  def explore_num_cells(self, k) :
      previous_architecture = {'k': -1, 'c': -1, 'max_val_acc': -2}
      current_architecture = {'k': -1, 'c': -1, 'max_val_acc': -1}
      c = -1
      k = int(k)

      while(current_architecture['max_val_acc'] > previous_architecture['max_val_acc']) :
          previous_architecture = current_architecture
          c = c + 1
          self.model_counter = self.model_counter + 1
          current_architecture = self.evaluate_model_process(k, c)
          print(f"\n\n\n{current_architecture}\n\n\n")
      return previous_architecture


  def evaluate_model_process(self, k, c):
    if k > 0 :
      self.model_name = f"k_{k}_c_{c}"
      model, MACC, number_of_cells_limited = self.create_model(k, c)
      result_property_dict = self.evaluate_model(model, MACC, number_of_cells_limited, self.model_name)
      result_property_dict["k"] = k
      result_property_dict["c"] = c if not number_of_cells_limited else "Not feasible"
      return result_property_dict
    else :
      return{'k': 'unfeasible', 'c': c, 'max_val_acc': -3}

In [None]:
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
data_dir = tf.keras.utils.get_file('flower_photos.tar', origin=dataset_url, extract=True)
data_dir = Path(data_dir).with_suffix('') / "flower_photos"

In [None]:
import numpy as np
import tensorflow as tf

input_shape = (50,50,3)

#target: STM32L412KBU3
#273 CoreMark, 40 kiB RAM, 128 kiB Flash
peak_RAM_upper_bound = 40960
Flash_upper_bound = 131072
MACC_upper_bound = 2730000 #CoreMark * 1e4

#Each dataset must comply with the following structure
#main_directory/
#...class_a/
#......a_image_1.jpg
#......a_image_2.jpg
#...class_b/
#......b_image_1.jpg
#......b_image_2.jpg
path_to_training_set = data_dir
val_split = 0.3

#whether or not to cache datasets in memory
#if the dataset cannot fit in the main memory, the application will crash
cache = True

#where to save results
save_path = '/content/'

#to show the GPU used
!nvidia-smi

colabNAS = ColabNAS(peak_RAM_upper_bound, Flash_upper_bound, MACC_upper_bound, path_to_training_set, val_split, cache, input_shape, save_path=save_path)

#search
path_to_tflite_model = colabNAS.search(OurNAS)

In [None]:
_, test_ds = colabNAS.get_data()

In [None]:
test_litert_model(path_to_tflite_model, test_ds)

## Second NAS (PSO)

In [None]:
# --- 1. SEARCH SPACE & ENCODING ---
# We represent an architecture as a vector: [k, c]
# k: Initial number of filters (Continuous, then rounded to int)
# c: Number of additional cells (Continuous, then rounded to int)

class ArchitectureSearchSpace:
    def __init__(self, k_range=(4, 128), c_range=(0, 10)):
        self.k_min, self.k_max = k_range
        self.c_min, self.c_max = c_range

    def clamp(self, k, c):
        """Ensures particles stay within the defined search space."""
        return np.clip(k, self.k_min, self.k_max), np.clip(c, self.c_min, self.c_max)

# --- 2. DECODER (Model Creator) ---
class ModelDecoder:
    def __init__(self):
        self.input_shape = None
        self.num_classes = None
        self.learning_rate = None

    def decode_and_build(self, k, c):
        """Transforms PSO coordinates into a Keras model + MAC count."""
        k, c = int(k), int(c)
        kernel_size = (3, 3)
        pool_size = (2, 2)
        
        number_of_mac = 0
        number_of_cells_limited = False
        
        inputs = keras.Input(shape=self.input_shape)
        n = k
        multiplier = 2
        
        # First Layer
        c_in = self.input_shape[2]
        x = keras.layers.Conv2D(n, kernel_size, padding='same')(inputs)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.ReLU()(x)
        
        # Simplified MAC calculation for clarity
        number_of_mac += (c_in * np.prod(kernel_size) * x.shape[1] * x.shape[2] * x.shape[3])

        # Adding Cells
        for i in range(1, c + 1):
            if x.shape[1] <= 1 or x.shape[2] <= 1:
                number_of_cells_limited = True
                break
            
            n = int(np.ceil(n * multiplier))
            multiplier -= 2**-i
            x = keras.layers.MaxPooling2D(pool_size=pool_size, strides=(2,2), padding='valid')(x)
            
            c_in = x.shape[3]
            x = keras.layers.Conv2D(n, kernel_size, padding='same')(x)
            x = keras.layers.BatchNormalization()(x)
            x = keras.layers.ReLU()(x)
            number_of_mac += (c_in * np.prod(kernel_size) * x.shape[1] * x.shape[2] * x.shape[3])

        # Classifier
        x = keras.layers.GlobalAveragePooling2D()(x)
        feat_dim = x.shape[1]
        x = keras.layers.Dense(n)(x)
        number_of_mac += (feat_dim * n)
        x = keras.layers.Dense(self.num_classes)(x)
        outputs = keras.layers.Softmax()(x)
        number_of_mac += (n * self.num_classes)

        model = keras.Model(inputs=inputs, outputs=outputs)
        opt = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
        model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
        
        return model, number_of_mac, number_of_cells_limited

# --- 3. EVALUATOR & CONSTRAINTS ---
class NASPsoOptimizer:
    def __init__(self, evaluate_model_fnc, input_shape, num_classes, learning_rate):
        self.evaluate_model_fnc = evaluate_model_fnc # External fnc that returns {'max_val_acc': float}
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.learning_rate = learning_rate
        self.model_name = ""
        
        self.setup_decoder()
        
    def setup_decoder(self):
        print(type(self.decoder))
        self.decoder.input_shape=self.input_shape
        self.decoder.num_classes=self.num_classes
        self.decoder.learning_rate=self.learning_rate

    def search(self):
        # Initialize particles [k, c] and velocities
        particles = np.array([
            [np.random.uniform(self.space.k_min, self.space.k_max), 
             np.random.uniform(self.space.c_min, self.space.c_max)] 
            for _ in range(self.n_particles)
        ])
        velocities = np.zeros((self.n_particles, 2))
        
        p_best = np.copy(particles)
        p_best_scores = np.full(self.n_particles, -1.0)
        
        g_best = None
        g_best_score = -1.0

        w, c1, c2 = 0.5, 1.5, 1.5 # Hyperparameters for PSO

        start = datetime.datetime.now()
        
        results_best = {}
        for it in range(self.iterations):
            print(f"==================== iteration {it} ====================")
            for i in range(self.n_particles):
                k, c = particles[i]
                
                # Build and Evaluate
                model, macc, limited = self.decoder.decode_and_build(k, c)
                
                # Constraint Check
                if limited:
                    score = 0 # Penalty for invalid architectures
                else:
                    self.model_name = f"k_{int(k)}_c_{int(c)}"
                    results = self.evaluate_model_fnc(model, macc, limited, self.model_name)
                    score = results['max_val_acc']

                # Update Personal Best
                if score > p_best_scores[i]:
                    p_best_scores[i] = score
                    p_best[i] = particles[i]

                # Update Global Best
                if score > g_best_score:
                    results_best = results
                    g_best_score = score
                    g_best = np.copy(particles[i])

            # Update Velocities and Positions
            if g_best is None:
                # Option A: If no valid architecture was found, re-randomize or skip update
                print("Warning: No valid architecture found in this iteration. Re-randomizing velocities...")
                velocities = np.random.uniform(-1, 1, size=velocities.shape)
            else:
                # Standard PSO Update logic
                for i in range(self.n_particles):
                    r1, r2 = np.random.rand(), np.random.rand()
                    velocities[i] = (w * velocities[i] + 
                                    c1 * r1 * (p_best[i] - particles[i]) + 
                                    c2 * r2 * (g_best - particles[i])) # No longer crashes
                    
                    particles[i] += velocities[i]
                    particles[i][0], particles[i][1] = self.space.clamp(particles[i][0], particles[i][1])

                print(f"Iteration {it}: Global Best Score = {g_best_score:.4f} at k={int(g_best[0])}, c={int(g_best[1])}")

        results_best["k"] = int(g_best[0])
        results_best["c"] = int(g_best[1])
        
        end = datetime.datetime.now()
        return results_best, end-start
    
    def setup(search_space, decoder,  n_particles=5, iterations=10):
        NASPsoOptimizer.n_particles = n_particles
        NASPsoOptimizer.iterations = iterations
        NASPsoOptimizer.space = search_space
        NASPsoOptimizer.decoder = decoder
        return NASPsoOptimizer

In [None]:
input_shape = (50,50,3)

#target: STM32L412KBU3
#273 CoreMark, 40 kiB RAM, 128 kiB Flash
peak_RAM_upper_bound = 40960
Flash_upper_bound = 131072
MACC_upper_bound = 2730000 #CoreMark * 1e4

#Each dataset must comply with the following structure
#main_directory/
#...class_a/
#......a_image_1.jpg
#......a_image_2.jpg
#...class_b/
#......b_image_1.jpg
#......b_image_2.jpg
path_to_training_set = data_dir
val_split = 0.3

#whether or not to cache datasets in memory
#if the dataset cannot fit in the main memory, the application will crash
cache = True

#where to save results
save_path = ''

#to show the GPU used
!nvidia-smi




search_space = ArchitectureSearchSpace(k_range=(2, 10), c_range=(1, 5))
decoder = ModelDecoder()

# Pass your existing evaluation logic here
colabNAS = ColabNAS(peak_RAM_upper_bound, Flash_upper_bound, MACC_upper_bound, path_to_training_set, val_split, cache, input_shape, save_path=save_path)
#search
path_to_tflite_model = colabNAS.search(NASPsoOptimizer.setup(search_space, decoder))

In [None]:
_, test_ds = colabNAS.get_data()

In [None]:
test_litert_model(path_to_tflite_model, test_ds)

## SEP

In [None]:
class OurNAS():
  def __init__(self, evaluate_model_fnc):
    self.architecture_name = 'resulting_architecture'
    self.evaluate_model_fnc = evaluate_model_fnc
    self.model = None
    self.model_count = 0

  def create_model(self, k, c):
    # create model
    # return model, MACC, is_valid # is_valid is status that check if the architecture is valid or not
    pass

  def search(self):
    # count self.model_count every iter
    # use PSO algorithm to fine optimize model architechture and check constraint is meet (like)
    # self.model
    pass

  def evaluate_model_process(self):
    # call the evaluate_model_func(self.model)
    # model, MACC, number_of_cells_limited = self.Model(k, c)
    pass

  def objective_funtion(self):
    return self.evaluate_model_fnc(self.model) # placeholder objective function (now, it only use model accuracy) in future i will add more objective