In [1]:
from superneuromat.neuromorphicmodel import NeuromorphicModel
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field


# from: https://stackoverflow.com/a/21894086/2712730
class bidict(dict):
    def __init__(self, *args, **kwargs):
        super(bidict, self).__init__(*args, **kwargs)
        self.inverse = {}
        for key, value in self.items():
            self.inverse.setdefault(value, []).append(key)

    def __setitem__(self, key, value):
        if key in self:
            self.inverse[self[key]].remove(key)
        super(bidict, self).__setitem__(key, value)
        self.inverse.setdefault(value, []).append(key)

    def __delitem__(self, key):
        self.inverse.setdefault(self[key], []).remove(key)
        if self[key] in self.inverse and not self.inverse[self[key]]:
            del self.inverse[self[key]]
        super(bidict, self).__delitem__(key)


def train_test_split_indices(papers, test_size=0.2, rng=None) -> tuple[np.ndarray[int], np.ndarray[int]]:
    """
    Splits a list of papers into train and test indices.

    Parameters
    ----------
    papers : iterable of papers with ids
        List or dict of papers to split.
    test_size : float, optional
        if < 1, proportion of papers to reserve for testing, by default 0.2
        if > 1, number of papers to reserve for testing
    rng : int, optional
        Random state for the random number generator, default uses numpy's random

    Returns
    -------
    train_indices : list of int
        List of indices for the training set.
    test_indices : list of int
        List of indices for the testing set.
    """
    n = papers if isinstance(papers, int) else len(papers)
    if test_size < 1:
        test_size = int(np.floor(test_size * n))  # number of papers in test
    if rng is None:
        rng = np.random.default_rng()  # setup rng
    elif isinstance(rng, int):
        rng = np.random.default_rng(rng)

    if isinstance(papers, int):
        indices = np.arange(n)  # generate indices
    elif isinstance(papers, (list, tuple)):
        indices = papers  # assume papers is a list of indices
    else:  # assume papers is a dict or mapping of papers with a .values() method
        indices = [paper.idx for paper in papers.values()]  # grab indices from dict entries
    indices = np.asarray(indices, dtype=int)

    # shuffle and split
    rng.shuffle(indices)
    test_indices = indices[:test_size]
    train_indices = indices[test_size:]
    return train_indices, test_indices


In [2]:
rng = np.random.default_rng(2024)

In [3]:
@dataclass
class Paper:
    idx: int
    label: str
    features: tuple[bool, ...] = ()
    citations: list[int] = field(default_factory=list)

papers = {}

# Load in training data
content = pd.read_csv("data/Cora/cora/cora.content", sep="\t", header=None)
citations = pd.read_csv("data/Cora/cora/cora.cites", sep="\t", header=None)

labels = set()

for paper in content.itertuples(index=False):
    idx = paper[0]
    features = tuple([int(feature) for feature in paper[1:-1]])
    papers[idx] = Paper(idx, paper[-1], features)
    labels.add(paper[-1])

for paper_idx, citation  in citations.itertuples(index=False):
    papers[paper_idx].citations.append(citation)

labels

{'Case_Based',
 'Genetic_Algorithms',
 'Neural_Networks',
 'Probabilistic_Methods',
 'Reinforcement_Learning',
 'Rule_Learning',
 'Theory'}

In [4]:
train_idxs, test_idxs = train_test_split_indices(papers, test_size=0.2, rng=rng)

In [5]:
# Initialize our model
model = NeuromorphicModel()

# Create our output neurons, set threshold very high so that we control when they spike during training.
# dict mapping {category: neuron_id}
lbl_threshold = 2.0
strong_connection = 5.0
weak_connection = 1.0
unknown_connection = 0.00001

lbl_neurons = bidict({label: model.create_neuron(threshold=lbl_threshold) for label in labels})

# Create our input neurons, one for each pixel of the image resolution.
paper_neurons = {}  # dict mapping {paper_id: neuron_id}

