diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy index b6cbf2d315..0761b37834 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/K8sConfig.groovy @@ -18,11 +18,8 @@ package nextflow.k8s import nextflow.k8s.client.K8sRetryConfig -import java.util.concurrent.TimeUnit import javax.annotation.Nullable -import com.google.common.cache.Cache -import com.google.common.cache.CacheBuilder import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j @@ -57,8 +54,6 @@ class K8sConfig implements ConfigScope { static final private Map DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1) - private Cache clientCache - @ConfigOption @Description(""" Automatically mount host paths into the task pods (default: `false`). Only intended for development purposes when using a single node. @@ -226,9 +221,6 @@ class K8sConfig implements ConfigScope { cleanup = opts.cleanup as Boolean client = opts.client as Map clientRefreshInterval = opts.clientRefreshInterval as Duration ?: Duration.of('50m') - clientCache = CacheBuilder.newBuilder() - .expireAfterWrite(clientRefreshInterval.toMillis(), TimeUnit.MILLISECONDS) - .build() computeResourceType = opts.computeResourceType context = opts.context cpuLimits = opts.cpuLimits as boolean @@ -366,10 +358,6 @@ class K8sConfig implements ConfigScope { } ClientConfig getClient() { - return clientCache.get('client', this::getClient0) - } - - private ClientConfig getClient0() { final result = client != null ? clientFromNextflow(client, namespace, serviceAccount) : clientDiscovery(context, namespace, serviceAccount) diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy index 32659722d2..f03eb88742 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/K8sExecutor.groovy @@ -16,6 +16,10 @@ package nextflow.k8s +import java.util.concurrent.TimeUnit + +import com.google.common.cache.Cache +import com.google.common.cache.CacheBuilder import groovy.transform.CompileStatic import groovy.transform.Memoized import groovy.util.logging.Slf4j @@ -41,12 +45,17 @@ import org.pf4j.ExtensionPoint class K8sExecutor extends Executor implements ExtensionPoint { /** - * The Kubernetes HTTP client + * Cache for the Kubernetes HTTP client. The client is refreshed periodically + * so that the service account token is re-read when it expires. */ - private K8sClient client + private Cache clientCache + /** + * @return The Kubernetes HTTP client. Delegates to a Guava cache that refreshes + * the client (including the service account token) when the configured interval expires. + */ protected K8sClient getClient() { - client + clientCache.get('client', () -> new K8sClient(k8sConfig.getClient())) } /** @@ -64,9 +73,12 @@ class K8sExecutor extends Executor implements ExtensionPoint { protected void register() { super.register() final k8sConfig = getK8sConfig() - final clientConfig = k8sConfig.getClient() - this.client = new K8sClient(clientConfig) - log.debug "[K8s] config=$k8sConfig; API client config=$clientConfig" + final refreshInterval = k8sConfig.clientRefreshInterval + this.clientCache = CacheBuilder.newBuilder() + .expireAfterWrite(refreshInterval.toMillis(), TimeUnit.MILLISECONDS) + .build() + final client = getClient() + log.debug "[K8s] config=$k8sConfig; API client config=$client.config" } /** diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index 57bc2e4a57..ae035ee78a 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -93,7 +93,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { K8sTaskHandler( TaskRun task, K8sExecutor executor ) { super(task) this.executor = executor - this.client = executor.client + this.client = executor.getClient() this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE) this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE) this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT) diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy index cf589695f9..f1eb2e2fee 100644 --- a/plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy +++ b/plugins/nf-k8s/src/test/nextflow/k8s/K8sConfigTest.groovy @@ -484,33 +484,4 @@ class K8sConfigTest extends Specification { cfg.clientRefreshInterval == Duration.of('1h') } - def 'should cache client config and refresh after expiration' () { - given: - def CONFIG = [ - namespace: 'test-ns', - serviceAccount: 'test-sa', - client: [server: 'http://k8s-server'], - clientRefreshInterval: '100ms' - ] - K8sConfig config = Spy(K8sConfig, constructorArgs: [CONFIG]) - - when: 'first call to getClient' - def client1 = config.getClient() - then: 'client is created via clientFromNextflow' - 1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns') - client1.server == 'http://k8s-server' - - when: 'second call within cache interval' - def client2 = config.getClient() - then: 'returns cached client without calling clientFromNextflow again' - 0 * config.clientFromNextflow(_, _, _) - client2.is(client1) - - when: 'call after cache expiration' - sleep(150) // wait for cache to expire - def client3 = config.getClient() - then: 'client is recreated' - 1 * config.clientFromNextflow(_, _, _) >> new ClientConfig(server: 'http://k8s-server', namespace: 'test-ns') - !client3.is(client1) - } } diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/K8sExecutorTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/K8sExecutorTest.groovy new file mode 100644 index 0000000000..9a0630cc0c --- /dev/null +++ b/plugins/nf-k8s/src/test/nextflow/k8s/K8sExecutorTest.groovy @@ -0,0 +1,66 @@ +/* + * Copyright 2013-2026, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.k8s + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.CacheBuilder +import nextflow.k8s.client.ClientConfig +import nextflow.k8s.client.K8sClient +import spock.lang.Specification + +/** + * @author Paolo Di Tommaso + */ +class K8sExecutorTest extends Specification { + + def 'should cache k8s client and refresh after expiration' () { + given: + def CONFIG = new K8sConfig( + client: [server: 'http://k8s-server'], + namespace: 'test-ns', + serviceAccount: 'test-sa', + clientRefreshInterval: '100ms' + ) + and: + def executor = Spy(K8sExecutor) + executor.getK8sConfig() >> CONFIG + // use a short-lived cache for the test + executor.@clientCache = CacheBuilder.newBuilder() + .expireAfterWrite(100, TimeUnit.MILLISECONDS) + .build() + + when: 'first call to getClient' + def client1 = executor.getClient() + then: 'a new K8sClient is created' + client1 instanceof K8sClient + client1.config.server == 'http://k8s-server' + + when: 'second call within cache interval' + def client2 = executor.getClient() + then: 'returns the same cached instance' + client2.is(client1) + + when: 'call after cache expiration' + sleep(150) + def client3 = executor.getClient() + then: 'a new K8sClient instance is created' + client3 instanceof K8sClient + !client3.is(client1) + } + +}