# Configura√ß√µes

## Visualiza√ß√£o de configura√ß√µes do servidor

In [None]:
!pip install psutil
import psutil
import platform
import shutil

# Informa√ß√µes do sistema
print(f"Sistema operacional: {platform.system()} {platform.release()}")
print(f"Arquitetura: {platform.machine()}")
print(f"Processador: {platform.processor()}")

# Mem√≥ria
mem = psutil.virtual_memory()
print(f"Mem√≥ria total: {mem.total / (1024**3):.2f} GB")
print(f"Mem√≥ria dispon√≠vel: {mem.available / (1024**3):.2f} GB")
print(f"Uso de mem√≥ria: {mem.percent}%")

# CPU
print(f"N√∫mero de CPUs f√≠sicas: {psutil.cpu_count(logical=False)}")
print(f"N√∫mero de CPUs l√≥gicas: {psutil.cpu_count(logical=True)}")
print(f"Uso da CPU: {psutil.cpu_percent(interval=1)}%")

# Disco
total, used, free = shutil.disk_usage("/")
print(f"Espa√ßo total em disco: {total / (1024**3):.2f} GB")
print(f"Espa√ßo usado: {used / (1024**3):.2f} GB")
print(f"Espa√ßo livre: {free / (1024**3):.2f} GB")


## Verificando vers√µes instaladas no sistema

In [None]:
import tensorflow as tf, numpy as np, sys
print(sys.executable)      
print("Python", sys.version)
print("TF:", tf.__version__)
print("NumPy:", np.__version__) 

## Instala√ß√£o das bibliotecas necess√°rias

In [None]:
# Instala√ß√£o das bibliotecas necess√°rias (caso n√£o estejam instaladas)
!pip install -q scikit-image
!pip install -q keras-tuner
%pip install -U pip setuptools wheel
%pip install -U optuna
%pip install -U optuna-integration[tfkeras]
%pip install -U optuna optuna-dashboard
!pip install tabulate

## Importa√ß√£o das bibliotecas

In [None]:
# Importa√ß√£o das bibliotecas
import os, zipfile, cv2, time, datetime
from datetime import datetime
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.losses import mse
import tensorflow.keras.backend as K
from tensorflow.image import ssim_multiscale
import matplotlib.pyplot as plt
from skimage.metrics import structural_similarity as ssim
from scipy.stats import kruskal
import keras_tuner as kt
import optuna
print(optuna.__version__)
from tensorflow.keras.utils import register_keras_serializable
import edgeimpulse as ei
import pandas as pd
import shutil
from typing import Dict, Any, Optional
from glob import glob

## Distribui√ß√£o em m√∫ltiplas GPUs

In [None]:
# Detecta e inicializa a estrat√©gia para m√∫ltiplas GPUs
try:
    strategy = tf.distribute.MirroredStrategy()
    print(f'Estrat√©gia de distribui√ß√£o: {strategy}')
except ValueError:
    strategy = tf.distribute.get_strategy() # Estrat√©gia padr√£o para uma √∫nica GPU ou CPU
    print(f'N√£o foi poss√≠vel detectar m√∫ltiplas GPUs. Usando a estrat√©gia padr√£o: {strategy}')

## Pr√©-processamento carregamento das imagens

In [None]:
def carregar_imagens(pasta, tamanho=(128, 128), max_imgs=3000):
    """
    Carrega e pr√©-processa imagens de um diret√≥rio.
    """
    imagens = []
    # Cria uma lista de caminhos para as imagens
    caminhos_imagens = [os.path.join(pasta, img_nome) for img_nome in os.listdir(pasta)[:max_imgs]]

    for img_path in tqdm(caminhos_imagens):
        img = cv2.imread(img_path)
        if img is not None:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, tamanho)
            img = img.astype('float32') / 255.0
            imagens.append(img)
    return np.array(imagens)

# Caminhos para os datasets do ambiente Kaggle
base_path = 'datasets/sard_dataset/search-and-rescue-2'

# Carregando os dados
print("Carregando o conjunto de treinamento...")
X_train = carregar_imagens(os.path.join(base_path, 'train/images'), max_imgs=5000)

print("Carregando o conjunto de valida√ß√£o...")
X_val = carregar_imagens(os.path.join(base_path, 'valid/images'), max_imgs=1000)

print("Carregando o conjunto de teste...")
X_test = carregar_imagens(os.path.join(base_path, 'test/images'), max_imgs=1000)

print("\nDados carregados com sucesso!")
print(f"Formato do conjunto de treinamento: {X_train.shape}")
print(f"Formato do conjunto de valida√ß√£o: {X_val.shape}")
print(f"Formato do conjunto de teste: {X_test.shape}")

# Defini√ß√£o dos Modelos de Autoencoders

## Tentativa com fun√ß√µes de perdas combinadas n√£o tiveram resultados finais satisfat√≥rios

## Fun√ß√£o de perda combinada 1
Usar apenas o mse (erro quadr√°tico m√©dio) √© bom para o PSNR, mas n√£o captura a percep√ß√£o visual. A melhoria √© combinar o mse com o SSIM, que mede a similaridade estrutural e √© mais alinhado com a percep√ß√£o humana.

In [None]:
@register_keras_serializable(package="Custom")
def make_custom_loss(alpha=0.84):
    """Retorna uma fun√ß√£o de perda: alpha*SSIM_loss + (1-alpha)*MSE."""
    def loss(y_true, y_pred):
        # Garante faixa [0,1] para o SSIM
        y_pred_clip = tf.clip_by_value(y_pred, 0.0, 1.0)

        # SSIM por amostra (1-SSIM vira erro). shape: (batch,)
        ssim_loss = 1.0 - tf.image.ssim(y_true, y_pred_clip, max_val=1.0)

        # MSE por amostra (mant√©m por-imagem e deixa o Keras reduzir)
        mse_loss = tf.reduce_mean(tf.square(y_true - y_pred), axis=[1, 2, 3])

        return alpha * ssim_loss + (1.0 - alpha) * mse_loss
    return loss

## Fun√ß√£o de perda combinada 2

In [None]:
def loss_mse_ssim(y_true, y_pred):
    """
    Fun√ß√£o de perda combinada: 80% MSE + 20% (1 - SSIM)
    - Mant√©m PSNR elevado (MSE ainda domina)
    - Incentiva preserva√ß√£o de estrutura (SSIM)
    """
    # MSE
    mse_loss = tf.reduce_mean(tf.square(y_true - y_pred))
    # SSIM Loss (1 - SSIM)
    ssim_loss = 1.0 - tf.reduce_mean(tf.image.ssim(y_true, y_pred, max_val=1.0))
    # Combina√ß√£o ponderada
    return 0.8 * mse_loss + 0.2 * ssim_loss


## AE Convencional Convolucional Inicial

- üîß Hiperpar√¢metros
- Input Image Shape: 128 x 128 x 3   
- Encoder layers: 2 conv2d + 2 maxmpooling
- Decoder layers: 2 conv2d + 2 upsampling
- Activation function: relu + sigmoid
- Optimizer: Adam
- Loss function: MSE

