Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uproot stalls in concurrent.futures when xrootd_glob is used #224

Closed
asnaylor opened this issue Dec 10, 2020 · 3 comments
Closed

uproot stalls in concurrent.futures when xrootd_glob is used #224

asnaylor opened this issue Dec 10, 2020 · 3 comments
Labels
wontfix This will not be worked on

Comments

@asnaylor
Copy link

asnaylor commented Dec 10, 2020

When I try to use uproot in parallel tasks with ProcessPoolExecutor from concurrent.futures it hangs when I have previously call glob from XRootD.client.glob_funcs.

It hangs inside ProcessPoolExectutor when i try this:

from concurrent import futures
from XRootD.client.glob_funcs import glob
import uproot

def simple_func(input_path, tree):
    return [uproot.open(input_path)[tree].num_entries]

input_data = glob("root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root")
tree = 'Events'

print("Input Data: {}".format(input_data))
print("Starting ProcessPoolExectutor")
with futures.ProcessPoolExecutor(max_workers=1) as executor:
    tasks = set()
    tasks.update(executor.submit(simple_func, path, tree) for path in input_data)            
    complete_tasks = futures.as_completed(tasks)

    for job in complete_tasks:
        print(job.result())

If I just switch glob for a list (['root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root']) for inputData this works. If I use xrootd glob and open up the file in uproot without using ProcessPoolExectutor it works too. I wonder if there some cpu locking issue when using xrootd, uproot and concurrent.futures together?

Versions:

Python 3.8.2
awkward==1.0.0
uproot==4.0.0
xrootd===5.0.3-
@asnaylor asnaylor added the bug (unverified) The problem described would be a bug, but needs to be triaged label Dec 10, 2020
@jpivarski
Copy link
Member

Hanging is not good, so this needs to be addressed, but also XRootD with wildcards has not been implemented: #193. You're hitting some behavior that I didn't define in Uproot because one of my assumptions about what XRootD does is broken by its wildcard processing. I'll need to reproduce your issue and adjust.

@jpivarski jpivarski added this to Sources in November bug-fixes Dec 10, 2020
@jpivarski jpivarski added wontfix This will not be worked on and removed bug (unverified) The problem described would be a bug, but needs to be triaged labels Feb 18, 2021
@jpivarski
Copy link
Member

I've tried this out, and it is as you say:

input_data = glob("root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root")

makes it hang (I waited about a minute) and

input_data = ["root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"]

works normally. However, if we inspect the return type of XRootD's glob,

>>> type(input_data)
<class 'list'>
>>> type(input_data[0])
<class 'str'>
>>> type(input_data) is list
True
>>> type(input_data[0]) is str
True

it's a totally normal list of Python strings. There's nothing in this data object that could cause a difference in Uproot.

By process of elimination, then, simply calling glob is putting XRootD into a state that determines whether it hangs. This function evidently has a side-effect. To test that, I ran your code with one small change:

from concurrent import futures
from XRootD.client.glob_funcs import glob
import uproot

def simple_func(input_path, tree):
    print("starting", input_path, tree)
    return [uproot.open(input_path)[tree].num_entries]

# Call it and forget about it.
glob("root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root")

input_data = ["root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"]
tree = 'Events'

print("Input Data: {}".format(input_data))
print("Starting ProcessPoolExectutor")
with futures.ProcessPoolExecutor(max_workers=1) as executor:
    tasks = set()
    tasks.update(executor.submit(simple_func, path, tree) for path in input_data)
    complete_tasks = futures.as_completed(tasks)

    for job in complete_tasks:
        print(job.result())

This hangs, too. In fact, I can toggle it by commenting out the completely unused glob call.

Here's a minimally reproducing example to report to the pyxrootd group: it fails if glob is called in the main process and the file is read from a child process. With the eight cases you can test below, only the two (ProcessPoolExecutor OR multiprocessing) AND (call_glob) actually hang.

import XRootD.client
from concurrent import futures
import threading
import multiprocessing
from XRootD.client.glob_funcs import glob

def simple_func(input_path, tree):
    file = XRootD.client.File()
    status, _ = file.open(input_data[0])
    print(status)

    status, some_raw_bytes = file.read(0, 100)
    print(status)
    print(some_raw_bytes)

    status, some_raw_bytes = file.read(100, 200)
    print(status)
    print(some_raw_bytes)

    return "yay"

# Call it and forget about it.
glob("root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root")

input_data = ["root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root"]
tree = 'Events'

which_test = "multiprocessing"

if which_test == "ProcessPoolExecutor":
    with futures.ProcessPoolExecutor(max_workers=1) as executor:
        future = executor.submit(simple_func, input_data[0], tree)
        print(future.result())

elif which_test == "ThreadPoolExecutor":
    with futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(simple_func, input_data[0], tree)
        print(future.result())

elif which_test == "multiprocessing":
    class MyProcess(multiprocessing.Process):
        def run(self):
            print("before")
            print(simple_func(input_data[0], tree))
            print("after")
    myprocess = MyProcess()
    myprocess.start()
    myprocess.join()

else:
    class MyThread(threading.Thread):
        def run(self):
            print("before")
            print(simple_func(input_data[0], tree))
            print("after")
    mythread = MyThread()
    mythread.start()
    mythread.join()

Since this is not an Uproot bug or an Uproot failure mode triggered by a pyxrootd behavior, I'm closing it. (I hadn't noticed before that you are only asking for the number of entries, which only does an XRootD.client.File.read, not any fancy vector-reads, like the TBasket-fetching code does. Thinking it was the latter, I had some ideas of how Uproot might be hanging, but it's just failing in XRootD.client.File.open.)

November bug-fixes automation moved this from Sources to Done Feb 18, 2021
@asnaylor
Copy link
Author

Thanks Jim for looking into this, I'll let the pyxrootd group know about the issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
No open projects
Development

No branches or pull requests

2 participants