Skip to content

Commit

Permalink
Partition Probes
Browse files Browse the repository at this point in the history
Probes are partitioned to reduce the network traffic load on each core
simulating a probe.
  • Loading branch information
mundya committed Oct 9, 2015
1 parent 8cadf19 commit 0f248fd
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 151 deletions.
Binary file modified nengo_spinnaker/binaries/nengo_value_sink.aplx
Binary file not shown.
132 changes: 90 additions & 42 deletions nengo_spinnaker/operators/value_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from nengo_spinnaker import regions
from nengo_spinnaker.regions.filters import make_filter_regions
from nengo_spinnaker.netlist import Vertex
from nengo_spinnaker import partition_and_cluster as partition
from nengo_spinnaker.utils.application import get_application
from nengo_spinnaker.utils.type_casts import fix_to_np
from nengo_spinnaker.utils.netlist import slice_accepts_signal


class ValueSink(object):
Expand All @@ -34,6 +36,9 @@ def __init__(self, probe, dt):
else:
self.sample_every = int(np.round(probe.sample_every / dt))

self.vertices = list()
self.vertices_memory = dict()

def make_vertices(self, model, n_steps): # TODO remove n_steps
"""Construct the data which can be loaded into the memory of a
SpiNNaker machine.
Expand All @@ -44,68 +49,111 @@ def make_vertices(self, model, n_steps): # TODO remove n_steps
self.filter_region, self.filter_routing_region = make_filter_regions(
signals_conns, model.dt, True, model.keyspaces.filter_routing_tag)

# Use a matrix region to record into (slightly unpleasant)
# Use a matrix region to record into (slightly unpleasant), this is
# sliced by column to account for partitioning of the probe.
self.recording_region = regions.MatrixRegion(
np.zeros((self.size_in, n_steps), dtype=np.uint32)
np.zeros((self.size_in, n_steps), dtype=np.uint32),
sliced_dimension=regions.MatrixPartitioning.columns
)

# This isn't partitioned, so we just compute the SDRAM requirement and
# return a new vertex.
self.system_region = SystemRegion(model.machine_timestep, self.size_in)

self.regions = [None] * 15
self.regions[0] = self.system_region
self.regions[1] = self.filter_region
self.regions[2] = self.filter_routing_region
self.regions[14] = self.recording_region # **YUCK**
resources = {
Cores: 1,
SDRAM: regions.utils.sizeof_regions(self.regions, None)
}
# Create the system region
self.system_region = SystemRegion(model.machine_timestep)

# Store references to all the regions
self.regions = [self.system_region,
self.filter_region,
self.filter_routing_region,
self.recording_region]

self.vertex = Vertex(get_application("value_sink"), resources)
# Partition by number of packets received and space in SDRAM
sdram_constraint = partition.Constraint(8 * 2**20) # Max 8MiB
packets_constraint = partition.Constraint(16) # Max 16 packets/step

# Return the spec
return netlistspec(self.vertex, self.load_to_machine,
constraints = {
sdram_constraint: lambda s: (
regions.utils.sizeof_regions(self.regions, s)),
packets_constraint: lambda s: s.stop - s.start
}
for sl in partition.partition(slice(0, self.size_in), constraints):
# Get the resources for this slice of the probe
resources = {
Cores: 1,
SDRAM: regions.utils.sizeof_regions(self.regions, sl),
}

# Add a new vertex for this slice
self.vertices.append(SlicedProbeVertex(sl, resources))

# Return the specification for the netlist
return netlistspec(self.vertices, self.load_to_machine,
after_simulation_function=self.after_simulation)

def load_to_machine(self, netlist, controller):
"""Load the ensemble data into memory."""
# Assign SDRAM for each memory region and create the application
# pointer table.
region_memory = regions.utils.create_app_ptr_and_region_files(
netlist.vertices_memory[self.vertex], self.regions, None)

# Write in each region
for region, mem in zip(self.regions[:3], region_memory):
if region is not None:
region.write_subregion_to_file(mem, slice(None))

# Store the location of the recording region
self.recording_region_mem = region_memory[14]
for vertex in self.vertices:
# Assign SDRAM for each memory region and create the application
# pointer table.
region_memory = regions.utils.create_app_ptr_and_region_files(
netlist.vertices_memory[vertex], self.regions,
vertex.in_slice
)

# Write in each region apart from the recording region
for region, mem in zip(self.regions[:-1], region_memory):
if region is not None:
region.write_subregion_to_file(mem, slice(None))

# Store the location of the recording region
self.vertices_memory[vertex] = region_memory[-1]

def after_simulation(self, netlist, simulator, n_steps):
"""Retrieve data from a simulation."""
self.recording_region_mem.seek(0)
recorded_data = fix_to_np(np.frombuffer(
self.recording_region_mem.read(n_steps * self.size_in * 4),
dtype=np.int32)).reshape(n_steps, self.size_in)

# Prepare to store the data
data = np.zeros((n_steps, self.size_in))

# Load in data from each vertex in turn
for vertex in self.vertices:
mem = self.vertices_memory[vertex]
mem.seek(0)

size = vertex.in_slice.stop - vertex.in_slice.start
read_data = mem.read(n_steps * size * 4)
read_matr = np.frombuffer(read_data, dtype=np.uint32)
read_matr.shape = (n_steps, size)
data[:, vertex.in_slice] = fix_to_np(read_matr)

# Either add the new data to the simulator or append it to the data
# that's already there.
if self.probe not in simulator.data:
simulator.data[self.probe] = recorded_data
simulator.data[self.probe] = data
else:
full_data = np.vstack([simulator.data[self.probe], recorded_data])
full_data = np.vstack([simulator.data[self.probe], data])
simulator.data[self.probe] = full_data


class SlicedProbeVertex(Vertex):
"""Sliced vertex representing a probe."""
def __init__(self, in_slice, resources):
super(SlicedProbeVertex, self).__init__(get_application("value_sink"),
resources)
self.in_slice = in_slice

def accepts_signal(self, signal_params, transmission_params):
"""Determine whether to accept the signal or not."""
return slice_accepts_signal(self.in_slice, signal_params,
transmission_params)


class SystemRegion(regions.Region):
"""System region for a value sink."""
def __init__(self, timestep, size_in):
def __init__(self, timestep):
self.timestep = timestep
self.size_in = size_in

def sizeof(self, *args):
return 8 # 2 words
return 12 # 2 words

def write_subregion_to_file(self, fp, *args):
fp.write(struct.pack("<2I", self.timestep, self.size_in))
def write_subregion_to_file(self, fp, in_slice, *args):
fp.write(struct.pack(
"<3I", self.timestep, in_slice.start,
in_slice.stop - in_slice.start)
)
24 changes: 24 additions & 0 deletions nengo_spinnaker/utils/netlist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import numpy as np

from nengo_spinnaker.builder.connection import (
EnsembleTransmissionParameters, PassthroughNodeTransmissionParameters)


def slice_accepts_signal(in_slice, signal_params, transmission_params):
"""Determine whether a signal should be accepted by the input slice of a
vertex.
"""
if isinstance(transmission_params, EnsembleTransmissionParameters):
# If the connection is from an ensemble only return true if the
# decoders contain non-zero values in the input dimensions we care
# about.
return np.any(transmission_params.decoders[:, in_slice])
elif isinstance(transmission_params,
PassthroughNodeTransmissionParameters):
# If the connection is from a Node of some variety then only return
# true if the transform contains non-zero values in the rows which
# relate to the subspace we receive input in.
return np.any(transmission_params.transform[in_slice])

# We don't know how to interpret the transmission parameters
raise NotImplementedError
92 changes: 67 additions & 25 deletions spinnaker_components/value_sink/value_sink.c
Original file line number Diff line number Diff line change
@@ -1,49 +1,90 @@
#include "value_sink.h"
#include "spin1_api.h"
#include "common-typedefs.h"
#include "common-impl.h"

address_t rec_start, rec_curr;
uint n_dimensions;
value_t *input;
#include "input_filtering.h"

if_collection_t g_input;
/*****************************************************************************/
// System region
typedef struct _region_system_t
{
uint32_t timestep;

struct
{
uint32_t offset; // Index of the first dimension
uint32_t n_dims; // Number of dimensions represented
} input_subspace; // Subspace of the input managed by this instance
} region_system_t;
region_system_t value_sink; // Parameters for this instance

void sink_update(uint ticks, uint arg1) {
address_t rec_start, rec_curr; // Pointers into SDRAM
value_t *input; // Currently received value
if_collection_t filters; // Input filters
/*****************************************************************************/

/*****************************************************************************/
// Timer tick
void timer_tick(uint ticks, uint arg1)
{
// End the simulation if we've run sufficient ticks
use(arg1);
if (simulation_ticks != UINT32_MAX && ticks >= simulation_ticks) {
if (simulation_ticks != UINT32_MAX && ticks >= simulation_ticks)
{
spin1_exit(0);
}

// Filter inputs, write the latest value to SRAM
input_filtering_step(&g_input);
spin1_memcpy(rec_curr, input, n_dimensions * sizeof(value_t));
rec_curr = &rec_curr[n_dimensions];
// Filter inputs, write the latest value to SDRAM
input_filtering_step(&filters);
spin1_memcpy(
rec_curr, filters.output,
value_sink.input_subspace.n_dims * sizeof(value_t)
);

// Progress the pointer into SDRAM
rec_curr = &rec_curr[value_sink.input_subspace.n_dims];
}
/*****************************************************************************/

void mcpl_callback(uint key, uint payload) {
input_filtering_input(&g_input, key, payload);
/*****************************************************************************/
// Receive a multicast packet
void mcpl_callback(uint key, uint payload)
{
input_filtering_input_with_dimension_offset(
&filters, key, payload,
value_sink.input_subspace.offset, // Offset for all packets
value_sink.input_subspace.n_dims - 1 // Max expected dimension
);
}
/*****************************************************************************/

/*****************************************************************************/
void c_main(void)
{
address_t address = system_load_sram();

// Load parameters and filters
region_system_t *pars = (region_system_t *) region_start(1, address);
n_dimensions = pars->n_dimensions;
input_filtering_initialise_output(&g_input, n_dimensions);
input = g_input.output;
// Copy in the system region
spin1_memcpy(region_start(1, address), &value_sink, sizeof(region_system_t));

if (input == NULL) {
// Prepare to receive values
input_filtering_initialise_output(&filters,
value_sink.input_subspace.n_dims);
if (filters.output == NULL)
{
return;
}

input_filtering_get_filters(&g_input, region_start(2, address));
input_filtering_get_routes(&g_input, region_start(3, address));
rec_start = region_start(15, address);
// Load the filter information
input_filtering_get_filters(&filters, region_start(2, address));
input_filtering_get_routes(&filters, region_start(3, address));

// Point at the start of the recording region
rec_start = region_start(4, address);

// Set up callbacks, start
spin1_set_timer_tick(pars->timestep);
// Set up callbacks
spin1_set_timer_tick(value_sink.timestep);
spin1_callback_on(MCPL_PACKET_RECEIVED, mcpl_callback, -1);
spin1_callback_on(TIMER_TICK, sink_update, 2);
spin1_callback_on(TIMER_TICK, timer_tick, 2);

while(true)
{
Expand All @@ -60,3 +101,4 @@ void c_main(void)
spin1_start(SYNC_WAIT);
}
}
/*****************************************************************************/
15 changes: 0 additions & 15 deletions spinnaker_components/value_sink/value_sink.h

This file was deleted.

0 comments on commit 0f248fd

Please sign in to comment.