Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MULTIPLE CHANGES to BUGFIX Dropped Futures #67

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

maharjun
Copy link
Contributor

@maharjun maharjun commented Jun 7, 2017

This is a surgical change in SCOOP that fixes many of its previous issues and finally makes it useful along with the communication error robustness.

I have slightly changed the run control loop, sendResult, made the deletes and appends more explicit, and fixed #65 fixed #66 (which was already fixed by #63) and fixed #64.

All tests seem to be passing except the SLURM tests.

Also added some documentation to the code.

This is a surgical change in SCOOP that fixes many of its previous
issues and finally makes it useful along with the communication error
robustness
@maharjun
Copy link
Contributor Author

maharjun commented Jun 8, 2017

This still does not completely fix the problem as i have later found out. I will fix this over the weekend and add a description of the new logic in a file in the scoop root directory.

1.  Made the fault tolerance scheme a heartbeat scheme.

    1.  Each worker has a parallel process (not thread, see below)
        that sends the heartbeat every TIME_BETWEEN_HEARTBEATS.

    2.  The broker checks every TASK_CHECK_INTERVAL seconds for and
        worker with assigned jobs that does has not sent a heartbeat
        in the last TIME_BEFORE_LOSING_WORKER seconds. It then
        considers this worker dead and performs the following:

        1.  Cleans up all futures created by that worker from
            self.assigned_tasks and self.unassigned_tasks.

        2.  Requests a resend of all futures that were assigned to be
            executed on that worker (This could potentially fail if
            there is no connection to the host that spawned these
            processes. Need to fix this case)

        3.  Deletes entry of this future from the heartbeat times
            dict.

1.  Made the local broker a python forked process instead of thread
    because running long C programs from pypthon causes python to be
    unable to switch threads

2.  Made the sending of heartbeat a forked process due to point 2

3.  Changed the scheduling to be FCFS:

    1.  Changed the updateQueue to remove the dependence on
        lowwatermark as that didnt make sense when we only fetch a job
        once one is completed (i.e. local queue is empty and greenlet
        has returned something). Also the time length only changes
        after having fetched a job.

    2.  Changed the container for self.available_workers to a set to
        prevent spamming of futureRequests which may occur if the
        UpdateQueue receives replies instead of futures after sending
        the future request.

4.  Made control loop clearer and added comments explaining some of
    the underlying mechanisms

5.  Made task sending safe against worker loss (sometimes workers are
    lost and before they can be removed from the qeue a task is
    assigned leading to a ZMQError).
@soravux
Copy link
Owner

soravux commented Jun 12, 2017

Thanks for those, I'll review them this week and merge them as soon as possible.

@maharjun
Copy link
Contributor Author

The following is a summary of changes made to scoop

  1. Made deletes from futureDict and execQueue more explicit (i.e. they thow an error if the object to delete doesn't exist.)

  2. Added an isReady Flag which represents the state of having been processed collected by the source worker. A ready future has all references cleared except the one in execQueue.ready. Also, the ready queue now only contains futures for which isReady is set.

  3. Made the append functions more explicit to convey exactly what's going on

  4. Made execQueue.inprogress actually track futures that are in progress

  5. Made control loop clearer and added comments explaining some of the underlying mechanisms

  6. Made the fault tolerance scheme a heartbeat scheme.

    1. Each worker has a parallel process (not thread, see point 7 & 8) that sends the heartbeat every TIME_BETWEEN_HEARTBEATS.

    2. The broker checks every TASK_CHECK_INTERVAL seconds for a worker with assigned jobs that has not sent a heartbeat in the last TIME_BEFORE_LOSING_WORKER seconds. It then considers this worker dead and performs the following:

      1. Cleans up all futures created by that worker from self.assigned_tasks and self.unassigned_tasks.

      2. Requests a resend of all futures that were assigned to be executed on that worker (This could potentially fail if there is no connection to the host that spawned these processes. A warning is issued in this case)

      3. Deletes entry of this future from the self.heartbeat_times dict and the self.assigned_times.

  7. Made the local broker a python forked process instead of thread because running long C programs from pypthon causes python to be unable to switch threads

  8. Made the sending of heartbeat a forked process due to point 7. skipped heartbeats cause unintentionally dropped processes

  9. Changed the scheduling to be FCFS:

    1. Changed the updateQueue to remove the dependence on lowwatermark as that didnt make sense when we only fetch a job once one is completed (i.e. local queue is empty and greenlet has returned something). Also the time length only changes after having fetched a job.

    2. Changed the container for self.available_workers to a set to prevent spamming of futureRequests which may occur if the UpdateQueue receives replies instead of futures after sending the future request.

Made the following changes

1.  Added a new function append_init which is used by futures.submit.
    This function adds newly spawned futures to the broker's task
    queue.

2.  Added new state variable request_in_process to the FutureQueue
    object in order to track the state of the future request. This is
    used to track the state of a future request so that a future
    request is sent iff the previous future request has been answered

3.  Also, replaced function recvFutures with recvIncoming which passes
    the received incoming messages back to updateQueue where they are
    processed. Changed _recv to only perform preprocessing of the
    messages.
Copy link

@nfaguirrec nfaguirrec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the line 329 of scoop/_types.py the function _delete is still used "sending_future._delete()", however this function was removed in this same commit. Program fails with the error: AttributeError: 'Future' object has no attribute "_delete"

@maharjun
Copy link
Contributor Author

maharjun commented Jul 27, 2017 via email

@maharjun
Copy link
Contributor Author

Hmm.. Its strange, I can't seem to find he part of code that you're talking about. Are you sure you're not using an older version of this?

BTW, If you're running this on linux, you may be interested in trying out the IGITUGraz fork (you should use the branch TUGmaster). It has some system-dependent fixes that improve process cleanup during interruption and exceptions.

@nfaguirrec
Copy link

I found the error in this pull request. Not in a specific branch. I was interested in testing your modifications: "Fixing case where the resend request could fail if source worker is lost" and "Made the local broker a python forked process instead of thread". Thanks!

@maharjun
Copy link
Contributor Author

The thing is there are some commits after that in this pull request. titled as follows

Changed Variable name status_times to heartbeat_times      (528081c)
Simplified FutureQueue.pop                                 (5c3e055)
MULTIPLE CHANGES Fixed Inefficient Scheduling (Issue #68)  (8de3f0c)

search for the commit ID's on this page you should find them. These commits contain the fixes to the issues you mentioned and more.

Enjoy!

@maharjun
Copy link
Contributor Author

maharjun commented Jul 27, 2017

Interestingly, when I check the diff of ea14f32, I see that the very line you mention has been changed, so right now, I really have no clue where it is that you're seeing that patch of code.

This is a link to the file at that commit (you should be able to see the commit ID in the link)
https://github.com/IGITUGraz/scoop/blob/ea14f3258b11b3315dbf10cf885a721a999edad3/scoop/_types.py

If you go to the line you specified (329) you'll see that there is no usage of _delete

PS: I think it would be easier for you if you simply pulled this branch into your local repo and tried it out.
PPS: I have posted a list of known issues here: (#60 (comment))

@nfaguirrec
Copy link

OK. I will try it. Thanks a lot !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SCOOP Memory Leak 1 SCOOP is Losing Futures SCOOP Memory Leak 2
3 participants