In [6]:
import random
import torch
from torch import optim
import torch.nn as nn
import torch.nn.functional as F
from sklearn.datasets import load_iris, load_digits, load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import math
import numpy as np

In [7]:
class SelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(SelfAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"
        
        self.query_weights = nn.Linear(embed_dim, embed_dim)
        self.key_weights = nn.Linear(embed_dim, embed_dim)
        self.value_weights = nn.Linear(embed_dim, embed_dim)
        self.fc_out = nn.Linear(embed_dim, embed_dim)
        
    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        Q = self.query_weights(x)
        K = self.key_weights(x)
        V = self.value_weights(x)
        
        Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attention_weights = torch.softmax(scores, dim=-1)
        attention_output = torch.matmul(attention_weights, V)
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_dim)
        
        out = self.fc_out(attention_output)
        return out

In [8]:
class SensorNeuron(nn.Module):
    def __init__(self, input_size, dataset_samples, threshold=0.5, num_heads=4):
        super(SensorNeuron, self).__init__()
        self.input_size = input_size
        self.dataset_samples = dataset_samples
        self.threshold = threshold
        self.sensory_db = self.initialize_sensory_neurons()
        self.synaptic_weights = torch.ones(input_size)
        self.consolidation_counter = [0] * input_size
        self.attention_layer = SelfAttention(input_size, num_heads)
        
    def generate_geometric_patterns(self, num_patterns, pattern_size):
        patterns = []
        for _ in range(num_patterns):
            pattern = np.zeros((pattern_size, pattern_size))
            # Criar um padrão geométrico simples, como uma linha diagonal
            for i in range(pattern_size):
                pattern[i, i] = 1
            patterns.append(pattern.flatten())  # Flatten para torná-los vetores
        return np.array(patterns)

    def initialize_positions(self):
        positions = {} 
        index = 0 
        for x in range(self.grid_size[0]):
            for y in range(self.grid_size[1]):
                positions[index] = (x, y) 
                index += 1
        return positions

    def initialize_sensory_neurons(self):
        sensory_neurons = []
        for _ in range(self.input_size):
            random_sample = random.choice(self.dataset_samples)  # Pega um padrão aleatório
            random_piece = np.random.choice(random_sample, self.input_size)  # Pega pedaços aleatórios
            sensory_neurons.append(random_piece)
    
        # Converte a lista para um único array numpy para melhorar o desempenho
        sensory_neurons = np.array(sensory_neurons)  
        # Converte para tensor e retorna
        return torch.tensor(sensory_neurons).float()

    def forward(self, x):
        x = x.float()  # Garantir tipo correto de entrada
        reinforcement_score = 0  # Inicia o score de reforço
    
        # Garantir que a base sensorial tenha o mesmo número de entradas no batch
        sensory_db_batch = self.sensory_db.unsqueeze(0).repeat(x.size(0), 1, 1)  # Repete a base sensorial para cada entrada no batch
    
        # Ordenar os índices dos neurônios de acordo com a consolidação (priorizando os reforçados) 
        sorted_indices = sorted(range(self.input_size), key=lambda i: self.consolidation_counter[i], reverse=True)
    
        for i in sorted_indices:
            # Processa os neurônios reforçados primeiro 
            # Aqui estamos fazendo a comparação correta entre x e sensory_db_batch para o batch
            diff = torch.abs(x - sensory_db_batch[:, i, :])  # Ajuste para a operação de diferença correta
            signal_strong = diff < self.threshold  # Verifica se a maioria dos sinais é forte (True)
        
            if signal_strong.all():
                # Se todos os valores no lote são menores que o limiar
                self.consolidation_counter[i] += 1
                reinforcement_score += 1  # Sinal forte encontrado, reforça a informação
                self.synaptic_weights[i] += 0.1  # Fortalece a conexão (LTP)
                # Enviar sinais para neurônios vizinhos
                self.send_signals(i, direction="right") 
                self.send_signals(i, direction="left") 
                self.send_signals(i, direction="up") 
                self.send_signals(i, direction="down")
            else:
                self.consolidation_counter[i] -= 1  # Sinal fraco, reduz a consolidação
                self.synaptic_weights[i] -= 0.1  # Enfraquece a conexão (LTD)
                # Se a conexão se tornar negativa, restabelece o valor mínimo
            if self.synaptic_weights[i] < 0:
                self.synaptic_weights[i] = 0  # Garante que o peso não seja negativo

            if self.consolidation_counter[i] < 0:
                sensory_db_batch = self.update_sensory_neuron(sensory_db_batch, self.dataset_samples, i)  # Atualiza com dados aleatórios 
    
        return x, reinforcement_score


    def update_sensory_neuron(self, sensory_db, dataset_samples, index):
        # Garantir que o índice esteja dentro dos limites válidos
        index = index % sensory_db.size(0)  # Use módulo para garantir que o índice fique dentro do limite
    
        random_sample = random.choice(dataset_samples)
        new_value = random.choice(random_sample)  # Valor novo para substituir
        sensory_db_batch = torch.tensor(np.array(sensory_db)).float()
        #sensory_db[index] = new_value  # Substituindo o valor na base sensorial
        return sensory_db_batch #sensory_db

    def send_signals(self, neuron_index, direction="right"):
        # Direções possíveis: "right", "left", "up", "down" 
        # Verifica a posição do neurônio e envia sinais aos vizinhos (ajuste no banco sensorial ou pesos)

        # Exemplo de posições 2D (assumindo que você tem uma estrutura 2D de neurônios, como uma matriz/grid):
        x, y = self.position[neuron_index]  # A posição do neurônio (supondo que você tenha um dicionário de posições)
        new_weight = 0.05  # O quanto o sinal pode modificar o peso sináptico (ajuste livre)

        # Enviar sinais para a direita
        if direction == "right" and y < self.grid_size[1] - 1:
            # Verifica se o vizinho à direita existe
            neighbor_index = self.get_neuron_index(x, y + 1)  # Pega o índice do vizinho à direita
            self.synaptic_weights[neighbor_index] += new_weight  # Modifica o peso sináptico

        # Enviar sinais para a esquerda
        elif direction == "left" and y > 0: 
            # Verifica se o vizinho à esquerda existe
            neighbor_index = self.get_neuron_index(x, y - 1)  # Pega o índice do vizinho à esquerda
            self.synaptic_weights[neighbor_index] += new_weight  # Modifica o peso sináptico

        # Enviar sinais para cima
        elif direction == "up" and x > 0:
            # Verifica se o vizinho acima existe
            neighbor_index = self.get_neuron_index(x - 1, y)  # Pega o índice do vizinho acima
            self.synaptic_weights[neighbor_index] += new_weight  # Modifica o peso sináptico

        # Enviar sinais para baixo
        elif direction == "down" and x < self.grid_size[0] - 1:
            # Verifica se o vizinho abaixo existe
            neighbor_index = self.get_neuron_index(x + 1, y)  # Pega o índice do vizinho abaixo
            self.synaptic_weights[neighbor_index] += new_weight  # Modifica o peso sináptico

    
    def reinforce_database(self): 
        # Caso o sinal seja encontrado mais de uma vez, a informação é consolidada no banco de dados permanente 
        for i in range(self.input_size):
            if self.consolidation_counter[i] > 5:
                # Exemplo de limiar de consolidação 
                self.sensory_db[i] = self.sensory_db[i]  # Reforço da informação 
            elif self.consolidation_counter[i] < -3:
                self.sensory_db = self.update_sensory_neuron(self.sensory_db, self.dataset_samples, i)  # Atualiza os neurônios com base no contador de consolidação 

    def consolidate_after_training(self):
        sensory_db_tensor = torch.tensor(self.sensory_db).unsqueeze(0) 
        attention_output, attention_weights = self.attention_layer(sensory_db_tensor) 
        attention_output = attention_output.squeeze(0) 
        for i in range(self.input_size):
            if self.consolidation_counter[i] > 0: 
                self.synaptic_weights[i] += 0.1 * attention_weights[0, i].item()
            elif self.consolidation_counter[i] < 0:
                self.synaptic_weights[i] -= 0.1 * attention_weights[0, i].item() 
            if self.synaptic_weights[i] < 0:
                self.synaptic_weights[i] = 0

    # Novo método para resetar os contadores de reforço
    def reset_reinforcement_scores(self):
        self.consolidation_counter = [0] * self.input_size  # Reseta os contadores de consolidação para zero.
        self.synaptic_weights = torch.ones(self.input_size)  # Reseta os pesos sinápticos para 1
        print("Contadores de reforço e pesos sinápticos resetados.")

    def verify_geometric_pattern(self, input_pattern, tolerance=0.1):
        correct = 0
        for pattern in self.sensory_db:
            if np.allclose(pattern, input_pattern, atol=tolerance):
                correct += 1
        accuracy = correct / len(self.sensory_db)
        return accuracy

class AdvancedMLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, dataset_samples, threshold=0.5):
        super(AdvancedMLP, self).__init__()
        self.sensor_neuron = SensorNeuron(input_size=input_size, dataset_samples=dataset_samples, threshold=threshold)
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x): 
        x, reinforcement_score = self.sensor_neuron(x)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return torch.log_softmax(x, dim=1), reinforcement_score

    def train_and_consolidate(self, train_data, train_labels):
        # Fase de Treinamento Diurno 
        for data, label in zip(train_data, train_labels):
            self.forward(data) 
            # Fase de Consolidação Noturna 
            self.sensor_neuron.consolidate_after_training() 
            self.sensor_neuron.reset_reinforcement_scores()

In [11]:
# Carregar o dataset digits
digits = load_digits()
X = digits.data  # Características (features)
y = digits.target  # Rótulos (labels)

# Normalizar os dados
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Converter para tensores do PyTorch
X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)

# Dividir os dados em treino e teste
X_train, X_test, y_train, y_test = train_test_split(X_tensor, y_tensor, test_size=0.2, random_state=42)

# Criar DataLoader para treino
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Criar DataLoader para teste
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Definir o modelo
input_size = X_train.shape[1]  # 4 características de entrada
hidden_size = 16
output_size = len(set(y))  # 3 classes (Setosa, Versicolor, Virginica)
dataset_samples = [list(range(10))]  # Exemplo simples, substitua conforme necessário
threshold = 0.5

