Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] JobTypeLoader Enhancement #334

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
95a4054
starting intitial implementation of JobTypeLoader
opalmer Sep 27, 2015
12ba1d6
adding some missing doc strings
opalmer Sep 27, 2015
d878a6f
removing TODO assertion, fixing statement in __init__
opalmer Sep 27, 2015
9096d0f
tests for cache_path(), download_source() and cache_directory
opalmer Sep 27, 2015
a15c62c
include farm_name in cache_directory path if set
opalmer Sep 29, 2015
f47e3b2
Merge branch 'master' of github.com:pyfarm/pyfarm-agent into jobtype_…
opalmer Sep 29, 2015
e8a49c6
check for None in farm_name
opalmer Sep 29, 2015
c4f736f
add cache_directory test for farm_name
opalmer Sep 29, 2015
702b46b
adding tests for compile_()
opalmer Sep 29, 2015
5ae3597
added _compile() static method, integrated it into load()
opalmer Sep 29, 2015
b5fea59
get code from correct url and with proper headers
opalmer Sep 30, 2015
0f3f85d
updating tests to act more like the master would
opalmer Sep 30, 2015
0853a80
tests for JobTypeLoader.load
opalmer Sep 30, 2015
62c189a
use inlineCallbacks for test_schema, otherwise JobType.load never runs
opalmer Oct 1, 2015
9753c18
refactor to use/cache job type dictionary rather than just the source…
opalmer Oct 1, 2015
5b2a53d
skip caching in a few cases rather than breaking
opalmer Oct 2, 2015
de45385
removing old code
opalmer Oct 2, 2015
5cdf641
rename _compile to create_module
opalmer Oct 2, 2015
256d5c3
updating tests to account for json response from master in JobTypeLoader
opalmer Oct 2, 2015
d70ad16
WIP update to test_accept
opalmer Oct 2, 2015
5b8cd76
starting to convert assign.py to inlineCallbacks so it's easiear to m…
opalmer Oct 7, 2015
3e1b737
fixing test_restarting
opalmer Oct 14, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
348 changes: 172 additions & 176 deletions pyfarm/agent/http/api/assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from twisted.web.server import NOT_DONE_YET
from twisted.internet import reactor
from twisted.internet.defer import DeferredList
from twisted.internet.defer import DeferredList, inlineCallbacks
from voluptuous import Schema, Required

from pyfarm.core.enums import WorkState, AgentState
Expand Down Expand Up @@ -62,169 +62,15 @@ class Assign(APIResource):

def __init__(self, agent):
self.agent = agent
self.tasks = []

def post(self, **kwargs):
request = kwargs["request"]
request_data = kwargs["data"]

if request_from_master(request):
config.master_contacted()

if ("agent_id" in request_data and
request_data["agent_id"] != config["agent_id"]):
logger.error("Wrong agent_id in assignment: %s. Our id is %s",
request_data["agent_id"], config["agent_id"])
request.setResponseCode(BAD_REQUEST)
request.write(dumps(
{"error": "You have the wrong agent. I am %s." %
config["agent_id"],
"agent_id": config["agent_id"]}))
request.finish()
return NOT_DONE_YET

if self.agent.reannounce_lock.locked:
logger.warning("Temporarily rejecting assignment because we "
"are in the middle of a reannounce.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
dumps({"error": "Agent cannot accept assignments because of a "
"reannounce in progress. Try again shortly."}))
request.finish()
return NOT_DONE_YET

# First, get the resources we have *right now*. In some cases
# this means using the functions in pyfarm.core.sysinfo because
# entries in `config` could be slightly out of sync with the system.
memory_free = free_ram()
cpus = config["agent_cpus"]
requires_ram = request_data["job"].get("ram")
requires_cpus = request_data["job"].get("cpus")

if self.agent.shutting_down:
logger.error("Rejecting assignment because the agent is in the "
"process of shutting down.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
dumps({"error": "Agent cannot accept assignments because it is "
"shutting down"}))
request.finish()
return NOT_DONE_YET

