Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MULTIPLE CHANGES to BUGFIX Dropped Futures #67

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions scoop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@
SHUTDOWN_REQUESTED = False

TIME_BETWEEN_PARTIALDEBUG = 30
TIME_BETWEEN_STATUS_REPORTS = 25
TIME_BETWEEN_STATUS_PRUNING = 60
TIME_BETWEEN_HEARTBEATS = 25
TASK_CHECK_INTERVAL = 15
TIME_BEFORE_LOSING_WORKER = 60
23 changes: 23 additions & 0 deletions scoop/_comm/scoopmessages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Worker requests
INIT = b"I"
REQUEST = b"RQ"
TASK = b"T"
REPLY = b"RP"
SHUTDOWN = b"S"
VARIABLE = b"V"
BROKER_INFO = b"B"
STATUS_READY = b"SD"
RESEND_FUTURE = b"RF"
HEARTBEAT = b"HB"
REQUEST_STATUS_REQUEST = b"RSR"
REQUEST_STATUS_ANS = b"RSA"
REQUEST_INPROCESS = b"RI"
REQUEST_UNKNOWN = b"RU"

# Task statuses
STATUS_HERE = b"H"
STATUS_GIVEN = b"G"
STATUS_NONE = b"N"

# Broker interconnection
CONNECT = b"C"
179 changes: 82 additions & 97 deletions scoop/_comm/scoopzmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import socket
import copy
import logging
import threading
from multiprocessing import Process
try:
import cPickle as pickle
except ImportError:
Expand All @@ -32,25 +32,7 @@
from .. import shared, encapsulation, utils
from ..shared import SharedElementEncapsulation
from .scoopexceptions import Shutdown, ReferenceBroken

# Worker requests
INIT = b"I"
REQUEST = b"RQ"
TASK = b"T"
REPLY = b"RP"
SHUTDOWN = b"S"
VARIABLE = b"V"
BROKER_INFO = b"B"
STATUS_REQ = b"SR"
STATUS_ANS = b"SA"
STATUS_DONE = b"SD"
STATUS_UPDATE = b"SU"

# Task statuses
STATUS_HERE = b"H"
STATUS_GIVEN = b"G"
STATUS_NONE = b"N"

from .scoopmessages import *

LINGER_TIME = 1000

Expand Down Expand Up @@ -157,9 +139,10 @@ def __init__(self):
self._addBroker(broker)

# Putting futures status reporting in place
self.status_update_thread = threading.Thread(target=self._reportFutures)
self.status_update_thread.daemon = True
self.status_update_thread.start()
self.heartbeat_thread = Process(target=ZMQCommunicator._sendHeartBeat,
args=(self, ("HB_{0}".format(scoop.worker.decode('utf-8'))).encode('utf-8')))
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()

def createZMQSocket(self, sock_type):
"""Create a socket of the given sock_type and deactivate message dropping"""
Expand All @@ -181,20 +164,34 @@ def createZMQSocket(self, sock_type):
sock.setsockopt(zmq.ROUTER_MANDATORY, 1)
return sock

def _reportFutures(self):
"""Sends futures status updates to broker at intervals of
scoop.TIME_BETWEEN_STATUS_REPORTS seconds. Is intended to be run by a
separate thread."""
def _sendHeartBeat(self, hb_socket_name):
"""Sends heartbeat signal to broker at intervals of
scoop.TIME_BETWEEN_HEARTBEATS seconds. This signals to the broker that
this worker is alive and connected"""
# socket for the heartbeat signal
self.ZMQcontext = zmq.Context()

self.heartbeat_socket = self.createZMQSocket(zmq.DEALER)
self.heartbeat_socket.setsockopt(zmq.IDENTITY, hb_socket_name)

