# Sensoriamento Espectral Inteligente para Coexistência em Mega-Constelações de Satélites LEO
**Autor:** Lana Alves Vieira Gonzaga

**Data:** Novembro de 2025

**Descrição:** Este projeto implementa uma Rede Neural Convolucional (CNN) para classificar 24 tipos de modulação de rádio a partir do dataset RadioML 2018.01A. O objetivo é analisar o desempenho do modelo em diferentes condições de sinal-ruído (SNR). Esse tipo de modelo de classificação é utilizado como base para alocação dinâmica de espectro em sistemas de rádio cognitivo para comunicação eficiente de satélites. Algumas das tecnologias e funcionalidades utilizadas: TensorFlow, Keras, Python, HDF5, pipeline de dados otimizado, classificador CNN e agente cognitivo com detetecção híbrida.

In [None]:
# Instalação de Dependências

!pip install kagglehub[pandas-datasets]

!pip uninstall -y protobuf
!pip install protobuf==3.20.3

In [None]:
# Imports

import numpy as np
import sys
import h5py
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from IPython.display import FileLink
import seaborn as sns
import json
import warnings
import os
import kagglehub
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, optimizers, callbacks
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model

warnings.filterwarnings('ignore')

2. Configuração do kaggle para download do dataset "RadioML 2018.01A":

In [None]:
# 2.1. Configuração da API do Kaggle, adaptada para o ambiente em que for executada.

def setup_kaggle_api():

  !pip install -q kaggle

  IN_COLAB = 'google.colab' in sys.modules

  kaggle_json_path = Path.home() / '.kaggle' / 'kaggle.json'

  if IN_COLAB:
        print("Ambiente Google Colab detectado.")
        if not kaggle_json_path.exists():
            print("Faça o upload do ficheiro kaggle.json.")
            from google.colab import files
            uploaded = files.upload()

            if 'kaggle.json' in uploaded:
                print("kaggle.json recebido. Configurando.")

                kaggle_dir = Path.home() / '.kaggle'
                kaggle_dir.mkdir(exist_ok=True, parents=True)

                with open(kaggle_json_path, 'wb') as f:
                    f.write(uploaded['kaggle.json'])
            else:
                print("Upload cancelado ou ficheiro incorreto.")
                return False

  # Fora do colab.
  else:
        print("Ambiente local ou desconhecido detectado.")

        if not kaggle_json_path.exists():
            print(f"ERRO: Ficheiro kaggle.json não encontrado em '{kaggle_json_path}'.")
            print("Por favor, descarregue o seu token da API do Kaggle e coloque-o nesse local.")
            return False

  !chmod 600 {kaggle_json_path}

setup_kaggle_api()

In [None]:
# 2.2. Download do dataset "RadioML 2018.01A".

try:
    path = kagglehub.dataset_download("pinxau1000/radioml2018")
    print(f"\nDownload concluído. Os ficheiros estão em: '{path}'")

    print("\nArquivos encontrados no diretório do dataset:")
    found_files = []
    for dirname, _, filenames in os.walk(path):
        for filename in filenames:
            file_path = os.path.join(dirname, filename)
            if file_path.endswith(('.h5', '.hdf5')):
                found_files.append(file_path)
                print(f"  • {file_path}")

    if found_files:
        HDF5_FILE_PATH = found_files[0]
        print(f"\nCaminho do ficheiro HDF5 para carregar: '{HDF5_FILE_PATH}'")
    else:
        print("\nNenhum ficheiro HDF5 encontrado no diretório descarregado.")

except Exception as e:
      print(f"\nOcorreu um erro durante o download: {e}")

In [None]:
# 2.3. Passar arquivo para o disco local

import os
import shutil
import time

print("COPIANDO ARQUIVO PARA DISCO LOCAL")

print(f"\n Arquivo atual:")
print(f"   Caminho: {HDF5_FILE_PATH}")
print(f"   Tamanho: {os.path.getsize(HDF5_FILE_PATH) / 1e9:.2f} GB")


local_path = '/content/radioml_local.hdf5'


if os.path.exists(local_path):
    print(f"\n Arquivo local já existe: {local_path}")
    print(f"   Tamanho: {os.path.getsize(local_path) / 1e9:.2f} GB")
