Race condition when running lbview.apply() fast multiple times in loop #401

Closed
kaazoo opened this Issue Apr 25, 2011 · 4 comments

Comments

Projects
None yet
2 participants

kaazoo commented Apr 25, 2011

There seems to be a kind of race condition when running lbview.apply() fast multiple times in a loop.

The following example code creates tasks. Sometimes tasks are not being created because the loop runs too fast.

def run_job(self, job):
        # set session name which will be used as job name
        self.ip_client.session.session = job['name']

        task_frames = range(job['startframe'], job['endframe'] + 1, job['blocksize'])
        for x in task_frames:
            # prepare script input
            env_dict = {
            'DRQUEUE_OS' : DrQueue.get_osname(),
            'DRQUEUE_ETC' : os.getenv('DRQUEUE_ROOT') + "/etc",
            'DRQUEUE_FRAME' : x,
            'DRQUEUE_BLOCKSIZE' : job['blocksize'],
            'DRQUEUE_ENDFRAME' : job['endframe'],
            'SCENE' : job['scene'],
            'RENDER_TYPE' : "animation"
            }

            # run task on cluster
            render_script = os.getenv('DRQUEUE_ROOT') + "/etc/" + DrQueue.get_rendertemplate(job['renderer'])
            ar = self.lbview.apply(DrQueue.run_script_with_env, render_script, env_dict)

A "time.sleep(0.5)" after the last line helps to avoid this. I think lbview.apply() should block at least for the time it takes to add a task to the queue (I don't mean blocking for task results).

Owner

minrk commented Apr 25, 2011

I'm intrigued by what you mean by 'not created'. What is the actual symptom? It must get a msg_id, I don't see how it's possible for that to fail. Are there any errors in the output of the Controller?

We do have the option to wait for a message to actually get sent to the scheduler. Just set lbview.track=True, and then after apply, call ar.wait_for_send(). This is optional, because there aren't race conditions unless you are editing buffers in-place after sending them, and it can increase submission latency significantly - one of those 'not a bug, but a feature', but it can certainly be a dangerous feature common to all systems that allow non-copying sends.

I don't actually see where there could be a race condition on a buffer here, though, so there's quite possibly a bug somewhere. I would like some more information on what particular step is failing to happen.

To see if all your tasks were created/submitted, you can do:

records = self.ip_client.db_query({'msg_id' : {'$in' : self.ip_client.history}}, keys=['header', 'completed', 'engine_uuid'])
if len(records) == len(self.ip_client.history), then all the tasks were created and made it to the controller
arrived = filter(lambda rec: rec['engine_uuid'] is not None, records) # the tasks that arrived on an engine
finished = filter(lambda rec: rec['completed'] is not None, records) # the tasks that finished somewhere (success or failure)

If all of those lists are the same, then every task was submitted and finished. If there was a race condition, then the data in some tasks could have been clobbered by the following one. You can check on the message data to see if any of them are actually not what they should be.

kaazoo commented May 6, 2011

Here's a stripped down script which creates tasks in a loop:

from IPython.parallel import Client
import time
import os

client = Client()
lbview = client.load_balanced_view()

def do_nothing(render_script, env_dict):
    os.listdir(".")

num_tasks = 100
task_frames = range(1, num_tasks + 1, 1)
for x in task_frames:
    env_dict = {}
    render_script = "foo"
    ar = lbview.apply(do_nothing, render_script, env_dict)
    # avoid race condition
    #time.sleep(0.2)

records = client.db_query({'msg_id' : {'$in' : client.history}}, keys=['header', 'completed', 'engine_uuid'])
print "records: "+str(len(records))
print "history: "+str(len(client.history))

time.sleep(20)

arrived = filter(lambda rec: rec['engine_uuid'] is not None, records)
print "arrived tasks: "+str(len(arrived))

finished = filter(lambda rec: rec['completed'] is not None, records)
print "finished tasks: "+str(len(finished))

It gives different output, but you can see that not all tasks can be created:

$ python2.6 job_create_race.py 
records: 56
history: 100
arrived tasks: 29
finished tasks: 25

$ python2.6 job_create_race.py 
records: 22
history: 100
arrived tasks: 10
finished tasks: 6

$ python2.6 job_create_race.py 
records: 8
history: 100
arrived tasks: 5
finished tasks: 1

If you wait a short while after each task with "time.sleep(0.2)", then you seem to get all tasks:

$ python2.6 job_create_race.py 
records: 100
history: 100
arrived tasks: 100
finished tasks: 100

@kaazoo kaazoo closed this May 6, 2011

@kaazoo kaazoo reopened this May 6, 2011

Owner

minrk commented May 6, 2011

Ah, okay. I thought you were saying that the jobs weren't happening, but it's actually a problem in the record creation. Have you tried this with the retries branch in #413? I think this is related to the collision detection issues I fixed yesterday.

Note that you are performing the db_query as soon as the jobs are submitted. It should be on the other side of your time.sleep() (or you could use client.wait() to wait for the jobs to actually finish), if you want the information to be up to date.

I can reproduce this in master, but not in my retries branch.

kaazoo commented May 30, 2011

Yes, you are right. I modified my test script like you suggested.
It works with the changes of your 'retries' branch (which is already merged into master).

@kaazoo kaazoo closed this May 30, 2011

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment