Skip to content

Commit

Permalink
Merge d5360ad into a0a6e5c
Browse files Browse the repository at this point in the history
  • Loading branch information
guidow committed Jun 26, 2015
2 parents a0a6e5c + d5360ad commit 5ca2b37
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 7 deletions.
56 changes: 51 additions & 5 deletions pyfarm/master/api/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from pyfarm.models.job import Job, JobNotifiedUser
from pyfarm.models.software import (
Software, SoftwareVersion, JobSoftwareRequirement)
from pyfarm.models.tag import Tag
from pyfarm.models.tag import Tag, JobTagRequirement
from pyfarm.models.jobqueue import JobQueue
from pyfarm.master.application import db
from pyfarm.master.utility import (
Expand Down Expand Up @@ -222,7 +222,7 @@ class JobIndexAPI(MethodView):
type_checks={"by": lambda x: isinstance(
x, RANGE_TYPES)},
ignore=["start", "end", "jobtype", "jobtype_version",
"user", "jobqueue"],
"user", "jobqueue", "tag_requirements"],
disallow=["jobtype_version_id", "time_submitted",
"time_started", "time_finished",
"job_queue_id"])
Expand Down Expand Up @@ -299,6 +299,12 @@ def post(self):
"software": "blender"
}
],
"tag_requirements": [
{
"tag": "workstation",
"negate": true
}
],
"id": 2,
"ram": 32,
"cpus": 1,
Expand Down Expand Up @@ -389,6 +395,7 @@ def post(self):
NOT_FOUND)

notified_usernames = g.json.pop("notified_users", None)
tag_requirements = g.json.pop("tag_requirements", None)

g.json.pop("start", None)
g.json.pop("end", None)
Expand Down Expand Up @@ -427,6 +434,17 @@ def post(self):
notified_user.on_deletion = entry["on_deletion"]
db.session.add(notified_user)

if tag_requirements:
for entry in tag_requirements:
tag = Tag.query.filter_by(tag=entry["tag"]).first()
if not tag:
tag = Tag(tag=entry["tag"])
db.session.add(tag)
tag_requirement = JobTagRequirement(job=job, tag=tag)
if entry["negate"]:
tag_requirement.negate = True
db.session.add(tag_requirement)

custom_json = loads(request.data.decode(), parse_float=Decimal)
if "end" in custom_json and "start" not in custom_json:
return (jsonify(error="`end` is specified while `start` is not"),
Expand Down Expand Up @@ -480,7 +498,8 @@ def post(self):
"software_requirements",
"parents",
"children",
"notified_users"])
"notified_users",
"tag_requirements"])
job_data["start"] = start
job_data["end"] = min(current_frame, end)
del job_data["jobtype_version_id"]
Expand Down Expand Up @@ -632,6 +651,12 @@ def get(self, job_name):
"max_version_id": null
}
],
"tag_requirements": [
{
"tag": "workstation",
"negate": true
}
],
"batch": 1,
"time_started": null,
"time_submitted": "2014-03-06T15:40:58.335259",
Expand Down Expand Up @@ -661,7 +686,8 @@ def get(self, job_name):
"software_requirements",
"parents",
"children",
"notified_users"])
"notified_users",
"tag_requirements"])

first_task = Task.query.filter_by(job=job).order_by("frame asc").first()
last_task = Task.query.filter_by(job=job).order_by("frame desc").first()
Expand Down Expand Up @@ -752,6 +778,12 @@ def post(self, job_name):
"data": {
"foo": "bar"
},
"tag_requirements": [
{
"tag": "workstation",
"negate": true
}
],
"ram": 32,
"parents": [],
"hidden": false,
Expand Down Expand Up @@ -905,6 +937,19 @@ def post(self, job_name):
return jsonify(error=e.args), NOT_FOUND
del g.json["software_requirements"]

tag_requirements = g.json.pop("tag_requirements", None)
if tag_requirements:
job.tag_requirements = []
for entry in tag_requirements:
tag = Tag.query.filter_by(tag=entry["tag"]).first()
if not tag:
tag = Tag(tag=entry["tag"])
db.session.add(tag)
tag_requirement = JobTagRequirement(job=job, tag=tag)
if entry["negate"]:
tag_requirement.negate = True
db.session.add(tag_requirement)