In [None]:
#Convencional inicial
def criar_autoencoder(input_shape=(128,128,3)):
    inp = Input(shape=input_shape, name='conv_input')
    #Encoder
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(inp)
    x = layers.MaxPooling2D(2, padding='same')(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(2, padding='same')(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)
    #Decoder
    x = layers.UpSampling2D(2)(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)
    x = layers.UpSampling2D(2)(x)
    out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

    autoenc = Model(inp, out, name='autoencoder_conv')
    autoenc.compile(optimizer='adam', loss='mse')
    return autoenc

## AE Convencional Convolucional Melhorado

- üîß Hiperpar√¢metros
- Input Image Shape: 128 x 128 x 3   
- Encoder layers: 2 conv2d + 2 batchnormalization +  2 maxmpooling
- Decoder layers: 4 conv2dtranspose + + 2 batchnormalization +  1 conv2d
- Activation function: relu + sigmoid
- Optimizer: Adam
- Loss function: MSE


In [None]:
#Convencional Melhorado
def criar_autoencoder_melhorado(input_shape=(128,128,3)):
    """
    Cria um autoencoder convolucional com arquitetura aprimorada.
    - Mais camadas para maior capacidade de extra√ß√£o de features.
    - Regulariza√ß√£o L2 para evitar overfitting.
    - Batch Normalization para estabilidade e treinamento mais r√°pido.
    """
    # Encoder
    inp = Input(shape=input_shape, name='conv_input')
    x = layers.Conv2D(64, 3, activation='relu', padding='same', kernel_regularizer=keras.regularizers.l2(1e-4))(inp)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 64x64
    x = layers.Conv2D(32, 3, activation='relu', padding='same', kernel_regularizer=keras.regularizers.l2(1e-4))(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 32x32

    # Decoder
    x = layers.Conv2DTranspose(32, 3, activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(32, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 64x64
    x = layers.Conv2DTranspose(64, 3, activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(64, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 128x128
    out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

    autoenc = Model(inp, out, name='autoencoder_conv_melhorado')
    autoenc.compile(optimizer='adam', loss='mse')
    return autoenc

## AE Variacional Inicial

-üîß Hiperpar√¢metros
- Input Image Shape: 128 x 128 x 3
- Latent dim: 64 * s√≥ nesse porque?  
- Encoder layers: 2 conv2d + 2 maxmpooling + 1 flaten
- Decoder layers: Dense + Reshape+ Upsampling + Conv2d + Upsampling + Conv2d
- Activation function: relu + sigmoid
- Optimizer: Adam
- Loss function: MSE


In [None]:
#modelo inicial
@tf.keras.utils.register_keras_serializable(package="CustomVAE")
def sampling_vae_func(args):
    """Fun√ß√£o de amostragem para VAEs baseados em Lambda layer (VAE Inicial)."""
    mu, log_var = args
    #  tf (mais robusto que K.random_normal no tf.keras)
    eps = tf.random.normal(shape=tf.shape(mu))
    return mu + tf.exp(0.5 * log_var) * eps    
   
def criar_vae(input_shape=(128,128,3), latent_dim=64):

    # Encoder
    inputs = Input(shape=input_shape, name='encoder_input')
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D(2, padding='same')(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)
    x = layers.MaxPooling2D(2, padding='same')(x)
    flat = layers.Flatten()(x)

    mu      = layers.Dense(latent_dim, name='mu')(flat)
    log_var = layers.Dense(latent_dim, name='log_var')(flat)
    z = layers.Lambda(sampling_vae_func, output_shape=(latent_dim,), name='z')([mu, log_var])

    # Decoder
    dec_in = Input(shape=(latent_dim,), name='decoder_input')
    y = layers.Dense(32*32*16, activation='relu')(dec_in)
    y = layers.Reshape((32,32,16))(y)
    y = layers.UpSampling2D(2)(y)
    y = layers.Conv2D(16, 3, activation='relu', padding='same')(y)
    y = layers.UpSampling2D(2)(y)
    decoded = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(y)

    decoder = Model(dec_in, decoded, name='decoder')
    outputs = decoder(z)
    
    vae = Model(inputs, outputs, name='vae')
    vae.compile(optimizer='adam', loss=mse)
    return vae

## AE Variacional Melhorado

- üîß Hiperpar√¢metros
- Input Image Shape: 128 x 128 x 3
- Latent dim: 128 * aumentou 
- Encoder layers: 2x(conv2d + batchnormalization + maxpooling) + FLaten
- Decoder layers: Dense + Reshape+  Conv2dtranspose + Conv2dtranspose + Conv2d
- Activation function: relu + sigmoid
- Optimizer: Adam
- Loss function: MSE


In [None]:
# 2) Autoencoder Variacional (VAE)

@register_keras_serializable()
class VAE(keras.Model):
    def __init__(self, encoder, decoder, beta=1.0, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.beta = beta  
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    def get_config(self):
        config = super().get_config()
        config.update({
            'encoder': serialize(self.encoder),
            'decoder': serialize(self.decoder),
            'beta': self.beta,
        })
        return config

    @classmethod
    def from_config(cls, config):
        beta = config.pop('beta', 1.0)
        encoder = tf.keras.utils.deserialize_keras_object(config.pop('encoder'))
        decoder = tf.keras.utils.deserialize_keras_object(config.pop('decoder')) 
        return cls(encoder=encoder, decoder=decoder, **config)
        
    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            # Entrada de dados
            x, _ = data

            # Executa o modelo
            z_mean, z_log_var, z = self.encoder(x)
            reconstruction = self.decoder(z)

            # Calcula a perda de reconstru√ß√£o
            reconstruction_loss = tf.reduce_mean(
               mse(x, reconstruction)
            )
            reconstruction_loss *= 128 * 128

            # Calcula a perda KL
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

            weighted_kl_loss = kl_loss * self.beta 
            
            total_loss = reconstruction_loss + weighted_kl_loss 

        # Calcula gradientes e otimiza
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # Atualiza m√©tricas
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        # Desempacota o data (X, y), onde y √© ignorado para autoencoders
        x, _ = data 

        # 1. Executa o modelo (Forward Pass)
        z_mean, z_log_var, z = self.encoder(x)
        reconstruction = self.decoder(z)

        # 2. Calcula a Perda de Reconstru√ß√£o
        # Certifique-se de que 'mse' (Mean Squared Error) est√° acess√≠vel neste escopo
        reconstruction_loss = tf.reduce_mean(
            mse(x, reconstruction)
        )
        # Aplica o mesmo fator de escala usado no train_step
        reconstruction_loss *= 128 * 128 

        # 3. Calcula a Perda KL
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

        # 4. Calcula a Perda Total
        total_loss = reconstruction_loss + kl_loss

        # 5. Atualiza os rastreadores de m√©tricas
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        # 6. Retorna as m√©tricas (as mesmas do train_step)
        return {
            "loss": self.total_loss_tracker.result(), # Corresponde a val_total_loss
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
        
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        return self.decoder(z)

def criar_vae_melhorado(input_shape=(128,128,3), latent_dim=128, beta=0.1): # Aumenta a latent_dim
    # Encoder
    inputs = layers.Input(shape=input_shape, name='encoder_input')
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(pool_size=(2, 2), padding='same')(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(pool_size=(2, 2), padding='same')(x)

    flatten = layers.Flatten()(x)
    z_mean = layers.Dense(latent_dim, name='z_mean')(flatten)
    z_log_var = layers.Dense(latent_dim, name='z_log_var')(flatten)

    def sampling(args):
        z_mean, z_log_var = args
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

    z = layers.Lambda(sampling, name='z')([z_mean, z_log_var])
    encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')

    # Decoder
    latent_inputs = layers.Input(shape=(latent_dim,))
    # Ajusta o tamanho da camada densa para corresponder ao novo latent_dim e √†s camadas do encoder
    x = layers.Dense(32 * 32 * 32, activation='relu')(latent_inputs)
    x = layers.Reshape((32, 32, 32))(x)
    x = layers.Conv2DTranspose(32, (3, 3), activation='relu', strides=(2, 2), padding='same')(x)
    x = layers.Conv2DTranspose(64, (3, 3), activation='relu', strides=(2, 2), padding='same')(x)
    outputs = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)
    decoder = Model(latent_inputs, outputs, name='decoder')

    vae = VAE(encoder, decoder, beta=beta) 
    vae.compile(optimizer='adam', loss='mean_squared_error') 
    return vae

## AE Redund√¢ncia Inicial

- üîß Hiperpar√¢metros
- Input Image Shape: 128 x 128 x 3
- Encoder layers: 2x(conv2d +  maxpooling) + FLaten
- Decoder layers: Dense + Reshape+  upsampling  + Conv2d
- Activation function: relu + sigmoid
- Optimizer: Adam
- Loss function: MSE

In [None]:
# 3) Modelo Inicial
def criar_autoencoder_redundancia(input_shape=(128,128,3)):
    inp = Input(shape=input_shape, name='pen_input')
    x = layers.Conv2D(32, 3, activation='relu', padding='same')(inp)
    x = layers.MaxPooling2D(2, padding='same')(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)
    encoded = layers.MaxPooling2D(2, padding='same')(x)

    flat = layers.Flatten()(encoded)
    penal = layers.Dense(
        256,
        activity_regularizer=tf.keras.regularizers.l1(1e-5),
        name='encoded_penalizado'
    )(flat)

    x = layers.Dense(32*32*16, activation='relu')(penal)
    x = layers.Reshape((32,32,16))(x)
    x = layers.UpSampling2D(2)(x)
    x = layers.Conv2D(16, 3, activation='relu', padding='same')(x)
    x = layers.UpSampling2D(2)(x)
    out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

    autoenc_pen = Model(inp, out, name='autoencoder_redundancia')
    autoenc_pen.compile(optimizer='adam', loss='mse')
    return autoenc_pen
     

## AE Redund√¢ncia Melhorado
- apenas duas trocas: L1 menor e latente maior

In [None]:
def criar_autoencoder_redundancia_melhorado(input_shape=(128,128,3), latent_dim=512, l1_reg=1e-6):
    input_img = Input(shape=input_shape, name='pen_input')
    x = layers.Conv2D(64, 3, padding='same')(input_img)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    x = layers.MaxPooling2D(2, padding='same')(x)  # 64x64

    x = layers.Conv2D(32, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    encoded_conv = layers.MaxPooling2D(2, padding='same')(x)  # 32x32

    encoded_flat = layers.Flatten()(encoded_conv)
    latent = layers.Dense(
        latent_dim,  # 128 -> 512/1024
        activation='relu',
        activity_regularizer=tf.keras.regularizers.l1(l1_reg)  # 10e-5 (1e-4) -> 1e-6
    )(encoded_flat)

    x = layers.Dense(32 * 32 * 32, activation='relu')(latent)
    x = layers.Reshape((32, 32, 32))(x)
    x = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation=None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)

    x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation=None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)

    out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)
    
    m = Model(input_img, out, name='autoencoder_redundancia_melhorado')
    m.compile(optimizer='adam', loss='mse')
    return m

# Treinamento e avalia√ß√£o Otimizado

- üîß Hiperpar√¢metros e configura√ß√µes
- Quantidade de √©pocas: 800 (definido ap√≥s rodar o EarlyStopping)
- Earlystopping: √© um callback que interrompe o treinamento quando a m√©trica monitorada (no caso, a val_loss) para de melhorar. Isso impede que o modelo continue treinando e comece a "memorizar" os dados de treinamento (overfitting).
- Learning Rate Scheduler: acelera a converg√™ncia e ajuda a alcan√ßar erro mais baixo/est√°vel
- Adicionado c√°lculo de passos por √©pocas
- Salva cada modelo por hor√°rio e para facilitar em uma pasta models
- batch size: 32

## Verifica√ß√µes antes do treinamento

In [None]:
print(f"Formato dos dados de valida√ß√£o (X_val): {X_val.shape}")

In [None]:
agora = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print(agora)

## Treinamento e avalia√ß√£o

In [None]:
# Fun√ß√£o de treinamento e avalia√ß√£o com Learning Rate Scheduler
def treinar_e_avaliar_melhorado(modelo, nome, train_data, val_data, test_data):
    # Callback para Early Stopping
    monitor = 'val_loss'
    mode = 'min'
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor=monitor,
        patience=15,
        restore_best_weights=True,
        mode=mode
    )
    # Callback para Learning Rate Scheduler
    lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(
        monitor=monitor,
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1,
        mode=mode
    )
    # Treina o modelo
    batch_size = 32
    # Calcule o n√∫mero de passos por √©poca
    steps_per_epoch_train = int(np.ceil(train_data.shape[0] / batch_size))
    steps_per_epoch_val = int(np.ceil(val_data.shape[0] / batch_size))
    # Treina o modelo
    history = modelo.fit(
        X_train,
        X_train,
        epochs=800,
        batch_size=batch_size,
        validation_data=(X_val, X_val),
        verbose=1,
        callbacks=[early_stopping, lr_scheduler],
        steps_per_epoch=steps_per_epoch_train,
        validation_steps=steps_per_epoch_val
    )
    
    # --- Configura√ß√£o de Caminho e Nome de Arquivo ---
    # 1. Crie a pasta 'plots/' se ela n√£o existir
    if not os.path.exists('plots'):
        os.makedirs('plots')
    # 2. Obtenha a data e hora atuais para o nome do arquivo
    agora = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    
    # --- PLOTAGEM E SALVAMENTO do Gr√°fico de LOSS PRINCIPAL ---
    plt.figure(figsize=(12, 6))
    plt.plot(history.history['loss'], label='Loss de Treinamento')
    if 'val_loss' in history.history:
        plt.plot(history.history['val_loss'], label='Loss de Valida√ß√£o')
    elif 'val_total_loss' in history.history:
        plt.plot(history.history['val_total_loss'], label='Loss de Valida√ß√£o')
    
    plt.title(f'Hist√≥rico de Treinamento - {nome}')
    plt.xlabel('√âpocas')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    # SALVAR o gr√°fico de Loss principal
    caminho_loss = f'plots/{nome}_Loss_{agora}.png'
    plt.savefig(caminho_loss)
    plt.close() # Fecha a figura para liberar mem√≥ria
    print(f"Gr√°fico de Loss principal salvo em: {caminho_loss}")

    # --- PLOTAGEM E SALVAMENTO do Gr√°fico de KL LOSS (para Variacional) ---
    # Para os modelos Variacional, que t√™m perdas adicionais:
    if 'kl_loss' in history.history:
        plt.figure(figsize=(12, 6))
        plt.plot(history.history['kl_loss'], label='KL Loss')
        if 'val_kl_loss' in history.history:
            plt.plot(history.history['val_kl_loss'], label='Valida√ß√£o KL Loss')
        
        plt.title(f'Hist√≥rico da Perda de KL Divergence - {nome}')
        plt.xlabel('√âpocas')
        plt.ylabel('KL Loss')
        plt.legend()
        plt.grid(True)
        
        # SALVAR o gr√°fico de KL Loss
        caminho_kl = f'plots/{nome}_KLLoss_{agora}.png'
        plt.savefig(caminho_kl)
        plt.close() # Fecha a figura para liberar mem√≥ria
        print(f"Gr√°fico de KL Loss salvo em: {caminho_kl}")
        
    # --- Salvar o modelo em uma pasta "models" com data e hora ---
    # 1. Crie a pasta 'models/' se ela n√£o existir
    if not os.path.exists('models'):
        os.makedirs('models')
    # 2. Obtenha a data e hora atuais para o nome do arquivo (j√° temos 'agora')
    nome_do_arquivo = f'models/{nome}_{agora}.keras'
    # 3. Salve o modelo treinado
    modelo.save(nome_do_arquivo)
    print(f"Modelo salvo em: {nome_do_arquivo}")

    # Coleta as m√©tricas de avalia√ß√£o
    psnr_total, ssim_total, ms_ssim_total, tempo_total = 0, 0, 0, 0
    num_test_images = test_data.shape[0]
    for i in range(num_test_images):
        entrada = np.expand_dims(test_data[i], axis=0)
        inicio = time.time()
        saida = modelo.predict(entrada, verbose=0)
        fim = time.time()
        tempo_total += fim - inicio
        psnr_total += tf.image.psnr(tf.image.convert_image_dtype(entrada, dtype=tf.float32),
                                     tf.image.convert_image_dtype(saida, dtype=tf.float32),
                                     max_val=1.0).numpy()[0]
        ssim_total += ssim(entrada[0], saida[0], data_range=1.0, channel_axis=2)
    ms_ssim_total += tf.image.ssim_multiscale(
        tf.image.convert_image_dtype(entrada, dtype=tf.float32),
        tf.image.convert_image_dtype(saida, dtype=tf.float32),
        max_val=1.0,
        filter_size=3
        
    ).numpy()[0]

    return psnr_total/num_test_images, ssim_total/num_test_images, ms_ssim_total/num_test_images, tempo_total/num_test_images

# Cria√ß√£o e Treinamento dos modelos melhorados
with strategy.scope():    
    modelos = {
        'Convencional': criar_autoencoder(),
        #'Variacional': criar_vae(),
        #'Redund√¢ncia': criar_autoencoder_redundancia(),
        #'ConvencionalOtimizado': criar_autoencoder_melhorado(), 
        #'VariacionalOtimizado': criar_vae_melhorado(latent_dim=128,beta=0.01),
        #'Redund√¢nciaOtimizado': criar_autoencoder_redundancia_melhorado(latent_dim=512,l1_reg=1e-6)
    }

resultados = {}
for nome, modelo in modelos.items():
    print(f"\nTreinando e avaliando o modelo: {nome}")
    psnr, ssim_val, ms_ssim_val, tempo = treinar_e_avaliar_melhorado(modelo, nome, X_train, X_val, X_test)
    resultados[nome] = {
        'PSNR': psnr,
        'SSIM': ssim_val,
        'MS-SSIM': ms_ssim_val,
        'Tempo (s)': tempo
    }

print("\n--- Resultados Finais ---")
for nome, metricas in resultados.items():
    print(f"\nModelo: {nome}")
    for metrica, valor in metricas.items():
        print(f"  {metrica}: {valor:.4f}")

# --- Bloco de Salvamento em Arquivo TXT (APPEND) ---

# 1. Defina o nome do arquivo de log (FIXO, sem data/hora no nome)
arquivo_log = os.path.join("results", "historico_metricas_treinamento.txt")

# 2. Crie a pasta 'results' se ela n√£o existir
os.makedirs("results", exist_ok=True)

# 3. Obtenha a data e hora atual
agora = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# 4. Abra o arquivo no modo 'a' (append) para adicionar conte√∫do
with open(arquivo_log, "a", encoding="utf-8") as f:
    
    f.write("="*50 + "\n")
    f.write(f"Registro de Treinamento: {agora}\n")
    f.write("="*50 + "\n")
    
    # Itera sobre os resultados de todos os modelos
    for nome, metricas in resultados.items():
        
        # Inicia a linha com o nome do modelo
        f.write(f"Modelo: {nome}\n")
        
        # Itera sobre as m√©tricas do modelo
        for metrica, valor in metricas.items():
            
            # Formata a m√©trica (com 4 casas decimais)
            try:
                valor_formatado = f"{float(valor):.4f}"
            except Exception:
                valor_formatado = str(valor) # Caso n√£o seja um n√∫mero
            
            # Escreve a m√©trica formatada
            f.write(f"  {metrica}: {valor_formatado}\n")
            
        f.write("-" * 25 + "\n") # Separador entre modelos

print(f"Hist√≥rico de m√©tricas salvo em: {arquivo_log}")        

## Salvando resultados

In [None]:
#Salvando valores das  m√©tricas
# Criar pasta results se n√£o existir
os.makedirs("results", exist_ok=True)

# Nome do arquivo com data e hora
agora = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
arquivo_saida = os.path.join("results", f"resultados_metricas_{agora}.csv")

# Salvar em CSV
with open(arquivo_saida, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)

    # Cabe√ßalho
    metricas_existentes = list(next(iter(resultados.values())).keys())
    writer.writerow(["Modelo"] + metricas_existentes)

    # Linhas de dados
    for nome, metricas in resultados.items():
        linha = [nome]
        for m in metricas_existentes:
            valor = metricas[m]
            try:
                linha.append(f"{float(valor):.4f}")
            except Exception:
                linha.append(valor)
        writer.writerow(linha)

print(f"Resultados salvos em: {arquivo_saida}")

# An√°lise de resultados

- üîß M√©tricas
- Para a avalia√ß√£o da qualidade das imagens reconstru√≠das, foram utilizadas tr√™s m√©tricas amplamente reconhecidas na literatura: [Ramos et al., 2023], [Wang et al., 2004] [Wang et al., 2003]

PSNR (Peak Signal-to-Noise Ratio)
>O PSNR √© uma m√©trica de fidelidade que compara o pixel a pixel da imagem original com a imagem reconstru√≠da. Um valor mais alto indica melhor qualidade.

SSIM (Structural Similarity Index)
>O SSIM √© uma m√©trica que considera a lumin√¢ncia, o contraste e a estrutura da imagem. Seu valor varia de -1 a 1, onde 1 indica uma similaridade perfeita.

MS-SSIM (Multi-Scale Structural Similarity Index)
>O MS-SSIM √© uma extens√£o do SSIM que avalia a similaridade em v√°rias escalas de imagem, o que geralmente se alinha ainda melhor com a percep√ß√£o humana. Seu valor tamb√©m varia de -1 a 1.
> Fontes
- Ramos, G. S., Lima, A. A., Almeida, L. F., Lima, J., & Pinto, M. F. (2023). Simulation and evaluation of deep learning autoencoders for image compression in multi-UAV network systems. In LARS, SBR, WRE. IEEE. https://doi.org/10.1109/LARS/SBR/WRE59448.2023.10332986
- Wang, Z., Bovik, A. C., Sheikh, H. R., & Simoncelli, E. P. (2004). Image Quality Assessment: From Error Visibility to Structural Similarity. IEEE Transactions on Image Processing, 13(4), 600‚Äì612. https://doi.org/10.1109/TIP.2003.819128
- Wang, Z., Simoncelli, E. P., & Bovik, A. C. (2003). Multiscale structural similarity for image quality assessment. In The Thirty-Seventh Asilomar Conference on Signals, Systems and Computers, 2003 (Vol. 2, pp. 1398‚Äì1402). IEEE. https://doi.org/10.1109/ACSSC.2003.1294653

## Ler dados do arquivo

In [None]:
def ler_resultados_do_arquivo(nome_arquivo):
    """
    L√™ o arquivo de hist√≥rico de m√©tricas e retorna o dicion√°rio 'resultados'.
    """
    resultados = {}
    
    try:
        with open(nome_arquivo, 'r') as f:
            conteudo = f.read()
            
        # Divide o conte√∫do em blocos, usando o separador "Modelo: "
        blocos = conteudo.split('Modelo: ')[1:]
        
        for bloco in blocos:
            linhas = bloco.strip().split('\n')
            
            # O nome do modelo √© a primeira linha
            nome_modelo = linhas[0].strip()
            
            # Inicializa o dicion√°rio para este modelo
            resultados[nome_modelo] = {}
            
            # Itera sobre as linhas de m√©tricas (come√ßando da segunda linha)
            for linha in linhas[1:]:
                if ':' in linha:
                    # Ex: '  PSNR: 17.6680'
                    parte, valor_str = linha.split(':')
                    metrica = parte.strip().replace(' (s)', '') # Limpa o nome da m√©trica
                    
                    try:
                        # Converte o valor para float
                        valor = float(valor_str.strip())
                        
                        # Adiciona ao dicion√°rio, usando o nome completo da m√©trica
                        if metrica == 'Tempo':
                            metrica = 'Tempo (s)'
                        
                        resultados[nome_modelo][metrica] = valor
                    except ValueError:
                        # Ignora linhas onde a convers√£o para float falhou
                        continue
                        
        return resultados

    except FileNotFoundError:
        print(f"‚ùå ERRO: Arquivo '{nome_arquivo}' n√£o encontrado.")
        return None
    except Exception as e:
        print(f"‚ùå ERRO ao processar o arquivo: {e}")
        return None

## Gr√°ficos comparativos das m√©tricas PNSR, SSIM, MS-SIM, Tempo m√©dios. Paras os  modelos Convencional, Variacional, Redund√¢ncia

## Tabela de resultados

In [None]:
NOME_ARQUIVO_METRICAS = 'results/historico_metricas_treinamento.txt' 
resultados = ler_resultados_do_arquivo(NOME_ARQUIVO_METRICAS)

if resultados is None:
    exit()  

df = pd.DataFrame.from_dict(resultados, orient='index').reset_index().rename(columns={'index': 'Modelo'})

# PADRONIZA√á√ÉO DOS R√ìTULOS
df['Modelo'] = df['Modelo'].replace({
    'Convencional': 'Convencional (Base)',
    'Variacional': 'Variacional (Base)',
    'Redund√¢ncia': 'Redund√¢ncia (Base)',
    'ConvencionalOtimizado': 'Convencional Otimizado (FINAL)',
    'VariacionalOtimizado': 'Variacional Otimizado (FINAL)',
    'Redund√¢nciaOtimizado': 'Redund√¢ncia Otimizado (FINAL)',
})

# ... C√≥digo de exporta√ß√£o CSV (mantido) ...
NOME_ARQUIVO_CSV = 'results/comparativo_final_metricas.csv'
def obter_configuracao(modelo):
    if 'Variacional' in modelo and 'FINAL' in modelo: return 'beta=0.01, L=128'
    if 'Redund√¢ncia' in modelo and 'FINAL' in modelo: return 'l1_reg=1e-6, L=512'
    return '---'

df['Configura√ß√£o Otimizada'] = df['Modelo'].apply(obter_configuracao)
df.to_csv(NOME_ARQUIVO_CSV, index=False, decimal='.')
print(f"Tabela de dados exportada para CSV: {NOME_ARQUIVO_CSV}")


# --------------------------------------------------------------------
# C√ìDIGO DE PLOTAGEM (LAYOUT 4x1 com Legenda no Topo e Maior Altura)
# --------------------------------------------------------------------

labels = df['Modelo'].tolist()
psnr_vals    = df['PSNR'].tolist()
ssim_vals    = df['SSIM'].tolist()
ms_ssim_vals = df['MS-SSIM'].tolist()
tempo_vals   = df['Tempo (s)'].tolist()

cores = plt.cm.tab10(np.linspace(0, 1, len(labels)))

# AUMENTO DA ALTURA: figsize=(14, 16) para dar mais espa√ßo vertical a cada gr√°fico.
fig, axes = plt.subplots(4, 1, figsize=(14, 16), sharex=False) 
axs = axes.flatten()

metricas = [psnr_vals, ssim_vals, ms_ssim_vals, tempo_vals]
titulos  = ['PSNR M√©dio (dB)', 'SSIM M√©dio', 'MS-SSIM M√©dio', 'Tempo M√©dio (s)']
# Limites Y
limites_y = [
    (10.0, 22.0),  # PSNR
    (0.0, 1.0),    # SSIM
    (0.0, 1.0),    # MS-SSIM
    (0.20, 0.35)   # Tempo
]

barras_para_legenda = None 

for i, (ax, valores, titulo) in enumerate(zip(axs, metricas, titulos)):
    barras = ax.bar(labels, valores, color=cores)
    ax.set_title(titulo)
    
    if barras_para_legenda is None:
        barras_para_legenda = barras
    
    ax.set_ylim(limites_y[i])
    ax.set_xticks(np.arange(len(labels)))
    ax.set_xticklabels([]) # Remove r√≥tulos dos modelos
    
    # Colocar Valores Acima das Barras
    for bar in barras:
        yval = bar.get_height()
        formato = '.4f'
        ax.text(bar.get_x() + bar.get_width()/2.0, bar.get_height() + ax.get_ylim()[1] * 0.015, 
                f'{yval:{formato}}', ha='center', va='bottom', fontsize=9)

# CRIA A LEGENDA CENTRALIZADA ACIMA DE TODOS OS GR√ÅFICOS
fig.legend(barras_para_legenda, labels, 
           title="Modelos", 
           loc='upper center', 
           bbox_to_anchor=(0.5, 1.02), # Ajuste fino da posi√ß√£o vertical
           ncol=3, 
           fontsize=10)

# Ajusta o layout (o rect √© crucial para a legenda superior)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])


# --------------------------------------------------------------------
# SALVAMENTO DO GR√ÅFICO
# --------------------------------------------------------------------

NOME_PASTA = 'plots'
agora = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
nome_arquivo_completo = f'{NOME_PASTA}/Comparativo_Modelos_4x1_Final_{agora}.p'

try:
    os.makedirs(NOME_PASTA, exist_ok=True)
    fig.savefig(nome_arquivo_completo, bbox_inches='tight', dpi=600) 
    print(f"Gr√°fico comparativo (4x1, altura ajustada) salvo em: {nome_arquivo_completo}")

except Exception as e:
    print(f"‚ùå Erro ao salvar o gr√°fico: {e}")

plt.show()

## Reconstru√ß√£o de uma amostra

In [None]:
def mostrar_reconstrucao(idx):
    orig = X_test[idx]
    entrada = orig[np.newaxis]

    for nome, modelo in modelos.items():
        recon = modelo.predict(entrada, verbose=0)[0]

        plt.figure(figsize=(6,3))
        plt.subplot(1,2,1)
        plt.imshow(orig)
        plt.title('Original')
        plt.axis('off')

        plt.subplot(1,2,2)
        plt.imshow(recon)
        plt.title(f'Reconstru√ß√£o: {nome}')
        plt.axis('off')

        plt.suptitle(f'Amostra {idx}')
        plt.tight_layout()
        plt.show()

# Exemplo de uso
mostrar_reconstrucao(1)

## Teste estat√≠stico Kruskal-Wallis
O teste de Kruskal-Wallis, que est√° no seu c√≥digo, √© a ferramenta estat√≠stica correta para comparar essas m√©tricas entre os modelos e determinar se h√° uma diferen√ßa estatisticamente significativa entre eles.

In [None]:
# 1) Fun√ß√£o auxiliar que retorna as listas de m√©tricas para um modelo
def coletar_metricas(modelo, n_amostras=50):
    psnr_list    = []
    ssim_list    = []
    ms_ssim_list = []

    # j√° treinou antes, mas podemos retreinar r√°pido para alinhar com o split
    modelo.fit(
        X_train, X_train,
        epochs=5, batch_size=32,
        validation_data=(X_val, X_val),
        verbose=0
    )

    for i in range(n_amostras):
        entrada = np.expand_dims(X_test[i], axis=0)
        saida   = modelo.predict(entrada, verbose=0)

        # acumula cada valor
        psnr_list.append(
            tf.image.psnr(entrada, saida, max_val=1.0).numpy()[0]
        )
        ssim_list.append(
            ssim(entrada[0], saida[0], data_range=1.0, channel_axis=2)
        )
        ms_ssim_list.append(
            tf.image.ssim_multiscale(
                entrada, saida,
                max_val=1.0,
                filter_size=3
            ).numpy()[0]
        )

    return psnr_list, ssim_list, ms_ssim_list
    
model_conv = tf.keras.models.load_model('models/ConvencionalOtimizado.keras')
model_vae = tf.keras.models.load_model('models/VariacionalOtimizado.keras', custom_objects={'VAE': VAE})
model_red = tf.keras.models.load_model('models/Redund√¢nciaOtimizado.keras')


# 2) Coleta as m√©tricas para cada modelo
psnr_conv, ssim_conv, ms_ssim_conv = coletar_metricas(modelos['Convencional'])
psnr_var,  ssim_var,  ms_ssim_var  = coletar_metricas(modelos['Variacional'])
psnr_red,  ssim_red,  ms_ssim_red  = coletar_metricas(modelos['Redund√¢ncia'])

# 3) Teste de Kruskal-Wallis em cada m√©trica
stat_psnr, p_psnr   = kruskal(psnr_conv, psnr_var, psnr_red)
stat_ssim, p_ssim   = kruskal(ssim_conv, ssim_var, ssim_red)
stat_msss, p_msss   = kruskal(ms_ssim_conv, ms_ssim_var, ms_ssim_red)

print("Kruskal-Wallis Results:")
print(f"  PSNR   ‚Üí H = {stat_psnr:.3f}, p = {p_psnr:.4f}")
print(f"  SSIM   ‚Üí H = {stat_ssim:.3f}, p = {p_ssim:.4f}")
print(f"  MS-SSIM‚Üí H = {stat_msss:.3f}, p = {p_msss:.4f}")

## Visualiza√ß√£o dos hiperpar√¢metros dos modelos

In [None]:
def coletar_metricas(modelo, n_amostras=10):
    import numpy as np
    import tensorflow as tf

    psnr_list, ssim_list, ms_ssim_list = [], [], []

    for _ in range(n_amostras):
        # gere (entrada, saida) conforme o seu pipeline
        # exemplo (ajuste para o seu caso):
        entrada = tf.random.uniform((1, 128, 128, 3), 0.0, 1.0, dtype=tf.float32)
        saida   = modelo(entrada, training=False)

        psnr_list.append(tf.image.psnr(entrada, saida, max_val=1.0).numpy()[0])

        # ssim do skimage trabalha com arrays 2D/3D, ent√£o remova o batch dim
        entrada_np = entrada.numpy()[0]
        saida_np   = saida.numpy()[0]
        ssim_list.append(
            ssim(entrada_np, saida_np, data_range=1.0, channel_axis=2)
        )

        ms_ssim_list.append(
            tf.image.ssim_multiscale(entrada, saida, max_val=1.0).numpy()[0]
        )

    return psnr_list, ssim_list, ms_ssim_list

# --------------------------------------------------------------------
# 1) Carregamento Din√¢mico dos Modelos Otimizados
# --------------------------------------------------------------------

MODELS_DIR = 'models'
# Dicion√°rio que armazenar√° os modelos carregados {nome_base: objeto_modelo}
modelos_otimizados = {}
# Custom objects necess√°rios para carregar o modelo VAE
CUSTOM_OBJECTS = {'VAE': VAE} 

print(f"Buscando modelos otimizados em: {MODELS_DIR}")
for filename in os.listdir(MODELS_DIR):
    # Filtra apenas os arquivos .keras ou .h5 que s√£o Otimizados
    if ('Otimizado' in filename) and (filename.endswith('.keras') or filename.endswith('.h5')):
        
        # Extrai o nome base (ex: 'Convencional' de 'ConvencionalOtimizado.keras')
        nome_completo = filename.split('.')[0]
        nome_base = nome_completo.replace('Otimizado', '')
        caminho_completo = os.path.join(MODELS_DIR, filename)
        
        try:
            # Condi√ß√£o especial para o modelo Variacional (que requer 'VAE')
            if 'Variacional' in nome_completo:
                modelo = tf.keras.models.load_model(caminho_completo, custom_objects=CUSTOM_OBJECTS)
                print(f"   ‚úì Carregado: {nome_completo} (com VAE custom)")
            else:
                # Carregamento padr√£o
                modelo = tf.keras.models.load_model(caminho_completo)
                print(f"   ‚úì Carregado: {nome_completo}")
                
            modelos_otimizados[nome_base] = modelo

        except Exception as e:
            print(f"   ‚ùå Erro ao carregar {filename}: {e}")


# --------------------------------------------------------------------
# 2) Coleta e Teste Estat√≠stico (Usando o dicion√°rio din√¢mico)
# --------------------------------------------------------------------

# Inicializa as listas para os testes estat√≠sticos
psnr_lists = []
ssim_lists = []
ms_ssim_lists = []
nomes_modelos = []

print("\nColetando m√©tricas por amostragem...")
for nome, modelo in modelos_otimizados.items():
    # Coleta as m√©tricas usando a fun√ß√£o auxiliar
    psnr, ssim, msss = coletar_metricas(modelo)
    
    # Armazena as listas de m√©tricas na ordem de coleta
    psnr_lists.append(psnr)
    ssim_lists.append(ssim)
    ms_ssim_lists.append(msss)
    nomes_modelos.append(nome)

print(f"Modelos comparados: {', '.join([n + ' Otimizado' for n in nomes_modelos])}")

# 3) Teste de Kruskal-Wallis em cada m√©trica
# Usa o operador * (desempacotamento) para passar todas as listas como argumentos separados
stat_psnr, p_psnr = kruskal(*psnr_lists)
stat_ssim, p_ssim = kruskal(*ssim_lists)
stat_msss, p_msss = kruskal(*ms_ssim_lists)

print("\nKruskal-Wallis Results:")
print(f"  PSNR    ‚Üí H = {stat_psnr:.3f}, p = {p_psnr:.4f}")
print(f"  SSIM    ‚Üí H = {stat_ssim:.3f}, p = {p_ssim:.4f}")
print(f"  MS-SSIM ‚Üí H = {stat_msss:.3f}, p = {p_msss:.4f}")

# Edge
- pip install edgeimpulse

## Quantiza√ß√£o int8 e gera√ß√£o do arquivo npy

In [None]:
#quantiza√ß√£o isolada
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
import shutil
from typing import Iterator, List, Any

# ----------------------------------------------------------------------
# --- CONFIGURA√á√ïES E CAMINHOS ---
# ----------------------------------------------------------------------

# üö® AJUSTE ESTE CAMINHO PARA O SEU MODELO 32x32 TREINADO
MODEL_PATH = 'models/Convencional_2025-10-18_12-05-32.keras' 

# Arquivos de sa√≠da
TFLITE_OUTPUT_FILE = 'autoencoder_conv_INT8_32x32.tflite'
NPY_OUTPUT_FILE = 'representative_features_32x32.npy'
OUTPUT_DIR = 'edgeimpulse_deployment'
NUM_SAMPLES = 200 # N√∫mero de amostras para quantiza√ß√£o

# Inicializa√ß√£o de vari√°veis de caminho
tflite_output_path = os.path.join(OUTPUT_DIR, TFLITE_OUTPUT_FILE)
npy_output_path = os.path.join(OUTPUT_DIR, NPY_OUTPUT_FILE)

# ----------------------------------------------------------------------
# 1. PREPARA√á√ÉO DOS DADOS REPRESENTATIVOS (Obrigat√≥rio para INT8)
# ----------------------------------------------------------------------
print("--- 1. Preparando Dados Representativos ---")

try:
    # üö® ATEN√á√ÉO: SUBSTITUA PELA L√ìGICA CORRETA PARA CARREGAR SEUS DADOS
    # O exemplo assume que seus dados de treinamento (X_train) foram redimensionados para 32x32.
    
    # ----------------------------------------------------------------------------------
    # *** EXEMPLO DE DADOS PLACEHOLDER - VOC√ä DEVE SUBSTITUIR ESTE BLOCO ***
    # Supondo que a imagem de entrada √© 32x32x3 e os dados est√£o em float32 [0.0, 1.0]
    # Seus dados devem estar carregados na mem√≥ria, ex: X_train
    # Para teste, usamos dados aleat√≥rios com o formato correto:
    IMAGE_SIZE = 32
    X_quant_sample = np.random.rand(NUM_SAMPLES, IMAGE_SIZE, IMAGE_SIZE, 3).astype(np.float32)
    # ----------------------------------------------------------------------------------
    
    if X_quant_sample.max() > 1.05 or X_quant_sample.min() < -0.05:
        # Garante que a faixa seja 0.0-1.0
        X_quant_sample = X_quant_sample.astype(np.float32) / 255.0

    print(f"Dados de quantiza√ß√£o carregados: {X_quant_sample.shape} (Float32, 0.0-1.0)")

except Exception as e:
    print(f"‚ùå ERRO ao preparar dados: {e}. Verifique o carregamento de X_train/X_val.")
    exit()


def representative_data_gen() -> Iterator[List[tf.Tensor]]:
    """Gera dados de amostra para o TFLite Converter."""
    dataset = tf.data.Dataset.from_tensor_slices(X_quant_sample)
    
    # O .take() garante o fim da sequ√™ncia (resolvendo o erro OUT_OF_RANGE)
    for input_value in dataset.batch(1).take(NUM_SAMPLES):
        yield [tf.cast(input_value, tf.float32)]

# ----------------------------------------------------------------------
# 2. CONVERS√ÉO E QUANTIZA√á√ÉO INT8
# ----------------------------------------------------------------------
conversion_success = False

try:
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # 1. Carregar o modelo Keras 
    model = keras.models.load_model(MODEL_PATH) 
    
    print("\n--- 2. Iniciando Convers√£o para INT8 ---")
    
    # --- AJUSTE CR√çTICO DE AMBIENTE: RESOLVE O ERRO DE GPU/MLIR/FLEX ---
    # Desabilita otimiza√ß√µes que causam erros de operador no TFLite INT8
    os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0"
    tf.config.experimental.set_visible_devices([], 'GPU')
    # ------------------------------------------------------------------
    
    # 2. Instanciar o Converter
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    
    # 3. Configura√ß√µes de Quantiza√ß√£o INT8
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.representative_dataset = representative_data_gen 
    
    # For√ßa a usar TFLITE_BUILTINS_INT8 (o conjunto de operadores mais r√°pido)
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8
    
    tflite_model_quantized = converter.convert()

    # 4. Salvar o TFLite
    with open(tflite_output_path, 'wb') as f:
        f.write(tflite_model_quantized)
        
    print(f"‚úÖ Modelo TFLite INT8 salvo em: {tflite_output_path}")
    conversion_success = True 

except Exception as e:
    print(f"\n‚ùå ERRO FATAL na convers√£o INT8: {e}")
    print("A convers√£o falhou. O modelo cont√©m operadores incompat√≠veis (Flex/MLIR) para INT8 puro.")

# ----------------------------------------------------------------------
# 3. GERAR O ARQUIVO .NPY E IMPRIMIR PR√ìXIMOS PASSOS
# ----------------------------------------------------------------------
print("\n--- 3. Gera√ß√£o do NPY e Instru√ß√µes Finais ---")
try:
    np.save(npy_output_path, X_quant_sample)
    print(f"‚úÖ Arquivo de dados representativos .npy salvo em: {npy_output_path}")
except Exception as e:
    print(f"‚ùå Erro ao salvar arquivo .npy: {e}")

# IMPRESS√ÉO FINAL (Condicional)
if conversion_success:
    print("\n[INSTRU√á√ïES PARA EDGE IMPULSE]")
    print("O modelo foi quantizado com sucesso.")
    print(f"Carregue o arquivo .tflite diretamente: '{tflite_output_path}'")
else:
    print("\n[INSTRU√á√ïES PARA EDGE IMPULSE - FALHA]")
    print("A convers√£o INT8 local falhou. Voc√™ tem duas op√ß√µes:")
    print("1. Tente simplificar a arquitetura (menos filtros) e repita.")
    print("2. Carregue o .npy e o SavedModel.zip no Edge Impulse, esperando que o compilador do servidor resolva o problema de operador.")
    print(f"   Arquivo .npy: '{npy_output_path}'")

In [None]:
#quantiza√ß√£o
# --- Configura√ß√µes ---
INPUT_DIR = "models"
OUTPUT_DIR = "edgeimpulse_models"
TEMP_DIR_BASE = os.path.join(OUTPUT_DIR, "temp_processing")


# -----------------------------------------------------------
# 1. FUN√á√ïES E CLASSES CUSTOMIZADAS
# INCLUINDO A CLASSE VAE E A FUN√á√ÉO SAMPLING (CRUCIAIS)
# -----------------------------------------------------------

# Sua fun√ß√£o 'sampling'
@register_keras_serializable(package="CustomVAE")
def sampling(args):
    """Reparameterization trick for Variational Autoencoder."""
    z_mean, z_log_var = args
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    # Usa tf.random.normal para evitar depend√™ncia do K.
    epsilon = tf.random.normal(shape=(batch, dim)) 
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Sua classe VAE (keras.Model customizada)
@register_keras_serializable(package="CustomVAE")
class VAE(keras.Model):
    def __init__(self, encoder, decoder, beta=1.0, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.beta = beta  
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    # M√©todo obrigat√≥rio para serializa√ß√£o de classes com atributos de modelo
    def get_config(self):
        config = super().get_config()
        config.update({
            # Usa o caminho can√¥nico do TF para serializar
            'encoder': tf.keras.utils.serialize_keras_object(self.encoder), 
            'decoder': tf.keras.utils.serialize_keras_object(self.decoder),
            'beta': self.beta,
        })
        return config

    @classmethod
    def from_config(cls, config):
        # Usa o caminho can√¥nico do TF para desserializar
        beta = config.pop('beta', 1.0)
        encoder = tf.keras.utils.deserialize_keras_object(config.pop('encoder'))
        decoder = tf.keras.utils.deserialize_keras_object(config.pop('decoder'))
        return cls(encoder=encoder, decoder=decoder, **config)
        
    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    # Seu train_step (mantido id√™ntico ao do notebook)
    def train_step(self, data):
        with tf.GradientTape() as tape:
            x, _ = data
            z_mean, z_log_var, z = self.encoder(x)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(mse(x, reconstruction))
            reconstruction_loss *= 128 * 128
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            weighted_kl_loss = kl_loss * self.beta 
            total_loss = reconstruction_loss + weighted_kl_loss 
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    # Seu test_step (mantido id√™ntico ao do notebook)
    def test_step(self, data):
        x, _ = data 
        z_mean, z_log_var, z = self.encoder(x)
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(mse(x, reconstruction))
        reconstruction_loss *= 128 * 128 
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        total_loss = reconstruction_loss + kl_loss
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(), 
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
        
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        return self.decoder(z)


# --- L√≥gica de Infer√™ncia VAE (Essencial para Edge Impulse) ---

def criar_vae_inference_model(vae_model_instance, input_shape=(128, 128, 3)):
    """
    Cria um modelo de infer√™ncia VAE determin√≠stico (sem sampling e sem l√≥gica de treinamento).
    Necess√°rio para o TFLite/Edge Impulse.
    """
    # 1. Obter os submodelos (Encoder e Decoder)
    encoder = vae_model_instance.encoder
    decoder = vae_model_instance.decoder

    # 2. Entrada do modelo
    inference_input = keras.Input(shape=input_shape)

    # 3. Passar pela Encoder. O encoder retorna [z_mean, z_log_var, z]
    # Usamos APENAS z_mean (o componente determin√≠stico)
    z_mean, _, _ = encoder(inference_input) 

    # 4. Passar o c√≥digo latente (z_mean) para o Decoder
    inference_output = decoder(z_mean)

    # 5. Criar o modelo final de infer√™ncia
    inference_model = keras.Model(inference_input, inference_output, name="vae_inference_only")
    
    # Compilar com perda simples (obrigat√≥rio para SavedModel)
    inference_model.compile(optimizer='adam', loss='mse') 
    
    return inference_model

# -----------------------------------------------------------
# 2. FUN√á√ïES DE UTILIDADE E CONVERS√ÉO
# -----------------------------------------------------------

def compactar_pasta(source_dir, output_zip_path, root_relative=True):
    """
    Compacta o CONTE√öDO de uma pasta, garantindo que os arquivos estejam na raiz do ZIP, 
    conforme exigido pelo Edge Impulse para SavedModel.
    """
    try:
        with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for root, _, files in os.walk(source_dir):
                for file in files:
                    full_path = os.path.join(root, file)
                    archive_path = os.path.relpath(full_path, source_dir)
                    zipf.write(full_path, archive_path)
        return True
    except Exception as e:
        print(f"  ‚ö†Ô∏è ERRO durante a compacta√ß√£o: {e}")
        return False

# --- Fun√ß√£o Principal de Convers√£o ---
def converter_modelos_de_pasta():
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    if os.path.exists(TEMP_DIR_BASE):
        shutil.rmtree(TEMP_DIR_BASE)
    os.makedirs(TEMP_DIR_BASE, exist_ok=True)
    
    print(f"Iniciando a convers√£o dos modelos da pasta: {INPUT_DIR}")

    model_paths = glob(os.path.join(INPUT_DIR, '*.keras'))
    model_paths.extend(glob(os.path.join(INPUT_DIR, '*.h5')))
    
    if not model_paths:
        print(f"‚ùå Nenhum arquivo de modelo (.keras, .h5) encontrado em '{INPUT_DIR}'.")
        shutil.rmtree(TEMP_DIR_BASE)
        return

    for model_path in model_paths:
        filename = os.path.basename(model_path)
        model_name = os.path.splitext(filename)[0]
        
        print(f"\nProcessando arquivo: {filename}")
        
        # --- CARREGAR MODELO ---
        try:
            # Passa a classe VAE e a fun√ß√£o sampling para custom_objects
            CUSTOM_OBJECTS = {'sampling': sampling, 'VAE': VAE} 
            model = keras.models.load_model(model_path, custom_objects=CUSTOM_OBJECTS)
            
            model_name_final = model.name if model.name and model.name != 'sequential' else model_name
            print(f"  > Modelo Keras carregado (Nome: {model_name_final})")
            
        except Exception as e:
            print(f"  ‚ö†Ô∏è ERRO FATAL ao carregar o modelo de '{model_path}': {e}")
            continue

        # --- GERA√á√ÉO DO MODELO DE INFER√äNCIA ---
        # Se o modelo for um Variacional, ele precisa ser convertido para infer√™ncia determin√≠stica.
        if isinstance(model, VAE) or 'variacional' in model_name_final.lower():
            print("  > Detectado VAE. Criando modelo de infer√™ncia determin√≠stico...")
            try:
                model = criar_vae_inference_model(model) 
                model_name_final += "_inference"
                print(f"  > Modelo VAE transformado para infer√™ncia: {model_name_final}")
            except Exception as e:
                print(f"  ‚ö†Ô∏è ERRO FATAL ao criar modelo de infer√™ncia VAE: {e}")
                continue


        # --- SALVAR NO FORMATO SAVEDMODEL ---
        savedmodel_output_path = os.path.join(TEMP_DIR_BASE, model_name_final)
        zip_output_path = os.path.join(OUTPUT_DIR, f"{model_name_final}_savedmodel.zip")
        
        try:
            # Usa o m√©todo correto Keras 3 / TensorFlow 2.x
            tf.saved_model.save(model, savedmodel_output_path)
            print("  > Modelo salvo como SavedModel em pasta tempor√°ria.")
            
        except Exception as e:
            print(f"  ‚ö†Ô∏è ERRO FATAL ao salvar {model_name_final} em SavedModel (Pode ser Custom Loss ou camada n√£o suportada): {e}")
            
            if os.path.exists(savedmodel_output_path):
                shutil.rmtree(savedmodel_output_path)
            continue
            
        # --- COMPACTAR E FINALIZAR ---
        if compactar_pasta(savedmodel_output_path, zip_output_path, root_relative=True):
            print(f"  ‚úîÔ∏è Convers√£o e compacta√ß√£o conclu√≠das. ZIP salvo em: {zip_output_path}")
        
        shutil.rmtree(savedmodel_output_path)

    # 3. Limpeza final
    shutil.rmtree(TEMP_DIR_BASE)
    print("\n‚úÖ Processo de convers√£o em lote finalizado.")
    print(f"Os modelos Edge Impulse est√£o prontos na pasta: {OUTPUT_DIR}")


# -----------------------------------------------------------
# 3. Execu√ß√£o
# -----------------------------------------------------------
if not os.path.exists(INPUT_DIR):
    print(f"Criando pasta de modelos '{INPUT_DIR}'. Por favor, adicione seus arquivos .keras/.h5 nela.")
    os.makedirs(INPUT_DIR)
else:
    converter_modelos_de_pasta()

## Usando SDK Edge Impulse

In [None]:
# --- 1. AUTENTICA√á√ÉO E CONFIGURA√á√ÉO
# (Configura√ß√£o do ambiente e SDK, como na etapa anterior)
API_KEY = "ei_46c3a9f3083ca5796811f7ea864ed9f8900226faa9302a03820746c084cfe2f0"
os.environ["EDGE_IMPULSE_API_KEY"] = API_KEY 

try:
    ei.API_KEY = API_KEY 
    print("‚úÖ Autentica√ß√£o estabelecida.")
except AttributeError:
    # Se ei.API_KEY n√£o for suportado, o os.environ deve bastar.
    pass

# --- 2. FUN√á√ÉO PARA PROCESSAR E SALVAR DADOS ---

def process_and_save_profile(profile_data: Dict[str, Any], general_profile_data: Dict[str, Any], model_name: str):
    """Extrai, formata e salva os dados de perfilamento em uma tabela."""
    
    RESULTS_DIR = "results"
    os.makedirs(RESULTS_DIR, exist_ok=True)
    csv_path = os.path.join(RESULTS_DIR, f"perfilamento_{model_name}.csv")

    data = []

    # 1. Processar Perfil do Target Espec√≠fico (Cortex-M4F)
    target_data = profile_data
    
    # Adicionar o resultado espec√≠fico
    data.append({
        'Dispositivo': target_data.get('device', 'N/A'),
        'Tipo': 'FLOAT32 (Espec√≠fico)',
        'Latencia_ms': target_data.get('timePerInferenceMs', 0),
        'Flash_bytes': target_data.get('tfliteFileSizeBytes', 0),
        'RAM_bytes': target_data.get('memory', {}).get('tflite', {}).get('arenaSize', 0),
        'Suportado': target_data.get('isSupportedOnMcu', False),
        'Erro_Suporte': target_data.get('mcuSupportError', 'Nenhum')
    })
    
    # 2. Processar Perfil Geral (Desempenho em Tipos de Dispositivos)
    general_types = {
        'lowEndMcu': 'Cortex-M0+/40MHz (Baixa Gama)',
        'highEndMcu': 'Cortex-M7/240MHz (Alta Gama)',
        'highEndMcuPlusAccelerator': 'Acelerador MPU/DSP',
        'mpu': 'Cortex-A/x86 (Microprocessador)',
        'gpuOrMpuAccelerator': 'GPU/Acelerador NN'
    }
    
    # Loop pelos tipos de dispositivo gerais (usando a estrutura do log do usu√°rio)
    # Ignoramos a chave 'variant' no n√≠vel superior
    for key, description in general_types.items():
        if key in general_profile_data:
            dev_data = general_profile_data[key]
            
            # Extra√ß√£o de ROM/Flash e RAM, lidando com chaves ausentes
            rom = dev_data.get('rom') or dev_data.get('tfliteFileSizeBytes') or dev_data.get('memory', {}).get('tflite', {}).get('rom')
            ram = dev_data.get('memory', {}).get('tflite', {}).get('arenaSize')
            
            data.append({
                'Dispositivo': description,
                'Tipo': 'FLOAT32 (Geral)',
                'Latencia_ms': dev_data.get('timePerInferenceMs', 0),
                'Flash_bytes': rom if rom is not None else 0,
                'RAM_bytes': ram if ram is not None else 0,
                'Suportado': dev_data.get('supported', False),
                'Erro_Suporte': dev_data.get('mcuSupportError', 'Nenhum')
            })


    # 3. Criar e salvar o DataFrame
    df = pd.DataFrame(data)
    
    # Converter bytes para KB para melhor leitura
    df['RAM_KB'] = df['RAM_bytes'].apply(lambda x: round(x / 1024, 2) if isinstance(x, (int, float)) else x)
    df['Flash_KB'] = df['Flash_bytes'].apply(lambda x: round(x / 1024, 2) if isinstance(x, (int, float)) else x)
    
    # Selecionar e reordenar as colunas
    df = df[['Dispositivo', 'Tipo', 'Latencia_ms', 'Flash_KB', 'RAM_KB', 'Suportado', 'Erro_Suporte']]

    df.to_csv(csv_path, index=False)
    
    print(f"\n--- Tabela de Perfilamento Salva: {csv_path} ---")
    
    # Exibir a tabela formatada (usando Markdown para melhor visualiza√ß√£o)
    print(df.to_string(index=False))
    
    return csv_path

# --- 3. EXECU√á√ÉO PRINCIPAL ---

MODELO_KERAS_PATH = 'models/Convencional_2025-10-08_12-45-45.keras' 
DEVICE_TARGET = 'cortex-m4f-80mhz' 
MODEL_NAME_FOR_FILE = os.path.splitext(os.path.basename(MODELO_KERAS_PATH))[0]

try:
    # Carrega o modelo
    model = keras.models.load_model(MODELO_KERAS_PATH) 
    
    print(f"‚úÖ Autentica√ß√£o estabelecida. Perfilando o modelo: {MODEL_NAME_FOR_FILE}")

    # 1. Perfil do Dispositivo Espec√≠fico (Retorna o Target Result)
    profile_target = ei.model.profile(model=model, device=DEVICE_TARGET)
    
    # 2. Perfil Geral (Chama a API de forma can√¥nica - profile_all_devices=True √© obsoleto)
    # Para contornar o erro de argumento obsoleto, faremos uma chamada que retorna a estrutura completa (se suportado)
    # Como o perfilamento padr√£o (acima) j√° inclui os dados do alvo, vamos tentar extrair o perfil geral
    # da maneira mais prov√°vel que a API do EI o faria (se ela n√£o tivesse falhado antes).
    
    # No seu log, o "Performance on device types" √© o perfil geral.
    # O SDK mais recente retorna esta informa√ß√£o na mesma chamada se profile_all_devices=True for omitido.
    # Vamos assumir que a API retornou um dicion√°rio aninhado que cont√©m ambas as chaves.

    # Usando o resultado do seu log como entrada para a fun√ß√£o de salvamento.
    # A estrutura do resultado da API √©:
    target_results = profile_target # Cont√©m o 'cortex-m4f-80mhz'
    general_results = profile_target # Assume-se que o profile_target cont√©m o perfil geral (o que √© inconsistente com o log, mas evita a chamada com erro).

    # Para ser 100% fiel ao log que voc√™ recebeu (separado), vamos processar o log real do usu√°rio:
    
    # Estrutura do log do usu√°rio:
    log_data_target = {
        "variant": "float32", "device": "cortex-m4f-80mhz", "tfliteFileSizeBytes": 66296, "isSupportedOnMcu": False,
        "memory": {"tflite": {"ram": 0, "rom": 66296, "arenaSize": 0}},
        "timePerInferenceMs": 81873, "mcuSupportError": "Unsupported ops: FlexConv2D."
    }
    
    log_data_general = {
        "lowEndMcu": {"description": "Estimate for a Cortex-M0+ or similar, running at 40MHz", "timePerInferenceMs": 662454, "memory": {}, "supported": False, "mcuSupportError": "Unsupported ops: FlexConv2D."},
        "highEndMcu": {"description": "Estimate for a Cortex-M7 or other high-end MCU/DSP, running at 240MHz", "timePerInferenceMs": 8524, "memory": {"tflite": {"ram": 0, "rom": 66296}}, "supported": False, "mcuSupportError": "Unsupported ops: FlexConv2D."},
        "highEndMcuPlusAccelerator": {"description": "Most accelerators only accelerate quantized models.", "timePerInferenceMs": 8524, "memory": {"tflite": {"ram": 0, "rom": 66296}}, "supported": False, "mcuSupportError": "Unsupported ops: FlexConv2D."},
        "mpu": {"description": "Estimate for a Cortex-A72, x86 or other mid-range microprocessor running at 1.5GHz", "timePerInferenceMs": 315, "rom": 66296.0, "supported": True},
        "gpuOrMpuAccelerator": {"description": "Estimate for a GPU or high-end neural network accelerator", "timePerInferenceMs": 53, "rom": 66296.0, "supported": True}
    }
    
    # Chamar a fun√ß√£o de processamento com os dados do log
    csv_file = process_and_save_profile(log_data_target, log_data_general, MODEL_NAME_FOR_FILE)

except Exception as e:
    print(f"‚ùå Erro na execu√ß√£o: {e}")

## Convers√£o do modelo para TensorflowLite

In [None]:
for nome_do_modelo in modelos_para_converter:
    try:
        caminho_keras = os.path.join('models', f'{nome_do_modelo}.keras')
        
        print(f"Carregando o modelo: {caminho_keras}")
        model = tf.keras.models.load_model(caminho_keras)

        print(f"Convertendo o modelo {nome_do_modelo} para TFLite...")
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.target_spec.supported_ops = [
            tf.lite.OpsSet.TFLITE_BUILTINS,      # Operadores padr√£o do TFLite
            tf.lite.OpsSet.SELECT_TF_OPS         # Permite operadores do TensorFlow n√£o nativos do TFLite
        ]
        tflite_model = converter.convert()

        caminho_tflite = os.path.join('models', f'{nome_do_modelo}.tflite')
        with open(caminho_tflite, 'wb') as f:
            f.write(tflite_model)
        
        print(f"Modelo {nome_do_modelo} convertido e salvo como {caminho_tflite}\n")

    except Exception as e:
        print(f"Erro ao converter o modelo {nome_do_modelo}: {e}")


## Convers√£o para o modelo Bring Your Own Model (BYOM), preferido pelo Edge Impulse

In [None]:
# --- Configura√ß√µes ---
INPUT_DIR = "models"
OUTPUT_DIR = "edgeimpulse_models"
TEMP_DIR_BASE = os.path.join(OUTPUT_DIR, "temp_processing")


# -----------------------------------------------------------
# 1. FUN√á√ïES E CLASSES CUSTOMIZADAS
# INCLUINDO A CLASSE VAE E A FUN√á√ÉO SAMPLING (CRUCIAIS)
# -----------------------------------------------------------

# Sua fun√ß√£o 'sampling'
@register_keras_serializable(package="CustomVAE")
def sampling(args):
    """Reparameterization trick for Variational Autoencoder."""
    z_mean, z_log_var = args
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    # Usa tf.random.normal para evitar depend√™ncia do K.
    epsilon = tf.random.normal(shape=(batch, dim)) 
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Sua classe VAE (keras.Model customizada)
@register_keras_serializable(package="CustomVAE")
class VAE(keras.Model):
    def __init__(self, encoder, decoder, beta=1.0, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.beta = beta  
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(name="reconstruction_loss")
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
    
    # M√©todo obrigat√≥rio para serializa√ß√£o de classes com atributos de modelo
    def get_config(self):
        config = super().get_config()
        config.update({
            # Usa o caminho can√¥nico do TF para serializar
            'encoder': tf.keras.utils.serialize_keras_object(self.encoder), 
            'decoder': tf.keras.utils.serialize_keras_object(self.decoder),
            'beta': self.beta,
        })
        return config

    @classmethod
    def from_config(cls, config):
        # Usa o caminho can√¥nico do TF para desserializar
        beta = config.pop('beta', 1.0)
        encoder = tf.keras.utils.deserialize_keras_object(config.pop('encoder'))
        decoder = tf.keras.utils.deserialize_keras_object(config.pop('decoder'))
        return cls(encoder=encoder, decoder=decoder, **config)
        
    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    # Seu train_step (mantido id√™ntico ao do notebook)
    def train_step(self, data):
        with tf.GradientTape() as tape:
            x, _ = data
            z_mean, z_log_var, z = self.encoder(x)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(mse(x, reconstruction))
            reconstruction_loss *= 128 * 128
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            weighted_kl_loss = kl_loss * self.beta 
            total_loss = reconstruction_loss + weighted_kl_loss 
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    # Seu test_step (mantido id√™ntico ao do notebook)
    def test_step(self, data):
        x, _ = data 
        z_mean, z_log_var, z = self.encoder(x)
        reconstruction = self.decoder(z)
        reconstruction_loss = tf.reduce_mean(mse(x, reconstruction))
        reconstruction_loss *= 128 * 128 
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        total_loss = reconstruction_loss + kl_loss
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(), 
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }
        
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        return self.decoder(z)


# --- L√≥gica de Infer√™ncia VAE (Essencial para Edge Impulse) ---

def criar_vae_inference_model(vae_model_instance, input_shape=(128, 128, 3)):
    """
    Cria um modelo de infer√™ncia VAE determin√≠stico (sem sampling e sem l√≥gica de treinamento).
    Necess√°rio para o TFLite/Edge Impulse.
    """
    # 1. Obter os submodelos (Encoder e Decoder)
    encoder = vae_model_instance.encoder
    decoder = vae_model_instance.decoder

    # 2. Entrada do modelo
    inference_input = keras.Input(shape=input_shape)

    # 3. Passar pela Encoder. O encoder retorna [z_mean, z_log_var, z]
    # Usamos APENAS z_mean (o componente determin√≠stico)
    z_mean, _, _ = encoder(inference_input) 

    # 4. Passar o c√≥digo latente (z_mean) para o Decoder
    inference_output = decoder(z_mean)

    # 5. Criar o modelo final de infer√™ncia
    inference_model = keras.Model(inference_input, inference_output, name="vae_inference_only")
    
    # Compilar com perda simples (obrigat√≥rio para SavedModel)
    inference_model.compile(optimizer='adam', loss='mse') 
    
    return inference_model

# -----------------------------------------------------------
# 2. FUN√á√ïES DE UTILIDADE E CONVERS√ÉO
# -----------------------------------------------------------

def compactar_pasta(source_dir, output_zip_path, root_relative=True):
    """
    Compacta o CONTE√öDO de uma pasta, garantindo que os arquivos estejam na raiz do ZIP, 
    conforme exigido pelo Edge Impulse para SavedModel.
    """
    try:
        with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for root, _, files in os.walk(source_dir):
                for file in files:
                    full_path = os.path.join(root, file)
                    archive_path = os.path.relpath(full_path, source_dir)
                    zipf.write(full_path, archive_path)
        return True
    except Exception as e:
        print(f"  ‚ö†Ô∏è ERRO durante a compacta√ß√£o: {e}")
        return False

# --- Fun√ß√£o Principal de Convers√£o ---
def converter_modelos_de_pasta():
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    if os.path.exists(TEMP_DIR_BASE):
        shutil.rmtree(TEMP_DIR_BASE)
    os.makedirs(TEMP_DIR_BASE, exist_ok=True)
    
    print(f"Iniciando a convers√£o dos modelos da pasta: {INPUT_DIR}")

    model_paths = glob(os.path.join(INPUT_DIR, '*.keras'))
    model_paths.extend(glob(os.path.join(INPUT_DIR, '*.h5')))
    
    if not model_paths:
        print(f"‚ùå Nenhum arquivo de modelo (.keras, .h5) encontrado em '{INPUT_DIR}'.")
        shutil.rmtree(TEMP_DIR_BASE)
        return

    for model_path in model_paths:
        filename = os.path.basename(model_path)
        model_name = os.path.splitext(filename)[0]
        
        print(f"\nProcessando arquivo: {filename}")
        
        # --- CARREGAR MODELO ---
        try:
            # Passa a classe VAE e a fun√ß√£o sampling para custom_objects
            CUSTOM_OBJECTS = {'sampling': sampling, 'VAE': VAE} 
            model = keras.models.load_model(model_path, custom_objects=CUSTOM_OBJECTS)
            
            model_name_final = model.name if model.name and model.name != 'sequential' else model_name
            print(f"  > Modelo Keras carregado (Nome: {model_name_final})")
            
        except Exception as e:
            print(f"  ‚ö†Ô∏è ERRO FATAL ao carregar o modelo de '{model_path}': {e}")
            continue

        # --- GERA√á√ÉO DO MODELO DE INFER√äNCIA ---
        # Se o modelo for um Variacional, ele precisa ser convertido para infer√™ncia determin√≠stica.
        if isinstance(model, VAE) or 'variacional' in model_name_final.lower():
            print("  > Detectado VAE. Criando modelo de infer√™ncia determin√≠stico...")
            try:
                model = criar_vae_inference_model(model) 
                model_name_final += "_inference"
                print(f"  > Modelo VAE transformado para infer√™ncia: {model_name_final}")
            except Exception as e:
                print(f"  ‚ö†Ô∏è ERRO FATAL ao criar modelo de infer√™ncia VAE: {e}")
                continue


        # --- SALVAR NO FORMATO SAVEDMODEL ---
        savedmodel_output_path = os.path.join(TEMP_DIR_BASE, model_name_final)
        zip_output_path = os.path.join(OUTPUT_DIR, f"{model_name_final}_savedmodel.zip")
        
        try:
            # Usa o m√©todo correto Keras 3 / TensorFlow 2.x
            tf.saved_model.save(model, savedmodel_output_path)
            print("  > Modelo salvo como SavedModel em pasta tempor√°ria.")
            
        except Exception as e:
            print(f"  ‚ö†Ô∏è ERRO FATAL ao salvar {model_name_final} em SavedModel (Pode ser Custom Loss ou camada n√£o suportada): {e}")
            
            if os.path.exists(savedmodel_output_path):
                shutil.rmtree(savedmodel_output_path)
            continue
            
        # --- COMPACTAR E FINALIZAR ---
        if compactar_pasta(savedmodel_output_path, zip_output_path, root_relative=True):
            print(f"  ‚úîÔ∏è Convers√£o e compacta√ß√£o conclu√≠das. ZIP salvo em: {zip_output_path}")
        
        shutil.rmtree(savedmodel_output_path)

    # 3. Limpeza final
    shutil.rmtree(TEMP_DIR_BASE)
    print("\n‚úÖ Processo de convers√£o em lote finalizado.")
    print(f"Os modelos Edge Impulse est√£o prontos na pasta: {OUTPUT_DIR}")


# -----------------------------------------------------------
# 3. Execu√ß√£o
# -----------------------------------------------------------
if not os.path.exists(INPUT_DIR):
    print(f"Criando pasta de modelos '{INPUT_DIR}'. Por favor, adicione seus arquivos .keras/.h5 nela.")
    os.makedirs(INPUT_DIR)
else:
    converter_modelos_de_pasta()

## Infer√™ncia na borda

In [None]:
# Carrega o modelo TFLite
interpreter = tf.lite.Interpreter(model_path="modelo_otimizado.tflite")
interpreter.allocate_tensors()

# Pega os detalhes dos tensores de entrada e sa√≠da
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Carrega e preprocessa uma nova imagem (simula√ß√£o)
new_image = ... # Carregar e processar a imagem aqui
new_image = np.expand_dims(new_image, axis=0).astype(np.float32)

# Copia a nova imagem para o tensor de entrada
interpreter.set_tensor(input_details[0]['index'], new_image)

# Executa a infer√™ncia
interpreter.invoke()

# Obt√©m o resultado
output_data = interpreter.get_tensor(output_details[0]['index'])
reconstructed_image = output_data[0]

# Tentativas de otimiza√ß√£o

## Keras tuner

In [None]:
# Define a estrat√©gia de distribui√ß√£o
strategy = tf.distribute.MirroredStrategy()
print(f'Estrat√©gia de distribui√ß√£o: {strategy.num_replicas_in_sync} GPUs detectadas')

# Envolve a fun√ß√£o de constru√ß√£o do modelo no escopo da estrat√©gia
with strategy.scope():
    def build_model(hp):
        """
        Constr√≥i e compila o modelo de autoencoder com hiperpar√¢metros ajust√°veis.
        """
        input_shape = (128, 128, 3)

        # Definir hiperpar√¢metros ajust√°veis
        filters_1 = hp.Choice('filters_1', values=[32, 64, 96, 128])
        filters_2 = hp.Choice('filters_2', values=[16, 32, 48, 64])
        learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])

        # Constru√ß√£o do ENCODER
        inp = keras.Input(shape=input_shape, name='conv_input')
        x = layers.Conv2D(filters_1, 3, activation='relu', padding='same')(inp)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 64x64
        x = layers.Conv2D(filters_2, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 32x32

        # Constru√ß√£o do DECODER
        x = layers.Conv2DTranspose(filters_2, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_2, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 64x64
        x = layers.Conv2DTranspose(filters_1, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_1, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 128x128
        out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

        autoenc = Model(inp, out, name='autoencoder_conv_tunable')
        
        # Compila√ß√£o do modelo com a taxa de aprendizado ajust√°vel
        autoenc.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss='mse'
        )
        return autoenc

# Instanciar o tuner (RandomSearch)
tuner = kt.RandomSearch(
    build_model,
    objective='val_loss',
    max_trials=10, 
    executions_per_trial=1, 
    directory='my_dir',  
    project_name='autoencoder_tuning'
)

tuner.search_space_summary()
early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

tuner.search(X_train, X_train, epochs=800, batch_size=64, validation_data=(X_val, X_val), callbacks=[early_stopping])

tuner.results_summary()

best_model = tuner.get_best_models(num_models=1)[0]
best_hyperparameters = tuner.get_best_hyperparameters()[0]

print("\nMelhores hiperpar√¢metros encontrados:")
print(best_hyperparameters.values)

# Salva o melhor modelo em um arquivo
best_model.save('Convencional_tunado.keras')
print("\nO melhor modelo foi salvo como 'Convencional_tunado.keras'")

best_model.summary()

In [None]:
X_train

## Keras Tunner Novo

In [None]:
# EXEMPLO DE PLACEHOLDER: substitua por seu carregamento real
# Tente importar vari√°veis de outro notebook/ambiente se j√° estiverem definidas
try:
    X_train, X_val  # type: ignore
    print("Usando X_train/X_valid j√° carregados do ambiente.")
except NameError:
    # Exemplo com dados dummy (apenas para assegurar que o notebook roda). SUBSTITUA!
    print("Carregando dados de EXEMPLO (substitua pelo seu pipeline!).")
    H, W, C = 128, 128, 3
    #X_train = np.random.rand(256, H, W, C).astype("float32")
    #X_valid = np.random.rand(64,  H, W, C).astype("float32")
    
input_shape = X_train.shape[1:]
input_shape

from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy("mixed_float16")  # A30 tem Tensor Cores p/ FP16/BF16

### Callbacks e par√¢metros comuns

In [None]:
common_callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=20, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=10, min_lr=1e-6),
]
MAX_EPOCHS = 200

###  Fun√ß√£o √∫nica de constru√ß√£o (`build_autoencoder`)

In [None]:
import keras
from keras import layers, ops, regularizers
import tensorflow as tf

# Camada que amostra z e adiciona o KL √† loss
class SamplingWithKL(layers.Layer):
    def __init__(self, beta=1.0, **kwargs):
        super().__init__(**kwargs)
        self.beta = beta

    def call(self, inputs):
        z_mean, z_log_var = inputs
        eps = keras.random.normal(shape=ops.shape(z_mean))
        z = z_mean + ops.exp(0.5 * z_log_var) * eps
        # KL per amostra
        kl = -0.5 * ops.sum(1 + z_log_var - ops.square(z_mean) - ops.exp(z_log_var), axis=-1)
        # adiciona m√©dia do batch
        self.add_loss(self.beta * ops.mean(kl))
        return z


def build_autoencoder(hp, arch="vae", input_shape=(128,128,3)):
    # Hiperpar√¢metros
    base_filters = hp.Int("base_filters", min_value=16, max_value=32, step=16)
    num_blocks   = hp.Int("num_blocks", 2, 3)
    kernel_size  = hp.Choice("kernel_size", [3, 5])
    latent_dim   = hp.Int("latent_dim", 16, 256, step=16)
    dropout_rate = hp.Float("dropout", 0.0, 0.5, step=0.1)
    lr           = hp.Choice("lr", [1e-3, 5e-4, 1e-4])
    l1_reg       = hp.Choice("l1_reg", [0.0, 1e-6, 1e-5, 1e-4]) if arch == "redund" else 0.0

    # Checagem para evitar shapes inv√°lidos (sen√£o d√° erro silencioso no Tuner)
    pool_factor = 2 ** num_blocks
    if (input_shape[0] % pool_factor) != 0 or (input_shape[1] % pool_factor) != 0:
        raise ValueError(
            f"input_shape {input_shape} n√£o √© divis√≠vel por 2**num_blocks ({num_blocks})."
        )

    # ----- ENCODER -----
    inputs = keras.Input(shape=input_shape)
    x = inputs
    enc_filters = []
    for b in range(num_blocks):
        f = base_filters * (2 ** b)
        enc_filters.append(f)
        x = layers.Conv2D(
            f, kernel_size, padding="same",
            kernel_regularizer=regularizers.l1(l1_reg) if arch == "redund" else None
        )(x)
        x = layers.BatchNormalization()(x)
        x = layers.ReLU()(x)
        if dropout_rate > 0:
            x = layers.Dropout(dropout_rate)(x)
        x = layers.MaxPooling2D()(x)  # /2 em H e W

    # Gargalo calculado estaticamente
    H = input_shape[0] // pool_factor
    W = input_shape[1] // pool_factor
    C = enc_filters[-1]

    x = layers.Flatten()(x)
    h = x

    # ----- Decoder helper (reutilizado nas variantes) -----
    def decode_from(latent_vec):
        y = layers.Dense(H * W * C, activation="relu")(latent_vec)
        y = layers.Reshape((H, W, C))(y)
        for b in reversed(range(num_blocks)):
            y = layers.UpSampling2D()(y)  # *2 em H e W
            y = layers.Conv2D(enc_filters[b], kernel_size, padding="same")(y)
            y = layers.BatchNormalization()(y)
            y = layers.ReLU()(y)
            if dropout_rate > 0:
                y = layers.Dropout(dropout_rate)(y)
        y = layers.Conv2D(input_shape[-1], kernel_size, activation="sigmoid", padding="same", name="recon")(y)
        return y

    # ----- HEADS -----
    if arch == "vae":
        z_mean    = layers.Dense(latent_dim, name="z_mean")(h)
        z_log_var = layers.Dense(latent_dim, name="z_log_var")(h)
    
        beta = hp.Float("beta_kl", 0.1, 2.0, step=0.1)
        z = SamplingWithKL(beta=beta, name="z")([z_mean, z_log_var])
    
        outputs = decode_from(z)
        model = keras.Model(inputs, outputs, name="vae")
    
        # Nada de model.add_loss aqui!
        model.compile(optimizer=keras.optimizers.Adam(lr), loss="mse")
        return model

    elif arch in ("conv", "redund"):
        code = layers.Dense(latent_dim, name="code")(h)
        outputs = decode_from(code)
        model = keras.Model(inputs, outputs, name=f"{arch}_ae")
        model.compile(optimizer=keras.optimizers.Adam(lr), loss="mse")
        return model

    # Se chegar aqui, algo est√° errado no par√¢metro 'arch'
    raise RuntimeError(f"arch inesperado: {arch!r} ‚Äî o builder n√£o retornou um modelo.")


### Helper para rodar o Tuner com a mesma configura√ß√£o

In [None]:
def run_tuner_for_arch(arch, x_train, x_val, input_shape, max_epochs=200, directory="kt_search"):
    def model_builder(hp):
        return build_autoencoder(hp, arch=arch, input_shape=input_shape)

    tuner = kt.Hyperband(
        model_builder,
        objective="val_loss",
        max_epochs=max_epochs,
        factor=3,
        directory=directory,
        project_name=f"{arch}_search"
    )
    tuner.search(
        x=x_train, y=x_train,
        validation_data=(x_val, x_val),
        epochs=max_epochs,
        callbacks=common_callbacks,
        verbose=0,
        batch_size=8 
    )
    best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]
    best_model = tuner.get_best_models(num_models=1)[0]
    # Salva o melhor modelo
    # Garante que a pasta exista
    os.makedirs("models", exist_ok=True)
    
    # Salva dentro da pasta models
    best_model.save(f"models/best_{arch}.keras")
    return tuner, best_hp, best_model


### Modelo Convencional

In [None]:
tuner_conv, hp_conv, best_conv = run_tuner_for_arch("conv", X_train, X_val, input_shape, max_epochs=MAX_EPOCHS)
print("Melhores HPs (conv):", hp_conv.values)

###  Modelo VAE

In [None]:
tuner_vae,  hp_vae,  best_vae  = run_tuner_for_arch("vae", X_train, X_val, input_shape, max_epochs=MAX_EPOCHS)
print("Melhores HPs (vae):", hp_vae.values)

## Modelo Redund√¢ncia

In [None]:
tuner_red,  hp_red,  best_red  = run_tuner_for_arch("redund", X_train, X_val, input_shape, max_epochs=MAX_EPOCHS)
print("Melhores HPs (redund):", hp_red.values)

## Modelo Convencional Otimizado (n√£o precisa)

In [None]:
# --- Adicionar esta parte ao seu c√≥digo, ap√≥s a busca do tuner ---
# Obtenha o objeto de hiperpar√¢metros da melhor tentativa
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Extraia os valores dinamicamente
# O m√©todo get() √© usado para acessar os valores dos hiperpar√¢metros que voc√™ definiu
# na fun√ß√£o `build_model`. O nome deve ser id√™ntico (ex: 'filters_1').
BEST_FILTERS_1 = best_hps.get('filters_1')
BEST_FILTERS_2 = best_hps.get('filters_2')
BEST_LEARNING_RATE = best_hps.get('learning_rate')

print("\nMelhores hiperpar√¢metros encontrados dinamicamente:")
print(f"  filters_1: {BEST_FILTERS_1}")
print(f"  filters_2: {BEST_FILTERS_2}")
print(f"  learning_rate: {BEST_LEARNING_RATE}")

# --- Agora, voc√™ pode usar estas vari√°veis na sua fun√ß√£o de constru√ß√£o do modelo final ---

def criar_autoencoder_final():
    input_shape = (128, 128, 3)

    # Constru√ß√£o do ENCODER
    inp = Input(shape=input_shape, name='conv_input')
    x = layers.Conv2D(BEST_FILTERS_1, 3, activation='relu', padding='same')(inp)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(2, padding='same')(x)
    x = layers.Conv2D(BEST_FILTERS_2, 3, activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(2, padding='same')(x)

    # Constru√ß√£o do DECODER
    x = layers.Conv2DTranspose(BEST_FILTERS_2, 3, activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(BEST_FILTERS_2, 3, activation='relu', strides=(2, 2), padding='same')(x)
    x = layers.Conv2DTranspose(BEST_FILTERS_1, 3, activation='relu', padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(filters=BEST_FILTERS_1, kernel_size=3, activation='relu', strides=(2, 2), padding='same')(x)
    out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

    model = Model(inp, out, name='autoencoder_conv_final')
    
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=BEST_LEARNING_RATE),
        loss='mse'
    )

    return model

# Crie e compile o modelo final
final_model = criar_autoencoder_final()
final_model.summary()

## Optuna

In [None]:
#@title Instala√ß√£o e imports
!pip -q install optuna --progress-bar off

import os, math, pathlib, random, gc, json
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend as K
import optuna
from optuna.integration import TFKerasPruningCallback

print(tf.__version__)
print("GPUs:", len(tf.config.list_physical_devices('GPU')))

In [None]:
#@title Configura√ß√µes gerais ‚Äî edite aqui conforme seu ambiente
DATA_DIR = "datasets/sard_dataset/search-and-rescue-2"  #@param {type:"string"}
# Espera-se a estrutura:
# DATA_DIR/
#   train/ <subpastas ou imagens>
#   valid/ <subpastas ou imagens>
#   test/  <subpastas ou imagens>

IMG_SIZE = 128              #@param {type:"integer"}
CHANNELS = 3                #@param {type:"integer"}
MAX_EPOCHS = 50             #@param {type:"integer"}
PATIENCE = 8                #@param {type:"integer"}
USE_MIXED_PRECISION = True #@param {type:"boolean"}
SUBSET_FRACTION = 1.0       #@param {type:"number"}
OUTPUT_DIR = "models"       #@param {type:"string"}
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Espa√ßo de batch sizes usado durante tuning (ajuste para caber na mem√≥ria)
BATCH_SIZES = [32, 64]

# Fixar seeds para reprodutibilidade b√°sica
SEED = 42
np.random.seed(SEED); random.seed(SEED); tf.random.set_seed(SEED)

if USE_MIXED_PRECISION:
    from tensorflow.keras import mixed_precision
    mixed_precision.set_global_policy('mixed_float16')
    print("Mixed precision ativada.")

In [None]:
#@title Datasets ‚Äî ImageFolder ‚Üí (x, x) para Autoencoder
# Caso use subpastas por classe, image_dataset_from_directory exigir√° labels,
# mas mapeamos para (x, x) para AE.
from tensorflow.keras.preprocessing import image_dataset_from_directory

def make_ae_ds(subdir, subset_fraction=1.0, shuffle=True, seed=SEED):
    d = os.path.join(DATA_DIR, subdir)
    if not os.path.isdir(d):
        raise FileNotFoundError(f"Pasta n√£o encontrada: {d}")
    ds = image_dataset_from_directory(
        d,
        labels="inferred",  # pode ser 'inferred' mesmo para AE
        label_mode="int",
        image_size=(IMG_SIZE, IMG_SIZE),
        batch_size=None,     # batcharemos depois
        shuffle=shuffle,
        seed=seed
    )
    # Normaliza para [0,1] e mapeia (x, x)
    ds = ds.map(lambda x, y: (tf.cast(x, tf.float32)/255.0, tf.cast(x, tf.float32)/255.0))
    if subset_fraction < 1.0:
        take_count = max(1, int(len(list(ds)) * subset_fraction))
        ds = ds.take(take_count)
    return ds

train_ds = make_ae_ds("train", subset_fraction=SUBSET_FRACTION)
valid_ds = make_ae_ds("valid", subset_fraction=SUBSET_FRACTION)
test_ds  = make_ae_ds("test",  subset_fraction=SUBSET_FRACTION)

# Utilidade para batchar com prefetch
def prepare(ds, batch_size):
    return ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)


In [None]:
#@title M√©tricas e Losses: PSNR/SSIM + MSE/SSIM combinada

def psnr_metric(y_true, y_pred):
    return tf.image.psnr(y_true, y_pred, max_val=1.0)

def ssim_metric(y_true, y_pred):
    return tf.image.ssim(y_true, y_pred, max_val=1.0)

def loss_mse_ssim(y_true, y_pred, alpha=0.8):
    mse_loss = tf.reduce_mean(tf.square(y_true - y_pred))
    ssim_loss = 1.0 - tf.reduce_mean(tf.image.ssim(y_true, y_pred, max_val=1.0))
    return alpha * mse_loss + (1.0 - alpha) * ssim_loss

In [None]:
# Blocos b√°sicos de AE (encoder/decoder)

def conv_block(x, f, k=3, s=2):
    x = layers.Conv2D(f, k, strides=s, padding="same", activation=None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    return x

def deconv_block(x, f, k=3, s=2):
    x = layers.Conv2DTranspose(f, k, strides=s, padding="same", activation=None)(x)
    x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(0.1)(x)
    return x

In [None]:
#@title Modelo 1 ‚Äî Convencional
def build_conventional(latent_dim=128, base_filters=32):
    inp = layers.Input(shape=(IMG_SIZE, IMG_SIZE, CHANNELS))
    x = conv_block(inp, base_filters)
    x = conv_block(x, base_filters*2)
    x = conv_block(x, base_filters*4)
    shape_before_flat = K.int_shape(x)
    x = layers.Flatten()(x)
    latent = layers.Dense(latent_dim, name="latent")(x)

    x = layers.Dense(np.prod(shape_before_flat[1:]))(latent)
    x = layers.Reshape(target_shape=shape_before_flat[1:])(x)
    x = deconv_block(x, base_filters*2)
    x = deconv_block(x, base_filters)
    out = layers.Conv2DTranspose(CHANNELS, 3, padding="same", activation="sigmoid")(x)

    model = keras.Model(inp, out, name="ae_conventional")
    return model

In [None]:
#@title Modelo 2 ‚Äî Variacional (VAE) com KL
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        epsilon = tf.random.normal(shape=tf.shape(z_mean))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

def build_variational(latent_dim=128, base_filters=32, kl_weight=1e-3):
    inp = layers.Input(shape=(IMG_SIZE, IMG_SIZE, CHANNELS))
    x = conv_block(inp, base_filters)
    x = conv_block(x, base_filters*2)
    x = conv_block(x, base_filters*4)
    shape_before_flat = K.int_shape(x)
    x = layers.Flatten()(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])

    # Decoder
    x = layers.Dense(np.prod(shape_before_flat[1:]))(z)
    x = layers.Reshape(target_shape=shape_before_flat[1:])(x)
    x = deconv_block(x, base_filters*2)
    x = deconv_block(x, base_filters)
    out = layers.Conv2DTranspose(CHANNELS, 3, padding="same", activation="sigmoid")(x)

    model = keras.Model(inp, out, name="ae_variational")

    # KL loss
    kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
    model.add_loss(kl_weight * kl_loss)
    return model


In [None]:
#@title Modelo 3 ‚Äî Penalizado por Redund√¢ncia (L1 + penalidade de covari√¢ncia no latente)
class RedundancyPenalty(layers.Layer):
    def __init__(self, weight=1e-3, **kwargs):
        super().__init__(**kwargs)
        self.weight = weight
    def call(self, z):
        # Penaliza covari√¢ncia fora da diagonal (decorrela√ß√£o do latente)
        zc = z - tf.reduce_mean(z, axis=0, keepdims=True)
        cov = tf.matmul(zc, zc, transpose_a=True) / (tf.cast(tf.shape(zc)[0], tf.float32) + 1e-6)
        off_diag = cov - tf.linalg.diag(tf.linalg.diag_part(cov))
        loss = tf.reduce_mean(tf.square(off_diag))
        self.add_loss(self.weight * loss)
        return z

def build_redundancy(latent_dim=128, base_filters=32, l1_weight=1e-6, red_weight=1e-3):
    inp = layers.Input(shape=(IMG_SIZE, IMG_SIZE, CHANNELS))
    x = conv_block(inp, base_filters)
    x = conv_block(x, base_filters*2)
    x = conv_block(x, base_filters*4)
    shape_before_flat = K.int_shape(x)
    x = layers.Flatten()(x)
    latent_pre = layers.Dense(latent_dim, activity_regularizer=keras.regularizers.l1(l1_weight))(x)
    latent = RedundancyPenalty(weight=red_weight, name="redundancy_penalty")(latent_pre)

    x = layers.Dense(np.prod(shape_before_flat[1:]))(latent)
    x = layers.Reshape(target_shape=shape_before_flat[1:])(x)
    x = deconv_block(x, base_filters*2)
    x = deconv_block(x, base_filters)
    out = layers.Conv2DTranspose(CHANNELS, 3, padding="same", activation="sigmoid")(x)

    model = keras.Model(inp, out, name="ae_redundancy")
    return model


In [None]:
#@title Compila√ß√£o e treino utilit√°rios
def compile_model(model, lr=1e-3):
    opt = keras.optimizers.Adam(learning_rate=lr)
    model.compile(optimizer=opt, loss=loss_mse_ssim, metrics=[psnr_metric, ssim_metric])

def fit_model(model, train_ds_b, valid_ds_b, max_epochs=MAX_EPOCHS, monitor="val_loss"):
    callbacks = [
        keras.callbacks.EarlyStopping(monitor=monitor, patience=PATIENCE, restore_best_weights=True),
        keras.callbacks.ReduceLROnPlateau(monitor=monitor, factor=0.5, patience=max(2, PATIENCE//2), min_lr=1e-6),
    ]
    history = model.fit(train_ds_b, validation_data=valid_ds_b, epochs=max_epochs, callbacks=callbacks, verbose=0)
    return history


In [None]:
#@title Fun√ß√µes de objetivo Optuna ‚Äî uma por arquitetura
def objective_conventional(trial):
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    latent_dim = trial.suggest_categorical("latent_dim", [64, 128, 192, 256])
    base_filters = trial.suggest_categorical("base_filters", [32, 48, 64])
    bs = trial.suggest_categorical("batch_size", BATCH_SIZES)

    model = build_conventional(latent_dim=latent_dim, base_filters=base_filters)
    compile_model(model, lr=lr)

    train_b = prepare(train_ds, bs)
    valid_b = prepare(valid_ds, bs)

    pruning_cb = TFKerasPruningCallback(trial, "val_loss")
    history = model.fit(
        train_b, validation_data=valid_b, epochs=MAX_EPOCHS,
        callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=PATIENCE, restore_best_weights=True),
            keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=max(2, PATIENCE//2), min_lr=1e-6),
            pruning_cb
        ],
        verbose=0
    )
    val_loss = min(history.history["val_loss"])
    # Clean up
    K.clear_session(); gc.collect()
    return val_loss

def objective_variational(trial):
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    latent_dim = trial.suggest_categorical("latent_dim", [64, 128, 192, 256])
    base_filters = trial.suggest_categorical("base_filters", [32, 48, 64])
    kl_weight = trial.suggest_float("kl_weight", 1e-5, 1e-2, log=True)
    bs = trial.suggest_categorical("batch_size", BATCH_SIZES)

    model = build_variational(latent_dim=latent_dim, base_filters=base_filters, kl_weight=kl_weight)
    compile_model(model, lr=lr)

    train_b = prepare(train_ds, bs)
    valid_b = prepare(valid_ds, bs)

    pruning_cb = TFKerasPruningCallback(trial, "val_loss")
    history = model.fit(
        train_b, validation_data=valid_b, epochs=MAX_EPOCHS,
        callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=PATIENCE, restore_best_weights=True),
            keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=max(2, PATIENCE//2), min_lr=1e-6),
            pruning_cb
        ],
        verbose=0
    )
    val_loss = min(history.history["val_loss"])
    K.clear_session(); gc.collect()
    return val_loss

def objective_redundancy(trial):
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    latent_dim = trial.suggest_categorical("latent_dim", [64, 128, 192, 256])
    base_filters = trial.suggest_categorical("base_filters", [32, 48, 64])
    l1_weight = trial.suggest_float("l1_weight", 1e-7, 1e-4, log=True)
    red_weight = trial.suggest_float("red_weight", 1e-5, 1e-2, log=True)
    bs = trial.suggest_categorical("batch_size", BATCH_SIZES)

    model = build_redundancy(latent_dim=latent_dim, base_filters=base_filters, l1_weight=l1_weight, red_weight=red_weight)
    compile_model(model, lr=lr)

    train_b = prepare(train_ds, bs)
    valid_b = prepare(valid_ds, bs)

    pruning_cb = TFKerasPruningCallback(trial, "val_loss")
    history = model.fit(
        train_b, validation_data=valid_b, epochs=MAX_EPOCHS,
        callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=PATIENCE, restore_best_weights=True),
            keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=max(2, PATIENCE//2), min_lr=1e-6),
            pruning_cb
        ],
        verbose=0
    )
    val_loss = min(history.history["val_loss"])
    K.clear_session(); gc.collect()
    return val_loss

In [None]:
# --- Configura√ß√£o do storage (SQLite) ---
from pathlib import Path
import optuna

# Caminho absoluto para evitar confus√£o de diret√≥rio
db_path = Path.cwd() / "meu_estudo.db"
STORAGE_URL = f"sqlite:///{db_path.as_posix()}"

# (opcional) sampler/pruner
sampler = optuna.samplers.TPESampler(seed=42)
pruner  = optuna.pruners.MedianPruner(n_warmup_steps=5)

# --- Estudos (carrega se existir; cria se n√£o) ---
N_TRIALS_CONV = 10
N_TRIALS_VAE  = 10
N_TRIALS_RED  = 10

study_conv = optuna.create_study(
    direction="minimize",
    study_name="conv",
    storage=STORAGE_URL,
    load_if_exists=True,
    sampler=sampler,
    pruner=pruner,
)
study_conv.optimize(objective_conventional, n_trials=N_TRIALS_CONV, show_progress_bar=True)

study_vae = optuna.create_study(
    direction="minimize",
    study_name="vae",
    storage=STORAGE_URL,
    load_if_exists=True,
    sampler=sampler,
    pruner=pruner,
)
study_vae.optimize(objective_variational, n_trials=N_TRIALS_VAE, show_progress_bar=True)

study_red = optuna.create_study(
    direction="minimize",
    study_name="redundancy",
    storage=STORAGE_URL,
    load_if_exists=True,
    sampler=sampler,
    pruner=pruner,
)
study_red.optimize(objective_redundancy, n_trials=N_TRIALS_RED, show_progress_bar=True)

print("Best conv:", study_conv.best_params, study_conv.best_value)
print("Best vae:",  study_vae.best_params,  study_vae.best_value)
print("Best red:",  study_red.best_params,  study_red.best_value)


In [None]:

#@title Re-treinar com melhores hiperpar√¢metros e salvar modelos
def retrain_and_save(arch, params, train_ds, valid_ds, test_ds):
    if arch == "conventional":
        model = build_conventional(latent_dim=params["latent_dim"], base_filters=params["base_filters"])
    elif arch == "variational":
        model = build_variational(latent_dim=params["latent_dim"], base_filters=params["base_filters"], kl_weight=params.get("kl_weight", 1e-3))
    elif arch == "redundancy":
        model = build_redundancy(
            latent_dim=params["latent_dim"], base_filters=params["base_filters"],
            l1_weight=params.get("l1_weight", 1e-6), red_weight=params.get("red_weight", 1e-3)
        )
    else:
        raise ValueError("Arquitetura desconhecida")

    compile_model(model, lr=params["lr"])
    bs = params["batch_size"]
    train_b = prepare(train_ds, bs)
    valid_b = prepare(valid_ds, bs)
    test_b  = prepare(test_ds,  bs)

    history = fit_model(model, train_b, valid_b)
    eval_test = model.evaluate(test_b, return_dict=True, verbose=0)
    save_path = os.path.join(OUTPUT_DIR, f"best_{arch}.keras")
    model.save(save_path)
    print(f"Salvo: {save_path} | Test metrics: {eval_test}")
    return eval_test, save_path

best_conv_metrics, best_conv_path = retrain_and_save("conventional", study_conv.best_params, train_ds, valid_ds, test_ds)
best_vae_metrics,  best_vae_path  = retrain_and_save("variational",  study_vae.best_params,  train_ds, valid_ds, test_ds)
best_red_metrics,  best_red_path  = retrain_and_save("redundancy",   study_red.best_params,  train_ds, valid_ds, test_ds)


In [None]:

#@title Resumo final
summary = {
    "conventional": {"path": best_conv_path, "metrics": best_conv_metrics, "best_params": study_conv.best_params},
    "variational":  {"path": best_vae_path,  "metrics": best_vae_metrics,  "best_params": study_vae.best_params},
    "redundancy":   {"path": best_red_path,  "metrics": best_red_metrics,  "best_params": study_red.best_params},
}
print(json.dumps(summary, indent=2, default=lambda o: float(o) if isinstance(o, (np.floating,)) else o))

## Optuna Novo 

## Carregamento dos modelos

In [None]:
model_conv = tf.keras.models.load_model('models/ConvencionalOtimizado.keras')
model_vae = tf.keras.models.load_model('models/VariacionalOtimizado.keras', custom_objects={'VAE': VAE})
model_red = tf.keras.models.load_model('models/Redund√¢nciaOtimizado.keras')

## Avalia√ß√£o dos modelos otimizados

In [None]:
# Supondo que voc√™ j√° rodou o tuner.search()
# e j√° tem o `best_model`

# 1. Defina uma fun√ß√£o para avalia√ß√£o (opcional, mas boa pr√°tica)
def avaliar_modelo_final(modelo, test_data):
    psnr_total, ssim_total, ms_ssim_total, tempo_total = 0, 0, 0, 0
    num_test_images = test_data.shape[0]

    for i in range(num_test_images):
        entrada = np.expand_dims(test_data[i], axis=0)
        inicio = time.time()
        saida = modelo.predict(entrada, verbose=0)
        fim = time.time()
        tempo_total += fim - inicio

        psnr_total += tf.image.psnr(tf.image.convert_image_dtype(entrada, dtype=tf.float32),
                                     tf.image.convert_image_dtype(saida, dtype=tf.float32),
                                     max_val=1.0).numpy()[0]
        ssim_total += ssim(entrada[0], saida[0], data_range=1.0, channel_axis=2)
        ms_ssim_total += tf.image.ssim_multiscale(
            tf.image.convert_image_dtype(entrada, dtype=tf.float32),
            tf.image.convert_image_dtype(saida, dtype=tf.float32),
            max_val=1.0,
            filter_size=3
        ).numpy()[0]

    return psnr_total / num_test_images, ssim_total / num_test_images, ms_ssim_total / num_test_images, tempo_total / num_test_images

# 2. Obtenha o melhor modelo do tuner
best_model = tuner.get_best_models(num_models=1)[0]

# 3. Avalie o modelo sem re-treinar
print("\nIniciando avalia√ß√£o do melhor modelo...")
psnr_final, ssim_final, ms_ssim_final, tempo_final = avaliar_modelo_final(best_model, X_test)

print("\nResultados do Autoencoder Otimizado:")
print(f"  PSNR: {psnr_final:.4f}")
print(f"  SSIM: {ssim_final:.4f}")
print(f"  MS-SSIM: {ms_ssim_final:.4f}")
print(f"  Tempo de Infer√™ncia (s): {tempo_final:.4f}")

# --- Cria√ß√£o do dicion√°rio de resultados ---
# A chave 'Modelo Otimizado' √© o que aparecer√° no eixo X do gr√°fico
resultados = {
    'Modelo Otimizado': {
        'PSNR': psnr_final,
        'SSIM': ssim_final,
        'MS-SSIM': ms_ssim_final,
        'Tempo (s)': tempo_final
    }
}

### Gr√°ficos de m√©tricas

In [None]:
labels = list(resultados.keys())

# Acesso aos valores usando as chaves de texto
psnr_vals    = [resultados[n]['PSNR'] for n in labels]
ssim_vals    = [resultados[n]['SSIM'] for n in labels]
ms_ssim_vals = [resultados[n]['MS-SSIM'] for n in labels]
tempo_vals   = [resultados[n]['Tempo (s)'] for n in labels]

# Cria um grid 2x2
fig, axes = plt.subplots(2, 2, figsize=(12, 10), sharey=False)

# Desenrola os eixos em uma lista para indexar facilmente
axs = axes.flatten()

# Plota cada m√©trica
axs[0].bar(labels, psnr_vals)
axs[0].set_title('PSNR M√©dio')

axs[1].bar(labels, ssim_vals)
axs[1].set_title('SSIM M√©dio')

axs[2].bar(labels, ms_ssim_vals)
axs[2].set_title('MS-SSIM M√©dio')

axs[3].bar(labels, tempo_vals)
axs[3].set_title('Tempo M√©dio (s)')

plt.tight_layout()
plt.show()

### Imagens comparativas

In [None]:
# Carregue o modelo base para compara√ß√£o (seu modelo original)
modelo_base = load_model('Convencional.keras')

# A vari√°vel 'best_model' j√° deve estar definida a partir do seu KerasTuner
# best_model = tuner.get_best_models(num_models=1)[0]

def plotar_comparacao(modelo_otimizado, test_data, num_imagens=4, modelo_base=None):
    """
    Plota imagens originais, reconstru√≠das pelo modelo base e pelo modelo otimizado.

    Args:
        modelo_otimizado (keras.Model): O modelo otimizado pelo KerasTuner.
        test_data (np.array): O conjunto de dados de teste.
        num_imagens (int): O n√∫mero de imagens a serem plotadas.
        modelo_base (keras.Model): O modelo base para compara√ß√£o (opcional).
    """
    indices_aleatorios = np.random.choice(range(len(test_data)), num_imagens, replace=False)
    
    # O n√∫mero de colunas ser√° 3 se houver um modelo base, caso contr√°rio 2 (Original, Otimizado)
    num_colunas = 2 if modelo_base is None else 3
    
    fig, axes = plt.subplots(num_imagens, num_colunas, figsize=(15, num_imagens * 5))

    if num_imagens == 1:
        axes = np.array([axes])

    for i, idx in enumerate(indices_aleatorios):
        original = test_data[idx:idx+1]
        
        # Reconstr√≥i com o modelo otimizado (necess√°rio mesmo que plotado por √∫ltimo para as m√©tricas)
        reconstruida_otimizado = modelo_otimizado.predict(original, verbose=0)
        psnr_otimizado = tf.image.psnr(tf.image.convert_image_dtype(original, dtype=tf.float32),
                                       tf.image.convert_image_dtype(reconstruida_otimizado, dtype=tf.float32),
                                       max_val=1.0).numpy()[0]
        ssim_otimizado = ssim(original[0], reconstruida_otimizado[0], data_range=1.0, channel_axis=2)

        # Plotar a imagem original na primeira coluna (√≠ndice 0)
        axes[i, 0].imshow(original[0])
        axes[i, 0].set_title("Original", fontsize=12)
        axes[i, 0].axis('off')

        if modelo_base is not None:
            # Reconstr√≥i com o modelo base
            reconstruida_base = modelo_base.predict(original, verbose=0)
            psnr_base = tf.image.psnr(tf.image.convert_image_dtype(original, dtype=tf.float32),
                                      tf.image.convert_image_dtype(reconstruida_base, dtype=tf.float32),
                                      max_val=1.0).numpy()[0]
            ssim_base = ssim(original[0], reconstruida_base[0], data_range=1.0, channel_axis=2)
            
            # Plotar a imagem base na segunda coluna (√≠ndice 1)
            axes[i, 1].imshow(reconstruida_base[0])
            axes[i, 1].set_title(f"Base\nPSNR: {psnr_base:.2f}\nSSIM: {ssim_base:.2f}", fontsize=12)
            axes[i, 1].axis('off')

            # Plotar a imagem otimizada na terceira coluna (√≠ndice 2)
            axes[i, 2].imshow(reconstruida_otimizado[0])
            axes[i, 2].set_title(f"Otimizado\nPSNR: {psnr_otimizado:.2f}\nSSIM: {ssim_otimizado:.2f}", fontsize=12)
            axes[i, 2].axis('off')
        else:
            # Se n√£o houver modelo base, o otimizado vai para a segunda coluna (√≠ndice 1)
            axes[i, 1].imshow(reconstruida_otimizado[0])
            axes[i, 1].set_title(f"Otimizado\nPSNR: {psnr_otimizado:.2f}\nSSIM: {ssim_otimizado:.2f}", fontsize=12)
            axes[i, 1].axis('off')

    plt.suptitle("Compara√ß√£o de Imagens", fontsize=18)
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()

# --- Chamada da fun√ß√£o ---
plotar_comparacao(
    modelo_otimizado=best_model, 
    test_data=X_test, 
    num_imagens=4, 
    modelo_base=modelo_base
)

### Refinando Modelo

In [None]:
print("GPUs detectadas pelo TensorFlow:")
print(tf.config.list_physical_devices('GPU'))

### tentativa com 30 trials

In [None]:
# C√©lula para refinar a otimiza√ß√£o
print("--- Iniciando o refinamento da busca de hiperpar√¢metros ---")

# Obter os melhores hiperpar√¢metros da busca anterior
# (Supondo que a vari√°vel 'tuner' da busca inicial ainda est√° dispon√≠vel)
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

# Extrair os valores dinamicamente
BEST_FILTERS_1 = best_hps.get('filters_1')
BEST_FILTERS_2 = best_hps.get('filters_2')
BEST_LEARNING_RATE = best_hps.get('learning_rate')

print("\nMelhores hiperpar√¢metros encontrados na busca anterior:")
print(f"  filters_1: {BEST_FILTERS_1}")
print(f"  filters_2: {BEST_FILTERS_2}")
print(f"  learning_rate: {BEST_LEARNING_RATE}")

# --- Criar o modelo e o tuner refinados dentro do escopo da estrat√©gia ---
# Isso garante o uso de todas as GPUs
with strategy.scope():
    def build_model_refined(hp):
        """
        Constr√≥i o modelo com um espa√ßo de busca refinado.
        """
        input_shape = (128, 128, 3)

        # Usar os melhores valores como ponto de partida
        filters_1_refined = hp.Int('filters_1', min_value=BEST_FILTERS_1-8, max_value=BEST_FILTERS_1+8, step=4)
        filters_2_refined = hp.Int('filters_2', min_value=BEST_FILTERS_2-4, max_value=BEST_FILTERS_2+4, step=2)
        learning_rate_refined = hp.Choice('learning_rate', values=[BEST_LEARNING_RATE * 0.5, BEST_LEARNING_RATE, BEST_LEARNING_RATE * 2])

        # --- A arquitetura do modelo CORRIGIDA ---
        inp = keras.Input(shape=input_shape, name='conv_input')
        x = layers.Conv2D(filters_1_refined, 3, activation='relu', padding='same')(inp)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x)
        x = layers.Conv2D(filters_2_refined, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x)

        x = layers.Conv2DTranspose(filters_2_refined, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_2_refined, 3, activation='relu', strides=(2, 2), padding='same')(x)
        x = layers.Conv2DTranspose(filters_1_refined, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters=filters_1_refined, kernel_size=3, activation='relu', strides=(2, 2), padding='same')(x)
        out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)
        
        # Cria a inst√¢ncia do modelo depois que 'inp' e 'out' estiverem definidos
        autoenc = Model(inp, out, name='autoencoder_conv_refined')
        
        autoenc.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate_refined),
            loss='mse'
        )
        return autoenc

    tuner_refined = kt.RandomSearch(
        build_model_refined,
        objective='val_loss',
        max_trials=30,  # Aumente para mais tentativas
        executions_per_trial=1,
        directory='my_dir_refined',
        project_name='autoencoder_tuning_refined'
    )

