Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cupSODA multi-GPU and chunk support #409

Merged
merged 5 commits into from Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions pysb/simulator/base.py
Expand Up @@ -108,15 +108,15 @@ def __init__(self, model, tspan=None, initials=None,
self.tout = None
# Per-run initial conditions/parameter/tspan override
self._tspan = tspan
# Base initials and param values
self._initials = None
self.initials = initials
self._params = None
self.param_values = param_values
# Per-run tspan, initials and param_values
self._run_tspan = None
self._run_initials = None
self._run_params = None
# Base initials and param values
self._params = None
self.param_values = param_values
self._initials = None
self.initials = initials
# Store init kwargs and run kwargs if needed for saving results
self._init_kwargs = kwargs
self._run_kwargs = None
Expand Down
237 changes: 158 additions & 79 deletions pysb/simulator/cupsoda.py
Expand Up @@ -14,6 +14,7 @@
import shutil
from pysb.pathfinder import get_path
import sympy
import collections

try:
import pandas as pd
Expand Down Expand Up @@ -92,21 +93,29 @@ class CupSodaSimulator(Simulator):
verbose: bool or int
Verbosity setting. See the base class
:class:`pysb.simulator.base.Simulator` for further details.
gpu : int
Index of GPU being run on
vol : float or None
System volume
n_blocks: int
Number of GPU blocks used by the simulator.
gpu : int or list
Index of GPU being run on, or a list of integers to use multiple GPUs.
Simulations will be split equally among the of GPUs.
outdir : str
Directory where cupSODA output files are placed. Input files are
also placed here.
opts: dict
Dictionary of options for the integrator in use.
Dictionary of options for the integrator, which can include the
following:

* vol (float or None): System volume
* n_blocks (int or None): Number of GPU blocks used by the simulator
* atol (float): Absolute integrator tolerance
* rtol (float): Relative integrator tolerance
* chunksize (int or None): The maximum number of simulations to run
per GPU at one time. Set this option if your GPU is running out of
memory.
* memory_usage ('global', 'shared', or 'sharedconstant'): The type of
GPU memory to use
* max_steps (int): The maximum number of internal integrator iterations
(equivalent to LSODA's mxstep)
integrator : str
Name of the integrator in use.
default_integrator_options : dict
Nested dictionary of default options for all supported integrators.
Name of the integrator in use (only "cupsoda" is supported).

Notes
-----
Expand All @@ -124,10 +133,13 @@ class CupSodaSimulator(Simulator):
References
----------

1. Nobile M.S., Cazzaniga P., Besozzi D., Mauri G., 2014. GPU-accelerated
1. Harris, L.A., Nobile, M.S., Pino, J.C., Lubbock, A.L.R., Besozzi, D.,
Mauri, G., Cazzaniga, P., and Lopez, C.F. 2017. GPU-powered model
analysis with PySB/cupSODA. Bioinformatics 33, pp.3492-3494.
2. Nobile M.S., Cazzaniga P., Besozzi D., Mauri G., 2014. GPU-accelerated
simulations of mass-action kinetics models with cupSODA, Journal of
Supercomputing, 69(1), pp.17-24.
2. Petzold, L., 1983. Automatic selection of methods for solving stiff and
3. Petzold, L., 1983. Automatic selection of methods for solving stiff and
nonstiff systems of ordinary differential equations. SIAM journal on
scientific and statistical computing, 4(1), pp.136-148.

Expand All @@ -143,11 +155,12 @@ class CupSodaSimulator(Simulator):
'max_steps': 20000, # max # of internal iterations (LSODA's MXSTEP)
'atol': 1e-8, # absolute tolerance
'rtol': 1e-8, # relative tolerance
'chunksize': None, # Max number of simulations per GPU per run
'n_blocks': None, # number of GPU blocks
'memory_usage': 'sharedconstant'}} # see _memory_options dict

_integrator_options_allowed = {'max_steps', 'atol', 'rtol', 'n_blocks',
'memory_usage', 'vol'}
'memory_usage', 'vol', 'chunksize'}