for brokerEntry in self.broker_set:
broker_address = "tcp://{hostname}:{port}".format(
hostname=brokerEntry.hostname,
port=brokerEntry.task_port,
)
# print("WORKER {} CONNECTING TO BROKER: {}".format(hb_socket_name, broker_address))
self.heartbeat_socket.connect(broker_address)
try:
while True:
time.sleep(scoop.TIME_BETWEEN_STATUS_REPORTS)
fids = set(x.id for x in scoop._control.execQueue.movable)
fids.update(set(x.id for x in scoop._control.execQueue.ready))
fids.update(set(x.id for x in scoop._control.execQueue.inprogress))
self.socket.send_multipart([
STATUS_UPDATE,
pickle.dumps(fids),
])
time.sleep(scoop.TIME_BETWEEN_HEARTBEATS)
# print('SENDING HEARTBEAT on worker {} at time {}'.format(scoop.worker, time.time()))
try:
self.heartbeat_socket.send_multipart([
HEARTBEAT,
pickle.dumps(time.time(), pickle.HIGHEST_PROTOCOL)
], zmq.NOBLOCK)
except zmq.error.Again as E:
scoop.logger.warning("FAILED HEARTBEAT IN worker {} at time {}".format(scoop.worker, time.time()))
except AttributeError:
# The process is being shut down.
pass
Expand Down Expand Up @@ -235,59 +232,49 @@ def _recv(self):
else:
msg = self.socket.recv_multipart()

try:
thisFuture = pickle.loads(msg[1])
except (AttributeError, ImportError) as e:
scoop.logger.error(
"An instance could not find its base reference on a worker. "
"Ensure that your objects have their definition available in "
"the root scope of your program.\n{error}".format(
error=e,
if msg[0] == TASK or msg[0] == REPLY:
try:
thisFuture = pickle.loads(msg[1])
except (AttributeError, ImportError) as e:
scoop.logger.error(
"An instance could not find its base reference on a worker. "
"Ensure that your objects have their definition available in "
"the root scope of your program.\n{error}".format(
error=e,
)
)
)
raise ReferenceBroken(e)

if msg[0] == TASK:
# Try to connect directly to this worker to send the result
# afterwards if Future is from a map.
if thisFuture.sendResultBack:
self.addPeer(thisFuture.id[0])

elif msg[0] == STATUS_ANS:
# TODO: This should not be here but in FuturesQueue.
if msg[2] == STATUS_HERE:
# TODO: Don't know why should that be done?
self.sendRequest()
elif msg[2] == STATUS_NONE:
# If a task was requested but is nowhere to be found, resend it
future_id = pickle.loads(msg[1])
raise ReferenceBroken(e)

if msg[0] == TASK:
# Try to connect directly to this worker to send the result
# afterwards if Future is from a map.
if thisFuture.sendResultBack:
self.addPeer(thisFuture.id[0])

isCallable = callable(thisFuture.callable)
isDone = thisFuture._ended()
if not isCallable and not isDone:
# TODO: Also check in root module globals for fully qualified name
try:
scoop.logger.warning(
"Lost track of future {0}. Resending it..."
"".format(scoop._control.futureDict[future_id])
)
self.sendFuture(scoop._control.futureDict[future_id])
except KeyError:
# Future was received and processed meanwhile
pass
return

isCallable = callable(thisFuture.callable)
isDone = thisFuture._ended()
if not isCallable and not isDone:
# TODO: Also check in root module globals for fully qualified name
try:
module_found = hasattr(sys.modules["__main__"],
thisFuture.callable)
except TypeError:
module_found = False
if module_found:
thisFuture.callable = getattr(sys.modules["__main__"],
thisFuture.callable)
else:
raise ReferenceBroken("This element could not be pickled: "
"{0}.".format(thisFuture))
return thisFuture
module_found = hasattr(sys.modules["__main__"],
thisFuture.callable)
except TypeError:
module_found = False
if module_found:
thisFuture.callable = getattr(sys.modules["__main__"],
thisFuture.callable)
else:
raise ReferenceBroken("This element could not be pickled: "
"{0}.".format(thisFuture))
return (msg[0], thisFuture)

elif msg[0] == RESEND_FUTURE:
# TODO: This should not be here but in FuturesQueue.
future_id = pickle.loads(msg[1])
return (RESEND_FUTURE, future_id)

else:
assert False, "Unrecognized incoming message {}".format(msg[0])

def pumpInfoSocket(self):
try:
Expand Down Expand Up @@ -347,7 +334,11 @@ def convertVariable(self, key, varName, varValue):
varName: result,
})

def recvFuture(self):
def recvIncoming(self):
"""
This function continually reads the input from the socket, processes it
according to _recv and returns the result
"""
while self._poll(0):
received = self._recv()
if received:
Expand Down Expand Up @@ -392,11 +383,10 @@ def sendResult(self, future):

self._sendReply(
future.id[0],
pickle.dumps(future.id, pickle.HIGHEST_PROTOCOL),
pickle.dumps(future, pickle.HIGHEST_PROTOCOL),
)

