Skip to content

Commit

Permalink
✨parallel computing
Browse files Browse the repository at this point in the history
  • Loading branch information
pierucci committed Oct 30, 2016
1 parent f094b7f commit 854dcda
Show file tree
Hide file tree
Showing 23 changed files with 251 additions and 128 deletions.
4 changes: 3 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ S3method(summary,eval_sensitivity)
S3method(summary,run_model)
S3method(summary,updated_models)
S3method(update,run_model)
export(cluster_for_heemod)
export(close_cluster)
export(define_correlation)
export(define_distrib)
export(define_distrib_)
Expand Down Expand Up @@ -101,6 +101,8 @@ export(run_models_)
export(run_probabilistic)
export(run_psa)
export(run_sensitivity)
export(status_cluster)
export(use_cluster)
importFrom(Hmisc,wtd.mean)
importFrom(Hmisc,wtd.quantile)
importFrom(Hmisc,wtd.var)
Expand Down
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# heemod devel

## New features

* Parallel computing with `use_cluster()`.

# heemod 0.5.1

## Bugfixes
Expand All @@ -9,7 +15,7 @@
## Breaking changes

* Some functions were renamed for clarification:
* `define_strategy()` <- `define_model()`
* `define_strategy()` <= `define_model()`
* `run_model()` <- `run_models()`
* `define_transition()` <- `define_matrix()`
* `define_dsa()` <- `define_sensitivity()`
Expand Down
114 changes: 114 additions & 0 deletions R/cluster.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
reach_cluster <- local({
heemod_cluster <- NULL

function(operation = c("get", "set", "close"), value) {
operation <- match.arg(operation)

switch(
operation,
"get" = {
heemod_cluster
},
"set" = {
heemod_cluster <<- value
},
"close" = {
parallel::stopCluster(heemod_cluster)
heemod_cluster <<- NULL
}
)
}
})

get_cluster <- function()
reach_cluster(operation = "get")

set_cluster <- function(x)
reach_cluster(operation = "set", value = x)

#' Run \code{heemod} on a Cluster
#'
#' These functions create or delete a cluster for
#' \code{heemod}. When the cluster is created it is
#' automagically used by \code{heemod} functions.
#'
#' The usual wokflow is to create the cluster with
#' \code{use_cluster}, then run functions such as
#' \code{\link{run_psa}} that make use of the cluster. To
#' stop using the cluster run \link{close_cluster}.
#'
#' The cluster status is given by \code{status_cluster}.
#'
#' A custom cluster can be passed to \code{use_cluster} with
#' the \code{cluster} argument. This custom custer needs to
#' work with \code{\link{parallel}{parLapply}}.
#'
#' @name cluster
#' @param num_cores Number of core.
#' @param cluster A custom cluster. See details.
#' @param verbose Print cluster info.
#'
#' @return \code{use_cluster} and \code{close_cluster}
#' return \code{TRUE} invisibly in case of success.
#' \code{status_cluster} returns \code{TRUE} if a cluster
#' is defined, \code{FALSE} otherwise.
#'
#' @export
use_cluster <- function(num_cores, cluster = NULL) {
if (status_cluster(verbose = FALSE)) {
stop("A cluster is already defined, use 'close_cluster()' before defining a new cluster.")
}

if (! is.null(cluster)) {
set_cluster(cluster)

} else {
if (! requireNamespace("parallel"))
stop("'parallel' package required to define a cluster.")

if (! is.wholenumber(num_cores))
stop("'num_cores' is not a whole number.")

cl <- parallel::makeCluster(num_cores)
parallel::clusterEvalQ(cl, library(heemod))
parallel::clusterEvalQ(cl, library(dplyr))

set_cluster(cl)

message(paste("Using a cluster with", num_cores, "cores."))
}

invisible(TRUE)
}

#' @rdname cluster
#' @export
status_cluster <- function(verbose = TRUE) {
sc <- ! is.null(get_cluster())

if (verbose) {
if (sc) {
message(sprintf(
"Running on a %i-cores cluster.",
length(get_cluster())
))
} else {
message("No cluster defined.")
}
}

invisible(sc)
}

#' @rdname cluster
#' @export
close_cluster <- function() {
catch <- try(reach_cluster(operation = "close"))

if (inherits(catch, "try-error")) {
stop("Failed to close the cluster.")
} else {
message("Cluster closed.")
invisible(TRUE)
}
}
24 changes: 0 additions & 24 deletions R/cluster_for_heemod.R

This file was deleted.