else:
    print(f"\n Copiando para: {local_path}")
    print("   Isso vai levar alguns minutos.")

    start = time.time()
    shutil.copy2(HDF5_FILE_PATH, local_path)
    elapsed = time.time() - start

    print(f"   Cópia concluída em {elapsed/60:.1f} minutos.")
    print(f"   Tamanho: {os.path.getsize(local_path) / 1e9:.2f} GB")


HDF5_FILE_PATH = local_path
print(f"\n Novo caminho: {HDF5_FILE_PATH}")

3. Pré-processamento dos dados:

In [None]:
# 3.1. Divisão em índices das informações do dataset, para não sobrecarregar a RAM com o conjunto inteiro de dados de uma vez só.

HDF5_FILE_PATH = '/content/radioml_local.hdf5'

with h5py.File(HDF5_FILE_PATH, 'r') as f:
    total_samples = f['X'].shape[0]
    num_classes = f['Y'].shape[1]
print(f"Total de amostras no dataset: {total_samples}")
print(f"Número de classes de modulação: {num_classes}")

all_indices = np.arange(total_samples)

with h5py.File(HDF5_FILE_PATH, 'r') as f:
    labels_for_stratify = f['Y'][:]

# Primeiro split: 80% treino+validação / 20% teste.
indices_temp, test_indices = train_test_split(
    all_indices, test_size=0.2, random_state=42, stratify=labels_for_stratify
)

labels_temp = labels_for_stratify[indices_temp]

# Segundo split: divide treino+validação em 80% treino / 20% validação.
train_indices, val_indices = train_test_split(
    indices_temp, test_size=0.1 / (1 - 0.2), random_state=42, stratify=labels_temp
)

print(f"\nNúmero de índices de treino: {len(train_indices)}")
print(f"Número de índices de validação: {len(val_indices)}")
print(f"Número de índices de teste: {len(test_indices)}")

In [None]:
# 3.2. Classe do gerador de dados e pré-processamento (one hot-enconding, normalização )

class RadioMLGenerator(keras.utils.Sequence):

  def __init__(self, file_path, indices, batch_size, num_classes,
             shuffle=True, augment=False, normalize=True, label_key='Y'):
    self.indices = indices.copy()
    self.batch_size = batch_size
    self.num_classes = num_classes
    self.shuffle = shuffle
    self.augment = augment
    self.normalize = normalize

    self.file = h5py.File(file_path, 'r')

    self.X_data = self.file['X']
    self.Y_data = self.file['Y']

    self.on_epoch_end()


  def __len__(self):    
        return int(np.ceil(len(self.indices) / self.batch_size))


  def __getitem__(self, index):

    start_idx = index * self.batch_size
    end_idx = (index + 1) * self.batch_size
    batch_indices = self.indices[start_idx:end_idx]

    original_order = np.argsort(np.argsort(batch_indices))


    batch_indices_sorted = np.sort(batch_indices)


    X_batch = np.array(self.X_data[batch_indices_sorted])
    Y_batch = np.array(self.Y_data[batch_indices_sorted])


    X_batch = X_batch[original_order]
    Y_batch = Y_batch[original_order]

    X_batch = self._preprocess_signals(X_batch)


    return X_batch, Y_batch


    if self.normalize:
        max_val = np.max(np.abs(X_batch), axis=(1, 2), keepdims=True)
        X_batch = X_batch / (max_val + 1e-7)


    return X_batch, Y_batch

  def _preprocess_signals(self, X):  
      if len(X.shape) == 2:
            batch_size, total_samples = X.shape
            timesteps = total_samples // 2
            X = X.reshape(batch_size, timesteps, 2)

      elif X.shape[1] == 2:
            X = np.transpose(X, (0, 2, 1))

      if self.normalize:
            X = self._normalize_power(X)

      if self.augment:
            X = self._augment_signals(X)

      return X.astype(np.float32)

  def _normalize_power(self, X):    
       X_normalized = np.zeros_like(X)

       for i in range(X.shape[0]):    
            signal = X[i]
            power = np.sqrt(np.mean(signal[:, 0]**2 + signal[:, 1]**2))
            X_normalized[i] = signal / (power + 1e-8)

       return X_normalized

  def _augment_signals(self, X):    
      augmented = []

      for i in range(X.shape[0]):
            signal = X[i]
            signal_complex = signal[:, 0] + 1j * signal[:, 1]

            phase_shift = np.random.uniform(0, 2 * np.pi)
            signal_rotated = signal_complex * np.exp(1j * phase_shift)

            noise_level = 0.01
            noise = noise_level * (np.random.randn(len(signal_rotated)) +
                                   1j * np.random.randn(len(signal_rotated)))
            signal_augmented = signal_rotated + noise

            signal_iq = np.stack([signal_augmented.real, signal_augmented.imag], axis=-1)
            augmented.append(signal_iq)

      return np.array(augmented)

  def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

  def __del__(self):
        if hasattr(self, 'h5_file'):
            try:
                self.h5_file.close()
                print("[INFO] Arquivo HDF5 fechado com sucesso")
            except:
                pass

