-
Notifications
You must be signed in to change notification settings - Fork 7
/
ethernet.py
191 lines (157 loc) · 7.11 KB
/
ethernet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import collections
import numpy as np
from rig.machine_control.consts import SCP_PORT
from rig.machine_control.packets import SCPPacket
from rig.place_and_route import Cores
from six import iteritems
import socket
import threading
from ..builder.builder import spec, ObjectPort
from ..builder.ports import InputPort, OutputPort
from ..builder.node import NodeIOController
from ..operators import SDPReceiver, SDPTransmitter
from ..utils import type_casts as tp
class Ethernet(NodeIOController):
"""Ethernet implementation of SpiNNaker to host node communication."""
def __init__(self, transmission_period=0.01):
"""Create a new Ethernet based Node communicator.
Parameters
----------
transmission_period : float
Period between transmitting SDP packets from SpiNNaker to the host
in seconds.
"""
super(Ethernet, self).__init__()
# Store ethernet specific parameters
self.transmission_period = transmission_period
self._sdp_receivers = dict()
self._sdp_transmitters = dict()
# Node -> [(Connection, (x, y, p), ...]
self._node_outgoing = collections.defaultdict(list)
# (x, y, p) -> Node
self._node_incoming = dict()
# Sockets
self._hostname = None
self.in_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.out_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def get_spinnaker_source_for_node(self, model, connection):
"""Get the source for a connection originating from a Node.
Arguments and return type are as for
:py:attr:`~nengo_spinnaker.builder.Model.source_getters`.
"""
# Create a new SDPReceiver if there isn't already one for the Node
if connection.pre_obj not in self._sdp_receivers:
receiver = SDPReceiver()
self._sdp_receivers[connection.pre_obj] = receiver
model.extra_operators.append(receiver)
return spec(ObjectPort(self._sdp_receivers[connection.pre_obj],
OutputPort.standard),
latching=True)
def get_spinnaker_sink_for_node(self, model, connection):
"""Get the sink for a connection terminating at a Node.
Arguments and return type are as for
:py:attr:`~nengo_spinnaker.builder.Model.sink_getters`.
"""
# Create a new SDPTransmitter if there isn't already one for the Node
if connection.post_obj not in self._sdp_transmitters:
transmitter = SDPTransmitter(connection.post_obj.size_in)
self._sdp_transmitters[connection.post_obj] = transmitter
model.extra_operators.append(transmitter)
return spec(ObjectPort(self._sdp_transmitters[connection.post_obj],
InputPort.standard))
def prepare(self, model, controller, netlist):
"""Prepare for simulation given the placed netlist and the machine
controller.
"""
# Store the hostname
# TODO Store a map of (x, y) to hostname
self._hostname = controller.initial_host
# Set up the IP tag (will need to do this for each ethernet connected
# chip that we expect to use).
self.in_socket.bind(('', 0))
with controller(x=0, y=0):
controller.iptag_set(1, *self.in_socket.getsockname())
# Build a map of Node to outgoing connections and SDP receivers
for node, sdp_rx in iteritems(self._sdp_receivers):
for transmission_params, vertex in \
iteritems(sdp_rx.connection_vertices):
# Get the placement and core
x, y = netlist.placements[vertex]
p = netlist.allocations[vertex][Cores].start
# Store this transmission parameters to (x, y, p) map
self._node_outgoing[node].append((
(transmission_params.pre_slice,
transmission_params.slice_in,
transmission_params.function,
transmission_params.full_transform(slice_out=False)),
(x, y, p)))
# Build a map of (x, y, p) to Node for incoming values
for node, sdp_tx in iteritems(self._sdp_transmitters):
# Get the placement and core
x, y = netlist.placements[sdp_tx._vertex]
p = netlist.allocations[sdp_tx._vertex][Cores].start
# Store this mapping (x, y, p) -> Node
self._node_incoming[(x, y, p)] = node
def set_node_output(self, node, value):
"""Transmit the value output by a Node."""
# Build an SDP packet to transmit for each outgoing connection for the
# node
for (pre_slice, slice_in, function, transform), (x, y, p) in \
self._node_outgoing[node]:
# Apply the pre-slice, the connection function and the transform.
c_value = value[pre_slice]
c_value = value[slice_in]
if function is not None:
c_value = function(c_value)
c_value = np.dot(transform, c_value)
# Transmit the packet
data = bytes(tp.np_to_fix(c_value).data)
packet = SCPPacket(dest_port=1, dest_cpu=p, dest_x=x, dest_y=y,
cmd_rc=0, arg1=0, arg2=0, arg3=0, data=data)
self.out_socket.sendto(packet.bytestring,
(self._hostname, SCP_PORT))
def spawn(self):
"""Get a new thread which will manage transmitting and receiving Node
values.
"""
return EthernetThread(self)
def close(self):
"""Close the sockets used by the ethernet Node IO."""
self.in_socket.close()
self.out_socket.close()
class EthernetThread(threading.Thread):
"""Thread which handles transmitting and receiving IO values."""
def __init__(self, ethernet_handler):
# Initialise the thread
super(EthernetThread, self).__init__(name="EthernetIO")
# Set up internal references
self.halt = False
self.handler = ethernet_handler
self.in_sock = ethernet_handler.in_socket
self.in_sock.settimeout(0.0001)
def run(self):
while not self.halt:
self.step()
def step(self):
# Read as many packets from the socket as we can
while True:
try:
data = self.in_sock.recv(512)
except IOError:
break # No more to read
# Unpack the data, and store it as the input for the
# appropriate Node.
packet = SCPPacket.from_bytestring(data)
values = tp.fix_to_np(
np.frombuffer(packet.data, dtype=np.int32)
)
# Get the Node
node = self.handler._node_incoming[(packet.src_x,
packet.src_y,
packet.src_cpu)]
with self.handler.node_input_lock:
self.handler.node_input[node] = values[:]
def stop(self):
"""Stop the thread from running."""
self.halt = True
self.join()