Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

local executor cannot resume due to thread pool limit (1000) reached while checking cache #1871

Closed
gpertea opened this issue Jan 27, 2021 · 9 comments

Comments

@gpertea
Copy link

gpertea commented Jan 27, 2021

Bug report

On a server with 144 cores, when resuming a bioinformatics workflow processing 800 samples with multiple processes/stages, the local executor runs hits the upper limit of the thread pool (1000, as hard-coded in GPars), while checking the cached tasks in parallel, for multiple processes at once.

Expected behavior and actual behavior

Resuming (using the -resume option) should be possible for the local executor, checking the caches for the 800 * (number of outputs from multiple processes) should be throttled/scheduled/limited to a lower number of threads in order to prevent hitting the hard-coded thread pool limit of 1000.

Steps to reproduce the problem

Using the -resume flag after the pipeline crashed/interrupted at the final stage, after most of the previous steps and outputs were generated, when the SPEAQeasy pipeline is run with a -profile local or -profile docker_local options (which invoke the local executor in nextflow) and an input of 800 samples in the samples.manifest file.

Program output

Here it is the output of SPEAQeasy showing the progress of the cached tasks when the pipeline fails

[skipped  ] process > PullAssemblyFasta (Downloading Assembly FA File: GRCh38.primary_assembly.genome.fa) [100%] 1 of 1, stored: 1 ?                                                         
[skipped  ] process > buildHISATindex (Building HISAT2 Index: hisat2_assembly_hg38_gencode_v32_main)      [100%] 1 of 1, stored: 1 ?                                                         
[skipped  ] process > PullGtf (Downloading GTF File: gencode.v32.annotation.gtf)                          [100%] 1 of 1, stored: 1 ?                                                         
[skipped  ] process > BuildAnnotationObjects                                                              [100%] 1 of 1, stored: 1 ?                                                         
[skipped  ] process > PullTranscriptFasta (Downloading TX FA File: gencode.v32.transcripts.fa)            [100%] 1 of 1, stored: 1 ?                                                         
[skipped  ] process > BuildKallistoIndex                                                                  [100%] 1 of 1, stored: 1 ?                                                         
[8c/03fc16] process > PreprocessInputs                                                                    [100%] 1 of 1, cached: 1 ?                                                         
[b1/09d4be] process > InferStrandness (SRR3542080)                                                        [100%] 800 of 800, cached: 800 ?                                                   
[c7/f00ab0] process > CompleteManifest                                                                    [100%] 1 of 1, cached: 1 ?                                                         
[e2/c97f6c] process > QualityUntrimmed (SRR3542077)                                                       [100%] 800 of 800, cached: 800 ?                                                   
[58/da1f20] process > Trimming (SRR3542099)                                                               [100%] 800 of 800, cached: 800 ?                                                   
[1f/0f1230] process > QualityTrimmed (SRR3542099)                                                         [100%] 800 of 800, cached: 800 ?                                                   
[cf/ca9370] process > SingleEndHISAT (SRR3541745)                                                         [100%] 448 of 448, cached: 448
[5d/f8f499] process > SamtoBam (SRR3541769)                                                               [100%] 312 of 312, cached: 312
[b0/9fde63] process > FeatureCounts (SRR3541550)                                                          [100%] 163 of 163, cached: 163
[f4/e9316e] process > PrimaryAlignments (SRR3541769)                                                      [100%] 311 of 311, cached: 311
[cf/6c69ab] process > Junctions (SRR3541369)                                                              [100%] 131 of 131, cached: 131
[c2/06efc0] process > TXQuantKallisto (SRR3541888)                                                        [100%] 587 of 587, cached: 587
[-        ] process > CountObjects                                                                        -
[ef/b5403a] process > VariantCalls (SRR3541556)                                                           [100%] 160 of 160, cached: 160
[-        ] process > VariantsMerge                                                                       -
[skipping] Stored process > PullGtf (Downloading GTF File: gencode.v32.annotation.gtf)
[skipping] Stored process > PullTranscriptFasta (Downloading TX FA File: gencode.v32.transcripts.fa)
[skipping] Stored process > PullAssemblyFasta (Downloading Assembly FA File: GRCh38.primary_assembly.genome.fa)
[skipping] Stored process > BuildKallistoIndex
[skipping] Stored process > buildHISATindex (Building HISAT2 Index: hisat2_assembly_hg38_gencode_v32_main)
[skipping] Stored process > BuildAnnotationObjects
Error executing process > 'Junctions'

Caused by:
  The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000

nextflow.log

Environment

  • Nextflow version: [20.10.0 build 5430]
  • Java version: [Groovy 3.0.5 on OpenJDK 64-Bit Server VM 1.8.0_275-8u275-b01-0ubuntu1~18.04-b01]
  • Operating system: [Ubuntu 18.04.5 LTS on a server with 4 x Intel Xeon Gold 6154 CPU (72 physical cores, 144 threads)]
  • Bash version: (4.4.20(1)-release)
@abhi18av
Copy link
Member

Hi @gpertea ,

Thanks for reporting this issue!

This could be because the maxThreads is set to 432, from L-130 in the shared nextflow.log file.

Jan-27 08:54:37.514 [Actor Thread 3] DEBUG n.util.BlockingThreadExecutorFactory - Thread pool name=FileTransfer; maxThreads=432; maxQueueSize=1296; keepAlive=1m

With a quick search, I could see that a couple of possible solutions have been mentioned in a similar thread earlier #92