In [None]:
# 3.3. Criação dos geradores de treino, validação e teste

batch_size = 1024

def create_generators(file_path, train_indices, val_indices, test_indices,
                     batch_size=1024, num_classes=24):

    train_gen = RadioMLGenerator(
        file_path=HDF5_FILE_PATH,
        indices=train_indices,
        batch_size=batch_size,
        num_classes=num_classes,
        shuffle=True,
        augment=True,
        normalize=True
    )

    val_gen = RadioMLGenerator(
        file_path=HDF5_FILE_PATH,
        indices=val_indices,
        batch_size=batch_size,
        num_classes=num_classes,
        shuffle=False,
        augment=False,
        normalize=True
    )

    test_gen = RadioMLGenerator(
        file_path=HDF5_FILE_PATH,
        indices=test_indices,
        batch_size=batch_size,
        num_classes=num_classes,
        shuffle=False,
        augment=False,
        normalize=True
    )

    print("\nGeradores criados:")
    print(f"   Treino:    {len(train_gen)} batches")
    print(f"   Validação: {len(val_gen)} batches")
    print(f"   Teste:     {len(test_gen)} batches")
    print("="*80 + "\n")


    return train_gen, val_gen, test_gen

batch_size = 128

train_generator, validation_generator, test_generator = create_generators(
    file_path=HDF5_FILE_PATH,
    train_indices=train_indices,
    val_indices=val_indices,
    test_indices=test_indices,
    batch_size=batch_size,
    num_classes=num_classes
)

In [None]:
# 3.4. Visuzalização dos dados

def visualize_batch(generator, num_samples=4, figsize=(16, 10)):
  if generator.num_classes == 24:
        modulations = [                                                        
            'OOK', '4ASK', '8ASK', 'BPSK', 'QPSK', '8PSK', '16PSK', '32PSK',
            '16APSK', '32APSK', '64APSK', '128APSK', '16QAM', '32QAM', '64QAM',
            '128QAM', '256QAM', 'AM-SSB-WC', 'AM-SSB-SC', 'AM-DSB-WC', 'AM-DSB-SC',
            'FM', 'GMSK', 'OQPSK'
        ]

  else:
        modulations = [f'Classe_{i}' for i in range(generator.num_classes)]

  X_batch, Y_batch = generator[0]

  fig = plt.figure(figsize=figsize)
  gs = GridSpec(num_samples, 3, figure=fig, hspace=0.4, wspace=0.3)

  print("="*80)
  print("Visualização do batch")
  print("="*80)
  print(f"X shape: {X_batch.shape}, Y shape: {Y_batch.shape}")
  print(f"X range: [{X_batch.min():.4f}, {X_batch.max():.4f}]")
  print("="*80 + "\n")

  for idx in range(min(num_samples, len(X_batch))):
        signal = X_batch[idx]
        label = Y_batch[idx]
        class_idx = np.argmax(label)
        mod_name = modulations[class_idx]

        I, Q = signal[:, 0], signal[:, 1]
        time_steps = np.arange(len(I))

        ax1 = fig.add_subplot(gs[idx, 0])   # sinal no tempo
        ax1.plot(time_steps, I, 'b-', alpha=0.7, linewidth=1, label='I')
        ax1.plot(time_steps, Q, 'r-', alpha=0.7, linewidth=1, label='Q')
        ax1.set_xlabel('Amostras')
        ax1.set_ylabel('Amplitude')
        ax1.set_title(f'{mod_name} - Sinal I/Q')
        ax1.legend(loc='upper right', fontsize=8)
        ax1.grid(True, alpha=0.3)

        ax2 = fig.add_subplot(gs[idx, 1])   # constelação
        ax2.scatter(I, Q, c=time_steps, cmap='viridis', s=10, alpha=0.6)
        ax2.set_xlabel('I')
        ax2.set_ylabel('Q')
        ax2.set_title(f'{mod_name} - Constelação')
        ax2.grid(True, alpha=0.3)
        ax2.axis('equal')
        circle = plt.Circle((0, 0), 1, color='gray', fill=False,
                           linestyle='--', linewidth=1, alpha=0.5)
        ax2.add_patch(circle)

        ax3 = fig.add_subplot(gs[idx, 2])   # espectro
        signal_complex = I + 1j * Q
        fft_result = np.fft.fftshift(np.fft.fft(signal_complex))
        power_spectrum = np.abs(fft_result)**2
        freqs = np.fft.fftshift(np.fft.fftfreq(len(signal_complex)))
        ax3.plot(freqs, 10*np.log10(power_spectrum + 1e-10), 'g-', linewidth=1)
        ax3.set_xlabel('Frequência Normalizada')
        ax3.set_ylabel('Potência (dB)')
        ax3.set_title(f'{mod_name} - Espectro')
        ax3.grid(True, alpha=0.3)

        plt.tight_layout()

        nome_arquivo = f'sinais_lote_exemplo.png'
        plt.savefig(nome_arquivo, dpi=300, bbox_inches='tight')
        print(f"Imagem salva como: {nome_arquivo}")

        plt.savefig('figura_sinais.pdf', bbox_inches='tight') 

        plt.show()

  print("\nDistribuição de classes no batch:")
  class_distribution = np.argmax(Y_batch, axis=1)
  unique, counts = np.unique(class_distribution, return_counts=True)
  for cls, count in zip(unique, counts):
        mod = modulations[cls]
        print(f"  {mod:15s}: {count:3d} ({count/len(Y_batch)*100:.1f}%)")
  print("="*80)


