In [1]:
from dataset_creation import TextDataset
from torch.utils.data import DataLoader
import numpy as np

# Instantiate the dataset
text_dataset = TextDataset(directory='data/SPGC-tokens-2018-07-18/', sequence_length=100)
print(f"Dataset created with {len(text_dataset)} sequences.")

# Create a DataLoader without a sampler
dataloader = DataLoader(text_dataset, batch_size=1)

# Iterate over a few batches and print their contents
for i, (sequences, inputs) in enumerate(dataloader):
    if i >= 2:  # Adjust this value to see more/less batches
        break

    print(f"\nBatch {i+1}")
    print(f"Inputs shape: {inputs.shape}")

    # Optionally print the actual sequences (comment out if too verbose)
    sequence = ''.join([text_dataset.idx_to_char[int(idx)] for idx in inputs[0]])
    # target = text_dataset.idx_to_char[int(targets[0])]
    print(f"Sequence: {sequence}")


Dataset created with 18422222637 sequences.

Batch 1
Inputs shape: torch.Size([1, 100])
Sequence: he little blind girl proves to be of gentle birth as well as of gentle manners only dollie by nina r

Batch 2
Inputs shape: torch.Size([1, 100])
Sequence: nose cheeks and chin nose and chin were long her were high her eyes were pale the lashes so light an


In [2]:
import torch

# Define chars using keys of char_to_idx
chars = list(text_dataset.char_to_idx.keys())

n_characters = len(chars)  # Number of unique characters
print(f"Number of unique characters: {n_characters}")
print(f"Characters: {chars}")

Number of unique characters: 70
Characters: ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ',', '.', ';', "'", '"', '?', '!', ' ']


In [3]:
import torch.nn as nn

class HebbianLinear(nn.Linear):
    def __init__(self, in_features, out_features, bias=True):
        super(HebbianLinear, self).__init__(in_features, out_features, bias)
        self.imprints = nn.Parameter(torch.zeros_like(self.weight))

    def forward(self, input):
        # print(input)
        output = super(HebbianLinear, self).forward(input)
        self.update_imprints(input, output)
        # print(output)
        return output

    def update_imprints(self, input, output):
        # print("input shape:", input.shape)
        # print("output shape:", output.shape)
    
        # Hebbian update rule: imprint = input * output
        # Adjusting to compute the required [5, 10] imprint matrix for each batch
        # Reshape input and output for broadcasting
        input_expanded = input.unsqueeze(1)  # Shape: [batch_size, 1, in_features]
        output_expanded = output.unsqueeze(2)  # Shape: [batch_size, out_features, 1]

        # Element-wise multiplication with broadcasting
        # Results in a [batch_size, out_features, in_features] tensor
        imprint_update = output_expanded * input_expanded

        # Sum over the batch dimension to get the final imprint update
        self.imprints.data = imprint_update.sum(dim=0)



    def apply_imprints(self, reward, learning_rate, imprint_rate):
        # Apply the imprints to the weights
        # self.weight.data += reward * learning_rate * self.imprints
        imprint_update = self.imprints.data
        # print("norm_imprint_update:", norm_imprint_update)

        # Apply the normalized imprints
        # The reward can be positive (for LTP) or negative (for LTD)
        self.weight.data += reward * learning_rate * imprint_update + reward * imprint_rate * imprint_update


# Example instantiation of HebbianLinear
layer = HebbianLinear(in_features=10, out_features=5)

# Checking if the shapes are the same
print("Shape of weights:", layer.weight.shape)
print("Shape of imprints:", layer.imprints.shape)
print("Are the shapes identical?", layer.weight.shape == layer.imprints.shape)

# Generate random data
input_data = torch.randn(3, 10)  # Batch size of 3, input features 10

# Pass data through the HebbianLinear layer
output = layer(input_data)

print("Weights:\n ", layer.weight)
layer.apply_imprints(reward=0.5, learning_rate=0.1, imprint_rate=0.1)
print("Weights after imprint:\n ", layer.weight)

Shape of weights: torch.Size([5, 10])
Shape of imprints: torch.Size([5, 10])
Are the shapes identical? True
Weights:
  Parameter containing:
tensor([[ 0.1692, -0.2211, -0.3019, -0.0297, -0.0795,  0.0222, -0.2560,  0.1446,
         -0.2675, -0.1766],
        [-0.1050, -0.2448,  0.2424,  0.1025, -0.1480,  0.0234, -0.1034,  0.0003,
         -0.2454, -0.2560],
        [-0.0302,  0.2932,  0.1877, -0.1359, -0.1733, -0.2020,  0.2475, -0.1967,
          0.2036, -0.0674],
        [ 0.1616,  0.1043,  0.2931, -0.2403, -0.1408,  0.1341, -0.1179, -0.2809,
          0.0460,  0.0395],
        [-0.3023,  0.2941,  0.1605,  0.2310,  0.0389, -0.0380, -0.3069,  0.2827,
         -0.0042,  0.1564]], requires_grad=True)