# make a neuron for each paper
for paper in papers.values():
    paper_neurons[paper.idx] = neuron_id = model.create_neuron()

    if paper.idx in train_idxs:
        # Make an explicitly STRONG synapse connecting the input to the output
        output_id = lbl_neurons[paper.label]
        model.create_synapse(neuron_id, output_id, weight=strong_connection, enable_stdp=False, delay=1)
    else:  # test set, connect this input neuron to all output neurons
        # Connect our input neuron to output neurons
        for output_id in lbl_neurons.values():
            # Randomize initial weight
            weight = (rng.uniform(-1, 1)) * unknown_connection
            # Make a synapse connecting the input to the output
            model.create_synapse(neuron_id, output_id, weight=weight, enable_stdp=True, delay=1)

# connect papers by their citations
for paper in papers.values():
    for citation in paper.citations:
        model.create_synapse(paper_neurons[paper.idx], paper_neurons[citation], weight=weak_connection, enable_stdp=True, delay=1)

In [6]:
# loop through our dataset and add spikes
timestep = 0
train_idxs_augmented = np.append(train_idxs, train_idxs)
for idx in train_idxs_augmented:
    paper = papers[idx]  # get paper by id
    # Add a spike that will exceed the threshold for the respective label neuron
    model.add_spike(timestep + 1, lbl_neurons[paper.label], strong_connection)
    # Add spikes to the paper
    model.add_spike(timestep, paper_neurons[paper.idx], strong_connection)
    timestep += 1

In [7]:
# Set up our stdp, only one timestep because we only want it looking at the results
# of what our input layer does to our output layer
model.stdp_setup(time_steps=2, Apos=[1e-4, 1e-4], Aneg=[1e-5, 1e-5], negative_update=True, positive_update=True)
model.setup()

print(f"Your model has {model.num_neurons} neurons and {model.num_synapses} synapses.\n{timestep} time steps will be simulated.")

from tqdm.notebook import tqdm

# Simulate
with tqdm(total=timestep) as pbar:
    model.simulate(time_steps=timestep, callback=lambda _s, _t, _n: pbar.update(), use='jit')

Your model has 2715 neurons and 11383 synapses.
4334 time steps will be simulated.


  0%|          | 0/4334 [00:00<?, ?it/s]

### inference

In [8]:
def blank_copy(model):
    new = NeuromorphicModel()
    new.num_neurons = model.num_neurons
    new.neuron_thresholds = model.neuron_thresholds.copy()
    new.neuron_leaks = model.neuron_leaks.copy()
    new.neuron_reset_states = model.neuron_reset_states.copy()
    new.neuron_refractory_periods = model.neuron_refractory_periods.copy()
    new.num_synapses = model.num_synapses
    new.pre_synaptic_neuron_ids = model.pre_synaptic_neuron_ids.copy()
    new.post_synaptic_neuron_ids = model.post_synaptic_neuron_ids.copy()
    new.synaptic_weights = model.synaptic_weights.copy()
    new.synaptic_delays = model.synaptic_delays.copy()
    new.enable_stdp = model.enable_stdp.copy()
    return new

In [154]:
seed = 10022

In [155]:
seed += 1
print(f"Seed: {seed}")
rng = np.random.default_rng(seed)
model2 = blank_copy(model)
model2.enable_stdp = np.zeros(model.num_neurons)
model2.neuron_thresholds[:7] = [99] * 7

# paper = random.choice(list(papers.values()))  # pick a random paper
# paper = papers[random.choice(train_idxs)]  # pick a random paper from training set
paper = papers[rng.choice(test_idxs)]  # pick a random paper from testing set

print(f"Paper: {paper.idx}\tCategory: {paper.label}")

spike_n = 2

for t in range(spike_n):
    model2.add_spike(t, paper_neurons[paper.idx], 10001.)

model2.setup()
model2.simulate(spike_n + 1, use='jit')

spiked_ids = {idx: lbl_neurons.inverse[idx][0]
              for idx, spiked in enumerate(model2.spike_train[-1][:7]) if spiked}