visualize_batch(train_generator)
visualize_batch(validation_generator)

4. Construção do classificador utlizando modelo de Rede Neural Convolucional (CNN)

In [None]:
# 4.1. Configurações e compilação do modelo

def create_radio_model(input_shape, num_classes):
    inputs = layers.Input(shape=input_shape)


    x = layers.Conv1D(64, kernel_size=8, padding='same', activation='relu')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Conv1D(128, kernel_size=8, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Conv1D(256, kernel_size=8, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Conv1D(512, kernel_size=8, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling1D(pool_size=2)(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Flatten()(x)

    x = layers.Dense(128, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)

    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs, name="RadioCNN")
    return model

sample_x, _ = train_generator[0]
input_shape = sample_x.shape[1:]
num_classes = train_generator.num_classes


model = create_radio_model(input_shape, num_classes)
model.summary()


opt = optimizers.Adam(learning_rate=0.001)
model.compile(
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

my_callbacks = [
    callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1),
    callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.00001, verbose=1),
    callbacks.ModelCheckpoint('melhor_modelo_radio.keras', monitor='val_accuracy', save_best_only=True, verbose=1)
]

print("Modelo compilado e callbacks configurados.")


In [None]:
# 4.2. Treinamento

print("Iniciando o treinamento.")

BATCH_SIZE = 1024

train_generator = RadioMLGenerator(HDF5_FILE_PATH, train_indices, BATCH_SIZE, num_classes, shuffle=True, normalize=True)
validation_generator = RadioMLGenerator(HDF5_FILE_PATH, val_indices, BATCH_SIZE, num_classes, shuffle=False, normalize=True)

steps_per_epoch = len(train_generator)
validation_steps = len(validation_generator)

history = model.fit(
    train_generator,
    validation_data=validation_generator,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps,
    epochs=30,
    callbacks=my_callbacks,
    verbose=1
)

print("Treinamento concluído.")

def plot_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    epochs = range(1, len(acc) + 1)

    plt.figure(figsize=(14, 5))

    # Gráfico de Acurácia
    plt.subplot(1, 2, 1)
    plt.plot(epochs, acc, 'bo-', label='Treino')
    plt.plot(epochs, val_acc, 'ro-', label='Validação')
    plt.title('Acurácia de Treino e Validação')
    plt.xlabel('Épocas')
    plt.ylabel('Acurácia')
    plt.legend()
    plt.grid(True)

    # Gráfico de Perda
    plt.subplot(1, 2, 2)
    plt.plot(epochs, loss, 'bo-', label='Treino')
    plt.plot(epochs, val_loss, 'ro-', label='Validação')
    plt.title('Perda (Loss) de Treino e Validação')
    plt.xlabel('Épocas')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    nome_arquivo = f'graficos_treino.png'
    plt.savefig(nome_arquivo, dpi=300, bbox_inches='tight')
    print(f"Imagem salva como: {nome_arquivo}")

    plt.savefig('figura_sinais.pdf', bbox_inches='tight')

    plt.show()

plot_history(history)

FileLink(r'melhor_modelo_radio.keras')

In [None]:
# 4.3. Carregar e salvar modelo treinado

print("Carregando o modelo treinado do disco.")
model = load_model('melhor_modelo_radio.keras') 

print("Modelo carregado com sucesso.")
model.summary()

display(FileLink(r'melhor_modelo_radio.keras'))

In [None]:
# 4.4. Encontrar caminho do arquivo do modelo treinado e importar

model_path = ""

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        if filename.endswith('.keras'):
            full_path = os.path.join(dirname, filename)
            print(f" Encontrado: {full_path}")
            model_path = full_path

if model_path:
    print(f"\n O caminho para usar no load_model é:\n'{model_path}'")
else:
    print("\n Nenhum modelo encontrado.")

try:
    print(f"Carregando modelo de: {model_path}")
    model = load_model(model_path)
    print("Modelo recuperado com sucesso.")
    model.summary()
except Exception as e:
    print(f"Erro ao carregar: {e}")

5. Verificações

In [None]:
# 5.1. Gráfico de Acurácia vs SNR

print("Iniciando extração sincronizada de teste.")

all_y_true = []
all_y_pred = []
all_snr = []

with h5py.File(HDF5_FILE_PATH, 'r') as f:
    
    total_batches = len(test_generator)
    
    for i in range(total_batches):
        start_idx = i * test_generator.batch_size
        end_idx = min((i + 1) * test_generator.batch_size, len(test_generator.indices))
        
        batch_indices = test_generator.indices[start_idx:end_idx]
        
        batch_indices = sorted(batch_indices)
        
        X_batch = f['X'][batch_indices]
        Y_batch = f['Y'][batch_indices]
        Z_batch = f['Z'][batch_indices]
        
        max_val = np.max(np.abs(X_batch), axis=(1, 2), keepdims=True)
        X_batch = X_batch / (max_val + 1e-7)
        
        pred_batch = model.predict(X_batch, verbose=0)
        
        all_y_true.append(np.argmax(Y_batch, axis=1)) 
        all_y_pred.append(np.argmax(pred_batch, axis=1))
        all_snr.append(Z_batch)
        
        if i % 100 == 0:
            print(f"   Processado lote {i}/{total_batches}")

y_true_final = np.concatenate(all_y_true)
y_pred_final = np.concatenate(all_y_pred)
snr_final = np.concatenate(all_snr)

print("Dados extraídos e alinhados.")


snrs = sorted(list(np.unique(snr_final)))
acc_by_snr = []

for snr in snrs:
    mask = (snr_final == snr)[:, 0]
    if np.sum(mask) > 0:
        acc = np.mean(y_pred_final[mask] == y_true_final[mask])
        acc_by_snr.append(acc)
    else:
        acc_by_snr.append(0)

plt.figure(figsize=(10, 6))
plt.plot(snrs, acc_by_snr, 'bo-', linewidth=2, label='Sua CNN')
plt.axhline(y=1/24, color='r', linestyle='--', label='Aleatório (4%)')
plt.title(f'Acurácia vs. SNR (Média Real: {np.mean(y_pred_final == y_true_final):.2%})')
plt.xlabel('SNR (dB)')
plt.ylabel('Acurácia')
plt.legend()
plt.grid(True)

plt.savefig('acuracia_snr.png', dpi=300, bbox_inches='tight')
print("Imagem salva como 'acuracia_snr.png'")

plt.show()

In [None]:
# 5.2. Matriz de confusão

print("Iniciando extração sincronizada para Matriz de Confusão.")

y_true_list = []
y_pred_list = []

with h5py.File(HDF5_FILE_PATH, 'r') as f:
    total_batches = len(test_generator)
    indices = test_generator.indices
    batch_size = test_generator.batch_size
    
    for i in range(total_batches):
        start = i * batch_size
        end = min((i + 1) * batch_size, len(indices))
        batch_idx = indices[start:end]
        
        batch_idx = sorted(batch_idx)
        
        X_batch = f['X'][batch_idx]
        Y_batch = f['Y'][batch_idx]
        
        if hasattr(test_generator, 'normalize') and test_generator.normalize:
            max_val = np.max(np.abs(X_batch), axis=(1, 2), keepdims=True)
            X_batch = X_batch / (max_val + 1e-7)
        
        pred_batch = model.predict(X_batch, verbose=0)
        
        y_true_list.append(np.argmax(Y_batch, axis=1))      
        y_pred_list.append(np.argmax(pred_batch, axis=1))   
        
        if i % 50 == 0:
            print(f"   -> Processado lote {i}/{total_batches}")


y_true_final = np.concatenate(y_true_list)
y_pred_final = np.concatenate(y_pred_list)

print("Dados extraídos e alinhados.")

cm = confusion_matrix(y_true_final, y_pred_final)

cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

if 'modulation_names' not in locals():
    modulation_names = ['8PSK', 'AM-DSB', 'AM-SSB', 'BPSK', 'CPFSK', 'FM', 'GFSK', 
                        'PAM4', 'QAM16', 'QAM64', 'QPSK', 'WBFM', '16APSK', '32APSK',
                        '64APSK', '128APSK', '16QAM', '32QAM', '64QAM', '128QAM',
                        '256QAM', 'AM-DSB-SC', 'AM-SSB-SC', 'OQPSK']

plt.figure(figsize=(20, 20))
sns.heatmap(cm_norm, annot=False, cmap='viridis', vmin=0, vmax=1,
            xticklabels=modulation_names, 
            yticklabels=modulation_names)

plt.title('Matriz de Confusão (Normalizada)', fontsize=15)
plt.ylabel('Rótulo Verdadeiro', fontsize=12)
plt.xlabel('Rótulo Previsto', fontsize=12)
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)

plt.savefig('matriz_confusao.png', dpi=300, bbox_inches='tight')
print("Imagem salva como 'matriz_confusao.png'")

plt.show()

In [None]:
# 5.3.Simulação de canais livres (ruído gaussiano)

noise_batch = np.random.normal(0, 1, size=(1000, 1024, 2)) 

max_val = np.max(np.abs(noise_batch), axis=(1, 2), keepdims=True)
noise_batch = noise_batch / (max_val + 1e-7)

x_real_batch, _ = test_gen_matrix[0] 

print("Prevendo ruído.")
pred_noise = model.predict(noise_batch, verbose=0)
print("Prevendo sinais reais.")
pred_signal = model.predict(x_real_batch, verbose=0)

conf_noise = np.max(pred_noise, axis=1)
conf_signal = np.max(pred_signal, axis=1)

plt.figure(figsize=(10, 6))
plt.hist(conf_noise, bins=50, alpha=0.7, label='Ruído (Canal Livre)', color='red')
plt.hist(conf_signal, bins=50, alpha=0.7, label='Sinais Reais (Canal Ocupado)', color='blue')
plt.title('Distribuição da Confiança do Modelo: Ruído vs. Sinais')
plt.xlabel('Confiança (Probabilidade da classe vencedora)')
plt.ylabel('Contagem')
plt.legend()
plt.grid(True, alpha=0.3)

plt.savefig('confiança.png', dpi=300, bbox_inches='tight')
print("Imagem salva como 'confiança.png'")

plt.show()

mean_noise_conf = np.mean(conf_noise)
print(f"\nConfiança média em Ruído: {mean_noise_conf:.4f}")
print(f"Confiança média em Sinais: {np.mean(conf_signal):.4f}")
print(f"--> Sugestão para o Agente: Considerar 'Livre' se confiança < {mean_noise_conf + 0.1:.2f}")

6. Implementação de Modelo Heurístico Baseado em Regras (Rule-Based Heuristic), para alocação de espectro.

In [None]:
# 6.1. Funções auxiliares e configuração

CONFIDENCE_THRESHOLD = 0.5 

def generate_noise(shape=(1024, 2)):
    return np.random.normal(0, 0.005, size=shape)

if 'modulation_names' not in locals():
    modulation_names = ['8PSK', 'AM-DSB', 'AM-SSB', 'BPSK', 'CPFSK', 'FM', 'GFSK', 
                        'PAM4', 'QAM16', 'QAM64', 'QPSK', 'WBFM', '16APSK', '32APSK',
                        '64APSK', '128APSK', '16QAM', '32QAM', '64QAM', '128QAM',
                        '256QAM', 'AM-DSB-SC', 'AM-SSB-SC', 'OQPSK']

In [None]:
# 6.2. Classe do ambiente

class SpectrumEnvironment:
    def __init__(self, num_channels, signal_generator, mod_names):
        self.num_channels = num_channels
        self.signal_generator = signal_generator 
        self.mod_names = mod_names
        self.ground_truth = [] 
        
    def update(self):
        current_signals = []
        self.ground_truth = []
        
        occupancy_rate = 0.6 

        for i in range(self.num_channels):
            if np.random.random() < occupancy_rate:
               
                rand_batch_idx = np.random.randint(0, len(self.signal_generator))
                x_batch, y_batch = self.signal_generator[rand_batch_idx]
                
                rand_signal_idx = np.random.randint(0, len(x_batch))
                signal = x_batch[rand_signal_idx]
                
                label_idx = np.argmax(y_batch[rand_signal_idx])
                real_label = self.mod_names[label_idx]
                
                current_signals.append(signal)
                self.ground_truth.append(real_label) 
                
            else:
                noise = generate_noise()
                current_signals.append(noise)
                self.ground_truth.append("Livre")
                
        return np.array(current_signals)

    def step(self, action_channel):
        actual_state = self.ground_truth[action_channel]
        
        if actual_state == "Livre":
            return 10, "Transmissão em canal livre."
        else:
            return -50,"Interferiu com um sinal {actual_state}"

In [None]:
# 6.3. Classe do agente

class CognitiveAgent:
    def __init__(self, model, mod_names):
        self.model = model
        self.mod_names = mod_names
        self.energy_threshold = 0.01 
        
    def sense(self, signals):
        sensed_results = []
        
        predictions = self.model.predict(signals, verbose=0)
        
        for i, signal in enumerate(signals):
            energy = np.mean(np.abs(signal))
            
            if energy < self.energy_threshold:
                sensed_results.append("Livre")
                continue
    
            pred = predictions[i]
            predicted_idx = np.argmax(pred)
            predicted_label = self.mod_names[predicted_idx]
            
            sensed_results.append(predicted_label)
        
        return sensed_results

    def decide(self, sensed_results):
        free_channels = [i for i, status in enumerate(sensed_results) if status == "Livre"]
        
        if not free_channels:
            return None 
        
        return np.random.choice(free_channels)

In [None]:
# 6.4. Execução

NUM_CHANNELS = 5
STEPS = 15

env = SpectrumEnvironment(NUM_CHANNELS, validation_generator, modulation_names)
agent = CognitiveAgent(model, modulation_names)

print(f"Iniciando Simulação de Rádio Cognitivo ({NUM_CHANNELS} canais).")
print(f"Limiar de Confiança para 'Canal Livre': {CONFIDENCE_THRESHOLD}")
print("-" * 60)

total_score = 0

for t in range(STEPS):
    print(f"\n[Instante {t+1}/{STEPS}]")
    
    spectrum_signals = env.update()
    print(f" Realidade:   {env.ground_truth}")
    
    sensed_state = agent.sense(spectrum_signals)
    print(f" Agente Vê:   {sensed_state}")
    
    chosen_channel = agent.decide(sensed_state)
    
    if chosen_channel is not None:
        reward, msg = env.step(chosen_channel)
        print(f"  Ação: Transmitir no Canal {chosen_channel}")
        print(f"  {msg}")
    else:
        reward = 0
        print(" Ação: Aguardar (Nenhum canal livre seguro)")
        
    total_score += reward

print("\n" + "="*60)
print(f" Pontuação Final: {total_score}")
if total_score > 0:
    print("Conclusão: O agente conseguiu operar com sucesso e evitar a maioria das colisões.")
else:
    print("Conclusão: O agente teve muitas colisões. Talvez seja necessário ajustar o limiar.")