if "restart_requested" in config \
and config["restart_requested"] is True:
logger.error("Rejecting assignment because of scheduled restart.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
dumps({"error": "Agent cannot accept assignments because of a "
"pending restart"}))
request.finish()
return NOT_DONE_YET

elif "agent_id" not in config:
logger.error(
"Agent has not yet connected to the master or `agent_id` "
"has not been set yet.")
request.setResponseCode(SERVICE_UNAVAILABLE)
request.write(
dumps({"error": "agent_id has not been set in the config"}))
request.finish()
return NOT_DONE_YET

# Do we have enough ram?
elif requires_ram is not None and requires_ram > memory_free:
logger.error(
"Task %s requires %sMB of ram, this agent has %sMB free. "
"Rejecting Task %s.",
request_data["job"]["id"], requires_ram, memory_free,
request_data["job"]["id"])
request.setResponseCode(BAD_REQUEST)
request.write(
dumps({"error": "Not enough ram",
"agent_ram": memory_free,
"requires_ram": requires_ram}))
request.finish()

# touch the config
config["free_ram"] = memory_free
return NOT_DONE_YET

# Do we have enough cpus (count wise)?
elif requires_cpus is not None and requires_cpus > cpus:
logger.error(
"Task %s requires %s CPUs, this agent has %s CPUs. "
"Rejecting Task %s.",
request_data["job"]["id"], requires_cpus, cpus,
request_data["job"]["id"])
request.setResponseCode(BAD_REQUEST)
request.write(
dumps({"error": "Not enough cpus",
"agent_cpus": cpus,
"requires_cpus": requires_cpus}))
request.finish()
return NOT_DONE_YET

# Check for double assignments
try:
current_assignments = config["current_assignments"].itervalues
except AttributeError: # pragma: no cover
current_assignments = config["current_assignments"].values

new_task_ids = set(task["id"] for task in request_data["tasks"])

for assignment in current_assignments():
existing_task_ids = set(x["id"] for x in assignment["tasks"])

# If the assignment is identical to one we already have
if existing_task_ids == new_task_ids:
logger.debug("Ignoring repeated assignment of the same batch")
request.setResponseCode(ACCEPTED)
request.write(dumps({"id": assignment["id"]}))
request.finish()
return NOT_DONE_YET
# If there is only a partial overlap
elif existing_task_ids & new_task_ids:
logger.error("Rejecting assignment with partial overlap with "
"existing assignment.")
unknown_task_ids = new_task_ids - existing_task_ids
request.setResponseCode(CONFLICT)
request.write(dumps(
{"error": "Partial overlap of tasks",
"rejected_task_ids": list(unknown_task_ids)}))
request.finish()
return NOT_DONE_YET

if not config["agent_allow_sharing"]:
try:
current_jobtypes = config["jobtypes"].itervalues
except AttributeError: # pragma: no cover
current_jobtypes = config["jobtypes"].values
for jobtype in current_jobtypes():
num_finished_tasks = (len(jobtype.finished_tasks) +
len(jobtype.failed_tasks))
if len(jobtype.assignment["tasks"]) > num_finished_tasks:
logger.error("Rejecting an assignment that would require "
"agent sharing")
request.setResponseCode(CONFLICT)
request.write(
dumps({"error":
"Agent does not allow multiple assignments",
"rejected_task_ids": list(new_task_ids)}))
request.finish()
return NOT_DONE_YET

assignment_uuid = uuid4()
request_data.update(id=assignment_uuid)
config["current_assignments"][assignment_uuid] = request_data