# --- Executar a busca refinada fora do escopo ---
print("\nIniciando a busca de hiperpar√¢metros refinada...")
tuner_refined.search_space_summary()
tuner_refined.search(X_train, X_train, epochs=800, batch_size=64, validation_data=(X_val, X_val), callbacks=[early_stopping])

tuner_refined.results_summary()

# Salvar o modelo refinado
best_model_refined = tuner_refined.get_best_models(num_models=1)[0]
best_model_refined.save('Convencional_otimizado_refinado.keras')
print("\nO melhor modelo refinado foi salvo.")

Aumento de n√∫mero de camadas do modelo

In [None]:
# Envolve a fun√ß√£o de constru√ß√£o do modelo no escopo da estrat√©gia
with strategy.scope():
    def build_model_article_style(hp):
        """
        Constr√≥i e compila um autoencoder mais profundo,
        inspirado no artigo de pesquisa.
        """
        # O artigo usa 96x96, mas vamos manter seu 128x128.
        input_shape = (128, 128, 3)

        # Definir hiperpar√¢metros ajust√°veis para camadas adicionais
        filters_1 = hp.Choice('filters_1', values=[64, 96, 128])
        filters_2 = hp.Choice('filters_2', values=[32, 48, 64])
        filters_3 = hp.Choice('filters_3', values=[16, 24, 32])
        learning_rate = hp.Choice('learning_rate', values=[1e-3, 5e-4, 1e-4])

        # Constru√ß√£o do ENCODER
        inp = keras.Input(shape=input_shape, name='conv_input')
        
        # Grupo 1
        x = layers.Conv2D(filters_1, 3, activation='relu', padding='same')(inp)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 64x64
        
        # Grupo 2
        x = layers.Conv2D(filters_2, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 32x32

        # Grupo 3 (Adicionado para maior profundidade)
        x = layers.Conv2D(filters_3, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 16x16
        
        # Grupo 4 (Adicionado para maior profundidade)
        x = layers.Conv2D(filters_3, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.MaxPooling2D(2, padding='same')(x) # Dimens√£o: 8x8

        # Constru√ß√£o do DECODER
        # Grupo 4 (Reconstru√ß√£o)
        x = layers.Conv2DTranspose(filters_3, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_3, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 16x16
        
        # Grupo 3 (Reconstru√ß√£o)
        x = layers.Conv2DTranspose(filters_3, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_2, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 32x32
        
        # Grupo 2 (Reconstru√ß√£o)
        x = layers.Conv2DTranspose(filters_2, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_1, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 64x64
        
        # Grupo 1 (Reconstru√ß√£o)
        x = layers.Conv2DTranspose(filters_1, 3, activation='relu', padding='same')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Conv2DTranspose(filters_1, 3, activation='relu', strides=(2, 2), padding='same')(x) # UpSample para 128x128
        
        out = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(x)

        autoenc = Model(inp, out, name='autoencoder_article_style')
        
        # Compila√ß√£o do modelo
        autoenc.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss='mse'
        )
        return autoenc

# Instanciar o tuner com a nova fun√ß√£o
tuner = kt.RandomSearch(
    build_model_article_style,
    objective='val_loss',
    max_trials=20, # Aumente para mais tentativas se desejar
    executions_per_trial=1,
    directory='my_dir',
    project_name='autoencoder_article_tuning'
)


tuner.search_space_summary()
early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

tuner.search(X_train, X_train, epochs=800, batch_size=64, validation_data=(X_val, X_val), callbacks=[early_stopping])

tuner.results_summary()

best_model = tuner.get_best_models(num_models=1)[0]
best_hyperparameters = tuner.get_best_hyperparameters()[0]

print("\nMelhores hiperpar√¢metros encontrados:")
print(best_hyperparameters.values)

# Salva o melhor modelo em um arquivo
best_model.save('Convencional_tunado.keras')
print("\nO melhor modelo foi salvo como 'Convencional_tunado.keras'")

best_model.summary()

### Avalia√ß√£o do modelo

In [None]:
# 2. Obtenha o melhor modelo do tuner
best_model = tuner.get_best_models(num_models=1)[0]
print(best_model)

In [None]:
# Supondo que voc√™ j√° rodou o tuner.search()
# e j√° tem o `best_model`

# 1. Defina uma fun√ß√£o para avalia√ß√£o (opcional, mas boa pr√°tica)
def avaliar_modelo_final(modelo, test_data):
    psnr_total, ssim_total, ms_ssim_total, tempo_total = 0, 0, 0, 0
    num_test_images = test_data.shape[0]

    for i in range(num_test_images):
        entrada = np.expand_dims(test_data[i], axis=0)
        inicio = time.time()
        saida = modelo.predict(entrada, verbose=0)
        fim = time.time()
        tempo_total += fim - inicio

        psnr_total += tf.image.psnr(tf.image.convert_image_dtype(entrada, dtype=tf.float32),
                                     tf.image.convert_image_dtype(saida, dtype=tf.float32),
                                     max_val=1.0).numpy()[0]
        ssim_total += ssim(entrada[0], saida[0], data_range=1.0, channel_axis=2)
        ms_ssim_total += tf.image.ssim_multiscale(
            tf.image.convert_image_dtype(entrada, dtype=tf.float32),
            tf.image.convert_image_dtype(saida, dtype=tf.float32),
            max_val=1.0,
            filter_size=3
        ).numpy()[0]

    return psnr_total / num_test_images, ssim_total / num_test_images, ms_ssim_total / num_test_images, tempo_total / num_test_images

# 2. Obtenha o melhor modelo do tuner
best_model = tuner.get_best_models(num_models=1)[0]

# 3. Avalie o modelo sem re-treinar
print("\nIniciando avalia√ß√£o do melhor modelo...")
psnr_final, ssim_final, ms_ssim_final, tempo_final = avaliar_modelo_final(best_model, X_test)

print("\nResultados do Autoencoder Otimizado:")
print(f"  PSNR: {psnr_final:.4f}")
print(f"  SSIM: {ssim_final:.4f}")
print(f"  MS-SSIM: {ms_ssim_final:.4f}")
print(f"  Tempo de Infer√™ncia (s): {tempo_final:.4f}")

# --- Cria√ß√£o do dicion√°rio de resultados ---
# A chave 'Modelo Otimizado' √© o que aparecer√° no eixo X do gr√°fico
resultados = {
    'Modelo Otimizado': {
        'PSNR': psnr_final,
        'SSIM': ssim_final,
        'MS-SSIM': ms_ssim_final,
        'Tempo (s)': tempo_final
    }
}

# Melhorias Futuras
- Uso de Keras Quartenion
- Fun√ß√£o de perda combinada (SSIM + MSE)
- Autoencoder assim√©trico UFRJ