g.json.pop("start", None)
g.json.pop("end", None)
if g.json:
Expand All @@ -916,7 +961,8 @@ def post(self, job_name):
"data",
"software_requirements",
"parents",
"children"])
"children",
"tag_requirements"])
job_data["start"] = start
job_data["end"] = end
del job_data["jobtype_version_id"]
Expand Down
5 changes: 4 additions & 1 deletion pyfarm/master/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def load_user_interface(app_instance):
rerun_failed_in_job, alter_autodeletion_for_job, rerun_multiple_jobs,
rerun_failed_in_multiple_jobs, pause_multiple_jobs,
unpause_multiple_jobs, delete_multiple_jobs, move_multiple_jobs,
set_prio_weight_on_jobs)
set_prio_weight_on_jobs, update_tag_requirements_in_job)
from pyfarm.master.user_interface.jobqueues import (
jobqueues, jobqueue_create, jobqueue, delete_jobqueue)
from pyfarm.master.user_interface.jobtypes import (
Expand Down Expand Up @@ -249,6 +249,9 @@ def load_user_interface(app_instance):
app_instance.add_url_rule("/jobs/<int:job_id>/update_tags",
"update_job_tags_ui", update_tags_in_job,
methods=("POST", ))
app_instance.add_url_rule("/jobs/<int:job_id>/update_tag_requirements",
"update_job_tag_requirements_ui",
update_tag_requirements_in_job, methods=("POST", ))
app_instance.add_url_rule("/jobs/<int:job_id>/upgrade_jobtype",
"upgrade_jobtype_for_job",
upgrade_job_to_latest_jobtype_version,
Expand Down
8 changes: 8 additions & 0 deletions pyfarm/master/templates/pyfarm/user_interface/job.html
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ <h4>Tags</h4>
<input type="submit" class="btn" value="Update Tags"/>
</form>

<h4 title="This job will only run on nodes that have all the required tags">Required Tags in Agent</h4>
<form method="POST" action="{{ url_for('update_job_tag_requirements_ui', job_id=job.id) }}">
<p>
<textarea name="tag_requirements" class="form-control" title="This job will only run on nodes that have all the required tags">{% for requirement in job.tag_requirements %}{{ '-' if requirement.negate }}{{ requirement.tag.tag }} {% endfor %}</textarea>
</p>
<input type="submit" class="btn" value="Update Tag Requirements"/>
</form>

{% if job.parents %}
<h4>Parent jobs</h4>
<table class="table table-striped table-bordered model-list">
Expand Down
5 changes: 5 additions & 0 deletions pyfarm/master/templates/pyfarm/user_interface/jobs.html
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@
</a>
{% endif %}
{% endfor %}
{% for tag_requirement in job[0].tag_requirements %}
<span class="label label-warning" style="margin:3px;">
{{ "-" if tag_requirement.negate }}{{ tag_requirement.tag.tag }}
</span>
{% endfor %}
</td>
<td><a href="{{ url_for('single_jobtype_ui', jobtype_id=job['jobtype_id']) }}">{{ job["jobtype_name"] }}</a></td>
<td>
Expand Down
34 changes: 33 additions & 1 deletion pyfarm/master/user_interface/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyfarm.scheduler.tasks import delete_job, stop_task, assign_tasks
from pyfarm.models.job import (
Job, JobDependency, JobTagAssociation, JobNotifiedUser)
from pyfarm.models.tag import Tag
from pyfarm.models.tag import Tag, JobTagRequirement
from pyfarm.models.task import Task
from pyfarm.models.agent import Agent
from pyfarm.models.jobqueue import JobQueue
Expand Down Expand Up @@ -875,6 +875,38 @@ def update_tags_in_job(job_id):

return redirect(url_for("single_job_ui", job_id=job.id), SEE_OTHER)

def update_tag_requirements_in_job(job_id):
job = Job.query.filter_by(id=job_id).first()
if not job:
return (render_template(
"pyfarm/error.html", error="Job %s not found" % job_id),
NOT_FOUND)

tagnames = request.form["tag_requirements"].split(" ")
tagnames = [x.strip() for x in tagnames if not x == ""]
job.tag_requirements = []
for name in tagnames:
negate = False
if name.startswith("-"):
if len(name) < 2:
return (render_template(
"pyfarm/error.html", error="Tag too short: " % name),
NOT_FOUND)
negate = True
name = name[1:]
tag = Tag.query.filter_by(tag=name).first()
if not tag:
tag = Tag(tag=name)
db.session.add(tag)
tag_requirement = JobTagRequirement(job=job, tag=tag, negate=negate)
db.session.add(tag_requirement)

db.session.commit()

flash("Tag Requirements for job %s have been updated." % job.title)

return redirect(url_for("single_job_ui", job_id=job.id), SEE_OTHER)

def rerun_single_task(job_id, task_id):
job = Job.query.filter_by(id=job_id).first()
if not job:
Expand Down
20 changes: 20 additions & 0 deletions pyfarm/models/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,26 @@ def satisfies_jobtype_requirements(self, jobtype_version):

return len(requirements_to_satisfy) == 0

def satisfies_job_requirements(self, job):
if not self.satisfies_jobtype_requirements( job.jobtype_version):
return False

if self.cpus < job.cpus:
return False

if self.free_ram < job.ram:
return False

for tag_requirement in job.tag_requirements:
if (not tag_requirement.negate and
tag_requirement.tag not in self.tags):
return False
if (tag_requirement.negate and
tag_requirement.tag in self.tags):
return False

return True

@classmethod
def validate_hostname(cls, key, value):
"""
Expand Down
3 changes: 3 additions & 0 deletions pyfarm/models/core/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def _to_dict_relationship(self, name):
elif name == "children":
out.append({"id": relationship.id,
"title": relationship.title})
elif name == "tag_requirements":
out.append({"tag": relationship.tag.tag,
"negate": relationship.negate})
else:
raise NotImplementedError(
"don't know how to unpack relationships for `%s`" % name)
Expand Down
3 changes: 3 additions & 0 deletions pyfarm/models/etc/models.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ table_job_type_version: ${table_prefix}jobtype_versions
# The name of the table which assoicates jobs and tags.
table_job_tag_assoc: ${table_prefix}job_tag_associations

# The name of the table which associates job and tag requirements
table_job_tag_req: ${table_prefix}job_tag_requirements

# The name of the table which assoicates inter-job dependencies
table_job_dependency: ${table_prefix}job_dependencies

Expand Down
2 changes: 2 additions & 0 deletions pyfarm/models/jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ def get_job_for_agent(self, agent):
Job.jobtype_version_id.in_(
supported_types),
Job.ram <= agent.free_ram).all()
child_jobs = [x for x in child_jobs if
agent.satisfies_job_requirements(x)]
child_queues = JobQueue.query.filter(
JobQueue.parent_jobqueue_id == self.id).all()

Expand Down
38 changes: 38 additions & 0 deletions pyfarm/models/tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyfarm.master.config import config
from pyfarm.models.core.types import id_column
from pyfarm.models.core.mixins import UtilityMixins
from pyfarm.models.core.types import IDTypeWork

__all__ = ("Tag", )

Expand All @@ -44,3 +45,40 @@ class Tag(db.Model, UtilityMixins):
tag = db.Column(
db.String(config.get("max_tag_length")),
nullable=False, doc="The actual value of the tag")


class JobTagRequirement(db.Model, UtilityMixins):
"""
Model representing a dependency of a job on a tag
If a job has a tag requirement, it will only run on agents that have that
tag.
"""
__tablename__ = config.get("table_job_tag_req")
__table_args__ = (UniqueConstraint("tag_id", "job_id"), )

id = id_column()

tag_id = db.Column(
db.Integer,
db.ForeignKey("%s.id" % config.get("table_tag")),
nullable=False, doc="Reference to the required tag")

job_id = db.Column(
IDTypeWork,
db.ForeignKey("%s.id" % config.get("table_job")),
nullable=False, doc="Foreign key to :class:`Job.id`")

negate = db.Column(
db.Boolean,
nullable=False, default=False,
doc="If true, an agent that has this tag can not work on this job")

job = db.relationship(
"Job",
backref=db.backref(
"tag_requirements",
lazy="dynamic",
cascade="all, delete-orphan"))

tag = db.relationship("Tag")
6 changes: 6 additions & 0 deletions tests/test_master/test_jobs_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def test_job_post(self):
"software": "foo"
}
],
"tag_requirements": [],
"ram": 32,
"cpus": 1,
"children": [],
Expand Down Expand Up @@ -264,6 +265,7 @@ def test_job_post_with_notified_users(self):
"environ": None,
"requeue": 3,
"software_requirements": [],
"tag_requirements": [],
"ram": 32,
"cpus": 1,
"children": [],
Expand Down Expand Up @@ -535,6 +537,7 @@ def test_job_post_with_jobtype_version(self):
"environ": None,
"requeue": 3,
"software_requirements": [],
"tag_requirements": [],
"ram": 32,
"cpus": 1,
"children": [],
Expand Down Expand Up @@ -652,6 +655,7 @@ def test_job_get(self):
"hidden": False,
"data": {"foo": "bar"},
"software_requirements": [],
"tag_requirements": [],
"batch": 1,
"time_started": None,
"time_submitted": time_submitted,
Expand Down Expand Up @@ -697,6 +701,7 @@ def test_job_get(self):
"hidden": False,
"data": {"foo": "bar"},
"software_requirements": [],
"tag_requirements": [],
"batch": 1,
"time_started": None,
"time_submitted": time_submitted,
Expand Down Expand Up @@ -780,6 +785,7 @@ def test_job_update(self):
"hidden": False,
"data": {"foo": "bar"},
"software_requirements": [],
"tag_requirements": [],
"batch": 1,
"time_started": None,
"time_submitted": time_submitted,
Expand Down

0 comments on commit 5ca2b37

Please sign in to comment.