Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DF] Crash with distributed RDataFrame on dask with dask_jobqueue #9429

Closed
1 task done
swertz opened this issue Dec 13, 2021 · 9 comments · Fixed by #9431
Closed
1 task done

[DF] Crash with distributed RDataFrame on dask with dask_jobqueue #9429

swertz opened this issue Dec 13, 2021 · 9 comments · Fixed by #9431
Assignees
Labels

Comments

@swertz
Copy link

swertz commented Dec 13, 2021

  • Checked for duplicates

Describe the bug

I've been trying out the new RDF.Experimental.Distributed.Dask.RDataFrame in ROOT master, which is a great addition. It seems to work fine when using a single-machine cluster of workers (dask.distributed.LocalCluster), but it fails when using a batch cluster (either dask_jobqueue.HTCondorCluster or dask_jobqueue.SLURMCluster):

Traceback (most recent call last):
File "/afs/cern.ch/user/s/swertz/testDistRDF/testCondor.py", line 81, in <module>
val = prod.GetValue()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 130, in GetValue
self.execute_graph()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 121, in execute_graph
headnode.backend.execute(generator)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 134, in execute
ranges = headnode.build_ranges()
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/HeadNode.py", line 91, in build_ranges
return Ranges.get_balanced_ranges(self.nentries, self.npartitions)
File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Ranges.py", line 276, in get_balanced_ranges
partition_size = nentries // npartitions
ZeroDivisionError: integer division or modulo by zero

To Reproduce

Minimal example, ran on lxplus/lxbatch:

import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
distributed = ROOT.RDF.Experimental.Distributed
from dask.distributed import Client, LocalCluster
from dask_jobqueue import HTCondorCluster

cluster = HTCondorCluster(cores=1, processes=1, memory="1GB", disk="0.1GB", job_extra={"jobflavour": "espresso"})
cluster.scale(jobs=1)
# Works fine with:
# cluster = LocalCluster(n_workers=1, threads_per_worker=1)
client = Client(cluster)
df = RDataFrame(100, daskclient=client)
df = df.Define("myCol", "10")
prod = df.Mean("myCol")
val = prod.GetValue()
print(f"Value: {val}")

Setup

  • ROOT from LCG dev3: /cvmfs/sft.cern.ch/lcg/views/dev3/latest/x86_64-centos7-gcc11-opt/setup.sh
  • need to pip install dask-jobqueue
@swertz
Copy link
Author

swertz commented Dec 13, 2021

I should add that when running the RDF on an existing file and TTree, there is a different exception:

Traceback (most recent call last):
  File "/afs/cern.ch/user/s/swertz/testDistRDF/testCondorTree.py", line 35, in <module>
    val = prod.GetValue()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 130, in GetValue
    self.execute_graph()
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Proxy.py", line 121, in execute_graph
    headnode.backend.execute(generator)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Base.py", line 219, in execute
    values = self.ProcessAndMerge(ranges, mapper, reducer)
  File "/cvmfs/sft-nightlies.cern.ch/lcg/views/dev3/Sat/x86_64-centos7-gcc11-opt/lib/DistRDF/Backends/Dask/Backend.py", line 128, in ProcessAndMerge
    final_results = mergeables_lists.pop().persist()
IndexError: pop from empty list

With the code:

import ROOT
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
from dask.distributed import Client
from dask_jobqueue import HTCondorCluster

