Skip to content

Commit

Permalink
Added experimental async mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tcstewar committed May 3, 2017
1 parent 971feeb commit db493ad
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
43 changes: 23 additions & 20 deletions nengo_spinnaker/node_io/ethernet.py
Expand Up @@ -161,26 +161,29 @@ def __init__(self, ethernet_handler):

def run(self):
while not self.halt:
# Read as many packets from the socket as we can
while True:
try:
data = self.in_sock.recv(512)
except IOError:
break # No more to read

# Unpack the data, and store it as the input for the
# appropriate Node.
packet = SCPPacket.from_bytestring(data)
values = tp.fix_to_np(
np.frombuffer(packet.data, dtype=np.int32)
)

# Get the Node
node = self.handler._node_incoming[(packet.src_x,
packet.src_y,
packet.src_cpu)]
with self.handler.node_input_lock:
self.handler.node_input[node] = values[:]
self.step()

def step(self):
# Read as many packets from the socket as we can
while True:
try:
data = self.in_sock.recv(512)
except IOError:
break # No more to read

# Unpack the data, and store it as the input for the
# appropriate Node.
packet = SCPPacket.from_bytestring(data)
values = tp.fix_to_np(
np.frombuffer(packet.data, dtype=np.int32)
)

# Get the Node
node = self.handler._node_incoming[(packet.src_x,
packet.src_y,
packet.src_cpu)]
with self.handler.node_input_lock:
self.handler.node_input[node] = values[:]

def stop(self):
"""Stop the thread from running."""
Expand Down
35 changes: 35 additions & 0 deletions nengo_spinnaker/simulator.py
Expand Up @@ -86,6 +86,7 @@ def __init__(self, network, dt=0.001, period=10.0, timescale=1.0,
io_kwargs = getconfig(network.config, Simulator, "node_io_kwargs",
dict())
self.io_controller = io_cls(**io_kwargs)
self.io_thread = None

# Calculate the machine timestep, this is measured in microseconds
# (hence the 1e6 scaling factor).
Expand Down Expand Up @@ -405,6 +406,40 @@ def close(self):
def trange(self, dt=None):
return np.arange(1, self.steps + 1) * (self.dt or dt)

def async_run_forever(self):
if self._closed:
raise Exception("Simulator has been closed and can't be used to "
"run further simulations.")

if self.io_thread is not None:
raise Exception("Simulator already running")

# Prepare the simulation
self.netlist.before_simulation(self, 0x7FFFFFFF)

# Wait for all cores to hit SYNC0 (either by remaining it or entering
# it from init)
self._wait_for_transition(AppState.init, AppState.sync0,
self.netlist.n_cores)
self.controller.send_signal("sync0")

# Get a new thread for the IO
self.io_thread = self.io_controller.spawn()

# Wait for all cores to hit SYNC1
self._wait_for_transition(AppState.sync0, AppState.sync1,
self.netlist.n_cores)
logger.info("Running simulation...")
self.controller.send_signal("sync1")

def async_update(self):
self.io_thread.step()
self.host_sim.step()

def async_halt(self):
self.controller.send_signal("stop")
self.io_thread = None


@atexit.register
def _close_open_simulators():
Expand Down

0 comments on commit db493ad

Please sign in to comment.