def _sendReply(self, destination, fid, *args):
def _sendReply(self, destination, *args):
"""Send a REPLY directly to its destination. If it doesn't work, launch
it back to the broker."""
# Try to send the result directly to its parent
Expand All @@ -420,15 +410,10 @@ def _sendReply(self, destination, fid, *args):
destination,
])

def sendReadyStatus(self, future):
self.socket.send_multipart([
STATUS_DONE,
fid,
])

def sendStatusRequest(self, future):
self.socket.send_multipart([
STATUS_REQ,
pickle.dumps(future.id, pickle.HIGHEST_PROTOCOL),
STATUS_READY,
pickle.dumps(future.id)
])

def sendVariable(self, key, value):
Expand Down
82 changes: 52 additions & 30 deletions scoop/_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import greenlet

from ._types import Future, FutureQueue, CallbackType
from ._types import Future, FutureQueue, CallbackType, UnrecognizedFuture
import scoop

# Backporting collection features
Expand Down Expand Up @@ -124,11 +124,18 @@ def delFuture(afuture):
try:
del futureDict[afuture.id]
except KeyError:
pass
try:
del futureDict[afuture.parentId].children[afuture]
except KeyError:
pass
raise UnrecognizedFuture(
"The future ID {0} was unavailable in the "
"futureDict of worker {1}".format(afuture.id, scoop.worker))
if afuture.id[0] == scoop.worker and afuture.parentId != (-1, 0):
try:
del futureDict[afuture.parentId].children[afuture]
except KeyError:
# This does not raise an exception as this happens when a future is
# resent after being lost
scoop.logger.warning(
"Orphan future {0} being deleted from worker {1}"
.format(afuture.id, scoop.worker))


def runFuture(future):
Expand Down Expand Up @@ -181,9 +188,6 @@ def runFuture(future):
# Run callback (see http://www.python.org/dev/peps/pep-3148/#future-objects)
future._execute_callbacks(CallbackType.universal)

# Delete references to the future
future._delete()

return future


Expand Down Expand Up @@ -249,37 +253,55 @@ def runController(callable_, *args, **kargs):
)
)
lastDebugTs = time.time()
# process future
if future._ended():
# future is finished
if future.id[0] != scoop.worker:
# future is not local
execQueue.sendResult(future)
future = execQueue.pop()
else:
# future is local, parent is waiting

# At this point , the future is either completed or in progress
# in progress => its runFuture greenlet is in progress

# process future and get new future / result future
waiting_parent = False
if future.isDone:
if future.isReady:
# future is completely processed and ready to be returned This return is
# executed only if the parent is waiting for this particular future.
# Note: orphaned futures are caused by... what again?
execQueue.sendReadyStatus(future)
if future.index is not None:
try:
# This means that this particular future is being waited upon by a
# parent (see futures._waitAny())
if future.parentId in futureDict:
parent = futureDict[future.parentId]
except KeyError:
# Job has no parent here (probably children restart)
future = execQueue.pop()
else:
if parent.exceptionValue is None:
future = parent._switch(future)
else:
future = execQueue.pop()
else:
future = execQueue.pop()
waiting_parent = True
else:
execQueue.finalizeFuture(future)

if future.isReady and waiting_parent:
# This means that this future must be returned to the parent future greenlet
# by this, one of 2 things happen, the parent completes execution and
# returns itself along with its results and isDone set to True, or, in case
# it has spawned a sub-future or needs to wait on another future, it returns
# itself in an inprogress state (isDone = False)
future = parent._switch(future)
else:
# future is in progress; run next future from pending execution queue.
# This means that the future is either ready and the parent is not waiting,
# or the future is in progress. This happens when the future is a parent
# future whose greenlet has returned itself in an incomplete manner. check
# the above comment
#
# In both the above cases, the future can safely be dropped/ignored and the
# next future picked up from the queue for processing.
future = execQueue.pop()

if not future._ended() and future.greenlet is None:
# initialize if the future hasn't started
# This checks for the case of a not-yet-started-execution future that is
# returned from the queue, (This can only happen if execQueue.pop is called)
# and starts the execution
future.greenlet = greenlet.greenlet(runFuture)
future = future._switch(future)

# Special case of removing the root future from the futureDict
scoop._control.delFuture(future)

execQueue.shutdown()
if future.exceptionValue:
print(future.exceptionTraceback)
Expand Down
Loading