57 changes: 30 additions & 27 deletions R/newdata.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#' @param newdata a data.frame whose names match parameters
#' names. \code{model} will be evaluated iteratively,
#' taking successive values from each row.
#' @param cl A cluster for computations.
#'
#' @return A data.frame containing the values of
#' \code{newdata} and each Markov Model evaluation in
Expand All @@ -22,51 +21,55 @@
#' @example inst/examples/example_eval_model_newdata.R
#'
#' @keywords internal
eval_model_newdata <- function(x, model = 1, newdata,
cl = NULL) {
num_cores <- length(cl)
eval_model_newdata <- function(x, model = 1, newdata) {
check_model_index(x = x, i = model)

cycles <- attr(x, "cycles")
init <- attr(x, "init")
method <- attr(x, "method")
old_parameters <- attr(x, "parameters")
uneval_model <- attr(x, "uneval_model_list")[[model]]


if(!is.null(cl)){

if (status_cluster(verbose = FALSE)) {
cl <- get_cluster()

num_cores <- length(cl)

message(paste("Using a cluster with", num_cores, "cores."))

split_vec <- rep(1:num_cores, each = nrow(newdata) %/% num_cores)
split_vec <- c(split_vec, rep(num_cores, nrow(newdata) %% num_cores))

pnewdata <- split(newdata, split_vec)
parallel::clusterExport(cl,
c("uneval_model", "old_parameters", "pnewdata",
"cycles", "init", "method"),
env = environment())
parallel::clusterExport(
cl,
c("uneval_model", "old_parameters", "pnewdata",
"cycles", "init", "method"),
envir = environment()
)

pieces <- parallel::parLapply(cl, pnewdata, function(newdata){

newdata %>%
dplyr::rowwise() %>%
dplyr::do_(
.mod = ~ eval_newdata(
.,
model = uneval_model,
old_parameters = old_parameters,
cycles = cycles,
init = init,
method = method
)
) %>%
dplyr::do_(
.mod = ~ eval_newdata(
.,
model = uneval_model,
old_parameters = old_parameters,
cycles = cycles,
init = init,
method = method
)
) %>%
dplyr::ungroup() %>%
dplyr::bind_cols(
newdata
)
})
})
res <- do.call("rbind", pieces)
rownames(res) <- NULL
}
else{
} else {
res <- newdata %>%
dplyr::rowwise() %>%
dplyr::do_(
Expand All @@ -86,7 +89,7 @@ eval_model_newdata <- function(x, model = 1, newdata,

}
res
}
}

eval_newdata <- function(new_parameters, model, old_parameters,
cycles, init, method) {
Expand Down
8 changes: 3 additions & 5 deletions R/resamp_eval.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
#' @param resample Resampling distribution for parameters
#' defined by \code{\link{define_psa}}.
#' @param N > 0. Number of simulation to run.
#' @param cl A cluster for computations.
#'
#' @return A list with one \code{data.frame} per model.
#' @export
#'
#' @example inst/examples/example_run_psa.R
#'
run_psa <- function(model, resample, N,
cl = NULL) {
run_psa <- function(model, resample, N) {
stopifnot(
N > 0,
! is.null(N)
Expand All @@ -33,8 +32,7 @@ run_psa <- function(model, resample, N,
eval_model_newdata(
x = model,
model = n,
newdata = newdata,
cl = cl) %>%
newdata = newdata) %>%
dplyr::rowwise() %>%
dplyr::do_(~ get_total_state_values(.$.mod)) %>%
dplyr::bind_cols(newdata) %>%
Expand Down
5 changes: 1 addition & 4 deletions R/run_model_define.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
#' the cost-effectiveness plane.
#' @param base_model Name of base model used as reference.
#' By default the model with the lowest effectiveness.
#' @param cl A cluster for computations.
#' @param method Counting method.
#' @param list_models List of models, only used by
#' \code{run_model_} to avoid using \code{...}.
Expand All @@ -66,7 +65,6 @@ run_model <- function(...,
"half-cycle"),
cost = NULL, effect = NULL,
base_model = NULL,
cl = NULL,
state_cycle_limit = NULL) {

list_models <- list(...)
Expand All @@ -82,7 +80,6 @@ run_model <- function(...,
cost = lazyeval::lazy_(substitute(cost), env = parent.frame()),
effect = lazyeval::lazy_(substitute(effect), env = parent.frame()),
base_model = base_model,
cl = cl,
state_cycle_limit = state_cycle_limit
)
}
Expand All @@ -94,7 +91,7 @@ run_model_ <- function(list_models,
init,
cycles,
method,
cost, effect, base_model, cl,
cost, effect, base_model,
state_cycle_limit) {

if (! is.wholenumber(cycles)) {
Expand Down
6 changes: 2 additions & 4 deletions R/sensitivity_eval.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
#' @export
#'
#' @example inst/examples/example_run_dsa.R
run_dsa <- function(model, sensitivity,
cl = NULL) {
run_dsa <- function(model, sensitivity) {

if (! all(c(".cost", ".effect") %in% names(model))) {
stop("No cost and/or effect defined, sensitivity analysis unavailable.")
Expand All @@ -27,8 +26,7 @@ run_dsa <- function(model, sensitivity,
tab <- eval_model_newdata(
model,
model = n,
newdata = sensitivity,
cl = cl
newdata = sensitivity
)
tab %>%
dplyr::mutate_if(
Expand Down

0 comments on commit 854dcda

Please sign in to comment.