model = AdvancedMLP(input_size=input_size, hidden_size=hidden_size, output_size=output_size, dataset_samples=dataset_samples, threshold=threshold)

# Definir a função de perda e o otimizador
criterion = nn.CrossEntropyLoss()  # Para classificação
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Treinamento do modelo
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    correct_preds = 0
    total_samples = 0

    for batch in train_loader:
        inputs, labels = batch
        optimizer.zero_grad()
        
        # Forward pass
        outputs, reinforcement_score = model(inputs)
        
        # Calcular a perda
        loss = criterion(outputs, labels)
        
        # Backward pass
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # Calcular acurácia
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_samples += labels.size(0)

    avg_loss = total_loss / len(train_loader)
    accuracy = 100 * correct_preds / total_samples
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")

# Avaliar o modelo nos dados de teste
model.eval()
correct_preds = 0
total_samples = 0

with torch.no_grad():
    for batch in test_loader:
        inputs, labels = batch
        outputs, _ = model(inputs)
        _, predicted = torch.max(outputs, 1)
        correct_preds += (predicted == labels).sum().item()
        total_samples += labels.size(0)

test_accuracy = 100 * correct_preds / total_samples
print(f"Test Accuracy: {test_accuracy:.2f}%")

Epoch [1/50], Loss: 2.2590, Accuracy: 18.09%
Epoch [2/50], Loss: 2.0723, Accuracy: 39.87%
Epoch [3/50], Loss: 1.8645, Accuracy: 51.64%
Epoch [4/50], Loss: 1.6292, Accuracy: 61.93%
Epoch [5/50], Loss: 1.3825, Accuracy: 71.19%
Epoch [6/50], Loss: 1.1620, Accuracy: 77.87%
Epoch [7/50], Loss: 0.9642, Accuracy: 83.23%
Epoch [8/50], Loss: 0.7998, Accuracy: 87.20%
Epoch [9/50], Loss: 0.6684, Accuracy: 89.42%
Epoch [10/50], Loss: 0.5688, Accuracy: 90.47%
Epoch [11/50], Loss: 0.4910, Accuracy: 91.51%
Epoch [12/50], Loss: 0.4285, Accuracy: 92.62%
Epoch [13/50], Loss: 0.3779, Accuracy: 93.74%
Epoch [14/50], Loss: 0.3378, Accuracy: 94.50%
Epoch [15/50], Loss: 0.3054, Accuracy: 94.99%
Epoch [16/50], Loss: 0.2781, Accuracy: 95.69%
Epoch [17/50], Loss: 0.2532, Accuracy: 96.03%
Epoch [18/50], Loss: 0.2379, Accuracy: 96.03%
Epoch [19/50], Loss: 0.2180, Accuracy: 96.17%
Epoch [20/50], Loss: 0.2033, Accuracy: 96.59%
Epoch [21/50], Loss: 0.1927, Accuracy: 96.80%
Epoch [22/50], Loss: 0.1835, Accuracy: 96.8