Weights after imprint:
  Parameter containing:
tensor([[ 0.2261, -0.3276, -0.6356,  0.0757, -0.1885, -0.0025, -0.3019, -0.0540,
         -0.3873, -0.0955],
        [-0.1774, -0.1310,  0.3206,  0.0988, -0.1224,  0.0910, -0.1225, -0.0594,
         -0.2282, -0.3740],
        [-0.0342,  0.4065,  

In [4]:
import torch.nn.functional as F

class SimpleRNN(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(SimpleRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Using HebbianLinear instead of Linear
        self.linear_layers = torch.nn.ModuleList([HebbianLinear(input_size + hidden_size, hidden_size)])
        for _ in range(1, num_layers):
            self.linear_layers.append(HebbianLinear(hidden_size, hidden_size))

        # Final layers for hidden and output, also using HebbianLinear
        self.i2h = HebbianLinear(hidden_size, hidden_size)
        self.i2o = HebbianLinear(hidden_size, output_size)
        self.softmax = torch.nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
        combined = torch.cat((input, hidden), dim=1)

        # Pass through the Hebbian linear layers with ReLU
        for layer in self.linear_layers:
            combined = layer(combined)
            combined = F.relu(combined)

        # Split into hidden and output
        hidden = self.i2h(combined)
        output = self.i2o(combined)
        # print(output)
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, self.hidden_size)

    def apply_imprints(self, reward, learning_rate, imprint_rate):
        # Apply imprints for all HebbianLinear layers
        for layer in self.linear_layers:
            layer.apply_imprints(reward, learning_rate, imprint_rate)
        self.i2h.apply_imprints(reward, learning_rate, imprint_rate)
        self.i2o.apply_imprints(reward, learning_rate, imprint_rate)


# Ensure the input size matches the number of features for each input
input_size = n_characters
output_size = n_characters
n_hidden = 128
rnn = SimpleRNN(input_size, n_hidden, output_size,3)

# Define the loss function (criterion) and optimizer
criterion = torch.nn.NLLLoss()
# optimizer = torch.optim.Adam(rnn.parameters(), lr=0.005)



# Apply Clipping
def clip_weights(model, max_norm):
    with torch.no_grad():
        for param in model.parameters():
            param.data.clamp_(-max_norm, max_norm)

# In your training loop, after the weight update step
clip_weights(rnn, max_norm=0.5)  # Choose an appropriate max_norm value


In [5]:
import torch

# Find letter index from all_letters, e.g. "a" = 0
def letterToIndex(letter):
    return text_dataset.char_to_idx[letter]

# Just for demonstration, turn a letter into a <1 x n_characters> Tensor
def letterToTensor(letter):
    tensor = torch.zeros(1, n_characters)
    tensor[0][letterToIndex(letter)] = 1
    return tensor

# Turn a line into a <line_length x 1 x n_characters>,
# or an array of one-hot letter vectors
def lineToTensor(line):
    tensor = torch.zeros(len(line), 1, n_characters)
    for li, letter in enumerate(line):
        tensor[li][0][letterToIndex(letter)] = 1
    return tensor

print(letterToTensor('J'))

print(lineToTensor('Jones').size())

tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]])
torch.Size([5, 1, 70])


In [6]:
text_dataset[3]

('ty little winifred efforts to find some children of whom she reads in a book lead to the acquaintanc',
 tensor([19, 24, 69, 11,  8, 19, 19, 11,  4, 69, 22,  8, 13,  8,  5, 17,  4,  3,
         69,  4,  5,  5, 14, 17, 19, 18, 69, 19, 14, 69,  5,  8, 13,  3, 69, 18,
         14, 12,  4, 69,  2,  7,  8, 11,  3, 17,  4, 13, 69, 14,  5, 69, 22,  7,
         14, 12, 69, 18,  7,  4, 69, 17,  4,  0,  3, 18, 69,  8, 13, 69,  0, 69,
          1, 14, 14, 10, 69, 11,  4,  0,  3, 69, 19, 14, 69, 19,  7,  4, 69,  0,
          2, 16, 20,  0,  8, 13, 19,  0, 13,  2]))

In [7]:
def randomTrainingExample():
    """Generate a random training example from the dataset"""
    sequence, line_tensor = text_dataset[np.random.randint(len(text_dataset))]
    return sequence, line_tensor

randomTrainingExample()

('well then here are his words shalt not make unto thee any graven image or any this passage refers on',
 tensor([22,  4, 11, 11, 69, 19,  7,  4, 13, 69,  7,  4, 17,  4, 69,  0, 17,  4,
         69,  7,  8, 18, 69, 22, 14, 17,  3, 18, 69, 18,  7,  0, 11, 19, 69, 13,
         14, 19, 69, 12,  0, 10,  4, 69, 20, 13, 19, 14, 69, 19,  7,  4,  4, 69,
          0, 13, 24, 69,  6, 17,  0, 21,  4, 13, 69,  8, 12,  0,  6,  4, 69, 14,
         17, 69,  0, 13, 24, 69, 19,  7,  8, 18, 69, 15,  0, 18, 18,  0,  6,  4,
         69, 17,  4,  5,  4, 17, 18, 69, 14, 13]))

In [8]:
learning_rate = 0.005 # If you set this too high, it might explode. If too low, it might not learn
imprint_rate = 0.001
last_n_rewards = [0]
last_n_reward_avg = 0
n_rewards = 100
def train(line_tensor):
    hidden = rnn.initHidden()
    rnn.zero_grad()
    losses = []
    output = None
    for i in range(line_tensor.size()[0] - 1):
        hot_input_char_tensor = torch.nn.functional.one_hot(line_tensor[i], num_classes=n_characters).type(torch.float).unsqueeze(0)
        output, hidden = rnn(hot_input_char_tensor, hidden)

        # print("output shape:", output.shape)
        # print("line_tensor shape:", line_tensor.shape)
        # print(output)
        # print(line_tensor[-1].unsqueeze(0))
        loss = criterion(output, line_tensor[-1].unsqueeze(0))
        # print(loss)

        # Convert loss to a reward signal
        reward = 1 / (1 + loss.item())  # Example conversion, assuming loss is non-negative
        # print(reward)

        # update last_n_rewards
        last_n_rewards.append(reward)
        if len(last_n_rewards) > n_rewards:
            last_n_rewards.pop(0)
        last_n_reward_avg = sum(last_n_rewards) / len(last_n_rewards)
        reward_update = reward - last_n_reward_avg
        # print(reward_update)
        clip_weights(rnn, max_norm=0.5)  # Choose an appropriate max_norm value

        # Apply Hebbian updates
        rnn.apply_imprints(reward_update, learning_rate, imprint_rate)

        losses.append(loss.item())
    loss_avg = sum(losses) / len(losses)
    return output, loss_avg


In [9]:
import time
import math

n_iters = 100000
print_every = 50
plot_every = 10



# Keep track of losses for plotting
current_loss = 0
all_losses = []

def timeSince(since):
    now = time.time()
    s = now - since
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

start = time.time()

for iter in range(1, n_iters + 1):
    sequence, line_tensor = randomTrainingExample()
    output, loss = train(line_tensor)
    current_loss += loss

    # Print ``iter`` number, loss, name and guess
    if iter % print_every == 0:
        # Use the output to generate a character prediction
        topv, topi = output.topk(1, dim=1)  # Change dim to 1
        predicted_char = text_dataset.idx_to_char[topi[0, 0].item()]
        target_char = sequence[-1]
        correct = '✓' if predicted_char == target_char else '✗ (%s)' % target_char
        print('%d %d%% (%s) %.4f %s / %s %s' % (iter, iter / n_iters * 100, timeSince(start), loss, sequence, predicted_char, correct))

        # also print some weights:
        # print("i2h weights:", rnn.i2h.weight)
        # print("i2o weights:", rnn.i2o.weight)

    # Add current loss avg to list of losses
    if iter % plot_every == 0:
        all_losses.append(current_loss / plot_every)
        current_loss = 0

50 0% (0m 3s) 4.2440 ay something to expostulate but she got a fresh start and hurried on i recognised you in that silly  / F ✗ ( )
100 0% (0m 7s) 4.2872  initial plants commemorating individual men douglas spruce coulter pine are written without the mar / n ✗ (r)
150 0% (0m 10s) 4.2138 odern norway iii the people and their industries iv on the farm manners and customs vi school and pl / B ✗ (l)
200 0% (0m 13s) 4.2438 core que pour lui est un être mère et les autres objets il leur parle ils se taisent les touche ils  / B ✗ ( )
250 0% (0m 16s) 4.2870  he helped her from the buggy at the gate chapter xxxvi on the morning following their arrival at br / F ✗ (r)


KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(all_losses)