Define DG block

In [1]:
import torch
import torch.nn as nn

class DGModel(nn.Module):
    """
    Goal: separation
    Methods: sparsity
    """
    
    def __init__(self, input_size, output_size):
        super(DGModel, self).__init__()
        hidden_layer_size = input_size + output_size // 2
        self.fc1 = nn.Linear(input_size, hidden_layer_size)
        self.fc2 = nn.Linear(hidden_layer_size, output_size)
        
        # Weight initialization (Xavier)
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)
        
    def forward(self, x):
        with torch.no_grad():  # Ensure no gradients are calculated for this model
            x = self.fc1(x)
            x = torch.tanh(x)  # Using tanh activation
            x = self.fc2(x)
        return x


N = 25  # Input size
M = 25 * 100   # Output size

dg_model = DGModel(N, M)


We need some data to test these blocks

In [2]:
# super simple ARC-like example
input_1 = torch.tensor([[[[0, 0, 0, 0, 0],
                          [0, 0, 0, 0, 0],
                          [0, 0, 1, 0, 0],
                          [0, 0, 0, 0, 0],
                          [0, 0, 0, 0, 0]]]], dtype=torch.float)

output_1 = torch.tensor([[[[0, 0, 1, 0, 0],
                           [0, 0, 1, 0, 0],
                           [1, 1, 1, 1, 1],
                           [0, 0, 1, 0, 0],
                           [0, 0, 1, 0, 0]]]], dtype=torch.float)

input_2 = torch.tensor([[[[0, 0, 0, 0, 0],
                          [0, 0, 0, 0, 0],
                          [0, 0, 0, 0, 0],
                          [0, 0, 0, 0, 0],
                          [0, 0, 0, 0, 1]]]], dtype=torch.float)

output_2 = torch.tensor([[[[0, 0, 0, 0, 1],
                           [0, 0, 0, 0, 1],
                           [0, 0, 0, 0, 1],
                           [0, 0, 0, 0, 1],
                           [1, 1, 1, 1, 1]]]], dtype=torch.float)

We are going to work with 1D arrays. 

DG block creates a sparse representation of the input

In [3]:
y_1 = dg_model(input_1.flatten())
print(y_1)

tensor([-0.0403,  0.0010,  0.0994,  ..., -0.0101, -0.1315, -0.0216])


In [4]:
class EncodingModel(nn.Module):
    """
    Goal: pattern retrieval
    Methods: compare sparse representation with the original
    """
    
    def __init__(self, input_size, output_size):
        super(EncodingModel, self).__init__()
        hidden_layer_size = output_size
        self.fc1 = nn.Linear(input_size, hidden_layer_size)
        self.fc2 = nn.Linear(hidden_layer_size, output_size)
        
    def forward(self, x):
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.fc2(x)
        return x

In [5]:

H = N + M  # Input size
L = M   # Output size

ec_model = EncodingModel(H, L)

In [11]:
y_hat = ec_model(torch.cat([y_1, input_1.flatten()]))
print(y_hat)

tensor([ 0.0070,  0.0538,  0.0114,  ...,  0.0256,  0.0587, -0.0106],
       grad_fn=<AddBackward0>)


In [12]:
class HopfieldNetwork(nn.Module):
    def __init__(self, num_neurons):
        super(HopfieldNetwork, self).__init__()
        self.num_neurons = num_neurons
        
        # Initialize the weight matrix (symmetry will be enforced)
        self.weights = torch.zeros(num_neurons, num_neurons)

    def store_pattern(self, patterns):
        """
        Store patterns into the weight matrix using Hebbian learning.
        Patterns should be a tensor of shape (num_patterns, num_neurons).
        """
        num_patterns = patterns.size(0)
        for i in range(num_patterns):
            p = patterns[i].unsqueeze(1)  # Make it a column vector
            self.weights += torch.mm(p, p.T)  # Hebbian learning rule
        # Ensure weights on the diagonal are zero
        self.weights.fill_diagonal_(0)
        self.weights /= num_patterns  # Normalize by the number of patterns

    def forward(self, input_state, steps=10):
        """
        Forward pass: iteratively update the states of the network.
        Returns early if a stable state is reached.
        """
        state = input_state.clone()
        prev_state = state.clone()

        for _ in range(steps):
            # Update all neurons at once for simplicity in backpropagation
            net_input = torch.matmul(self.weights, state)  # Weighted sum of inputs
            state = torch.tanh(net_input)  # Continuous activation (differentiable)
        
#             # For each neuron, update its state
#             for i in range(self.num_neurons):
#                 net_input = torch.dot(self.weights[i], state)  # Weighted sum of inputs
#                 state[i] = 1 if net_input >= 0 else -1  # Update rule (sign function)

            # Early stopping if state does not change
            if torch.equal(state, prev_state):
                break
            prev_state = state.clone()

        return state

In [13]:
T = M + L

hopfield_model = HopfieldNetwork(T)


# Store patterns in the Hopfield network
ca3_input = torch.cat([y_1, y_hat])
hopfield_model.store_pattern(patterns)

# Random initial state (T neurons)
input_state = torch.tensor([1, 1, -1, 1, -1, -1, 1, 1], dtype=torch.float32)

# Forward pass through the Hopfield network (recurrent updates)
output_state = hopfield_model(input_state, steps=10)
print(output_state)

NameError: name 'patterns' is not defined

torch.Size([25])

25

2500