@gpertea
Copy link
Author

gpertea commented Jan 27, 2021

I checked that thread before posting this bug report and tried the solutions suggested there on the user side. Using -Dnxf.pool.maxThreads=5000 results in exactly the same behavior and error message about reaching the 1000 thread pool limit. Also my problem is strictly related to the cache checking during the resume of the pipeline (not quite the same situation as described there), I do not seem to have much control about how those cache verification tasks are scheduled, at least with the local executor, do I?
And the final solution to that issue #92 seemed to have been fixed in the nextflow code in 2017.

It seems to me that the 1000 thread limit is hard-coded in the GPars scheduler package:
https://github.com/codehaus/gpars/blob/d9a3097ae831ad963ed5615e0afd72207ababbfe/src/main/groovy/groovyx/gpars/scheduler/ResizeablePool.java#L68

Also, in the nextflow documentation I could not find a way to specify the maximum number of threads, I have no idea where that 432 maxThreads value comes from, I do not think it is controllable user-side. My general understanding is that an dynamic thread pool size is being used, so that hard-coded 1000 limit is hit by the local executor due to the lack of (proper?) task scheduling. This can be seen in the error message triggered in the submitted nextflow.log from L-5149 when the Actor Thread 1006 is to be spawned:

Jan-27 08:54:56.673 [Actor Thread 232] INFO  nextflow.processor.TaskProcessor - [66/97c3d3] Cached process > SingleEndHISAT (SRR3541761)
Jan-27 08:54:56.787 [Actor Thread 1006] ERROR nextflow.processor.TaskProcessor - Error executing process > 'Junctions'

Caused by:
  The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000

java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000
	at groovyx.gpars.scheduler.ResizeablePool$2.rejectedExecution(ResizeablePool.java:87)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155)
	at groovyx.gpars.dataflow.operator.ForkingDataflowOperatorActor.startTask(ForkingDataflowOperatorActor.java:54)
	at groovyx.gpars.dataflow.operator.DataflowOperatorActor.onMessage(DataflowOperatorActor.java:108)
	at groovyx.gpars.actor.impl.SDAClosure$1.call(SDAClosure.java:43)
	at groovyx.gpars.actor.AbstractLoopingActor.runEnhancedWithoutRepliesOnMessages(AbstractLoopingActor.java:293)
	at groovyx.gpars.actor.AbstractLoopingActor.access$400(AbstractLoopingActor.java:30)
	at groovyx.gpars.actor.AbstractLoopingActor$1.handleMessage(AbstractLoopingActor.java:93)
	at groovyx.gpars.util.AsyncMessagingCore.run(AsyncMessagingCore.java:132)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@gpertea
Copy link
Author

gpertea commented Jan 27, 2021

The limit of 1000 local threads is obviously a reasonable maximum (at least until we get servers with >1000 cores), the problem seems to be with the lack of throttling/scheduling, of proper queuing of the tasks in the case of the local executor, such that the number of Actor Threads is kept under control.

I also tried limiting cpus, submitRateLimit and queueSize in the executor configuration scope, with no different outcome -- as if they do not affect the local executor.

@abhi18av
Copy link
Member

@gpertea , unfortunately this is beyond my knowledge of the dark art of threading.

However, based on the following comment

paolo: you may want to try nxf.pool.type=bounded or nxf.pool.type=unbounded

... from an earlier gitter chat I believe that NF does provide a mechanism to tweak the threading behavior. On further investigation, I came across the NXF_POOL_TYPE env var which could provide a way forward

def type = property(NXF_POOL_TYPE, 'default')

@gpertea
Copy link
Author

gpertea commented Jan 27, 2021

It looks like -Dnxf.pool.type=sync might actually address this issue -- it seems that with it the pipeline was able to get past the cache verification, but it's only one run and the server was quite busy as I tried it so it's still possible that the resuming was heavily throttled by external factors.. That option was discussed in #92 but in a context where it was causing further problems (hanging the pipeline) it was not quite clear to me that such an option MUST be used to prevent the local executor from spawning threads uncontrollably (and if so, how is this not the default for the local executor?).

I'll perform further testing and I'll close the issue if I get persistent positive results with this option.

@gpertea
Copy link
Author

gpertea commented Jan 29, 2021

After multiple tests I can confirm that -Dnxf.pool.type=sync prevents this issue for the local executor.

@gpertea gpertea closed this as completed Jan 29, 2021
@abhi18av
Copy link
Member

Great to know that this resolved your issue :)

@abhi18av
Copy link
Member

Regarding what you mentioned about sync being a sensible default for the local executor, perhaps @pditommaso could shed some light on the design choice overall.

@rjpbonnal
Copy link

I had a similar problem w/the thread pool limit (1000) running the nf-core/sarek @maxulysse . The node running nextflow has (128 Core/256 Threads), then the jobs have been deployed on our sge executor.
I can confirm that running Nextflow with -Dnxf.pool.type=sync solved the issue.

      N E X T F L O W
      version 20.10.0 build 5430
      created 01-11-2020 15:14 UTC (16:14 CEST)
      cite doi:10.1038/nbt.3820
      http://nextflow.io
openjdk 11.0.9.1-internal 2020-11-04
OpenJDK Runtime Environment (build 11.0.9.1-internal+0-adhoc..src)
OpenJDK 64-Bit Server VM (build 11.0.9.1-internal+0-adhoc..src, mixed mode)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants