Skip to content

Commit

Permalink
use data.frame to store job_data
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Dec 8, 2016
1 parent f9ae691 commit 6ed048b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 18 deletions.
42 changes: 25 additions & 17 deletions R/master.r
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,30 @@ Q = function(fun, ..., const=list(), expand_grid=FALSE, seed=128965,
if (memory < 500)
stop("Worker needs about 230 MB overhead, set memory>=500")

# set up function to call, data, and index
fun = match.fun(fun)
job_data = process_args(fun, iter=list(...), const=const,
expand_grid=expand_grid,
split_array_by=split_array_by)
names(job_data) = 1:length(job_data)

n_jobs = min(ceiling(length(job_data) / job_size), n_jobs)

process_args(fun, iter=list(...), const=const,
expand_grid=expand_grid,
split_array_by=split_array_by)
job_data = do.call(data.frame, c(list(...), stringsAsFactors=FALSE, check.names=FALSE))
n_calls = nrow(job_data)
rownames(job_data) = 1:n_calls
n_jobs = min(ceiling(n_calls / job_size), n_jobs)

# use heuristic for wait and chunk size
if (is.na(wait_time))
wait_time = ifelse(length(job_data) < 5e5, 1/sqrt(length(job_data)), 0)
wait_time = ifelse(n_calls < 5e5, 1/sqrt(n_calls), 0)
if (is.na(chunk_size))
chunk_size = ceiling(min(
length(job_data) / n_jobs / 100,
5e5 * length(job_data) / object.size(job_data)[[1]]
n_calls / n_jobs / 100,
5e5 * n_calls / object.size(job_data)[[1]]
))

qsys = qsys$new(fun=fun, const=const, seed=seed)
on.exit(qsys$cleanup())

# do the submissions
message("Submitting ", n_jobs, " worker jobs for ", length(job_data),
message("Submitting ", n_jobs, " worker jobs for ", n_calls,
" function calls (ID: ", qsys$id, ") ...")
pb = txtProgressBar(min=0, max=n_jobs, style=3)
for (j in 1:n_jobs) {
Expand All @@ -68,17 +71,19 @@ Q = function(fun, ..., const=list(), expand_grid=FALSE, seed=128965,
}
close(pb)

job_result = rep(list(NULL), length(job_data))
# prepare empty variables for managing results
job_result = rep(list(NULL), n_calls)
submit_index = 1:chunk_size
jobs_running = list()
workers_running = list()
worker_stats = list()

message("Running calculations (", chunk_size, " calls/chunk) ...")
pb = txtProgressBar(min=0, max=length(job_data), style=3)
pb = txtProgressBar(min=0, max=n_calls, style=3)

# main event loop
start_time = proc.time()
while(submit_index[1] <= length(job_data) || length(workers_running) > 0) {
while(submit_index[1] <= n_calls || length(workers_running) > 0) {
msg = qsys$receive_data()
if (msg$id[1] == 0) { # worker ready, send common data
qsys$send_common_data()
Expand All @@ -92,9 +97,10 @@ Q = function(fun, ..., const=list(), expand_grid=FALSE, seed=128965,
setTxtProgressBar(pb, submit_index[1] - length(jobs_running) - 1)
}

if (submit_index[1] <= length(job_data)) { # send iterated data to worker
submit_index = submit_index[submit_index <= length(job_data)]
qsys$send_job_data(id=submit_index, iter=as.list(job_data[submit_index]))
if (submit_index[1] <= n_calls) { # send iterated data to worker
submit_index = submit_index[submit_index <= n_calls]
iter = job_data[submit_index, , drop=FALSE]
qsys$send_job_data(id=submit_index, iter=iter)
jobs_running[as.character(submit_index)] = TRUE
submit_index = submit_index + chunk_size
} else # send shutdown signal to worker
Expand All @@ -108,13 +114,15 @@ Q = function(fun, ..., const=list(), expand_grid=FALSE, seed=128965,

on.exit(NULL)

# check for failed jobs
failed = sapply(job_result, class) == "try-error"
if (any(failed)) {
warning(job_result[failed])
if (fail_on_error)
stop("errors occurred, stopping")
}

# compute summary statistics for workers
wt = Reduce(`+`, worker_stats) / length(worker_stats)
message(sprintf("Master: [%.1fs %.1f%% CPU]; Worker average: [%.1f%% CPU]",
rt[[3]], 100*(rt[[1]]+rt[[2]])/rt[[3]],
Expand Down
2 changes: 2 additions & 0 deletions R/process_args.r
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ process_args = function(fun, iter, const=list(), expand_grid=FALSE, split_array_
if (any(dups))
stop(paste("Argument duplicated:", paste(provided[[dups]], collapse=" ")))

return(NULL) #TODO: adjust processing below

# convert matrices to lists so they can be vectorised over
split_arrays = function(x) {
if (is.array(x))
Expand Down
2 changes: 1 addition & 1 deletion R/worker.r
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ worker = function(worker_id, master, memlimit) {

one_id = function(seq_num) {
set.seed(seed + msg$id[seq_num])
result = try(do.call(fun, c(const, msg$iter[[seq_num]])))
result = try(do.call(fun, c(const, msg$iter[seq_num,])))
}
result = lapply(seq_along(msg$id), one_id)
counter = counter + length(msg$id)
Expand Down
3 changes: 3 additions & 0 deletions man/Q.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6ed048b

Please sign in to comment.