In [10]:
# Constants equivalent to macros
TICKS_PER_MS = 1000000
GNATS = 1
CDH = 2

# Type aliases
# In Python, explicit type definitions are not necessary as it's a dynamically typed language.
# However, for clarity, you can document the intended types using comments or Python type hints.
tstamp_t = int  # spike timestamp (comment indicating intended type)
idx_t = int
real_t = float


In [11]:
import math
import csv

def gamma(t1, t2, weight, delay, tau):
    theta = 1 if (t2 - t1) >= delay else 0
    res = -math.log(weight * theta * math.exp(-(t2 - t1 - delay) / tau))
    return res


In [22]:
class SpikeRaster:
    def __init__(self, n_neurons):
        self.n_neurons = n_neurons
        self.evtlist = [set() for _ in range(n_neurons)]

    def read_event_file(self, fname):
        try:
            with open(fname, 'r') as infile:
                csv_reader = csv.reader(infile)
                header = next(csv_reader)  # Skip the header row

                if header != ["event_type", "timestamp", "neuron_index"]:
                    raise ValueError("File format is incorrect")

                print(f"Opened file: {fname}")
                for row in csv_reader:
                    if len(row) != 3:
                        print("Invalid row format, skipping:", ','.join(row))
                        continue

                    evttype, evtstamp, evtidx = row
                    evttype = int(evttype)
                    evtstamp = int(float(evtstamp) * TICKS_PER_MS)  # Convert milliseconds to ticks
                    evtidx = int(evtidx)

                    if evtidx >= self.n_neurons:
                        print("Neuron index of event greater than number of neurons; ignoring..")
                        continue

                    if evttype == 0:
                        self.evtlist[evtidx].add(evtstamp)

        except IOError:
            print(f"Error opening event file: {fname}")
            sys.exit(1)
        except ValueError as e:
            print(str(e))
            sys.exit(1)

    def get_spikes_in_range(self, neuron_idx, low, high):
        neuron_spikes = self.evtlist[neuron_idx]

        # Use a list comprehension to filter spikes within the specified range
        return [timestamp for timestamp in neuron_spikes if low <= timestamp <= high]


In [23]:
test_spike_raster = SpikeRaster(10)
test_spike_raster.read_event_file('test_spikes.csv')
test_spike_raster.get_spikes_in_range(1, 0, 100000000)

Opened file: test_spikes.csv


[35000000, 77000000, 94000000]

In [36]:
from collections import namedtuple

# Using namedtuple for the edge structure
Edge = namedtuple('Edge', ['idx', 'weight', 'delay'])

class Network:
    def __init__(self, n_neurons):
        self.n_neurons = n_neurons
        self.presynaptic_edges = [[] for _ in range(n_neurons)]

    def read_connectivity_csv(self, fname):
        try:
            with open(fname, 'r') as infile:
                reader = csv.reader(infile)

                self.presynaptic_edges = []

                for row in reader:
                    n_edges = int(row[0])
                    edge_list = []

                    for i in range(n_edges):
                        idx = int(row[1 + i * 3])
                        weight = float(row[2 + i * 3])
                        delay = float(row[3 + i * 3])
                        edge_list.append(Edge(idx, weight, delay))

                    self.presynaptic_edges.append(edge_list)

                self.n_targets = len(self.presynaptic_edges)

        except IOError:
            print(f"Error opening connectivity file: {fname}")
            sys.exit(1)
        except ValueError as e:
            print(str(e))
            sys.exit(1)


    def read_connectivity(self, fname):
        try:
            with open(fname, 'r') as infile:
                print(f"Opened connectivity file: {fname}")
                reader = csv.reader(infile)

                # Initialize edge lists
                self.presynaptic_edges = [[] for _ in range(self.n_neurons)]

                for row in reader:
                    if len(row) != 4:
                        print(f"Invalid row format, skipping: {','.join(row)}")
                        continue

                    src_idx, tgt_idx, weight, delay = row
                    src_idx = int(src_idx)
                    tgt_idx = int(tgt_idx)
                    weight = float(weight)
                    delay = float(delay)

                    edge = Edge(src_idx, weight, delay)
                    self.presynaptic_edges[tgt_idx].append(edge)

        except IOError:
            print(f"Error opening connectivity file: {fname}")
            sys.exit(1)

    def compute_activity_threads(self, raster, fname, gamma_thresh, temporal_radius, tau, func):
        if self.n_neurons < raster.n_neurons:
            print("Number of neurons in connectivity file is less than the number of neurons in the raster")
            sys.exit(1)

        try:
            with open(fname, 'w', newline='') as outfile:
                if raster.n_neurons > 0:
                    for neuron_idx in range(raster.n_neurons):
                        self.emit_causal_neighbors(raster, neuron_idx, gamma_thresh, temporal_radius, tau, func, outfile)

        except IOError:
            print("Error opening activity thread output file")
            sys.exit(1)

    # Private method (can be public as Python does not enforce access control)
    def emit_causal_neighbors(self, sr, neuron_idx, gamma_thresh, temporal_radius, tau, func, outfile):
        # Get spike train from this neuron
        postsyn_neuron_spikes = sr.evtlist[neuron_idx]

        # For each spike, find all spikes from presynaptic neurons within temporal radius
        for curr_spike in postsyn_neuron_spikes:
            # Past limit is the earliest spike time we consider
            # Clamp to zero to avoid going past start of recording
            past_limit = max(curr_spike - temporal_radius, 0)

            # Loop over presynaptic neurons
            for presyn_edge in self.presynaptic_edges[neuron_idx]:
                presyn_neuron_idx, weight, delay = presyn_edge

                # Get all spikes within temporal radius from this presynaptic neuron
                pre_spikes = sr.get_spikes_in_range(presyn_neuron_idx, past_limit, curr_spike)

                # Loop over all presynaptic spikes from this presynaptic neuron
                for pre_spike in pre_spikes:
                    g = gamma(pre_spike, curr_spike, weight, delay, tau)

                    # Check for causality
                    if g <= gamma_thresh and func == GNATS:
                        # Emit edge
                        outfile.write(f"{presyn_neuron_idx},{pre_spike},{neuron_idx},{curr_spike}\n")
                    elif func == CDH:
                        outfile.write(f"{g}\n")