if spiked_ids:
    print(f"Spiked categories:")
    for idx, category in spiked_ids.items():
        print(f"\t{idx}\t{category}")
else:
    print("No category spiked")

print('=' * 10)

lbl_by_threshold = sorted((enumerate(model2._internal_states[:7])), key=lambda x: x[1], reverse=True)
for i, v in lbl_by_threshold:
    category = lbl_neurons.inverse[i][0]
    print(f"{i}\t{v: 5.3f}\t{category}")
print(model2._internal_states[:7])
print(model2._neuron_thresholds[:7])
for ts in model2.spike_train:
    print(ts[:7])

Seed: 10023
Paper: 1130847	Category: Genetic_Algorithms
No category spiked
1	 0.864	Neural_Networks
3	 0.864	Case_Based
4	 0.864	Genetic_Algorithms
6	 0.864	Probabilistic_Methods
5	 0.864	Theory
2	 0.864	Rule_Learning
0	 0.864	Reinforcement_Learning
[0.8642914  0.8643088  0.864296   0.8643069  0.86430476 0.86429739
 0.86430147]
[99. 99. 99. 99. 99. 99. 99.]
[0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0.]


In [151]:
test_threshold = 4.331

def evaluate_paper(paper_idx):
    model_temp = blank_copy(model)
    model_temp.enable_stdp = np.zeros(model.num_neurons)
    model_temp.neuron_thresholds[:7] = [test_threshold] * 7
    for t in range(2):
        model_temp.add_spike(t, paper_neurons[paper_idx], strong_connection)
    model_temp.setup()
    model_temp.simulate(1 + 2, use='jit')
    return model_temp.spike_train[-1][:7], model_temp.spike_train[-1][:7]

In [152]:
# single-threaded (slow, but better for debugging)
results = [evaluate_paper(paper_idx) for paper_idx in tqdm(test_idxs)]

  0%|          | 0/541 [00:00<?, ?it/s]

In [153]:
from tabulate import tabulate

tp, fp, tn, fn = 0, 0, 0, 0
correct = 0
total = len(test_idxs)

for actual_idx, spikes in zip(test_idxs, results):
    correct_label = papers[actual_idx].label
    guesses = {lbl_neurons.inverse[idx][0] for idx, spiked in enumerate(spikes[0]) if spiked}
    guesses |= {lbl_neurons.inverse[idx][0] for idx, spiked in enumerate(spikes[1]) if spiked}
    tp += correct_label in guesses
    fn += not bool(guesses)  # +1 false negative if no guesses
    fp += len([x for x in guesses if x != correct_label])
    correct += correct_label in guesses and len(guesses) == 1

n_guesses = total - fn
print(f"tp: {tp} / {n_guesses} / {total} | Perfect: {correct} / {n_guesses} / {total} (correct / attempted / total)")
print(f"tp:        {tp / n_guesses:>.6f} | Perfect: {correct / n_guesses:>.6f} | F1:        {tp / (tp + (0.5 * (fp + fn))):>.6f}")
print(f"Precision: {tp / (tp + fp):>.6f} | Recall:  {tp / (tp + fn):>.6f} | Accuracy:  {(tp + tn) / (tp + tn + fp + fn):>.6f}")

tabulate([[correct, tp, fp, tn, fn, n_guesses, total,
           model.stdp_Apos.tolist(), model.stdp_Aneg.tolist(), lbl_threshold,
           strong_connection, weak_connection, unknown_connection, test_threshold]], tablefmt="html")


tp: 262 / 293 / 541 | Perfect: 219 / 293 / 541 (correct / attempted / total)
tp:        0.894198 | Perfect: 0.747440 | F1:        0.604383
Precision: 0.733894 | Recall:  0.513725 | Accuracy:  0.433058


0,1,2,3,4,5,6,7,8,9,10,11,12,13
219,262,95,0,248,293,541,"[0.0001, 0.0001]","[1e-05, 1e-05]",2,5,1,1e-05,4.331
