Skip to content

Commit

Permalink
Use a queue to speed up packet processing
Browse files Browse the repository at this point in the history
  • Loading branch information
mundya committed Nov 1, 2016
1 parent 481e296 commit 9c73d72
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 0 deletions.
Binary file modified nengo_spinnaker/binaries/nengo_ensemble.aplx
Binary file not shown.
Binary file modified nengo_spinnaker/binaries/nengo_ensemble_profiled.aplx
Binary file not shown.
76 changes: 76 additions & 0 deletions spinnaker_components/ensemble/ensemble.c
Expand Up @@ -19,10 +19,14 @@
#include "pes.h"
#include "recording.h"
#include "voja.h"
#include "packet_queue.h"

/*****************************************************************************/
// Global variables
ensemble_state_t ensemble; // Global state
packet_queue_t packets; // Queue of multicast packets
bool queue_processing; // Flag indicating that packets are being processed
unsigned int queue_overflows;

// Input filters and buffers for general and inhibitory inputs. Their outputs
// are summed into accumulators which are used to drive the standard neural input
Expand Down Expand Up @@ -368,6 +372,21 @@ static inline void decode_output_and_transmit(const ensemble_state_t *ensemble)
// Multicast packet with payload received
void mcpl_received(uint key, uint payload)
{
// Queue the packet for later processing, if no processing is scheduled then
// trigger the queue processor.
if (packet_queue_push(&packets, key, payload))
{
if (!queue_processing)
{
spin1_trigger_user_event(0, 0);
queue_processing = true;
}
}
else
{
queue_overflows++;
}
/*
// Try to receive the packet in each filter
uint32_t offset = ensemble.parameters.input_subspace.offset;
uint32_t max_dim_sub_one = ensemble.parameters.input_subspace.n_dims - 1;
Expand All @@ -376,6 +395,50 @@ void mcpl_received(uint key, uint payload)
offset, max_dim_sub_one
);
input_filtering_input(&filter_routing_unsliced_inputs, key, payload);
*/
}

void process_queue()
{
uint32_t offset = ensemble.parameters.input_subspace.offset;
uint32_t max_dim_sub_one = ensemble.parameters.input_subspace.n_dims - 1;

// Continuously remove packets from the queue and including them in filters
while (!packet_queue_is_empty(&packets))
{
// Pop a packet from the queue (critical section)
packet_t packet;
uint cpsr = spin1_fiq_disable();
bool packet_is_valid = packet_queue_pop(&packets, &packet);
spin1_mode_restore(cpsr);

// Process the received packet
if (packet_is_valid)
{
uint32_t key = packet.key;
uint32_t payload = packet.payload;

input_filtering_input_with_dimension_offset(
&filter_routing_sliced_inputs, key, payload,
offset, max_dim_sub_one
);
input_filtering_input(&filter_routing_unsliced_inputs, key, payload);
}
else
{
io_printf(IO_BUF, "Popped packet from empty queue\n");
rt_error(RTE_ABORT);
}
}
queue_processing = false;
}

void user_event(uint arg0, uint arg1)
{
use(arg0);
use(arg1);

process_queue();
}
/*****************************************************************************/

Expand Down Expand Up @@ -517,6 +580,8 @@ void timer_tick(uint ticks, uint arg1)
// Apply filtering to the input vector
profiler_write_entry(PROFILER_ENTER | PROFILER_INPUT_FILTER);

process_queue();

input_filtering_step(&input_filters);
input_filtering_step(&inhibition_filters);
input_filtering_step_no_accumulate(&modulatory_filters);
Expand Down Expand Up @@ -784,12 +849,17 @@ void c_main(void)

// --------------------------------------------------------------------------

queue_processing = false;
packet_queue_init(&packets, 1024);
queue_overflows = 0;

// --------------------------------------------------------------------------
// Prepare callbacks
spin1_set_timer_tick(ensemble.parameters.machine_timestep);
spin1_callback_on(TIMER_TICK, timer_tick, 2);
spin1_callback_on(DMA_TRANSFER_DONE, dma_complete, 0);
spin1_callback_on(MCPL_PACKET_RECEIVED, mcpl_received, -1);
spin1_callback_on(USER_EVENT, user_event, 1);
// --------------------------------------------------------------------------

// --------------------------------------------------------------------------
Expand All @@ -806,6 +876,12 @@ void c_main(void)
record_buffer_reset(&record_spikes);
record_buffer_reset(&record_voltages);

if (queue_overflows)
{
io_printf(IO_BUF, "Queue overflows = %u\n", queue_overflows);
rt_error(RTE_ABORT);
}

// Perform the simulation
spin1_start(SYNC_WAIT);
}
Expand Down
88 changes: 88 additions & 0 deletions spinnaker_components/ensemble/packet_queue.h
@@ -0,0 +1,88 @@
/* Methods and structures required to handle a queue of packets.
*/

#ifndef __PACKET_QUEUE_H__
#define __PACKET_QUEUE_H__

#include <stdbool.h>
#include <stdint.h>
#include "nengo-common.h"

typedef struct
{
uint32_t key;
uint32_t payload;
} packet_t;

// Packet queue structure (really a stack)
typedef struct
{
packet_t *packets; // The queue
unsigned int length; // Length of the queue
unsigned int current; // Current position in the queue
} packet_queue_t;


// Create an initialise a packet queue
static inline void packet_queue_init(packet_queue_t *queue,
unsigned int length)
{
// Allocate space for the queue
MALLOC_OR_DIE(queue->packets, length * sizeof(packet_t));

// Store the length and current position
queue->length = length;
queue->current = 0;
}


// Add a packet to the queue
static inline bool packet_queue_push(packet_queue_t *queue,
uint32_t key, uint32_t payload)
{
if (queue->current < queue->length)
{
// Add the packet to the queue if it isn't full
queue->packets[queue->current].key = key;
queue->packets[queue->current].payload = payload;
queue->current++;
return true;
}
else
{
// Otherwise return false to indicate that the queue was full
return false;
}
}


// Pop a packet from the queue, returning true or false to indicate whether
// this succeeded.
static inline bool packet_queue_pop(packet_queue_t *queue,
packet_t *dest)
{
if (queue->current > 0)
{
// Decrement the current index and then copy the key and payload to the
// destination.
queue->current--;
dest->key = queue->packets[queue->current].key;
dest->payload = queue->packets[queue->current].payload;

return true; // Indicate that a packet was popped
}
else
{
// Otherwise return false to indicate that the queue was empty
return false;
}
}


// Query if a queue is empty
static inline bool packet_queue_is_empty(packet_queue_t *queue)
{
return queue->current > 0;
}

#endif // __PACKET_QUEUE_H__

0 comments on commit 9c73d72

Please sign in to comment.