Skip to content

Commit

Permalink
Fix size selection of input communication buffer and copying of trans…
Browse files Browse the repository at this point in the history
…mitted data from input buf to module port map data array to prevent inclusion of garbage and/or breakage caused by fan-out patterns.
  • Loading branch information
lebedov committed Mar 20, 2016
1 parent d119f91 commit d00c4fc
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 213 deletions.
83 changes: 55 additions & 28 deletions neurokernel/core_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
ExceptionOnSignal, TryExceptionOnSignal
from mixins import LoggerMixin
import mpi
from tools.gpu import bufint, set_by_inds
from tools.gpu import bufint, set_by_inds, set_by_inds_from_inds
from tools.logging import setup_logger
from tools.misc import catch_exception, dtype_to_mpi
from tools.misc import catch_exception, dtype_to_mpi, renumber_in_order
from tools.mpi import MPIOutput
from pattern import Interface, Pattern
from plsel import Selector, SelectorMethods
from pm import BasePortMapper
from pm_gpu import GPUPortMapper
from routing_table import RoutingTable
from uid import uid
Expand Down Expand Up @@ -239,9 +240,6 @@ def _init_port_dicts(self):

# Extract identifiers of source ports in the current module's interface
# for all modules receiving output from the current module:
self._out_port_dict = {}
self._out_port_dict['gpot'] = {}
self._out_port_dict['spike'] = {}
self._out_port_dict_ids = {}
self._out_port_dict_ids['gpot'] = {}
self._out_port_dict_ids['spike'] = {}
Expand All @@ -260,24 +258,29 @@ def _init_port_dicts(self):

# Get ports in interface (`int_0`) connected to the current
# module that are connected to the other module via the pattern:
self._out_port_dict['gpot'][out_id] = \
pat.src_idx(int_0, int_1, 'gpot', 'gpot')
self._out_port_dict_ids['gpot'][out_id] = \
gpuarray.to_gpu(self.pm['gpot'].ports_to_inds(self._out_port_dict['gpot'][out_id]))
self._out_port_dict['spike'][out_id] = \
pat.src_idx(int_0, int_1, 'spike', 'spike')
gpuarray.to_gpu(self.pm['gpot'].ports_to_inds(pat.src_idx(int_0, int_1, 'gpot', 'gpot')))
self._out_port_dict_ids['spike'][out_id] = \
gpuarray.to_gpu(self.pm['spike'].ports_to_inds(self._out_port_dict['spike'][out_id]))
gpuarray.to_gpu(self.pm['spike'].ports_to_inds(pat.src_idx(int_0, int_1, 'spike', 'spike')))

# Extract identifiers of destination ports in the current module's
# interface for all modules sending input to the current module:
self._in_port_dict = {}
self._in_port_dict['gpot'] = {}
self._in_port_dict['spike'] = {}
self._in_port_dict_ids = {}
self._in_port_dict_ids['gpot'] = {}
self._in_port_dict_ids['spike'] = {}

# Extract indices corresponding to the entries in the transmitted
# buffers that must be copied into the input port map data arrays; these
# are needed to support fan-out:
self._in_port_dict_buf_ids = {}
self._in_port_dict_buf_ids['gpot'] = {}
self._in_port_dict_buf_ids['spike'] = {}

# Lengths of input buffers:
self._in_buf_len = {}
self._in_buf_len['gpot'] = {}
self._in_buf_len['spike'] = {}

self._in_ids = self.routing_table.src_ids(self.id)
self._in_ranks = [self.rank_to_id.inv[i] for i in self._in_ids]
for in_id in self._in_ids:
Expand All @@ -292,14 +295,26 @@ def _init_port_dicts(self):

# Get ports in interface (`int_1`) connected to the current
# module that are connected to the other module via the pattern:
self._in_port_dict['gpot'][in_id] = \
pat.dest_idx(int_0, int_1, 'gpot', 'gpot')
self._in_port_dict_ids['gpot'][in_id] = \
gpuarray.to_gpu(self.pm['gpot'].ports_to_inds(self._in_port_dict['gpot'][in_id]))
self._in_port_dict['spike'][in_id] = \
pat.dest_idx(int_0, int_1, 'spike', 'spike')
gpuarray.to_gpu(self.pm['gpot'].ports_to_inds(pat.dest_idx(int_0, int_1, 'gpot', 'gpot')))
self._in_port_dict_ids['spike'][in_id] = \
gpuarray.to_gpu(self.pm['spike'].ports_to_inds(self._in_port_dict['spike'][in_id]))
gpuarray.to_gpu(self.pm['spike'].ports_to_inds(pat.dest_idx(int_0, int_1, 'spike', 'spike')))