def __init__(self, model, tspan=None, initials=None, param_values=None,
verbose=False, **kwargs):
Expand Down Expand Up @@ -215,6 +228,77 @@ def __init__(self, model, tspan=None, initials=None, param_values=None,
# regex for extracting cupSODA reported running time
self._running_time_regex = re.compile(r'Running time:\s+(\d+\.\d+)')

def _run_chunk(self, gpus, outdir, chunk_idx, cmtx, sims, trajectories,
tout):
_indirs = {}
_outdirs = {}
p = {}

# Path to cupSODA executable
bin_path = get_path('cupsoda')

# Start simulations
for gpu in gpus:
_indirs[gpu] = os.path.join(outdir, "INPUT_GPU{}_{}".format(
gpu, chunk_idx))
os.mkdir(_indirs[gpu])
_outdirs[gpu] = os.path.join(outdir, "OUTPUT_GPU{}_{}".format(
gpu, chunk_idx))

# Create cupSODA input files
self._create_input_files(_indirs[gpu], sims[gpu], cmtx)

# Build command
# ./cupSODA input_model_folder blocks output_folder simulation_
# file_prefix gpu_number fitness_calculation memory_use dump
command = [bin_path, _indirs[gpu], str(self.n_blocks),
_outdirs[gpu], self._prefix, str(gpu),
'0', self._memory_usage, str(self._cupsoda_verbose)]

self._logger.info("Running cupSODA: " + ' '.join(command))

# Run simulation and return trajectories
p[gpu] = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

# Read results
for gpu in gpus:
(p_out, p_err) = p[gpu].communicate()
p_out = p_out.decode('utf-8')
p_err = p_err.decode('utf-8')
logger_level = self._logger.logger.getEffectiveLevel()
if logger_level <= logging.INFO:
run_time_match = self._running_time_regex.search(p_out)
if run_time_match:
self._logger.info('cupSODA GPU {} chunk {} reported '
'time: {} seconds'.format(
gpu,
chunk_idx,
run_time_match.group(1)))
self._logger.debug('cupSODA GPU {} chunk {} stdout:\n{}'.format(
gpu, chunk_idx, p_out))
if p_err:
self._logger.error('cupSODA GPU {} chunk {} '
'stderr:\n{}'.format(
gpu, chunk_idx, p_err))
if p[gpu].returncode:
raise SimulatorException(
"cupSODA GPU {} chunk {} exception:\n{}\n{}".format(
gpu, chunk_idx, p_out.rstip("at line"), p_err.rstrip()
)
)
tout_run, trajectories_run = self._load_trajectories(
_outdirs[gpu], sims[gpu])
if trajectories is None:
tout = tout_run
trajectories = trajectories_run
else:
tout = np.concatenate((tout, tout_run))
trajectories = np.concatenate(
(trajectories, trajectories_run))

return tout, trajectories

def run(self, tspan=None, initials=None, param_values=None):
"""Perform a set of integrations.

Expand Down Expand Up @@ -250,58 +334,57 @@ def run(self, tspan=None, initials=None, param_values=None):
param_values=param_values,
_run_kwargs=[])

if isinstance(self.gpu, collections.Iterable):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of normalization probably belongs in __init__ -- can you just apply this directly to self.gpu so it's always a list by the time the code gets to run?

gpus = self.gpu
else:
gpus = [self.gpu]

# Create directories for cupSODA input and output files
self.outdir = tempfile.mkdtemp(prefix=self._prefix + '_',
dir=self._base_dir)
_outdirs = {}
_indirs = {}

self._logger.debug("Output directory is %s" % self.outdir)
_cupsoda_infiles_dir = os.path.join(self.outdir, "INPUT")
os.mkdir(_cupsoda_infiles_dir)
self._cupsoda_outfiles_dir = os.path.join(self.outdir, "OUTPUT")
start_time = time.time()

# Path to cupSODA executable
bin_path = get_path('cupsoda')
cmtx = self._get_cmatrix()

outdir = tempfile.mkdtemp(prefix=self._prefix + '_',
dir=self._base_dir)
self._logger.debug("Output directory is %s" % outdir)

# Create cupSODA input files
self._create_input_files(_cupsoda_infiles_dir)
# Set up chunking (enforce max # sims per GPU per run)
n_sims = len(self.param_values)

# Build command
# ./cupSODA input_model_folder blocks output_folder simulation_
# file_prefix gpu_number fitness_calculation memory_use dump
command = [bin_path, _cupsoda_infiles_dir, str(self.n_blocks),
self._cupsoda_outfiles_dir, self._prefix, str(self.gpu),
'0', self._memory_usage, str(self._cupsoda_verbose)]
chunksize_gpu = self.opts.get('chunksize', None)
if chunksize_gpu is None:
chunksize_gpu = n_sims

chunksize_total = chunksize_gpu * len(gpus)

tout = None
trajectories = None

chunks = np.array_split(range(n_sims),
np.ceil(n_sims / chunksize_total))

try:
for chunk_idx, chunk in enumerate(chunks):
self._logger.debug('cupSODA chunk {} of {}'.format(
(chunk_idx + 1), len(chunks)))

# Split chunk equally between GPUs
sims = dict(zip(gpus, np.array_split(chunk,
len(gpus))))

tout, trajectories = self._run_chunk(
gpus, outdir, chunk_idx, cmtx, sims,
trajectories, tout)
finally:
if self._cleanup:
shutil.rmtree(outdir)

self._logger.info("Running cupSODA: " + ' '.join(command))
start_time = time.time()
# Run simulation and return trajectories
p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# for line in iter(p.stdout.readline, b''):
# if 'Running time' in line:
# self._logger.info(line[:-1])
(p_out, p_err) = p.communicate()
p_out = p_out.decode('utf-8')
p_err = p_err.decode('utf-8')
logger_level = self._logger.logger.getEffectiveLevel()
if logger_level <= logging.INFO:
run_time_match = self._running_time_regex.search(p_out)
if run_time_match:
self._logger.info('cupSODA reported time: {} '
'seconds'.format(run_time_match.group(1)))
self._logger.debug('cupSODA stdout:\n' + p_out)
if p_err:
self._logger.error('cupsoda strerr:\n' + p_err)
if p.returncode:
raise SimulatorException(
p_out.rstrip("at line") + "\n" + p_err.rstrip())
tout, trajectories = self._load_trajectories(
self._cupsoda_outfiles_dir)
if self._cleanup:
shutil.rmtree(self.outdir)
end_time = time.time()
self._logger.info("cupSODA + I/O time: {} seconds".format(end_time -
start_time))
self._logger.info("cupSODA + I/O time: {} seconds".format(
end_time - start_time))
return SimulationResult(self, tout, trajectories)

@property
Expand Down Expand Up @@ -359,10 +442,7 @@ def n_blocks(self, n_blocks):
raise ValueError("n_blocks must be greater than 0")
self.opts['n_blocks'] = n_blocks

def _create_input_files(self, directory):

n_sims = len(self.param_values)

def _create_input_files(self, directory, sims, cmtx):
# atol_vector
with open(os.path.join(directory, "atol_vector"), 'w') as atol_vector:
for i in range(self._len_species):
Expand All @@ -372,15 +452,14 @@ def _create_input_files(self, directory):

# c_matrix
with open(os.path.join(directory, "c_matrix"), 'w') as c_matrix:
cmtx = self._get_cmatrix()
for i in range(n_sims):
for i in sims:
line = ""
for j in range(self._len_rxns):
if j > 0:
line += "\t"
line += str(cmtx[i][j])
c_matrix.write(line)
if i < n_sims - 1:
if i != sims[-1]:
c_matrix.write("\n")

# cs_vector
Expand Down Expand Up @@ -434,14 +513,14 @@ def _create_input_files(self, directory):
if self.vol:
mx0 = mx0.copy()
mx0 /= (N_A * self.vol)
for i in range(n_sims):
for i in sims:
line = ""
for j in range(self._len_species):
if j > 0:
line += "\t"
line += str(mx0[i][j])
MX_0.write(line)
if i < n_sims - 1:
if i != sims[-1]:
MX_0.write("\n")

# right_side
Expand Down Expand Up @@ -511,7 +590,7 @@ def _get_cmatrix(self):
self._logger.debug("100%")
return c_matrix

def _load_trajectories(self, directory):
def _load_trajectories(self, directory, sims):
"""Read simulation results from output files.

Returns `tout` and `trajectories` arrays.
Expand All @@ -521,26 +600,26 @@ def _load_trajectories(self, directory):
if len(files) == 0:
raise SimulatorException(
"Cannot find any output files to load data from.")
if len(files) != len(self.param_values):
if len(files) != len(sims):
raise SimulatorException(
"Number of input files (%d) does not match number "
"Number of output files (%d) does not match number "
"of requested simulations (%d)." % (
len(files), len(self.param_values)))
len(files), len(sims)))
n_sims = len(files)
trajectories = [None] * n_sims
tout = [None] * n_sims
traj_n = np.ones((len(self.tspan), self._len_species)) * float('nan')
tout_n = np.ones(len(self.tspan)) * float('nan')
# load the data
indir_prefix = os.path.join(directory, self._prefix)
for n in range(n_sims):
trajectories[n] = traj_n.copy()
tout[n] = tout_n.copy()
filename = indir_prefix + "_" + str(n)
for idx, n in enumerate(sims):
trajectories[idx] = traj_n.copy()
tout[idx] = tout_n.copy()
filename = indir_prefix + "_" + str(idx)
if not os.path.isfile(filename):
raise Exception("Cannot find input file " + filename)
# determine optimal loading method
if n == 0:
if idx == 0:
(data, use_pandas) = self._test_pandas(filename)
# load data
else:
Expand All @@ -549,11 +628,11 @@ def _load_trajectories(self, directory):
else:
data = self._load_with_openfile(filename)
# store data
tout[n] = data[:, 0]
trajectories[n][:, self._out_species] = data[:, 1:]
tout[idx] = data[:, 0]
trajectories[idx][:, self._out_species] = data[:, 1:]
# volume correction
if self.vol:
trajectories[n][:, self._out_species] *= (N_A * self.vol)
trajectories[idx][:, self._out_species] *= (N_A * self.vol)
return np.array(tout), np.array(trajectories)

def _test_pandas(self, filename):
Expand Down