Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Jan 9, 2024
1 parent f722bc2 commit 61ef69d
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 393 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Description: In computationally demanding analysis projects,
'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>),
and 'batchtools' by Lang, Bischl, and Surmann (2017).
<doi:10.21105/joss.00135>.
Version: 0.0.3.9000
Version: 0.0.4
License: MIT + file LICENSE
URL: https://wlandau.github.io/crew.aws.batch/,
https://github.com/wlandau/crew.aws.batch
Expand Down
6 changes: 2 additions & 4 deletions NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# Generated by roxygen2: do not edit by hand

S3method(crew_aws_batch_client,crew_aws_batch_debug)
S3method(crew_aws_batch_client,default)
export(crew_aws_batch_launch)
export(crew_aws_batch_terminate)
export(crew_class_definition_aws_batch)
export(crew_class_launcher_aws_batch)
export(crew_class_monitor_aws_batch)
export(crew_controller_aws_batch)
export(crew_definition_aws_batch)
export(crew_launcher_aws_batch)
export(crew_launcher_aws_batch_launch)
export(crew_launcher_aws_batch_terminate)
export(crew_monitor_aws_batch)
importFrom(R6,R6Class)
importFrom(cli,cli_progress_bar)
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# crew.aws.batch 0.0.3.9000 (development)

# crew.aws.batch 0.0.4

* Move the `args_client()` and `args_submit()` launcher methods to the `private` list.

# crew.aws.batch 0.0.3