In [37]:
test_network = Network(10)
test_network.read_connectivity_csv('test_connectivity.csv')
print(test_network.presynaptic_edges)

[[], [], [Edge(idx=7, weight=0.22, delay=0.02), Edge(idx=8, weight=0.78, delay=0.02), Edge(idx=1, weight=0.73, delay=0.05)], [], [Edge(idx=5, weight=0.21, delay=0.02), Edge(idx=1, weight=0.13, delay=0.03)], [], [Edge(idx=4, weight=0.73, delay=0.05), Edge(idx=2, weight=0.6, delay=0.02), Edge(idx=7, weight=0.42, delay=0.03), Edge(idx=0, weight=0.28, delay=0.02)], [], [Edge(idx=2, weight=0.9, delay=0.02), Edge(idx=0, weight=0.25, delay=0.04), Edge(idx=9, weight=0.35, delay=0.01), Edge(idx=5, weight=0.81, delay=0.03)], []]


In [43]:
test_network = Network(10)
test_network.read_connectivity('test_connectivity_2.csv')
for node in test_network.presynaptic_edges:
    print(node)

Opened connectivity file: test_connectivity_2.csv
[Edge(idx=6, weight=0.37, delay=0.04), Edge(idx=9, weight=0.66, delay=0.01)]
[Edge(idx=5, weight=0.21, delay=0.01), Edge(idx=4, weight=0.36, delay=0.04), Edge(idx=9, weight=0.2, delay=0.02)]
[Edge(idx=0, weight=0.17, delay=0.02), Edge(idx=3, weight=0.43, delay=0.03), Edge(idx=0, weight=0.79, delay=0.05), Edge(idx=1, weight=0.29, delay=0.02)]
[]
[Edge(idx=7, weight=0.12, delay=0.05), Edge(idx=1, weight=0.92, delay=0.01)]
[]
[Edge(idx=8, weight=0.78, delay=0.02), Edge(idx=0, weight=0.81, delay=0.02), Edge(idx=5, weight=0.57, delay=0.04)]
[]
[Edge(idx=1, weight=0.78, delay=0.05), Edge(idx=9, weight=0.81, delay=0.01), Edge(idx=1, weight=0.87, delay=0.04), Edge(idx=4, weight=0.26, delay=0.01)]
[Edge(idx=5, weight=0.25, delay=0.03), Edge(idx=0, weight=0.1, delay=0.02)]


In [45]:
test_network.compute_activity_threads(
    raster=test_spike_raster,
    fname='test_activity_threads_1.csv', 
    gamma_thresh=1, 
    temporal_radius=0.10, 
    tau=0.05, 
    func=1
)

In [7]:
def main(n_neurons, connection_file, spike_file, func, out_file, tau, thresh, causal_radius):
    print("Reading event file...")
    raster = SpikeRaster(n_neurons)
    raster.read_event_file(spike_file)

    print("Reading connectivity file...")
    net = Network(n_neurons)
    net.read_connectivity(connection_file)

    print("Computing activity threads...")
    net.compute_activity_threads(raster, out_file, thresh, causal_radius, tau, func)
    print("Done")