# Get the integer indices associated with the connected source ports
# in the pattern interface connected to the source module `in_d`;
# these are needed to copy received buffer contents into the current
# module's port map data array:
self._in_port_dict_buf_ids['gpot'][in_id] = \
np.array(renumber_in_order(BasePortMapper(pat.gpot_ports(int_0).to_tuples()).
ports_to_inds(pat.src_idx(int_0, int_1, 'gpot', 'gpot', duplicates=True))))
self._in_port_dict_buf_ids['spike'][in_id] = \
np.array(renumber_in_order(BasePortMapper(pat.spike_ports(int_0).to_tuples()).
ports_to_inds(pat.src_idx(int_0, int_1, 'spike', 'spike', duplicates=True))))

# The size of the input buffer to the current module must be the
# same length as the output buffer of module `in_id`:
self._in_buf_len['gpot'][in_id] = len(pat.src_idx(int_0, int_1, 'gpot', 'gpot'))
self._in_buf_len['spike'][in_id] = len(pat.src_idx(int_0, int_1, 'spike', 'spike'))

def _init_comm_bufs(self):
"""
Expand All @@ -322,7 +337,7 @@ def _init_comm_bufs(self):
self._in_buf_mtype['gpot'] = {}
self._in_buf_mtype['spike'] = {}
for in_id in self._in_ids:
n_gpot = len(self._in_port_dict_ids['gpot'][in_id])
n_gpot = self._in_buf_len['gpot'][in_id]
if n_gpot:
self._in_buf['gpot'][in_id] = \
gpuarray.empty(n_gpot, self.pm['gpot'].dtype)
Expand All @@ -333,7 +348,7 @@ def _init_comm_bufs(self):
else:
self._in_buf['gpot'][in_id] = None

n_spike = len(self._in_port_dict_ids['spike'][in_id])
n_spike = self._in_buf_len['spike'][in_id]
if n_spike:
self._in_buf['spike'][in_id] = \
gpuarray.empty(n_spike, self.pm['spike'].dtype)
Expand Down Expand Up @@ -398,7 +413,7 @@ def _sync(self):
self._out_port_dict_ids['gpot'][dest_id],
self.data['gpot'], 'src')
if not self.time_sync:
self.log_info('gpot data being sent to %s: %s' % \
self.log_info('gpot data sent to %s: %s' % \
(dest_id, str(self._out_buf['gpot'][dest_id])))
r = MPI.COMM_WORLD.Isend([self._out_buf_int['gpot'][dest_id],
self._out_buf_mtype['gpot'][dest_id]],
Expand All @@ -409,7 +424,7 @@ def _sync(self):
self._out_port_dict_ids['spike'][dest_id],
self.data['spike'], 'src')
if not self.time_sync:
self.log_info('spike data being sent to %s: %s' % \
self.log_info('spike data sent to %s: %s' % \
(dest_id, str(self._out_buf['spike'][dest_id])))
r = MPI.COMM_WORLD.Isend([self._out_buf_int['spike'][dest_id],
self._out_buf_mtype['spike'][dest_id]],
Expand Down Expand Up @@ -442,10 +457,22 @@ def _sync(self):

# Copy received elements into the current module's data array:
for src_id in self._in_ids:
ind_in_gpot = self._in_port_dict_ids['gpot'][src_id]
self.pm['gpot'].set_by_inds(ind_in_gpot, self._in_buf['gpot'][src_id])
ind_in_spike = self._in_port_dict_ids['spike'][src_id]
self.pm['spike'].set_by_inds(ind_in_spike, self._in_buf['spike'][src_id])
if self._in_buf['gpot'][src_id] is not None:
if not self.time_sync:
self.log_info('gpot data received from %s: %s' % \
(src_id, str(self._in_buf['gpot'][src_id])))
set_by_inds_from_inds(self.data['gpot'],
self._in_port_dict_ids['gpot'][src_id],
self._in_buf['gpot'][src_id],
self._in_port_dict_buf_ids['gpot'][src_id])
if self._in_buf['spike'][src_id] is not None:
if not self.time_sync:
self.log_info('spike data received from %s: %s' % \
(src_id, str(self._in_buf['spike'][src_id])))
set_by_inds_from_inds(self.data['spike'],
self._in_port_dict_ids['spike'][src_id],
self._in_buf['spike'][src_id],
self._in_port_dict_buf_ids['spike'][src_id])

# Save timing data:
if self.time_sync:
Expand Down
185 changes: 0 additions & 185 deletions tests/test_core.py

This file was deleted.

0 comments on commit d00c4fc

Please sign in to comment.