Expand Down
128 changes: 87 additions & 41 deletions R/crew_launcher_aws_batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,35 @@ crew_class_launcher_aws_batch <- R6::R6Class(
.aws_batch_propagate_tags = NULL,
.aws_batch_timeout = NULL,
.aws_batch_tags = NULL,
.aws_batch_eks_properties_override = NULL
.aws_batch_eks_properties_override = NULL,
.args_client = function() {
list(
config = private$.aws_batch_config,
credentials = private$.aws_batch_credentials,
endpoint = private$.aws_batch_endpoint,
region = private$.aws_batch_region
)
},
.args_submit = function(call, name) {
container_overrides <- as.list(private$.aws_batch_parameters)
container_overrides$command <- list("R", "-e", call)
list(
jobName = name,
jobQueue = private$.aws_batch_job_queue,
shareIdentifier = private$.aws_batch_share_identifier,
schedulingPriorityOverride =
private$.aws_batch_scheduling_priority_override,
jobDefinition = private$.aws_batch_job_definition,
parameters = private$.aws_batch_parameters,
containerOverrides = container_overrides,
nodeOverrides = private$.aws_batch_node_overrides,
retryStrategy = private$.aws_batch_retry_strategy,
propagateTags = private$.aws_batch_propagate_tags,
timeout = private$.aws_batch_timeout,
tags = private$.aws_batch_tags,
eksPropertiesOverride = private$.aws_batch_eks_properties_override
)
}
),
active = list(
#' @field aws_batch_config See [crew_launcher_aws_batch()].
Expand Down Expand Up @@ -439,41 +467,6 @@ crew_class_launcher_aws_batch <- R6::R6Class(
}
invisible()
},
#' @description Argument list for `paws.compute::batch()`.
#' @return Argument list for `paws.compute::batch()`.
args_client = function() {
list(
config = private$.aws_batch_config,
credentials = private$.aws_batch_credentials,
endpoint = private$.aws_batch_endpoint,
region = private$.aws_batch_region
)
},
#' @description Argument list for `paws.compute::batch()$submit_job()`.
#' @return Argument list for `paws.compute::batch()$submit_job()`.
#' @param call Character of length 1, a namespaced call to [crew_worker()]
#' which will run in the worker and accept tasks.
#' @param name Character of length 1, an informative worker name.
args_submit = function(call, name) {
container_overrides <- as.list(private$.aws_batch_parameters)
container_overrides$command <- list("R", "-e", call)
list(
jobName = name,
jobQueue = private$.aws_batch_job_queue,
shareIdentifier = private$.aws_batch_share_identifier,
schedulingPriorityOverride =
private$.aws_batch_scheduling_priority_override,
jobDefinition = private$.aws_batch_job_definition,
parameters = private$.aws_batch_parameters,
containerOverrides = container_overrides,
nodeOverrides = private$.aws_batch_node_overrides,
retryStrategy = private$.aws_batch_retry_strategy,
propagateTags = private$.aws_batch_propagate_tags,
timeout = private$.aws_batch_timeout,
tags = private$.aws_batch_tags,
eksPropertiesOverride = private$.aws_batch_eks_properties_override
)
},
#' @description Launch a local process worker which will
#' dial into a socket.
#' @details The `call` argument is R code that will run to
Expand All @@ -491,32 +484,85 @@ crew_class_launcher_aws_batch <- R6::R6Class(
#' @param instance Character of length 1 to uniquely identify
#' the current instance of the worker.
launch_worker = function(call, name, launcher, worker, instance) {
# Tested in tests/controller/persistent.R
# nocov start
self$async$eval(
command = crew.aws.batch::crew_aws_batch_launch(
command = crew.aws.batch::crew_launcher_aws_batch_launch(
args_client = args_client,
args_submit = args_submit
),
data = list(
args_client = self$args_client(),
args_submit = self$args_submit(call = call, name = name)
args_client = private$.args_client(),
args_submit = private$.args_submit(call = call, name = name)
)
)
# nocov end
},
#' @description Terminate a local process worker.
#' @return `NULL` (invisibly).
#' @param handle A process handle object previously
#' returned by `launch_worker()`.
terminate_worker = function(handle) {
# Tested in tests/controller/minimal.R
# nocov start
self$async$eval(
crew.aws.batch::crew_aws_batch_terminate(
crew.aws.batch::crew_launcher_aws_batch_terminate(
args_client = args_client,
job_id = job_id
),
data = list(
args_client = self$args_client(),
args_client = private$.args_client(),
job_id = handle$data$jobId
)
)
# nocov end
}
)
)

#' @title Submit an AWS Batch job.
#' @export
#' @keywords internal
#' @description Not a user-side function. For internal use only.
#' @details This utility is its own separate exported function specific to
#' the launcher and not shared with the job definition or monitor classes.
#' It generates the `paws.compute::batch()` client within itself
#' instead of a method inside the class.
#' This is all because it needs to run on a separate local worker process
#' and it needs to accept exportable arguments.
#' @return HTTP response from submitting the job.
#' @param args_client Named list of arguments to `paws.compute::batch()`.
#' @param args_submit Named list of arguments to
#' `paws.compute::batch()$submit_job()`.
crew_launcher_aws_batch_launch <- function(args_client, args_submit) {
# Tested in tests/controller/persistent.R
# nocov start
client <- do.call(what = paws.compute::batch, args = args_client)
do.call(what = client$submit_job, args = args_submit)
# nocov end
}

#' @title Terminate an AWS Batch job.
#' @export
#' @keywords internal
#' @description Not a user-side function. For internal use only.
#' @details This utility is its own separate exported function specific to
#' the launcher and not shared with the job definition or monitor classes.
#' It generates the `paws.compute::batch()` client within itself
#' instead of a method inside the class.
#' This is all because it needs to run on a separate local worker process
#' and it needs to accept exportable arguments.
#' @return HTTP response from submitting the job.
#' @param args_client Named list of arguments to `paws.compute::batch()`.
#' @param job_id Character of length 1, ID of the AWS Batch job to
#' terminate.
crew_launcher_aws_batch_terminate <- function(args_client, job_id) {
# nocov start
# Tested in tests/controller/minimal.R
client <- do.call(what = paws.compute::batch, args = args_client)
client$terminate_job(
jobId = job_id,
reason = "terminated by crew controller"
)
# nocov end
}
87 changes: 0 additions & 87 deletions R/utils_aws_batch.R
Original file line number Diff line number Diff line change
@@ -1,87 +0,0 @@
#' @title Submit an AWS Batch job.
#' @export
#' @keywords internal
#' @description Not a user-side function. For internal use only.
#' @return HTTP response from submitting the job.
#' @param args_client Named list of arguments to `paws.compute::batch()`.
#' @param args_submit Named list of arguments to
#' `paws.compute::batch()$submit_job()`.
crew_aws_batch_launch <- function(args_client, args_submit) {
client <- crew_aws_batch_client(
config = args_client$config,
args = args_client
)
do.call(what = client$submit_job, args = args_submit)
}

#' @title Terminate an AWS Batch job.
#' @export
#' @keywords internal
#' @description Not a user-side function. For internal use only.
#' @return HTTP response from submitting the job.
#' @param args_client Named list of arguments to `paws.compute::batch()`.
#' @param job_id Character of length 1, ID of the AWS Batch job to
#' terminate.
crew_aws_batch_terminate <- function(args_client, job_id) {
client <- crew_aws_batch_client(
config = args_client$config,
args = args_client
)
client$terminate_job(
jobId = job_id,
reason = "terminated by crew controller"
)
}

crew_aws_batch_client <- function(config, args) {
UseMethod("crew_aws_batch_client")
}

#' @export
crew_aws_batch_client.default <- function(config, args) {
do.call(what = paws.compute::batch, args = args)
}

#' @export
crew_aws_batch_client.crew_aws_batch_debug <- function(config, args) {
list(
submit_job = function(
jobName,
jobQueue,
shareIdentifier,
schedulingPriorityOverride,
jobDefinition,
parameters,
containerOverrides,
nodeOverrides,
retryStrategy,
propagateTags,
timeout,
tags,
eksPropertiesOverride
) {
envir <- environment()
lapply(X = names(formals()), FUN = function(arg) {
force(get(x = arg, envir = envir))
})
for (arg in list(jobName, jobQueue, jobDefinition)) {
crew::crew_assert(
arg,
length(.) == 1L,
is.character(.),
!anyNA(.),
nzchar(.),
nchar(.) < 100L
)
}
list(jobId = jobName)
},
terminate_job = function(jobId, reason) {
envir <- environment()
lapply(X = names(formals()), FUN = function(arg) {
force(get(x = arg, envir = envir))
})
list(value = sample.int(n = 1e9L, size = 1L))
}
)
}
21 changes: 0 additions & 21 deletions man/crew_aws_batch_launch.Rd

This file was deleted.

21 changes: 0 additions & 21 deletions man/crew_aws_batch_terminate.Rd

This file was deleted.

38 changes: 0 additions & 38 deletions man/crew_class_launcher_aws_batch.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 61ef69d

Please sign in to comment.