Skip to content
Permalink
Browse files

cupSODA multi-GPU and chunk support (#409)

* CupSodaSimulator multi-GPU support
* cupSODA multi-chunk support; update docs
* Fix non-ASCII character
* Move self.gpu initialization to __init__
  • Loading branch information...
alubbock committed Mar 21, 2019
1 parent 3c68232 commit 91fefad221a4e00fc4bab405f35ba82bff2f3591
Showing with 172 additions and 86 deletions.
  1. +5 −5 pysb/simulator/base.py
  2. +157 −81 pysb/simulator/cupsoda.py
  3. +10 −0 pysb/tests/test_simulator_cupsoda.py
@@ -107,15 +107,15 @@ def __init__(self, model, tspan=None, initials=None,
self.tout = None
# Per-run initial conditions/parameter/tspan override
self._tspan = tspan
# Base initials and param values
self._initials = None
self.initials = initials
self._params = None
self.param_values = param_values
# Per-run tspan, initials and param_values
self._run_tspan = None
self._run_initials = None
self._run_params = None
# Base initials and param values
self._params = None
self.param_values = param_values
self._initials = None
self.initials = initials
# Store init kwargs and run kwargs if needed for saving results
self._init_kwargs = kwargs
self._run_kwargs = None
@@ -14,6 +14,7 @@
import shutil
from pysb.pathfinder import get_path
import sympy
import collections

try:
import pandas as pd
@@ -92,21 +93,29 @@ class CupSodaSimulator(Simulator):
verbose: bool or int
Verbosity setting. See the base class
:class:`pysb.simulator.base.Simulator` for further details.
gpu : int
Index of GPU being run on
vol : float or None
System volume
n_blocks: int
Number of GPU blocks used by the simulator.
gpu : int or list
Index of GPU being run on, or a list of integers to use multiple GPUs.
Simulations will be split equally among the of GPUs.
outdir : str
Directory where cupSODA output files are placed. Input files are
also placed here.
opts: dict
Dictionary of options for the integrator in use.
Dictionary of options for the integrator, which can include the
following:
* vol (float or None): System volume
* n_blocks (int or None): Number of GPU blocks used by the simulator
* atol (float): Absolute integrator tolerance
* rtol (float): Relative integrator tolerance
* chunksize (int or None): The maximum number of simulations to run
per GPU at one time. Set this option if your GPU is running out of
memory.
* memory_usage ('global', 'shared', or 'sharedconstant'): The type of
GPU memory to use
* max_steps (int): The maximum number of internal integrator iterations
(equivalent to LSODA's mxstep)
integrator : str
Name of the integrator in use.
default_integrator_options : dict
Nested dictionary of default options for all supported integrators.
Name of the integrator in use (only "cupsoda" is supported).
Notes
-----
@@ -124,10 +133,13 @@ class CupSodaSimulator(Simulator):
References
----------
1. Nobile M.S., Cazzaniga P., Besozzi D., Mauri G., 2014. GPU-accelerated
1. Harris, L.A., Nobile, M.S., Pino, J.C., Lubbock, A.L.R., Besozzi, D.,
Mauri, G., Cazzaniga, P., and Lopez, C.F. 2017. GPU-powered model
analysis with PySB/cupSODA. Bioinformatics 33, pp.3492-3494.
2. Nobile M.S., Cazzaniga P., Besozzi D., Mauri G., 2014. GPU-accelerated
simulations of mass-action kinetics models with cupSODA, Journal of
Supercomputing, 69(1), pp.17-24.
2. Petzold, L., 1983. Automatic selection of methods for solving stiff and
3. Petzold, L., 1983. Automatic selection of methods for solving stiff and
nonstiff systems of ordinary differential equations. SIAM journal on
scientific and statistical computing, 4(1), pp.136-148.
@@ -143,19 +155,22 @@ class CupSodaSimulator(Simulator):
'max_steps': 20000, # max # of internal iterations (LSODA's MXSTEP)
'atol': 1e-8, # absolute tolerance
'rtol': 1e-8, # relative tolerance
'chunksize': None, # Max number of simulations per GPU per run
'n_blocks': None, # number of GPU blocks
'memory_usage': 'sharedconstant'}} # see _memory_options dict

_integrator_options_allowed = {'max_steps', 'atol', 'rtol', 'n_blocks',
'memory_usage', 'vol'}
'memory_usage', 'vol', 'chunksize'}