cluster = HTCondorCluster(cores=1, processes=1, memory="1GB", disk="0.1GB", job_extra={"jobflavour":  espresso"})
cluster.scale(jobs=1)
client = Client(cluster)
df = RDataFrame("Events", "tree.root", daskclient=client)
prod = df.Mean("myCol")
val = prod.GetValue()
print(f"Value: {val}")

And the file tree.root attached

tree.root.zip

@vepadulano
Copy link
Member

Hi @swertz,
Thanks a lot for trying it out and for the reproducer!
The issue is a bit silly and it has to do with how we compute a reasonable default value for the npartitions attribute if the user doesn't provide one. The idea, for the moment at least, is to get information about how many cores are available in the distributed cluster to the user application and submit one task per core.

In the case of Dask, this is done by accessing the client.scheduler_info() dictionary, which in turn contains a worker key which value is another dictionary of information about the workers. The current approach assumed that if this key was present than it must mean that the Dask client has the information about how many cores are available. Unfortunately, as you just demonstrated, this is not the case.

In particular, the dask-jobqueue interface has a rather annoying quirk, that is whenever you have already called the code to setup the cluster and the client objects, nothing is really "ready to run" yet. Instead, your Dask application will still be waiting for the batch system (HTCondor in this case) to actually give it resources. This has the side-effect that the dictionary I mentioned above contains the key workers, but that is an empty dictionary. You can check this by adding a

print(client.scheduler_info())

call after you create your client in the application.

This effect can be mitigated if you first call client.wait_for_workers(N_WORKERS) before calling anything else RDF related. This will supposedly make sure that HTCondor has released resources to the Dask cluster (according to the docs).

With that being said, the optimize_npartitions function in the Dask backend should not be sensitive to this kind of effect, so in the linked PR I made the check a bit stronger so that in your reproducer above the npartitions should correctly default to two instead of zero.

@vepadulano
Copy link
Member

As for your second comment

IndexError: pop from empty list

I think you are still seeing the effect of HTCondor that hasn't already given resources to Dask. Try again adding client.wait_for_workers(N_WORKERS) before calling RDF API. N_WORKERS should be the amount of Dask workers that the HTCondor job should give you back, so I guess one in your case. But the dask docs are a bit scarce on this so I can't give you too many details

@swertz
Copy link
Author

swertz commented Dec 13, 2021

Thanks for the quick feedback!

I've tried with client.wait_for_workers(1) before creating the RDF, but it doesn't have the expected effect: it just waits forever after printing DEBUG:Starting job: 12241446.0 (debugging activated with import logging; logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)). I can see the job in condor_q come and go, but the blocking call is never released for some reason (note that I'm not a Dask expert by any means).

I'll try again once #9431 is merged; in particular I'm interested in using Dask's adaptive worker management, so that the exact number of submitted jobs is not fixed a priori but automatically adapts to the workload...

@vepadulano
Copy link
Member

vepadulano commented Dec 13, 2021

The fact that wait_for_workers waits forever looks to me like the Dask job is never actually started. You should double check that a simple Dask application with the same config but doing something else unrelated to RDF is working. Let's see, something like this should suffice (inspired by the dask docs and your reproducer):

import time
import random

from dask import delayed
from dask.distributed import Client
from dask_jobqueue import HTCondorCluster

cluster = HTCondorCluster(cores=1, processes=1, memory="1GB", disk="0.1GB", job_extra={"jobflavour": "espresso"})
cluster.scale(jobs=1)
client = Client(cluster)

# Try with and without this
client.wait_for_workers(1)

def inc(x):
    time.sleep(random.random())
    return x + 1

def dec(x):
    time.sleep(random.random())
    return x - 1

def add(x, y):
    time.sleep(random.random())
    return x + y

inc = delayed(inc)
dec = delayed(dec)
add = delayed(add)

x = inc(1)
y = dec(2)
z = add(x, y)

print(f"Result is: {z.compute()}")

Coming back to RDF, the npartitions parameter corresponds to how many distributed tasks will be sent to the Dask scheduler. Supposing you know something about how many resources you have or the layout of your ROOT dataset, you could set this manually in the RDataFrame constructor like

df = RDataFrame("treename","filename.root",daskclient=client,npartitions=NPARTITIONS)

A good parallelisation can often be obtained when the number of partitions is roughly 3x the amount of cores you can use.
UPDATE:

in particular I'm interested in using Dask's adaptive worker management, so that the exact number of submitted jobs is not fixed a priori but automatically adapts to the workload...

Sounds good, then the npartitions parameter could be adjusted based on how many TTree clusters you have in your dataset. Roughly one task every 4-5 clusters should be a good starting point.

@swertz
Copy link
Author

swertz commented Dec 13, 2021

Strange, I've tried the dask-only example (with lxbatch) but in both cases (with or without the wait) it waits forever. I see the job come and go in condor_q but nothing happens. Perhaps we're missing something to correctly use dask_jobqueue 🤔

About the choice of npartitions: thanks, that's useful information. I should check how many clusters/file we have on average for nanoAOD files. But since I'm feeding in a TChain with (in some cases) several hundred files, I suppose I should choose npartitions ~ n_files * n_clusters_per_file? Although, this would quickly amount to much more than "3x the amount of cores I can use" (I have O(400) RDF's with each O(200) input files but only a total of O(500) cores to run on)...

@vepadulano
Copy link
Member

Deciding how many tasks to submit to a distributed scheduler is a fine art on its own :) From what I understand of your case, I would try to test first with npartitions ~ n_files * (n_clusters_per_file / 5) so that each task has at least more than 100 MB of data to process and you don't incur in too many small tasks that bring more overhead than actual gain. In the future it would be nice to have some clever algorithm that figures out a better default number of partitions, we'll see.

Not sure about the details of the HTCondorCluster interface, I will get back to you if I manage to understand what's missing.

@vepadulano vepadulano added this to Issues in Fixed in 6.26/00 via automation Dec 13, 2021
@swertz
Copy link
Author

swertz commented Dec 14, 2021

I managed to get it to work on HTCondor/lxbatch thanks to the instructions here. The distributed RDF case (with the wait_for_workers call) works fine provided the cluster is configured as:

import socket 
cluster = HTCondorCluster(cores=1, processes=1, memory="1GB", disk="0.1GB",
                              job_extra={
                                  "+JobFlavour": '"espresso"',
                                  'should_transfer_files': 'Yes',
                                  'when_to_transfer_output': 'ON_EXIT',
                                  'getenv': 'True'
                              },
                              death_timeout = '60', 
                              scheduler_options={
                                  'port': 8786,
                                  'host': socket.gethostname()
                              },
                              extra=['--worker-port 10000:10100']
                             )

Note that for the dask-only example I had to use futures instead of delayed, and submit explicitly to the cluster (and this works both with and without the wait call, as expected):

x = client.submit(inc, 1)
y = client.submit(inc, 2)
z = client.submit(add, x, y)
print(f"Result is: {z.result()}")

The dask documentation is not very enlightening about the use of delayed with clusters...

As for using the SLURMCluster, just adding the call to wait_for_workers worked fine with the config I had before 👍

If you'd like I'll try again with the next nightly build including your merged PR, without the explicit call to wait_for_workers.

I would try to test first with npartitions ~ n_files * (n_clusters_per_file / 5) so that each task has at least more than 100 MB of data to process and you don't incur in too many small tasks that bring more overhead than actual gain

If I may abuse the thread with a naive question: how do you get the number of clusters in a file?

@vepadulano
Copy link
Member

Thanks for all the insights! We are still learning how to cope with all the different interfaces. It is possible that at some point all this extra configuration will be collected in a single place to make it easier for new users to activate from distributed RDataFrame directly.

It would be amazing if you could try again your reproducer with the next nightlies if you have time, thank you so much 😄 !
Let's continue the discussion in private for other topics

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
No open projects
Development

Successfully merging a pull request may close this issue.

2 participants