From 05465fc7b3a3cbd953d80b966ef4d1050dfe853f Mon Sep 17 00:00:00 2001 From: kalibera Date: Fri, 10 May 2024 12:58:09 +0000 Subject: [PATCH] Make stopCluster() of a SOCK cluster wait a little bit for the workers to shut down (PR#18133). git-svn-id: https://svn.r-project.org/R/trunk@86529 00db46b3-68df-0310-9c12-caf00c1e9a41 --- src/library/parallel/NAMESPACE | 1 + src/library/parallel/R/snowSOCK.R | 28 ++++++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/library/parallel/NAMESPACE b/src/library/parallel/NAMESPACE index 800fcc75eff..bbad65dc633 100644 --- a/src/library/parallel/NAMESPACE +++ b/src/library/parallel/NAMESPACE @@ -29,6 +29,7 @@ S3method(recvOneData, SOCKcluster) S3method(sendData, SOCKnode) S3method(sendData, SOCK0node) S3method(stopCluster, default) +S3method(stopCluster, SOCKcluster) ## To support snow clusters #S3method(closeNode, NWSnode) diff --git a/src/library/parallel/R/snowSOCK.R b/src/library/parallel/R/snowSOCK.R index 49d845a8907..e6efef3ff75 100644 --- a/src/library/parallel/R/snowSOCK.R +++ b/src/library/parallel/R/snowSOCK.R @@ -1,7 +1,7 @@ # File src/library/parallel/R/snowSOCK.R # Part of the R package, https://www.R-project.org # -# Copyright (C) 1995-2023 The R Core Team +# Copyright (C) 1995-2024 The R Core Team # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -125,7 +125,8 @@ newPSOCKnode <- function(machine = "localhost", ..., class = if(useXDR) "SOCKnode" else "SOCK0node") } -closeNode.SOCKnode <- closeNode.SOCK0node <- function(node) close(node$con) +## Let the OS close the connection (see stopCluster). +closeNode.SOCKnode <- closeNode.SOCK0node <- function(node) {} sendData.SOCKnode <- function(node, data) serialize(data, node$con) sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE) @@ -269,6 +270,29 @@ print.SOCKnode <- print.SOCK0node <- function(x, ...) invisible(x) } +stopCluster.SOCKcluster <- function(cl = NULL) +{ + for (n in cl) postNode(n, "DONE") + cons <- lapply(cl, function(x) x$con) + + ## Wait (with a timeout) for the worker connection to be closed by the + ## OS, so that the cleanup of the worker's R session has a chance to run + ## before stopCluster() finishes (PR#18133). + + t0 <- Sys.time() + cleanup_timeout <- 5 + while(length(cons) > 0) { + done <- socketSelect(cons, write = FALSE, timeout = cleanup_timeout) + for(con in cons[done]) close(con) + cons <- cons[!done] + if (difftime(Sys.time(), t0, units="secs") > cleanup_timeout) + break + } + + ## Close the remaining worker connections unconditionally. + for(con in cons) close(con) +} + .workRSOCK <- function() { makeSOCKmaster <- function(master, port, setup_timeout, timeout, useXDR,