# In all other cases we have some work to do inside of
# deferreds so we just have to respond
request.setResponseCode(ACCEPTED)
request.write(dumps({"id": assignment_uuid}))
request.finish()
logger.debug("Accepted assignment %s: %r",
assignment_uuid, request_data)
logger.info("Accept assignment from job %s with %s tasks",
request_data["job"]["title"], len(request_data["tasks"]))
@inlineCallbacks
def handle_assignment(self, request_data, assignment_uuid):
logger.debug(
"Accepting assignment %s: %r", assignment_uuid, request_data)
logger.info(
"Accept assignment from job %s with %s tasks",
request_data["job"]["title"], len(request_data["tasks"]))

def assignment_failed(result, assign_id):
logger.error(
Expand Down Expand Up @@ -340,7 +186,14 @@ def error_callback(cburl, cbdata, task, failure_reason):
if jobtype_id:
config["jobtypes"].pop(jobtype_id, None)

def loaded_jobtype(jobtype_class, assign_id):
# Load the job type then pass the class along to the
# callback. No errback here because all the errors
# are handled internally in this case.
try:
jobtype_class = yield JobType.load(request_data)
except Exception:
pass # load_jobtype_failed
else:
# TODO: report error to master
if hasattr(jobtype_class, "getTraceback"):
logger.error(jobtype_class.getTraceback())
Expand All @@ -361,17 +214,18 @@ def loaded_jobtype(jobtype_class, assign_id):
# TODO: add callback to stop persistent process
try:
started_deferred, stopped_deferred = instance._start()
started_deferred.addCallback(assignment_started, assign_id)
started_deferred.addErrback(assignment_failed, assign_id)
stopped_deferred.addCallback(assignment_stopped, assign_id)
stopped_deferred.addErrback(assignment_failed, assign_id)
started_deferred.addCallback(assignment_started, assignment_uuid)
started_deferred.addErrback(assignment_failed, assignment_uuid)
stopped_deferred.addCallback(assignment_stopped, assignment_uuid)
stopped_deferred.addErrback(assignment_failed, assignment_uuid)
stopped_deferred.addBoth(restart_if_necessary)
stopped_deferred.addBoth(
lambda *args: instance._remove_tempdirs())
stopped_deferred.addBoth(
lambda *args: instance._close_logs())
stopped_deferred.addBoth(
lambda *args: instance._upload_logfile())

except Exception as e:
logger.error("Error on starting jobtype, stopping it now. "
"Error was: %r. Traceback: %s", e,
Expand All @@ -380,17 +234,159 @@ def loaded_jobtype(jobtype_class, assign_id):
error="Error while loading jobtype: %r. "
"Traceback: %s" %
(e, traceback.format_exc()))
assignment = config["current_assignments"].pop(assign_id)
assignment = config["current_assignments"].pop(assignment_uuid)
if "jobtype" in assignment:
jobtype_id = assignment["jobtype"].pop("id", None)
if jobtype_id:
config["jobtypes"].pop(jobtype_id, None)

# Load the job type then pass the class along to the
# callback. No errback here because all the errors
# are handled internally in this case.
jobtype_loader = JobType.load(request_data)
jobtype_loader.addCallback(loaded_jobtype, assignment_uuid)
jobtype_loader.addErrback(load_jobtype_failed, assignment_uuid)
def post(self, **kwargs):
request_data = kwargs["data"]

# First, get the resources we have *right now*. In some cases
# this means using the functions in pyfarm.core.sysinfo because
# entries in `config` could be slightly out of sync with the system.
memory_free = free_ram()
cpus = config["agent_cpus"]
requires_ram = request_data["job"].get("ram")
requires_cpus = request_data["job"].get("cpus")

if request_from_master(kwargs["request"]):
config.master_contacted()

if ("agent_id" in request_data and
request_data["agent_id"] != config["agent_id"]):
logger.error("Wrong agent_id in assignment: %s. Our id is %s",
request_data["agent_id"], config["agent_id"])
return (
dumps({"error": "You have the wrong agent. "
"I am %s." % config["agent_id"],
"agent_id": config["agent_id"]}),
BAD_REQUEST
)

elif self.agent.reannounce_lock.locked:
logger.warning("Temporarily rejecting assignment because we "
"are in the middle of a reannounce.")
return (
dumps({"error": "Agent cannot accept assignments because of a "
"reannounce in progress. Try again shortly."}),
SERVICE_UNAVAILABLE
)

elif self.agent.shutting_down:
logger.error("Rejecting assignment because the agent is in the "
"process of shutting down.")
return (
dumps({"error": "Agent cannot accept assignments because it is "
"shutting down."}),
SERVICE_UNAVAILABLE
)

elif "restart_requested" in config \
and config["restart_requested"] is True:
logger.error("Rejecting assignment because of scheduled restart.")
return (
dumps({"error": "Agent cannot accept assignments because of a "
"pending restart."}),
SERVICE_UNAVAILABLE
)

elif "agent_id" not in config:
logger.error(
"Agent has not yet connected to the master or `agent_id` "
"has not been set yet.")
return (
dumps({"error": "agent_id has not been set in the config"}),
SERVICE_UNAVAILABLE
)

# Do we have enough ram?
elif requires_ram is not None and requires_ram > memory_free:
logger.error(
"Task %s requires %sMB of ram, this agent has %sMB free. "
"Rejecting Task %s.",
request_data["job"]["id"], requires_ram, memory_free,
request_data["job"]["id"])
config["free_ram"] = memory_free
return (
dumps({"error": "Not enough ram",
"agent_ram": memory_free,
"requires_ram": requires_ram}),
BAD_REQUEST
)

# Do we have enough cpus (count wise)?
elif requires_cpus is not None and requires_cpus > cpus:
logger.error(
"Task %s requires %s CPUs, this agent has %s CPUs. "
"Rejecting Task %s.",
request_data["job"]["id"], requires_cpus, cpus,
request_data["job"]["id"])
return (
dumps({"error": "Not enough cpus",
"agent_cpus": cpus,
"requires_cpus": requires_cpus}),
BAD_REQUEST
)

# Check for double assignments
try:
current_assignments = config["current_assignments"].itervalues
except AttributeError: # pragma: no cover
current_assignments = config["current_assignments"].values

new_task_ids = set(task["id"] for task in request_data["tasks"])

for assignment in current_assignments():
existing_task_ids = set(x["id"] for x in assignment["tasks"])

# If the assignment is identical to one we already have
if existing_task_ids == new_task_ids:
logger.debug(
"Ignoring repeated assignment of the same batch")
return dumps({"id": assignment["id"]}), ACCEPTED

# If there is only a partial overlap
elif existing_task_ids & new_task_ids:
logger.error("Rejecting assignment with partial overlap with "
"existing assignment.")
unknown_task_ids = new_task_ids - existing_task_ids
return (
dumps({"error": "Partial overlap of tasks",
"rejected_task_ids": list(unknown_task_ids)}),
CONFLICT
)

if not config["agent_allow_sharing"]:
try:
current_jobtypes = config["jobtypes"].itervalues
except AttributeError: # pragma: no cover
current_jobtypes = config["jobtypes"].values

for jobtype in current_jobtypes():
num_finished_tasks = (len(jobtype.finished_tasks) +
len(jobtype.failed_tasks))
if len(jobtype.assignment["tasks"]) > num_finished_tasks:
logger.error("Rejecting an assignment that would require "
"agent sharing")
return (
dumps({
"error": "Agent does not allow multiple "
"assignments",
"rejected_task_ids": list(new_task_ids)}),
CONFLICT
)

assignment_uuid = uuid4()
request_data.update(id=assignment_uuid)
config["current_assignments"][assignment_uuid] = request_data

# Schedule the method to handle the assignment itself
task = reactor.callLater(
0, self.handle_assignment, assignment_uuid, request_data)
self.tasks.append(task)

# Let the client know we're going to handle the assignment.
return dumps({"id": assignment_uuid}), ACCEPTED

return NOT_DONE_YET