def __init__(self, model, tspan=None, initials=None, param_values=None,
verbose=False, **kwargs):
super(CupSodaSimulator, self).__init__(model, tspan=tspan,
initials=initials,
param_values=param_values,
verbose=verbose, **kwargs)
self.gpu = kwargs.pop('gpu', 0)
self.gpu = kwargs.pop('gpu', (0, ))
if not isinstance(self.gpu, collections.Iterable):
self.gpu = [self.gpu]
self._obs_species_only = kwargs.pop('obs_species_only', True)
self._cleanup = kwargs.pop('cleanup', True)
self._prefix = kwargs.pop('prefix', self._model.name)
@@ -217,6 +232,77 @@ def __init__(self, model, tspan=None, initials=None, param_values=None,
# regex for extracting cupSODA reported running time
self._running_time_regex = re.compile(r'Running time:\s+(\d+\.\d+)')

def _run_chunk(self, gpus, outdir, chunk_idx, cmtx, sims, trajectories,
tout):
_indirs = {}
_outdirs = {}
p = {}

# Path to cupSODA executable
bin_path = get_path('cupsoda')

# Start simulations
for gpu in gpus:
_indirs[gpu] = os.path.join(outdir, "INPUT_GPU{}_{}".format(
gpu, chunk_idx))
os.mkdir(_indirs[gpu])
_outdirs[gpu] = os.path.join(outdir, "OUTPUT_GPU{}_{}".format(
gpu, chunk_idx))

# Create cupSODA input files
self._create_input_files(_indirs[gpu], sims[gpu], cmtx)

# Build command
# ./cupSODA input_model_folder blocks output_folder simulation_
# file_prefix gpu_number fitness_calculation memory_use dump
command = [bin_path, _indirs[gpu], str(self.n_blocks),
_outdirs[gpu], self._prefix, str(gpu),
'0', self._memory_usage, str(self._cupsoda_verbose)]

self._logger.info("Running cupSODA: " + ' '.join(command))

# Run simulation and return trajectories
p[gpu] = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

# Read results
for gpu in gpus:
(p_out, p_err) = p[gpu].communicate()
p_out = p_out.decode('utf-8')
p_err = p_err.decode('utf-8')
logger_level = self._logger.logger.getEffectiveLevel()
if logger_level <= logging.INFO:
run_time_match = self._running_time_regex.search(p_out)
if run_time_match:
self._logger.info('cupSODA GPU {} chunk {} reported '
'time: {} seconds'.format(
gpu,
chunk_idx,
run_time_match.group(1)))
self._logger.debug('cupSODA GPU {} chunk {} stdout:\n{}'.format(
gpu, chunk_idx, p_out))
if p_err:
self._logger.error('cupSODA GPU {} chunk {} '
'stderr:\n{}'.format(
gpu, chunk_idx, p_err))
if p[gpu].returncode:
raise SimulatorException(
"cupSODA GPU {} chunk {} exception:\n{}\n{}".format(
gpu, chunk_idx, p_out.rstip("at line"), p_err.rstrip()
)
)
tout_run, trajectories_run = self._load_trajectories(
_outdirs[gpu], sims[gpu])
if trajectories is None:
tout = tout_run
trajectories = trajectories_run
else:
tout = np.concatenate((tout, tout_run))
trajectories = np.concatenate(
(trajectories, trajectories_run))

return tout, trajectories

def run(self, tspan=None, initials=None, param_values=None):
"""Perform a set of integrations.
@@ -253,57 +339,51 @@ def run(self, tspan=None, initials=None, param_values=None):
_run_kwargs=[])

# Create directories for cupSODA input and output files
self.outdir = tempfile.mkdtemp(prefix=self._prefix + '_',
dir=self._base_dir)
_outdirs = {}
_indirs = {}

self._logger.debug("Output directory is %s" % self.outdir)
_cupsoda_infiles_dir = os.path.join(self.outdir, "INPUT")
os.mkdir(_cupsoda_infiles_dir)
self._cupsoda_outfiles_dir = os.path.join(self.outdir, "OUTPUT")
start_time = time.time()

# Path to cupSODA executable
bin_path = get_path('cupsoda')
cmtx = self._get_cmatrix()

# Create cupSODA input files
self._create_input_files(_cupsoda_infiles_dir)
outdir = tempfile.mkdtemp(prefix=self._prefix + '_',
dir=self._base_dir)
self._logger.debug("Output directory is %s" % outdir)

# Build command
# ./cupSODA input_model_folder blocks output_folder simulation_
# file_prefix gpu_number fitness_calculation memory_use dump
command = [bin_path, _cupsoda_infiles_dir, str(self.n_blocks),
self._cupsoda_outfiles_dir, self._prefix, str(self.gpu),
'0', self._memory_usage, str(self._cupsoda_verbose)]
# Set up chunking (enforce max # sims per GPU per run)
n_sims = len(self.param_values)

chunksize_gpu = self.opts.get('chunksize', None)
if chunksize_gpu is None:
chunksize_gpu = n_sims

chunksize_total = chunksize_gpu * len(self.gpu)

tout = None
trajectories = None

chunks = np.array_split(range(n_sims),
np.ceil(n_sims / chunksize_total))

try:
for chunk_idx, chunk in enumerate(chunks):
self._logger.debug('cupSODA chunk {} of {}'.format(
(chunk_idx + 1), len(chunks)))

# Split chunk equally between GPUs
sims = dict(zip(self.gpu, np.array_split(chunk,
len(self.gpu))))

tout, trajectories = self._run_chunk(
self.gpu, outdir, chunk_idx, cmtx, sims,
trajectories, tout)
finally:
if self._cleanup:
shutil.rmtree(outdir)

self._logger.info("Running cupSODA: " + ' '.join(command))
start_time = time.time()
# Run simulation and return trajectories
p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# for line in iter(p.stdout.readline, b''):
# if 'Running time' in line:
# self._logger.info(line[:-1])
(p_out, p_err) = p.communicate()
p_out = p_out.decode('utf-8')
p_err = p_err.decode('utf-8')
logger_level = self._logger.logger.getEffectiveLevel()
if logger_level <= logging.INFO:
run_time_match = self._running_time_regex.search(p_out)
if run_time_match:
self._logger.info('cupSODA reported time: {} '
'seconds'.format(run_time_match.group(1)))
self._logger.debug('cupSODA stdout:\n' + p_out)
if p_err:
self._logger.error('cupsoda strerr:\n' + p_err)
if p.returncode:
raise SimulatorException(
p_out.rstrip("at line") + "\n" + p_err.rstrip())
tout, trajectories = self._load_trajectories(
self._cupsoda_outfiles_dir)
if self._cleanup:
shutil.rmtree(self.outdir)
end_time = time.time()
self._logger.info("cupSODA + I/O time: {} seconds".format(end_time -
start_time))
self._logger.info("cupSODA + I/O time: {} seconds".format(
end_time - start_time))
return SimulationResult(self, tout, trajectories)

@property
@@ -334,7 +414,7 @@ def n_blocks(self):
threads_per_block = default_threads_per_block
else:
cuda.init()
device = cuda.Device(self.gpu)
device = cuda.Device(self.gpu[0])
attrs = device.get_attributes()
shared_memory_per_block = attrs[
cuda.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK]
@@ -361,10 +441,7 @@ def n_blocks(self, n_blocks):
raise ValueError("n_blocks must be greater than 0")
self.opts['n_blocks'] = n_blocks

def _create_input_files(self, directory):

n_sims = len(self.param_values)

def _create_input_files(self, directory, sims, cmtx):
# atol_vector
with open(os.path.join(directory, "atol_vector"), 'w') as atol_vector:
for i in range(self._len_species):
@@ -374,15 +451,14 @@ def _create_input_files(self, directory):

# c_matrix
with open(os.path.join(directory, "c_matrix"), 'w') as c_matrix:
cmtx = self._get_cmatrix()
for i in range(n_sims):
for i in sims:
line = ""
for j in range(self._len_rxns):
if j > 0:
line += "\t"
line += str(cmtx[i][j])
c_matrix.write(line)
if i < n_sims - 1:
if i != sims[-1]:
c_matrix.write("\n")

# cs_vector
@@ -436,14 +512,14 @@ def _create_input_files(self, directory):
if self.vol:
mx0 = mx0.copy()
mx0 /= (N_A * self.vol)
for i in range(n_sims):
for i in sims:
line = ""
for j in range(self._len_species):
if j > 0:
line += "\t"
line += str(mx0[i][j])
MX_0.write(line)
if i < n_sims - 1:
if i != sims[-1]:
MX_0.write("\n")

# right_side
@@ -513,7 +589,7 @@ def _get_cmatrix(self):
self._logger.debug("100%")
return c_matrix

def _load_trajectories(self, directory):
def _load_trajectories(self, directory, sims):
"""Read simulation results from output files.
Returns `tout` and `trajectories` arrays.
@@ -523,26 +599,26 @@ def _load_trajectories(self, directory):
if len(files) == 0:
raise SimulatorException(
"Cannot find any output files to load data from.")
if len(files) != len(self.param_values):
if len(files) != len(sims):
raise SimulatorException(
"Number of input files (%d) does not match number "
"Number of output files (%d) does not match number "
"of requested simulations (%d)." % (
len(files), len(self.param_values)))
len(files), len(sims)))
n_sims = len(files)
trajectories = [None] * n_sims
tout = [None] * n_sims
traj_n = np.ones((len(self.tspan), self._len_species)) * float('nan')
tout_n = np.ones(len(self.tspan)) * float('nan')
# load the data
indir_prefix = os.path.join(directory, self._prefix)
for n in range(n_sims):
trajectories[n] = traj_n.copy()
tout[n] = tout_n.copy()
filename = indir_prefix + "_" + str(n)
for idx, n in enumerate(sims):
trajectories[idx] = traj_n.copy()
tout[idx] = tout_n.copy()
filename = indir_prefix + "_" + str(idx)
if not os.path.isfile(filename):
raise Exception("Cannot find input file " + filename)
# determine optimal loading method
if n == 0:
if idx == 0:
(data, use_pandas) = self._test_pandas(filename)
# load data
else:
@@ -551,11 +627,11 @@ def _load_trajectories(self, directory):
else:
data = self._load_with_openfile(filename)
# store data
tout[n] = data[:, 0]
trajectories[n][:, self._out_species] = data[:, 1:]
tout[idx] = data[:, 0]
trajectories[idx][:, self._out_species] = data[:, 1:]
# volume correction
if self.vol:
trajectories[n][:, self._out_species] *= (N_A * self.vol)
trajectories[idx][:, self._out_species] *= (N_A * self.vol)
return np.array(tout), np.array(trajectories)

def _test_pandas(self, filename):

0 comments on commit 91fefad

Please sign in to comment.
You can’t perform that action at this time.