diff --git a/NAMESPACE b/NAMESPACE index 282d57aa..415d16ac 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -55,7 +55,7 @@ S3method(summary,eval_sensitivity) S3method(summary,run_model) S3method(summary,updated_models) S3method(update,run_model) -export(cluster_for_heemod) +export(close_cluster) export(define_correlation) export(define_distrib) export(define_distrib_) @@ -101,6 +101,8 @@ export(run_models_) export(run_probabilistic) export(run_psa) export(run_sensitivity) +export(status_cluster) +export(use_cluster) importFrom(Hmisc,wtd.mean) importFrom(Hmisc,wtd.quantile) importFrom(Hmisc,wtd.var) diff --git a/NEWS.md b/NEWS.md index 6d8c7374..87f290da 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,9 @@ +# heemod devel + +## New features + + * Parallel computing with `use_cluster()`. + # heemod 0.5.1 ## Bugfixes @@ -9,7 +15,7 @@ ## Breaking changes * Some functions were renamed for clarification: - * `define_strategy()` <- `define_model()` + * `define_strategy()` <= `define_model()` * `run_model()` <- `run_models()` * `define_transition()` <- `define_matrix()` * `define_dsa()` <- `define_sensitivity()` diff --git a/R/cluster.R b/R/cluster.R new file mode 100644 index 00000000..f1c22d6f --- /dev/null +++ b/R/cluster.R @@ -0,0 +1,114 @@ +reach_cluster <- local({ + heemod_cluster <- NULL + + function(operation = c("get", "set", "close"), value) { + operation <- match.arg(operation) + + switch( + operation, + "get" = { + heemod_cluster + }, + "set" = { + heemod_cluster <<- value + }, + "close" = { + parallel::stopCluster(heemod_cluster) + heemod_cluster <<- NULL + } + ) + } +}) + +get_cluster <- function() + reach_cluster(operation = "get") + +set_cluster <- function(x) + reach_cluster(operation = "set", value = x) + +#' Run \code{heemod} on a Cluster +#' +#' These functions create or delete a cluster for +#' \code{heemod}. When the cluster is created it is +#' automagically used by \code{heemod} functions. +#' +#' The usual wokflow is to create the cluster with +#' \code{use_cluster}, then run functions such as +#' \code{\link{run_psa}} that make use of the cluster. To +#' stop using the cluster run \link{close_cluster}. +#' +#' The cluster status is given by \code{status_cluster}. +#' +#' A custom cluster can be passed to \code{use_cluster} with +#' the \code{cluster} argument. This custom custer needs to +#' work with \code{\link{parallel}{parLapply}}. +#' +#' @name cluster +#' @param num_cores Number of core. +#' @param cluster A custom cluster. See details. +#' @param verbose Print cluster info. +#' +#' @return \code{use_cluster} and \code{close_cluster} +#' return \code{TRUE} invisibly in case of success. +#' \code{status_cluster} returns \code{TRUE} if a cluster +#' is defined, \code{FALSE} otherwise. +#' +#' @export +use_cluster <- function(num_cores, cluster = NULL) { + if (status_cluster(verbose = FALSE)) { + stop("A cluster is already defined, use 'close_cluster()' before defining a new cluster.") + } + + if (! is.null(cluster)) { + set_cluster(cluster) + + } else { + if (! requireNamespace("parallel")) + stop("'parallel' package required to define a cluster.") + + if (! is.wholenumber(num_cores)) + stop("'num_cores' is not a whole number.") + + cl <- parallel::makeCluster(num_cores) + parallel::clusterEvalQ(cl, library(heemod)) + parallel::clusterEvalQ(cl, library(dplyr)) + + set_cluster(cl) + + message(paste("Using a cluster with", num_cores, "cores.")) + } + + invisible(TRUE) +} + +#' @rdname cluster +#' @export +status_cluster <- function(verbose = TRUE) { + sc <- ! is.null(get_cluster()) + + if (verbose) { + if (sc) { + message(sprintf( + "Running on a %i-cores cluster.", + length(get_cluster()) + )) + } else { + message("No cluster defined.") + } + } + + invisible(sc) +} + +#' @rdname cluster +#' @export +close_cluster <- function() { + catch <- try(reach_cluster(operation = "close")) + + if (inherits(catch, "try-error")) { + stop("Failed to close the cluster.") + } else { + message("Cluster closed.") + invisible(TRUE) + } +} diff --git a/R/cluster_for_heemod.R b/R/cluster_for_heemod.R deleted file mode 100644 index 54771fe0..00000000 --- a/R/cluster_for_heemod.R +++ /dev/null @@ -1,24 +0,0 @@ -#' Prepare a cluster to use with heemod -#' -#' @param num_cores The number of cores desired -#' -#' @return If num_cores is 1 or 0, NULL. If num_cores is a -#' larger integer, a cluster loaded with the appropriate -#' packages. -#' @export -cluster_for_heemod <- function(num_cores) { - cl <- NULL - if (length(num_cores)) { - stopifnot(round(num_cores) == num_cores) - if (num_cores > 1) { - if (!requireNamespace("parallel")) - stop("parallel package required to use a cluster") - cl <- parallel::makeCluster(num_cores) - if (options()$heemod.verbose) - message(paste("Using a cluster with", length(cl), "cores")) - parallel::clusterEvalQ(cl, library(heemod)) - parallel::clusterEvalQ(cl, library(dplyr)) - cl - } - } -} \ No newline at end of file diff --git a/R/newdata.R b/R/newdata.R index 5b227fd4..a02f7a9a 100644 --- a/R/newdata.R +++ b/R/newdata.R @@ -13,7 +13,6 @@ #' @param newdata a data.frame whose names match parameters #' names. \code{model} will be evaluated iteratively, #' taking successive values from each row. -#' @param cl A cluster for computations. #' #' @return A data.frame containing the values of #' \code{newdata} and each Markov Model evaluation in @@ -22,9 +21,7 @@ #' @example inst/examples/example_eval_model_newdata.R #' #' @keywords internal -eval_model_newdata <- function(x, model = 1, newdata, - cl = NULL) { - num_cores <- length(cl) +eval_model_newdata <- function(x, model = 1, newdata) { check_model_index(x = x, i = model) cycles <- attr(x, "cycles") @@ -32,41 +29,47 @@ eval_model_newdata <- function(x, model = 1, newdata, method <- attr(x, "method") old_parameters <- attr(x, "parameters") uneval_model <- attr(x, "uneval_model_list")[[model]] - - - if(!is.null(cl)){ + + if (status_cluster(verbose = FALSE)) { + cl <- get_cluster() + + num_cores <- length(cl) + + message(paste("Using a cluster with", num_cores, "cores.")) + split_vec <- rep(1:num_cores, each = nrow(newdata) %/% num_cores) split_vec <- c(split_vec, rep(num_cores, nrow(newdata) %% num_cores)) - + pnewdata <- split(newdata, split_vec) - parallel::clusterExport(cl, - c("uneval_model", "old_parameters", "pnewdata", - "cycles", "init", "method"), - env = environment()) + parallel::clusterExport( + cl, + c("uneval_model", "old_parameters", "pnewdata", + "cycles", "init", "method"), + envir = environment() + ) pieces <- parallel::parLapply(cl, pnewdata, function(newdata){ - + newdata %>% dplyr::rowwise() %>% - dplyr::do_( - .mod = ~ eval_newdata( - ., - model = uneval_model, - old_parameters = old_parameters, - cycles = cycles, - init = init, - method = method - ) - ) %>% + dplyr::do_( + .mod = ~ eval_newdata( + ., + model = uneval_model, + old_parameters = old_parameters, + cycles = cycles, + init = init, + method = method + ) + ) %>% dplyr::ungroup() %>% dplyr::bind_cols( newdata ) - }) + }) res <- do.call("rbind", pieces) rownames(res) <- NULL - } - else{ + } else { res <- newdata %>% dplyr::rowwise() %>% dplyr::do_( @@ -86,7 +89,7 @@ eval_model_newdata <- function(x, model = 1, newdata, } res - } +} eval_newdata <- function(new_parameters, model, old_parameters, cycles, init, method) { diff --git a/R/resamp_eval.R b/R/resamp_eval.R index c9bd11fa..915b88ca 100644 --- a/R/resamp_eval.R +++ b/R/resamp_eval.R @@ -4,14 +4,13 @@ #' @param resample Resampling distribution for parameters #' defined by \code{\link{define_psa}}. #' @param N > 0. Number of simulation to run. -#' @param cl A cluster for computations. +#' #' @return A list with one \code{data.frame} per model. #' @export #' #' @example inst/examples/example_run_psa.R #' -run_psa <- function(model, resample, N, - cl = NULL) { +run_psa <- function(model, resample, N) { stopifnot( N > 0, ! is.null(N) @@ -33,8 +32,7 @@ run_psa <- function(model, resample, N, eval_model_newdata( x = model, model = n, - newdata = newdata, - cl = cl) %>% + newdata = newdata) %>% dplyr::rowwise() %>% dplyr::do_(~ get_total_state_values(.$.mod)) %>% dplyr::bind_cols(newdata) %>% diff --git a/R/run_model_define.R b/R/run_model_define.R index 82477602..db5c8592 100644 --- a/R/run_model_define.R +++ b/R/run_model_define.R @@ -46,7 +46,6 @@ #' the cost-effectiveness plane. #' @param base_model Name of base model used as reference. #' By default the model with the lowest effectiveness. -#' @param cl A cluster for computations. #' @param method Counting method. #' @param list_models List of models, only used by #' \code{run_model_} to avoid using \code{...}. @@ -66,7 +65,6 @@ run_model <- function(..., "half-cycle"), cost = NULL, effect = NULL, base_model = NULL, - cl = NULL, state_cycle_limit = NULL) { list_models <- list(...) @@ -82,7 +80,6 @@ run_model <- function(..., cost = lazyeval::lazy_(substitute(cost), env = parent.frame()), effect = lazyeval::lazy_(substitute(effect), env = parent.frame()), base_model = base_model, - cl = cl, state_cycle_limit = state_cycle_limit ) } @@ -94,7 +91,7 @@ run_model_ <- function(list_models, init, cycles, method, - cost, effect, base_model, cl, + cost, effect, base_model, state_cycle_limit) { if (! is.wholenumber(cycles)) { diff --git a/R/sensitivity_eval.R b/R/sensitivity_eval.R index 637cab7d..fc429c89 100644 --- a/R/sensitivity_eval.R +++ b/R/sensitivity_eval.R @@ -9,8 +9,7 @@ #' @export #' #' @example inst/examples/example_run_dsa.R -run_dsa <- function(model, sensitivity, - cl = NULL) { +run_dsa <- function(model, sensitivity) { if (! all(c(".cost", ".effect") %in% names(model))) { stop("No cost and/or effect defined, sensitivity analysis unavailable.") @@ -27,8 +26,7 @@ run_dsa <- function(model, sensitivity, tab <- eval_model_newdata( model, model = n, - newdata = sensitivity, - cl = cl + newdata = sensitivity ) tab %>% dplyr::mutate_if( diff --git a/R/tabular_input.R b/R/tabular_input.R index db3727a1..a45a1750 100644 --- a/R/tabular_input.R +++ b/R/tabular_input.R @@ -171,8 +171,6 @@ eval_models_from_tabular <- function(inputs, if (options()$heemod.verbose) message("* Running files...") - cl <- cluster_for_heemod(inputs$model_options$num_cores) - list_args <- c( inputs$models, list( @@ -182,8 +180,7 @@ eval_models_from_tabular <- function(inputs, effect = inputs$model_options$effect, base_model = inputs$model_options$base_model, method = inputs$model_options$method, - cycles = inputs$model_options$cycles, - cl = cl + cycles = inputs$model_options$cycles ) ) @@ -192,6 +189,11 @@ eval_models_from_tabular <- function(inputs, list_args ) + + if (! is.null(inputs$model_options$num_cores)) { + use_cluster(inputs$model_options$num_cores) + } + if (options()$heemod.verbose) message("** Running models...") model_runs <- do.call( run_model, @@ -203,8 +205,7 @@ eval_models_from_tabular <- function(inputs, if (options()$heemod.verbose) message("** Running DSA...") model_dsa <- run_dsa( model_runs, - inputs$param_info$dsa_params, - cl = cl + inputs$param_info$dsa_params ) } @@ -214,18 +215,20 @@ eval_models_from_tabular <- function(inputs, model_psa <- run_psa( model_runs, resample = inputs$param_info$psa_params, - N = inputs$model_options$n, - cl = cl + N = inputs$model_options$n ) } demo_res <- NULL if (! is.null(inputs$demographic_file) & run_demo) { if (options()$heemod.verbose) message("** Running demographic analysis...") - demo_res <- stats::update(model_runs, inputs$demographic_file, - cl = cl) + demo_res <- stats::update(model_runs, inputs$demographic_file) } - if(!is.null(cl)) parallel::stopCluster(cl) + + if(! is.null(inputs$model_options$num_cores)) { + close_cluster() + } + list( models = inputs$models, model_runs = model_runs, diff --git a/R/update.R b/R/update.R index e6b347f3..edb8855d 100644 --- a/R/update.R +++ b/R/update.R @@ -26,7 +26,6 @@ #' one column per parameter and one row per parameter set. #' An optional \code{.weights} column can be included for #' a weighted analysis. -#' @param cl a cluster on which to perform computations. #' @param x Updated model to plot. #' @param model A model index, character or numeric. #' @param type The type of plot to return (see details). @@ -44,8 +43,7 @@ #' #' @example inst/examples/example_update.R #' -update.run_model <- function(object, newdata, - cl = NULL, ...) { +update.run_model <- function(object, newdata, ...) { if (! any(class(object) %in% "run_model")) { stop("'object' must be the result of 'run_model()'.") @@ -70,8 +68,7 @@ update.run_model <- function(object, newdata, list_res <- c( list_res, list(eval_model_newdata(object, model = n, - newdata = newdata, - cl = cl)) + newdata = newdata)) ) }) } diff --git a/inst/tabular/test/.gitignore b/inst/tabular/test/.gitignore new file mode 100644 index 00000000..f5cf7d3d --- /dev/null +++ b/inst/tabular/test/.gitignore @@ -0,0 +1 @@ +edited_ref.csv diff --git a/inst/tabular/test/edited_ref.csv b/inst/tabular/test/edited_ref.csv deleted file mode 100644 index c01d820a..00000000 --- a/inst/tabular/test/edited_ref.csv +++ /dev/null @@ -1,8 +0,0 @@ -"data","file","absolute_path" -"state","C:/Users/aa0113/Documents/github-heemod/inst/tabular/thr/THR_states.csv",1 -"tm","C:/Users/aa0113/Documents/github-heemod/inst/tabular/thr/THR_transition_probs.csv",1 -"parameters","C:/Users/aa0113/Documents/github-heemod/inst/tabular/thr/THR_parameters.csv",1 -"demographics","C:/Users/aa0113/Documents/github-heemod/inst/tabular/thr/THR_demographic_table.csv",1 -"data","C:/Users/aa0113/Documents/github-heemod/inst/tabular/thr/input_dataframes",1 -"output","C:/Users/aa0113/Documents/github-heemod/inst/tabular/thr/output",1 -"options","THR_options.csv",NA diff --git a/man/cluster.Rd b/man/cluster.Rd new file mode 100644 index 00000000..33edede5 --- /dev/null +++ b/man/cluster.Rd @@ -0,0 +1,46 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/cluster.R +\name{cluster} +\alias{close_cluster} +\alias{cluster} +\alias{status_cluster} +\alias{use_cluster} +\title{Run \code{heemod} on a Cluster} +\usage{ +use_cluster(num_cores, cluster = NULL) + +status_cluster(verbose = TRUE) + +close_cluster() +} +\arguments{ +\item{num_cores}{Number of core.} + +\item{cluster}{A custom cluster. See details.} + +\item{verbose}{Print cluster info.} +} +\value{ +\code{use_cluster} and \code{close_cluster} + return \code{TRUE} invisibly in case of success. + \code{status_cluster} returns \code{TRUE} if a cluster + is defined, \code{FALSE} otherwise. +} +\description{ +These functions create or delete a cluster for +\code{heemod}. When the cluster is created it is +automagically used by \code{heemod} functions. +} +\details{ +The usual wokflow is to create the cluster with +\code{use_cluster}, then run functions such as +\code{\link{run_psa}} that make use of the cluster. To +stop using the cluster run \link{close_cluster}. + +The cluster status is given by \code{status_cluster}. + +A custom cluster can be passed to \code{use_cluster} with +the \code{cluster} argument. This custom custer needs to +work with \code{\link{parallel}{parLapply}}. +} + diff --git a/man/cluster_for_heemod.Rd b/man/cluster_for_heemod.Rd deleted file mode 100644 index cae45611..00000000 --- a/man/cluster_for_heemod.Rd +++ /dev/null @@ -1,20 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/cluster_for_heemod.R -\name{cluster_for_heemod} -\alias{cluster_for_heemod} -\title{Prepare a cluster to use with heemod} -\usage{ -cluster_for_heemod(num_cores) -} -\arguments{ -\item{num_cores}{The number of cores desired} -} -\value{ -If num_cores is 1 or 0, NULL. If num_cores is a - larger integer, a cluster loaded with the appropriate - packages. -} -\description{ -Prepare a cluster to use with heemod -} - diff --git a/man/eval_model_newdata.Rd b/man/eval_model_newdata.Rd index 943f8cf5..e6cfd320 100644 --- a/man/eval_model_newdata.Rd +++ b/man/eval_model_newdata.Rd @@ -5,7 +5,7 @@ \title{Iteratively Evaluate a Markov Model With New Parameter Values} \usage{ -eval_model_newdata(x, model = 1, newdata, cl = NULL) +eval_model_newdata(x, model = 1, newdata) } \arguments{ \item{x}{Result from \code{\link{run_model}}.} @@ -15,8 +15,6 @@ eval_model_newdata(x, model = 1, newdata, cl = NULL) \item{newdata}{a data.frame whose names match parameters names. \code{model} will be evaluated iteratively, taking successive values from each row.} - -\item{cl}{A cluster for computations.} } \value{ A data.frame containing the values of diff --git a/man/run_dsa.Rd b/man/run_dsa.Rd index deb26a88..f9471c34 100644 --- a/man/run_dsa.Rd +++ b/man/run_dsa.Rd @@ -7,7 +7,7 @@ \usage{ run_sensitivity(model, sensitivity) -run_dsa(model, sensitivity, cl = NULL) +run_dsa(model, sensitivity) } \arguments{ \item{model}{An evaluated Markov model} diff --git a/man/run_model.Rd b/man/run_model.Rd index 4e89f4ca..265933c6 100644 --- a/man/run_model.Rd +++ b/man/run_model.Rd @@ -14,10 +14,10 @@ run_models_(...) run_model(..., parameters = define_parameters(), init = c(1000L, rep(0L, get_state_number(get_states(list(...)[[1]])) - 1)), cycles = 1, method = c("life-table", "beginning", "end", "half-cycle"), cost = NULL, - effect = NULL, base_model = NULL, cl = NULL, state_cycle_limit = NULL) + effect = NULL, base_model = NULL, state_cycle_limit = NULL) run_model_(list_models, parameters, init, cycles, method, cost, effect, - base_model, cl, state_cycle_limit) + base_model, state_cycle_limit) } \arguments{ \item{...}{One or more \code{uneval_model} object.} @@ -43,8 +43,6 @@ the cost-effectiveness plane.} \item{base_model}{Name of base model used as reference. By default the model with the lowest effectiveness.} -\item{cl}{A cluster for computations.} - \item{state_cycle_limit}{Optional expansion limit for \code{state_cycle}, see details.} diff --git a/man/run_psa.Rd b/man/run_psa.Rd index 468735b6..9b276534 100644 --- a/man/run_psa.Rd +++ b/man/run_psa.Rd @@ -7,7 +7,7 @@ \usage{ run_probabilistic(model, resample, N) -run_psa(model, resample, N, cl = NULL) +run_psa(model, resample, N) } \arguments{ \item{model}{The result of \code{\link{run_model}}.} @@ -16,8 +16,6 @@ run_psa(model, resample, N, cl = NULL) defined by \code{\link{define_psa}}.} \item{N}{> 0. Number of simulation to run.} - -\item{cl}{A cluster for computations.} } \value{ A list with one \code{data.frame} per model. diff --git a/man/update-model.Rd b/man/update-model.Rd index 67f9e9a2..12c19882 100644 --- a/man/update-model.Rd +++ b/man/update-model.Rd @@ -6,7 +6,7 @@ \alias{update.run_model} \title{Run Model on New Data} \usage{ -\method{update}{run_model}(object, newdata, cl = NULL, ...) +\method{update}{run_model}(object, newdata, ...) \method{plot}{updated_models}(x, model, type = c("cost", "effect", "icer", "counts", "ce", "values"), ...) @@ -19,8 +19,6 @@ one column per parameter and one row per parameter set. An optional \code{.weights} column can be included for a weighted analysis.} -\item{cl}{a cluster on which to perform computations.} - \item{...}{Additional arguments passed to \code{geom_histogram}. Especially usefull to specify \code{binwidth}.} diff --git a/tests/testthat/test_parallel.R b/tests/testthat/test_parallel.R index 1fcea184..89a4ff5d 100644 --- a/tests/testthat/test_parallel.R +++ b/tests/testthat/test_parallel.R @@ -1,15 +1,15 @@ -context("running with multiple cores") +context("Multiple cores") library(parallel) test_that( "Same results using 1 core or 2.", { - result_1core <- run_models_tabular( + result_1core <- run_model_tabular( location = system.file("tabular/thr", package = "heemod"), save = FALSE, overwrite = FALSE, run_psa = FALSE ) - result_2core <- run_models_tabular( + result_2core <- run_model_tabular( location = system.file("tabular/thr", package = "heemod"), reference = "REFERENCE_2core.csv", save = FALSE, overwrite = FALSE, run_psa = FALSE @@ -31,4 +31,4 @@ test_that( data.frame(result_2core$demographics)) } -) \ No newline at end of file +) diff --git a/vignettes/a-introduction.Rmd b/vignettes/a-introduction.Rmd index eea41f74..c0bb7f59 100644 --- a/vignettes/a-introduction.Rmd +++ b/vignettes/a-introduction.Rmd @@ -162,6 +162,14 @@ In order to compare different strategies it is possible to run a model with mult Time varying transition probabilities and state values are available through the `markov_cycle` and `state_cycle` variables. Time-varying probabilities and values are explained in `vignette("b-time-dependency", "heemod")`. +## Parallel computing + +Some operation such as PSA can be significantly sped up using parallel computing. This can be done in the folowwing way: + + * Define a cluster with the `use_cluster()` functions (i.e. `use_cluster(4)` to use 4 cores). + * Run the analysis as usual. + * To stop using parallel computing use the `close_cluster()` function. + ## Going further Probabilistic uncertainty analysis `vignette("e-probabilistic", "heemod")` and deterministic sensitivity analysis `vignette("f-sensitivity", "heemod")` can be performed. Population-level estimates and heterogeneity can be computed : `vignette("g-heterogeneity", "heemod")`. diff --git a/vignettes/e-probabilistic.Rmd b/vignettes/e-probabilistic.Rmd index 19751a3d..5908000d 100644 --- a/vignettes/e-probabilistic.Rmd +++ b/vignettes/e-probabilistic.Rmd @@ -246,3 +246,12 @@ plot(pm, type = "ce") + theme_minimal() ``` +## Parallel computing + +Resampling can be significantly sped up by using parallel computing. This can be done in the folowwing way: + + * Define a cluster with the `use_cluster()` functions (i.e. `use_cluster(4)` to use 4 cores). + * Run the analysis as usual. + * To stop using parallel computing use the `close_cluster()` function. + +Results may vary depending on the machine, but we found speed gains to be quite limited beyond 4 cores. diff --git a/vignettes/h-tabular.Rmd b/vignettes/h-tabular.Rmd index ebf9405e..f139eecd 100644 --- a/vignettes/h-tabular.Rmd +++ b/vignettes/h-tabular.Rmd @@ -120,6 +120,7 @@ Models specified by tabular input will run with defaults options. The following * `base`: name of base model. * `cycles`: run the model for how many cycles? * `n`: number of resample for PSA. + * `num_core`: number of cluster cores. ```{r echo = FALSE} heemod:::read_file(system.file("tabular/thr/THR_options.csv", package = "heemod")) %>%