diff --git a/continuousprint/api.py b/continuousprint/api.py index f045cf4..055d284 100644 --- a/continuousprint/api.py +++ b/continuousprint/api.py @@ -2,6 +2,7 @@ from enum import Enum from octoprint.access.permissions import Permissions, ADMIN_GROUP from octoprint.server.util.flask import restricted_access +from .queues.lan import ValidationError import flask import json from .storage import queries @@ -174,50 +175,32 @@ def add_job(self): self._get_queue(DEFAULT_QUEUE).add_job(data.get("name")).as_dict() ) - # PRIVATE API METHOD - may change without warning. - @octoprint.plugin.BlueprintPlugin.route("/set/mv", methods=["POST"]) - @restricted_access - @cpq_permission(Permission.EDITJOB) - def mv_set(self): - self._get_queue(DEFAULT_QUEUE).mv_set( - int(flask.request.form["id"]), - int( - flask.request.form["after_id"] - ), # Move to after this set (-1 for beginning of job) - int( - flask.request.form["dest_job"] - ), # Move to this job (null for new job at end) - ) - return json.dumps("ok") - # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/mv", methods=["POST"]) @restricted_access @cpq_permission(Permission.EDITJOB) def mv_job(self): - self._get_queue(DEFAULT_QUEUE).mv_job( - int(flask.request.form["id"]), - int( - flask.request.form["after_id"] - ), # Move to after this job (-1 for beginning of queue) - ) - return json.dumps("ok") + src_id = flask.request.form["id"] + after_id = flask.request.form["after_id"] + if after_id == "": # Treat empty string as 'none' i.e. front of queue + after_id = None + sq = self._get_queue(flask.request.form["src_queue"]) + dq = self._get_queue(flask.request.form.get("dest_queue")) + + # Transfer into dest queue first + if dq != sq: + try: + new_id = dq.import_job_from_view(sq.get_job_view(src_id)) + except ValidationError as e: + return json.dumps(dict(error=str(e))) - # PRIVATE API METHOD - may change without warning. - @octoprint.plugin.BlueprintPlugin.route("/job/submit", methods=["POST"]) - @restricted_access - @cpq_permission(Permission.ADDJOB) - def submit_job(self): - j = queries.getJob(int(flask.request.form["id"])) - # Submit to the queue and remove from its origin - err = self._get_queue(flask.request.form["queue"]).submit_job(j) - if err is None: - self._logger.debug( - self._get_queue(DEFAULT_QUEUE).remove_jobs(job_ids=[j.id]) - ) - return self._state_json() - else: - return json.dumps(dict(error=str(err))) + print("Imported job from view") + sq.remove_jobs([src_id]) + src_id = new_id + + # Finally, move the job + dq.mv_job(src_id, after_id) + return json.dumps("OK") # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/edit", methods=["POST"]) @@ -225,7 +208,8 @@ def submit_job(self): @cpq_permission(Permission.EDITJOB) def edit_job(self): data = json.loads(flask.request.form.get("json")) - return json.dumps(self._get_queue(DEFAULT_QUEUE).edit_job(data["id"], data)) + q = self._get_queue(data["queue"]) + return json.dumps(q.edit_job(data["id"], data)) # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/import", methods=["POST"]) @@ -270,17 +254,6 @@ def rm_job(self): ) ) - # PRIVATE API METHOD - may change without warning. - @octoprint.plugin.BlueprintPlugin.route("/set/rm", methods=["POST"]) - @restricted_access - @cpq_permission(Permission.EDITJOB) - def rm_set(self): - return json.dumps( - self._get_queue(DEFAULT_QUEUE).rm_multi( - set_ids=flask.request.form.getlist("set_ids[]") - ) - ) - # PRIVATE API METHOD - may change without warning. @octoprint.plugin.BlueprintPlugin.route("/job/reset", methods=["POST"]) @restricted_access diff --git a/continuousprint/driver.py b/continuousprint/driver.py index 3d5b721..b837f87 100644 --- a/continuousprint/driver.py +++ b/continuousprint/driver.py @@ -78,12 +78,19 @@ def action( # Given that some calls to action() come from a watchdog timer, we hold a mutex when performing the action # so the state is updated in a thread safe way. with self.mutex: - self._logger.debug( - f"{a.name}, {p.name}, path={path}, materials={materials}, bed_temp={bed_temp}" - ) + now = time.time() + if self.idle_start_ts is None or self.idle_start_ts + 15 > now: + extra = ( + f"(idle logs hidden after {self.idle_start_ts+15})" + if self.idle_start_ts is not None + else "" + ) + self._logger.debug( + f"{a.name}, {p.name}, path={path}, materials={materials}, bed_temp={bed_temp} {extra}" + ) if p == Printer.IDLE and self.idle_start_ts is None: - self.idle_start_ts = time.time() + self.idle_start_ts = now elif p != Printer.IDLE and self.idle_start_ts is not None: self.idle_start_ts = None diff --git a/continuousprint/integration_test.py b/continuousprint/integration_test.py index 7d71327..dfc89bc 100644 --- a/continuousprint/integration_test.py +++ b/continuousprint/integration_test.py @@ -15,6 +15,7 @@ from peewee import SqliteDatabase from collections import defaultdict from peerprint.lan_queue import LANPrintQueueBase +from peerprint.sync_objects_test import TestReplDict # logging.basicConfig(level=logging.DEBUG) @@ -183,11 +184,6 @@ def release(self, k): self.locks.pop(k, None) -class LocalJobDict(dict): - def set(self, k, v, **kwargs): - self[k] = v - - class TestLANQueue(IntegrationTest): """A simple in-memory integration test between DB storage layer, queuing layer, and driver.""" @@ -214,13 +210,14 @@ def setUp(self): self.lq.ns, self.lq.addr, MagicMock(), logging.getLogger("lantestbase") ) self.lq.lan.q.locks = LocalLockManager(dict(), "lq") - self.lq.lan.q.jobs = LocalJobDict() + self.lq.lan.q.jobs = TestReplDict(lambda a, b: None) self.lq.lan.q.peers = dict() def test_completes_job_in_order(self): self.lq.lan.q.setJob( - "bsdf", + "uuid1", dict( + id="uuid1", name="j1", created=0, sets=[ @@ -238,8 +235,9 @@ def test_completes_job_in_order(self): def test_multi_job(self): for name in ("j1", "j2"): self.lq.lan.q.setJob( - f"{name}_hash", + f"{name}_id", dict( + id=f"{name}_id", name=name, created=0, sets=[dict(path=f"{name}.gcode", count=1, remaining=1)], @@ -288,7 +286,7 @@ def onupdate(): lq.ns, lq.addr, MagicMock(), logging.getLogger("lantestbase") ) lq.lan.q.locks = LocalLockManager(self.locks, f"peer{i}") - lq.lan.q.jobs = LocalJobDict() + lq.lan.q.jobs = TestReplDict(lambda a, b: None) lq.lan.q.peers = self.peers if i > 0: lq.lan.q.peers = self.peers[0][2].lan.q.peers @@ -304,6 +302,7 @@ def test_ordered_acquisition(self): lq1.lan.q.setJob( f"{name}_hash", dict( + id=f"{name}_hash", name=name, created=0, sets=[ diff --git a/continuousprint/queues/abstract.py b/continuousprint/queues/abstract.py index 56a4462..dae1e6f 100644 --- a/continuousprint/queues/abstract.py +++ b/continuousprint/queues/abstract.py @@ -67,40 +67,36 @@ def reset_jobs(self, job_ids) -> dict: pass -class AbstractJobQueue(AbstractQueue): - """LAN queues (potentially others in the future) act on whole jobs and do not allow - edits to inner data""" - - @abstractmethod - def submit_job(self, j: JobView) -> bool: - pass - - class AbstractEditableQueue(AbstractQueue): - """Some queues (e.g. local to a single printer) are directly editable.""" + """Use for queues that are directly editable.""" @abstractmethod - def add_job(self, name="") -> JobView: + def mv_job(self, job_id, after_id): pass @abstractmethod - def add_set(self, job_id, data) -> SetView: + def edit_job(self, job_id, data): pass @abstractmethod - def mv_set(self, set_id, after_id, dest_job) -> SetView: + def get_job_view(self, job_id): pass @abstractmethod - def mv_job(self, job_id, after_id): + def import_job_from_view(self, job_view): + """Imports a JobView into storage. Returns ID of the imported job""" pass + +class AbstractFactoryQueue(AbstractEditableQueue): + """Use for queues where you can construct new jobs/sets""" + @abstractmethod - def edit_job(self, job_id, data): + def add_job(self, name="") -> JobView: pass @abstractmethod - def rm_multi(self, job_ids, set_ids) -> dict: + def add_set(self, job_id, data) -> SetView: pass @abstractmethod diff --git a/continuousprint/queues/abstract_test.py b/continuousprint/queues/abstract_test.py new file mode 100644 index 0000000..ab287cf --- /dev/null +++ b/continuousprint/queues/abstract_test.py @@ -0,0 +1,157 @@ +from ..storage.database import JobView, SetView + + +class DummyQueue: + name = "foo" + + +def testJob(inst): + s = SetView() + s.id = inst + s.path = f"set{inst}.gcode" + s.count = 2 + s.remaining = 2 + s.rank = 0 + s.sd = False + s.material_keys = "" + s.profile_keys = "profile" + s.completed = 0 + s.save = lambda: True + j = JobView() + s.job = j + j.id = inst + j.acquired = False + j.name = f"job{inst}" + j.count = 2 + j.remaining = 2 + j.sets = [s] + j.draft = False + j.rank = 0 + j.queue = DummyQueue() + j.created = 5 + j.save = lambda: True + return j + + +class JobEqualityTests: + def _strip(self, d, ks): + for k in ks: + del d[k] + + def assertJobsEqual(self, v1, v2, ignore=[]): + d1 = v1.as_dict() + d2 = v2.as_dict() + for d in (d1, d2): + self._strip(d, [*ignore, "id", "queue"]) + for s in d1["sets"]: + self._strip(s, ("id", "rank")) + for s in d2["sets"]: + self._strip(s, ("id", "rank")) + self.assertEqual(d1, d2) + + def assertSetsEqual(self, s1, s2): + d1 = s1.as_dict() + d2 = s2.as_dict() + for d in (d1, d2): + self._strip(d, ("id", "rank")) + self.assertEqual(d1, d2) + + +class AbstractQueueTests(JobEqualityTests): + def setUp(self): + raise NotImplementedError("Must create queue as self.q with testJob() inserted") + + def test_acquire_get_release(self): + j = testJob(0) + self.assertEqual(self.q.acquire(), True) + self.assertEqual(self.q.get_job().acquired, True) + self.assertJobsEqual(self.q.get_job(), j, ignore=["acquired"]) + self.assertSetsEqual(self.q.get_set(), j.sets[0]) + self.q.release() + self.assertEqual(self.q.get_job(), None) + self.assertEqual(self.q.get_set(), None) + + def test_decrement_and_reset(self): + self.assertEqual(self.q.acquire(), True) + self.assertEqual(self.q.decrement(), True) # Work remains + got = self.q.get_set() + self.assertEqual(got.remaining, 1) + self.assertEqual(got.completed, 1) + self.q.reset_jobs([self.jid]) + got = self.q.as_dict()["jobs"][0]["sets"][0] + self.assertEqual(got["remaining"], 2) + self.assertEqual(got["completed"], 0) + + def test_remove_jobs(self): + self.assertEqual(self.q.remove_jobs([self.jid])["jobs_deleted"], 1) + self.assertEqual(len(self.q.as_dict()["jobs"]), 0) + + def test_as_dict(self): + d = self.q.as_dict() + self.assertNotEqual(d.get("name"), None) + self.assertNotEqual(d.get("jobs"), None) + self.assertNotEqual(d.get("strategy"), None) + + +class EditableQueueTests(JobEqualityTests): + NUM_TEST_JOBS = 4 + + def setUp(self): + raise NotImplementedError( + "Must create queue as self.q with testJob() inserted (inst=0..3)" + ) + + def test_mv_job_exchange(self): + self.q.mv_job(self.jids[1], self.jids[2]) + jids = [j["id"] for j in self.q.as_dict()["jobs"]] + self.assertEqual(jids, [self.jids[i] for i in (0, 2, 1, 3)]) + + def test_mv_to_front(self): + self.q.mv_job(self.jids[2], None) + jids = [j["id"] for j in self.q.as_dict()["jobs"]] + self.assertEqual(jids, [self.jids[i] for i in (2, 0, 1, 3)]) + + def test_mv_to_back(self): + self.q.mv_job(self.jids[2], self.jids[3]) + jids = [j["id"] for j in self.q.as_dict()["jobs"]] + self.assertEqual(jids, [self.jids[i] for i in (0, 1, 3, 2)]) + + def test_edit_job(self): + result = self.q.edit_job(self.jids[0], dict(draft=True)) + self.assertEqual(result, self.q.as_dict()["jobs"][0]) + self.assertEqual(self.q.as_dict()["jobs"][0]["draft"], True) + + def test_edit_job_then_decrement_persists_changes(self): + self.assertEqual(self.q.acquire(), True) + self.assertEqual(self.q.as_dict()["jobs"][0]["acquired"], True) + self.assertEqual(len(self.q.as_dict()["jobs"][0]["sets"]), 1) + + # Edit the acquired job, adding a new set + newsets = [testJob(0).sets[0].as_dict()] # Same as existing + newsets.append(testJob(100).sets[0].as_dict()) # New set + self.q.edit_job(self.jids[0], dict(sets=newsets)) + + # Value after decrement should be consistent, i.e. not regress to prior acquired-job value + self.q.decrement() + self.assertEqual(len(self.q.as_dict()["jobs"][0]["sets"]), 2) + + def test_get_job_view(self): + self.assertJobsEqual(self.q.get_job_view(self.jids[0]), testJob(0)) + + def test_import_job_from_view(self): + j = testJob(10) + jid = self.q.import_job_from_view(j) + self.assertJobsEqual(self.q.get_job_view(jid), j) + + def test_import_job_from_view_persists_completion_and_remaining(self): + j = testJob(10) + j.sets[0].completed = 3 + j.sets[0].remaining = 5 + jid = self.q.import_job_from_view(j) + got = self.q.get_job_view(jid).sets[0] + self.assertEqual(got.completed, j.sets[0].completed) + self.assertEqual(got.remaining, j.sets[0].remaining) + + +class AbstractFactoryQueueTests(JobEqualityTests): + pass # TODO diff --git a/continuousprint/queues/lan.py b/continuousprint/queues/lan.py index fb752c6..4340421 100644 --- a/continuousprint/queues/lan.py +++ b/continuousprint/queues/lan.py @@ -1,12 +1,19 @@ -from peerprint.lan_queue import LANPrintQueue -from ..storage.lan import LANJobView -from ..storage.database import JobView +import uuid +from typing import Optional +from bisect import bisect_left +from peerprint.lan_queue import LANPrintQueue, ChangeType +from ..storage.lan import LANJobView, LANSetView +from ..storage.database import JobView, SetView from pathlib import Path -from .abstract import AbstractJobQueue, QueueData, Strategy +from .abstract import AbstractEditableQueue, QueueData, Strategy import dataclasses -class LANQueue(AbstractJobQueue): +class ValidationError(Exception): + pass + + +class LANQueue(AbstractEditableQueue): def __init__( self, ns, @@ -25,6 +32,8 @@ def __init__( self.ns = ns self.addr = addr self.lan = None + self.job_id = None + self.set_id = None self.update_cb = update_cb self._fileshare = fileshare self._path_on_disk = path_on_disk_fn @@ -38,7 +47,26 @@ def is_ready(self) -> bool: def connect(self): self.lan.connect() - def _on_update(self): + def _compare_peer(self, prev, nxt): + if prev is None and nxt is not None: + return True + if prev is not None and nxt is None: + return True + if prev is None and nxt is None: + return False + for k in ("status", "run"): + if prev.get(k) != nxt.get(k): + return True + return False + + def _compare_job(self, prev, nxt): + return True # Always trigger callback - TODO make this more sophisticated + + def _on_update(self, changetype, prev, nxt): + if changetype == ChangeType.PEER and not self._compare_peer(prev, nxt): + return + elif changetype == ChangeType.JOB and not self._compare_job(prev, nxt): + return self.update_cb(self) def destroy(self): @@ -62,9 +90,9 @@ def set_job(self, hash_: str, manifest: dict): def resolve_set(self, peer, hash_, path) -> str: # Get fileshare address from the peer - peerstate = self.lan.q.getPeers().get(peer) + peerstate = self._get_peers().get(peer) if peerstate is None: - raise Exception( + raise ValidationError( "Cannot resolve set {path} within job hash {hash_}; peer state is None" ) @@ -72,70 +100,59 @@ def resolve_set(self, peer, hash_, path) -> str: gjob_dirpath = self._fileshare.fetch(peerstate["fs_addr"], hash_, unpack=True) return str(Path(gjob_dirpath) / path) - # --------- AbstractJobQueue implementation ------ - - def _validate_job(self, j: JobView) -> str: - peer_profiles = set( - [ - p.get("profile", dict()).get("name", "UNKNOWN") - for p in self.lan.q.getPeers().values() - ] - ) + # -------- Wrappers around LANQueue to add/remove metadata ------ - for s in j.sets: - sprof = set(s.profiles()) - # All sets in the job *must* have an assigned profile - if len(sprof) == 0: - return f"validation for job {j.name} failed - set {s.path} has no assigned profile" + def _annotate_job(self, peer_and_manifest, acquired_by): + (peer, manifest) = peer_and_manifest + m = dict(**manifest) + m["peer_"] = peer + m["acquired"] = True if acquired_by is not None else False + m["acquired_by_"] = acquired_by + return m - # At least one printer in the queue must have a compatible proile - if len(peer_profiles.intersection(sprof)) == 0: - return f"validation for job {j.name} failed - no match for set {s.path} with profiles {sprof} (connected printer profiles: {peer_profiles})" + def _normalize_job(self, data): + del m["peer_"] + del m["acquired_by_"] - # All set paths must resolve to actual files - fullpath = self._path_on_disk(s.path, s.sd) - if fullpath is None or not Path(fullpath).exists(): - return f"validation for job {j.name} failed - file not found at {s.path} (is it stored on disk and not SD?)" + def _get_jobs(self) -> list: + joblocks = self.lan.q.getLocks() + jobs = [] + for (jid, v) in self.lan.q.getJobs(): + jobs.append(self._annotate_job(v, joblocks.get(jid))) + return jobs - def submit_job(self, j: JobView) -> bool: - err = self._validate_job(j) - if err is not None: - self._logger.warning(err) - return Exception(err) - filepaths = dict([(s.path, self._path_on_disk(s.path, s.sd)) for s in j.sets]) - manifest = j.as_dict() - if manifest.get("created") is None: - manifest["created"] = int(time.time()) - # Note: postJob strips fields from manifest in-place - hash_ = self._fileshare.post(manifest, filepaths) - self.lan.q.setJob(hash_, manifest) + def _get_job(self, jid) -> dict: + j = self.lan.q.getJob(jid) + if j is not None: + joblocks = self.lan.q.getLocks() + return self._annotate_job(j, joblocks.get(jid)) - def reset_jobs(self, job_ids) -> dict: - for jid in job_ids: - j = self.lan.q.jobs.get(jid) - if j is None: - continue - (addr, manifest) = j + def _get_peers(self) -> list: + result = {} + # Locks are given by job:peer, so reverse this + peerlocks = dict([(v, k) for k, v in self.lan.q.getLocks().items()]) + for k, v in self.lan.q.getPeers().items(): + result[k] = dict(**v, acquired=peerlocks.get(k, [])) + return result - manifest["remaining"] = manifest["count"] - for s in manifest.get("sets", []): - s["remaining"] = s["count"] - self.lan.q.setJob(jid, manifest, addr=addr) + # --------- begin AbstractQueue -------- - def remove_jobs(self, job_ids) -> dict: - for jid in job_ids: - self.lan.q.removeJob(jid) + def get_job(self) -> Optional[JobView]: + # Override to ensure the latest data is received + return self.get_job_view(self.job_id) - # --------- AbstractQueue implementation -------- + def get_set(self) -> Optional[SetView]: + if self.job_id is not None and self.set_id is not None: + # Linear search through sets isn't efficient, but it works. + j = self.get_job_view(self.job_id) + for s in j.sets: + if s.id == self.set_id: + return s def _peek(self): if self.lan is None or self.lan.q is None: return (None, None) - jobs = self.lan.q.getJobs() - jobs.sort( - key=lambda j: j["created"] - ) # Always creation order - there is no reordering in lan queue - for data in jobs: + for data in self._get_jobs(): acq = data.get("acquired_by_") if acq is not None and acq != self.addr: continue # Acquired by somebody else, so don't consider for scheduling @@ -152,8 +169,8 @@ def acquire(self) -> bool: if job is not None and s is not None: if self.lan.q.acquireJob(job.id): self._logger.debug(f"acquire() candidate:\n{job}\n{s}") - self.job = job - self.set = s + self.job_id = job.id + self.set_id = s.id self._logger.debug("acquire() success") return True else: @@ -163,22 +180,24 @@ def acquire(self) -> bool: return False def release(self) -> None: - if self.job is not None: - self.lan.q.releaseJob(self.job.id) - self.job = None - self.set = None + if self.job_id is not None: + self.lan.q.releaseJob(self.job_id) + self.job_id = None + self.set_id = None def decrement(self) -> None: - if self.job is not None: - next_set = self.set.decrement(self._profile) + if self.job_id is not None: + next_set = self.get_set().decrement(self._profile) if next_set: self._logger.debug("Still has work, going for next set") - self.set = next_set + self.set_id = next_set.id return True else: self._logger.debug("No more work; releasing") self.release() return False + else: + raise Exception("Cannot decrement; no job acquired") def _active_set(self): assigned = self.get_set() @@ -190,11 +209,10 @@ def as_dict(self) -> dict: jobs = [] peers = {} if self.lan.q is not None: - jobs = self.lan.q.getJobs() - jobs.sort( - key=lambda j: j["created"] - ) # Always creation order - there is no reordering in lan queue - peers = self.lan.q.getPeers() + jobs = self._get_jobs() + peers = self._get_peers() + for j in jobs: + j["queue"] = self.ns return dataclasses.asdict( QueueData( @@ -206,3 +224,99 @@ def as_dict(self) -> dict: active_set=self._active_set(), ) ) + + def reset_jobs(self, job_ids) -> dict: + for jid in job_ids: + j = self._get_job(jid) + if j is None: + continue + + j["remaining"] = j["count"] + for s in j.get("sets", []): + s["remaining"] = s["count"] + s["completed"] = 0 + self.lan.q.setJob(jid, j, addr=j["peer_"]) + + def remove_jobs(self, job_ids) -> dict: + n = 0 + for jid in job_ids: + if self.lan.q.removeJob(jid) is not None: + n += 1 + return dict(jobs_deleted=n) + + # --------- end AbstractQueue ------ + + # --------- AbstractEditableQueue implementation ------ + + def get_job_view(self, job_id): + j = self._get_job(job_id) + if j is not None: + return LANJobView(j, self) + + def import_job_from_view(self, j, jid=None): + err = self._validate_job(j) + if err is not None: + raise ValidationError(err) + filepaths = dict([(s.path, self._path_on_disk(s.path, s.sd)) for s in j.sets]) + manifest = j.as_dict() + if manifest.get("created") is None: + manifest["created"] = int(time.time()) + # Note: post mutates manifest by stripping fields + manifest["hash"] = self._fileshare.post(manifest, filepaths) + manifest["id"] = jid if jid is not None else self._gen_uuid() + self.lan.q.setJob(manifest["id"], manifest) + return manifest["id"] + + def mv_job(self, job_id, after_id): + self.lan.q.jobs.mv(job_id, after_id) + + def _path_exists(self, fullpath): + return Path(fullpath).exists() + + def _validate_job(self, j: JobView) -> str: + peer_profiles = set( + [ + p.get("profile", dict()).get("name", "UNKNOWN") + for p in self._get_peers().values() + ] + ) + + for s in j.sets: + sprof = set(s.profiles()) + # All sets in the job *must* have an assigned profile + if len(sprof) == 0: + return f"validation for job {j.name} failed - set {s.path} has no assigned profile" + + # At least one printer in the queue must have a compatible proile + if len(peer_profiles.intersection(sprof)) == 0: + return f"validation for job {j.name} failed - no match for set {s.path} with profiles {sprof} (connected printer profiles: {peer_profiles})" + + # All set paths must resolve to actual files + fullpath = self._path_on_disk(s.path, s.sd) + if fullpath is None or not self._path_exists(fullpath): + return f"validation for job {j.name} failed - file not found at {s.path} (is it stored on disk and not SD?)" + + def _gen_uuid(self) -> str: + for i in range(100): + result = uuid.uuid4() + if not self.lan.q.hasJob(result): + return str(result) + raise Exception("UUID generation failed - too many ID collisions") + + def edit_job(self, job_id, data) -> bool: + # For lan queues, "editing" a job is basically resubmission of the whole thing. + # This is because the backing .gjob format is a single file containing the full manifest. + j = self.get_job_view(job_id) + for (k, v) in data.items(): + if k in ("id", "peer_", "queue"): + continue + if k == "sets": + j.updateSets( + v + ) # Set data must be translated into views, done by updateSets() + else: + setattr(j, k, v) + + # Exchange the old job for the new job (reuse job ID) + jid = self.import_job_from_view(j, j.id) + return self._get_job(jid) diff --git a/continuousprint/queues/lan_test.py b/continuousprint/queues/lan_test.py index 3d3a47f..270ad5d 100644 --- a/continuousprint/queues/lan_test.py +++ b/continuousprint/queues/lan_test.py @@ -4,14 +4,26 @@ from datetime import datetime from unittest.mock import MagicMock from .abstract import Strategy -from .lan import LANQueue +from .abstract_test import ( + AbstractQueueTests, + EditableQueueTests, + testJob as makeAbstractTestJob, +) +from .lan import LANQueue, ValidationError from ..storage.database import JobView, SetView +from peerprint.lan_queue_test import LANQueueLocalTest as PeerPrintLANTest # logging.basicConfig(level=logging.DEBUG) -class LANQueueTest(unittest.TestCase): +class LANQueueTest(unittest.TestCase, PeerPrintLANTest): def setUp(self): + PeerPrintLANTest.setUp(self) # Generate peerprint LANQueue as self.q + self.q.q.syncPeer( + dict(profile=dict(name="profile")), addr=self.q.q.addr + ) # Helps pass validation + ppq = self.q # Rename to make way for CPQ LANQueue + self.ucb = MagicMock() self.fs = MagicMock() self.q = LANQueue( @@ -24,6 +36,23 @@ def setUp(self): dict(name="profile"), lambda path, sd: path, ) + self.q.lan = ppq + self.q._path_exists = lambda p: True # Override path check for validation + + +class TestAbstractImpl(AbstractQueueTests, LANQueueTest): + def setUp(self): + LANQueueTest.setUp(self) + self.jid = self.q.import_job_from_view(makeAbstractTestJob(0)) + + +class TestEditableImpl(EditableQueueTests, LANQueueTest): + def setUp(self): + LANQueueTest.setUp(self) + self.jids = [ + self.q.import_job_from_view(makeAbstractTestJob(i)) + for i in range(EditableQueueTests.NUM_TEST_JOBS) + ] class TestLANQueueNoConnection(LANQueueTest): @@ -31,11 +60,16 @@ def test_update_peer_state(self): self.q.update_peer_state("HI", {}, {}, {}) # No explosions? Good +class DummyQueue: + name = "lantest" + + class TestLANQueueConnected(LANQueueTest): def setUp(self): super().setUp() self.q.lan = MagicMock() self.q.lan.q = MagicMock() + self.q.lan.q.hasJob.return_value = False # For UUID generation self.q.lan.q.getPeers.return_value = { "a": dict(fs_addr="123", profile=dict(name="abc")), } @@ -53,6 +87,7 @@ def _jbase(self, path="a.gcode"): j = JobView() j.id = 1 j.name = "j1" + j.queue = DummyQueue() s = SetView() s.path = path s.id = 2 @@ -71,58 +106,26 @@ def _jbase(self, path="a.gcode"): j.acquired = False return j - def test_submit_job_file_missing(self): + def test_validation_file_missing(self): j = self._jbase() j.sets[0].profile_keys = "def,abc" - result = self.q.submit_job(j) - self.assertRegex(str(result), "file not found") + self.q._path_exists = lambda p: False # Override path check for validation + with self.assertRaisesRegex(ValidationError, "file not found"): + self.q.import_job_from_view(j) self.fs.post.assert_not_called() - def test_submit_job_no_profile(self): - result = self.q.submit_job(self._jbase()) - self.assertRegex(str(result), "no assigned profile") + def test_validation_no_profile(self): + with self.assertRaisesRegex(ValidationError, "no assigned profile"): + self.q.import_job_from_view(self._jbase()) self.fs.post.assert_not_called() - def test_submit_job_no_match(self): + def test_validation_no_match(self): j = self._jbase() j.sets[0].profile_keys = "def" - result = self.q.submit_job(j) - self.assertRegex(str(result), "no match for set") + with self.assertRaisesRegex(ValidationError, "no match for set"): + self.q.import_job_from_view(j) self.fs.post.assert_not_called() - def test_submit_job(self): - with tempfile.NamedTemporaryFile(suffix=".gcode") as f: - self.fs.post.return_value = "hash" - j = self._jbase(f.name) - j.sets[0].profile_keys = "def,abc" - self.q.submit_job(j) - self.fs.post.assert_called() - self.q.lan.q.setJob.assert_called_with( - "hash", - { - "name": "j1", - "count": 1, - "draft": False, - "sets": [ - { - "path": f.name, - "count": 1, - "materials": [], - "profiles": ["def", "abc"], - "id": 2, - "rank": 1, - "sd": False, - "remaining": 1, - "completed": 0, - } - ], - "created": 100, - "id": 1, - "remaining": 1, - "acquired": False, - }, - ) - class TestLANQueueWithJob(LANQueueTest): def setUp(self): diff --git a/continuousprint/queues/local.py b/continuousprint/queues/local.py index 33d0d12..a9e5de7 100644 --- a/continuousprint/queues/local.py +++ b/continuousprint/queues/local.py @@ -1,4 +1,4 @@ -from .abstract import Strategy, QueueData, AbstractEditableQueue +from .abstract import Strategy, QueueData, AbstractFactoryQueue import tempfile import os from ..storage.database import JobView, SetView @@ -7,7 +7,7 @@ import dataclasses -class LocalQueue(AbstractEditableQueue): +class LocalQueue(AbstractFactoryQueue): def __init__( self, queries, @@ -52,7 +52,7 @@ def acquire(self) -> bool: self.ns, self._profile, self._set_path_exists ) if p is not None and self.queries.acquireJob(p): - self.job = p + self.job = self.queries.getJob(p.id) # Refetch job to get acquired state self.set = p.next_set(self._profile, self._set_path_exists) return True return False @@ -98,7 +98,7 @@ def as_dict(self) -> dict: ) def remove_jobs(self, job_ids): - return self.rm_multi(job_ids=job_ids) + return self.queries.remove(job_ids=job_ids) def reset_jobs(self, job_ids): return self.queries.resetJobs(job_ids) @@ -107,14 +107,22 @@ def reset_jobs(self, job_ids): # -------------- begin AbstractEditableQueue ----------- - def add_job(self, name="") -> JobView: - return self.queries.newEmptyJob(self.ns, name) - - def add_set(self, job_id, data) -> SetView: - return self.queries.appendSet(self.ns, job_id, data) - - def mv_set(self, set_id, after_id, dest_job): - return self.queries.moveSet(set_id, after_id, dest_job) + def get_job_view(self, job_id): + return self.queries.getJob(job_id) + + def import_job_from_view(self, v): + manifest = v.as_dict() + # TODO make transaction, move to storage/queries.py + j = self.add_job() + for (k, v) in manifest.items(): + if k in ("peer_", "sets", "id", "acquired", "queue"): + continue + setattr(j, k, v) + j.save() + for s in manifest["sets"]: + del s["id"] + self.add_set(j.id, s) + return j.id def mv_job(self, job_id, after_id): return self.queries.moveJob(job_id, after_id) @@ -122,8 +130,15 @@ def mv_job(self, job_id, after_id): def edit_job(self, job_id, data): return self.queries.updateJob(job_id, data) - def rm_multi(self, job_ids=[], set_ids=[]) -> dict: - return self.queries.remove(job_ids=job_ids, set_ids=set_ids) + # ------------------- end AbstractEditableQueue --------------- + + # ------------ begin AbstractFactoryQueue ------ + + def add_job(self, name="") -> JobView: + return self.queries.newEmptyJob(self.ns, name) + + def add_set(self, job_id, data) -> SetView: + return self.queries.appendSet(self.ns, job_id, data) def import_job(self, gjob_path: str, draft=True) -> dict: out_dir = str(Path(gjob_path).stem) @@ -150,4 +165,4 @@ def export_job(self, job_id: int, dest_dir: str) -> str: os.rename(tf.name, path) return path - # ------------------- end AbstractEditableQueue --------------- + # ------------------- end AbstractFactoryQueue --------------- diff --git a/continuousprint/queues/local_test.py b/continuousprint/queues/local_test.py index 7e27306..ee07e77 100644 --- a/continuousprint/queues/local_test.py +++ b/continuousprint/queues/local_test.py @@ -1,11 +1,53 @@ import unittest import logging +from ..storage.database_test import DBTest +from ..storage import queries from unittest.mock import MagicMock from .abstract import Strategy, QueueData +from .abstract_test import ( + AbstractQueueTests, + EditableQueueTests, + testJob as makeAbstractTestJob, +) from .local import LocalQueue from dataclasses import dataclass, asdict -# logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.DEBUG) + + +class TestAbstractImpl(AbstractQueueTests, DBTest): + # See abstract_test.py for actual test cases + def setUp(self): + DBTest.setUp(self) + self.q = LocalQueue( + queries, + "local", + Strategy.IN_ORDER, + dict(name="profile"), + MagicMock(), + MagicMock(), + ) + self.jid = self.q.import_job_from_view(makeAbstractTestJob(0)) + self.q._set_path_exists = lambda p: True + + +class TestEditableImpl(EditableQueueTests, DBTest): + # See abstract_test.py for actual test cases + def setUp(self): + DBTest.setUp(self) + self.q = LocalQueue( + queries, + "local", + Strategy.IN_ORDER, + dict(name="profile"), + MagicMock(), + MagicMock(), + ) + self.jids = [ + self.q.import_job_from_view(makeAbstractTestJob(i)) + for i in range(EditableQueueTests.NUM_TEST_JOBS) + ] + self.q._set_path_exists = lambda p: True class TestLocalQueueInOrderNoInitialJob(unittest.TestCase): @@ -21,16 +63,6 @@ def setUp(self): MagicMock(), ) - def test_acquire_success(self): - j = MagicMock() - s = MagicMock() - j.next_set.return_value = s - self.q.queries.getNextJobInQueue.return_value = j - self.q.queries.acquireJob.return_value = True - self.assertEqual(self.q.acquire(), True) - self.assertEqual(self.q.get_job(), j) - self.assertEqual(self.q.get_set(), s) - def test_acquire_failed(self): self.q.queries.getNextJobInQueue.return_value = "doesntmatter" self.q.queries.acquireJob.return_value = False @@ -41,19 +73,6 @@ def test_acquire_failed_no_jobs(self): self.q.queries.getNextJobInQueue.return_value = None self.assertEqual(self.q.acquire(), False) - def test_as_dict(self): - self.assertEqual( - self.q.as_dict(), - dict( - name="testQueue", - strategy="IN_ORDER", - jobs=[], - active_set=None, - addr=None, - peers=[], - ), - ) - class TestLocalQueueInOrderInitial(unittest.TestCase): def setUp(self): @@ -115,6 +134,8 @@ def as_dict(self): ) +# TODO test mv_job + # TODO test SD card behavior on importing/exporting and printing # class TestSD(unittest.TestCase): # def testSDExport(self): diff --git a/continuousprint/static/css/continuousprint.css b/continuousprint/static/css/continuousprint.css index 77eeb32..be263b3 100644 --- a/continuousprint/static/css/continuousprint.css +++ b/continuousprint/static/css/continuousprint.css @@ -186,7 +186,7 @@ padding: 2px; border: 1px transparent solid; } -#tab_plugin_continuousprint .loading { +#tab_plugin_continuousprint .cp-queue .loading { opacity: 0.3; cursor: default !important; } @@ -422,44 +422,3 @@ #tab_plugin_continuousprint .header > *, #tab_plugin_continuousprint .entries > .entry > * { flex: 1; - text-align: center; -} - -#tab_plugin_continuousprint .timelapse_thumbnail { - position: relative; - display: inline-block; -} - -#tab_plugin_continuousprint .timelapse_thumbnail > img { - display: none; - cursor: pointer; - position: absolute; - z-index: 1000; - top: 0; - max-width: 100px; - left: 18px; - border: 1px black solid; - padding: 5px; - background-color: white; -} -#tab_plugin_continuousprint .timelapse_thumbnail > i { - padding: 2px; /* So hover is maintained when mousing over to image */ -} -#tab_plugin_continuousprint .timelapse_thumbnail:hover > img { - display: block; -} - -#tab_plugin_continuousprint .separator { - display: flex; - justify-content: space-between; - align-items: center; -} -#tab_plugin_continuousprint .separator .line { - content: ''; - flex: 1; - border-bottom: 1px solid #000; - margin-left: 5px; - margin-right: 5px; -} - -/* ======== */ diff --git a/continuousprint/static/js/continuousprint_job.js b/continuousprint/static/js/continuousprint_job.js index 962e256..f696c59 100644 --- a/continuousprint/static/js/continuousprint_job.js +++ b/continuousprint/static/js/continuousprint_job.js @@ -20,7 +20,7 @@ function CPJob(obj, peers, api, profile) { } var self = this; - obj = {...{sets: [], name: "", draft: false, count: 1, queue: "default", id: -1}, ...obj}; + obj = {...{sets: [], name: "", draft: false, count: 1, id: -1}, ...obj}; if (obj.remaining === undefined) { obj.remaining = obj.count; } @@ -82,7 +82,7 @@ function CPJob(obj, peers, api, profile) { } self.editStart = function() { - api.edit(api.JOB, {id: self.id(), draft: true}, () => { + api.edit(api.JOB, {queue: obj.queue, id: self.id(), draft: true}, () => { self.draft(true); }); } @@ -96,7 +96,7 @@ function CPJob(obj, peers, api, profile) { self.sets.push(newqs); } self.editCancel = function() { - api.edit(api.JOB, {id: self.id(), draft: false}, self._update); + api.edit(api.JOB, {queue: obj.queue, id: self.id(), draft: false}, self._update); } self.onBlur = function(vm, e) { let cl = e.target.classList; @@ -109,6 +109,7 @@ function CPJob(obj, peers, api, profile) { self.editEnd = function() { let data = self.as_object(); data.draft = false; + data.queue = obj.queue; api.edit(api.JOB, data, self._update); } diff --git a/continuousprint/static/js/continuousprint_queue.js b/continuousprint/static/js/continuousprint_queue.js index 3a1f94e..1b94631 100644 --- a/continuousprint/static/js/continuousprint_queue.js +++ b/continuousprint/static/js/continuousprint_queue.js @@ -31,6 +31,7 @@ function CPQueue(data, api, files, profile) { self.shiftsel = ko.observable(-1); self.details = ko.observable(""); self.fullDetails = ko.observable(""); + self.ready = ko.observable(data.name === 'local' || Object.keys(data.peers).length > 0); if (self.addr !== null && data.peers !== undefined) { let pkeys = Object.keys(data.peers); if (pkeys.length === 0) { @@ -252,6 +253,15 @@ function CPQueue(data, api, files, profile) { }); } + self.hasDraftJobs = function() { + for (let j of self.jobs()) { + if (j.draft()) { + return true; + } + } + return false; + } + self.addFile = function(data, infer_profile=false) { if (data.path.endsWith('.gjob')) { // .gjob import has a different API path @@ -291,7 +301,7 @@ function CPQueue(data, api, files, profile) { set_data['job'] = null; // Invoking API causes a new job to be created self.api.add(self.api.SET, set_data, (response) => { - return self._pushJob({id: response.job_id, name: set_data['jobName'], draft: true, count: 1, sets: [response.set_]}); + return self._pushJob({queue: self.name, id: response.job_id, name: set_data['jobName'], draft: true, count: 1, sets: [response.set_]}); }); }; diff --git a/continuousprint/static/js/continuousprint_queue.test.js b/continuousprint/static/js/continuousprint_queue.test.js index d4b3765..a3ec702 100644 --- a/continuousprint/static/js/continuousprint_queue.test.js +++ b/continuousprint/static/js/continuousprint_queue.test.js @@ -33,7 +33,9 @@ function items(njobs = 1, nsets = 2) { } function init(njobs = 1) { - return new VM({name:"test", jobs:items(njobs)}, mocks()); + return new VM({name:"test", jobs:items(njobs), peers:[ + {name: "localhost", profile: {name: "profile"}, status: "IDLE"} + ]}, mocks()); } test('newEmptyJob', () => { diff --git a/continuousprint/static/js/continuousprint_viewmodel.js b/continuousprint/static/js/continuousprint_viewmodel.js index df956db..fb69225 100644 --- a/continuousprint/static/js/continuousprint_viewmodel.js +++ b/continuousprint/static/js/continuousprint_viewmodel.js @@ -68,7 +68,16 @@ function CPViewModel(parameters) { // Patch the files panel to allow for adding to queue self.files.add = function(data) { - self.defaultQueue.addFile(data, self.settings.settings.plugins.continuousprint.cp_infer_profile() || false); + // We first look for any queues with draft jobs - add the file here if so + // Otherwise it goes into the default queue. + let fq = self.defaultQueue; + for (let q of self.queues()) { + if (q.hasDraftJobs()) { + fq = q; + break; + } + } + fq.addFile(data, self.settings.settings.plugins.continuousprint.cp_infer_profile() || false); }; // Also patch file deletion, to show a modal if the file is in the queue let oldRemove = self.files.removeFile; @@ -221,42 +230,33 @@ function CPViewModel(parameters) { self.draggingJob(vm.constructor.name === "CPJob"); }; - self.sortEnd = function(evt, vm, src) { + self.sortEnd = function(evt, vm, src, dataFor=ko.dataFor) { // Re-enable default drag and drop behavior self.files.onServerConnect(); self.draggingSet(false); self.draggingJob(false); - // If we're dragging a job out of the local queue and into a network queue, - // we must warn the user about the irreversable action before making the change. - // This fully replaces the default sort action - if (evt.from.classList.contains("local") && !evt.to.classList.contains("local")) { - let targetQueue = ko.dataFor(evt.to); - // Undo the move done by CPSortable and trigger updates - // This is inefficient (i.e. could instead prevent the transfer) but that - // would require substantial edits to the CPSortable library. - targetQueue.jobs.splice(evt.newIndex, 1); - src.jobs.splice(evt.oldIndex, 0, vm); - src.jobs.valueHasMutated(); - targetQueue.jobs.valueHasMutated(); - - return self.showSubmitJobDialog(vm, targetQueue); - } - // Sadly there's no "destination job" information, so we have to // infer the index of the job based on the rendered HTML given by evt.to if (vm.constructor.name === "CPJob") { let jobs = self.defaultQueue.jobs(); - let dest_idx = jobs.indexOf(vm); + let destq = dataFor(evt.to); + let dest_idx = destq.jobs().indexOf(vm); let ids = [] for (let j of jobs) { ids.push(j.id()); } self.api.mv(self.api.JOB, { + src_queue: src.name, + dest_queue: destq.name, id: vm.id(), - after_id: (dest_idx > 0) ? jobs[dest_idx-1].id() : -1 - }, (result) => {}); + after_id: (dest_idx > 0) ? destq.jobs()[dest_idx-1].id() : null + }, (result) => { + if (result.error) { + self.onDataUpdaterPluginMessage("continuousprint", {type: "error", msg: result.error}); + } + }); } }; @@ -269,6 +269,10 @@ function CPViewModel(parameters) { if (evt.from.id === "queue_sets" && !evt.to.classList.contains("draft")) { return false; } + // No dragging items in non-ready queues + if (evt.to.classList.contains("loading")) { + return false; + } return true; }; @@ -343,35 +347,6 @@ function CPViewModel(parameters) { self.hasSpoolManager(statusCode !== 404); }); - - self.dialog = $("#cpq_submitDialog"); - self.jobSendDetails = ko.observable(); - self.jobSendTitle = ko.computed(function() { - let details = self.jobSendDetails(); - if (details === undefined) { - return ""; - } - return `Send ${details[0]._name()} to ${details[1].name}?`; - }); - self.submitJob = function() { - let details = self.jobSendDetails(); - self.api.submit(self.api.JOB, {id: details[0].id(), queue: details[1].name}, (result) => { - if (result.error) { - self.onDataUpdaterPluginMessage("continuousprint", {type: "error", msg: result.error}); - } else { - self._setState(result); - } - }); - self.dialog.modal('hide'); - } - self.showSubmitJobDialog = function(job, queue) { - self.jobSendDetails([job, queue]); - self.dialog.modal({}).css({ - width: 'auto', - 'margin-left': function() { return -($(this).width() /2); } - }); - } - self.humanize = function(num) { // Humanizes numbers by condensing and adding units if (num < 1000) { diff --git a/continuousprint/static/js/continuousprint_viewmodel.test.js b/continuousprint/static/js/continuousprint_viewmodel.test.js index 36fcac8..4d3ff87 100644 --- a/continuousprint/static/js/continuousprint_viewmodel.test.js +++ b/continuousprint/static/js/continuousprint_viewmodel.test.js @@ -159,12 +159,12 @@ test('sortEnd job to start', () => { expect(data.after_id).toEqual(-1); }); -test('sortEnd job to end', () => { +test.only('sortEnd job to end', () => { let v = init(njobs=2); let ccont = {classList: {contains: () => true}}; let evt = {from: ccont, to: ccont}; let j = v.defaultQueue.jobs()[1]; - v.sortEnd(evt, j, null); + v.sortEnd(evt, j, v.defaultQueue, dataFor=function(elem) {return v.defaultQueue}); expect(v.files.onServerConnect).toHaveBeenCalled(); expect(v.api.mv).toHaveBeenCalled(); let data = v.api.mv.mock.calls[0][1]; diff --git a/continuousprint/storage/database.py b/continuousprint/storage/database.py index a83d771..dc1cbbd 100644 --- a/continuousprint/storage/database.py +++ b/continuousprint/storage/database.py @@ -113,6 +113,7 @@ def as_dict(self): sets.sort(key=lambda s: s.rank) sets = [s.as_dict() for s in sets] d = dict( + queue=self.queue.name, name=self.name, count=self.count, draft=self.draft, @@ -338,7 +339,6 @@ class TempSet(Set): for f in Set._meta.sorted_field_names: attrs[f] = getattr(s, f) attrs["completed"] = max(0, attrs["count"] - attrs["remaining"]) - print(attrs) TempSet.create(**attrs) if logger is not None: logger.warning(f"Migrating set {s.path} to schema v0.0.3") diff --git a/continuousprint/storage/lan.py b/continuousprint/storage/lan.py index f71e594..a633761 100644 --- a/continuousprint/storage/lan.py +++ b/continuousprint/storage/lan.py @@ -1,4 +1,5 @@ from .database import JobView, SetView +from .queries import getint from requests.exceptions import HTTPError @@ -10,16 +11,21 @@ def __init__(self, lq): class LANJobView(JobView): def __init__(self, manifest, lq): - for attr in ("name", "count", "created"): - setattr(self, attr, manifest[attr]) - self.remaining = manifest.get("remaining", self.count) + self.name = manifest.get("name", "") + self.created = getint(manifest, "created") + self.count = getint(manifest, "count") + self.remaining = getint(manifest, "remaining", default=self.count) self.queue = LANQueueView(lq) self.id = manifest["id"] self.peer = manifest["peer_"] self.sets = [] - self.draft = False - self.acquired = None - self.sets = [LANSetView(s, self, i) for i, s in enumerate(manifest["sets"])] + self.draft = manifest.get("draft", False) + self.acquired = manifest.get("acquired", False) + self.updateSets(manifest["sets"]) + self.hash = manifest.get("hash") + + def updateSets(self, sets_list): + self.sets = [LANSetView(s, self, i) for i, s in enumerate(sets_list)] def save(self): self.queue.lq.set_job(self.id, self.as_dict()) @@ -38,12 +44,12 @@ class LANSetView(SetView): def __init__(self, data, job, rank): self.job = job self.sd = False - self.rank = rank + self.rank = int(rank) self.id = f"{job.id}_{rank}" for attr in ("path", "count"): setattr(self, attr, data[attr]) - self.remaining = data.get("remaining", self.count) - self.completed = data.get("completed", 0) + self.remaining = getint(data, "remaining", default=self.count) + self.completed = getint(data, "completed") self.material_keys = ",".join(data.get("materials", [])) self.profile_keys = ",".join(data.get("profiles", [])) self._resolved = None @@ -52,7 +58,7 @@ def resolve(self) -> str: if self._resolved is None: try: self._resolved = self.job.queue.lq.resolve_set( - self.job.peer, self.job.id, self.path + self.job.peer, self.job.hash, self.path ) except HTTPError as e: raise ResolveError(f"Failed to resolve {self.path}") from e diff --git a/continuousprint/storage/queries.py b/continuousprint/storage/queries.py index f1c9908..e803c2f 100644 --- a/continuousprint/storage/queries.py +++ b/continuousprint/storage/queries.py @@ -11,6 +11,13 @@ MAX_COUNT = 999999 +def getint(d, k, default=0): + v = d.get(k, default) + if type(v) == str: + v = int(v) + return v + + def clearOldState(): # On init, scrub the local DB for any state that may have been left around # due to an improper shutdown @@ -188,6 +195,7 @@ def updateJob(job_id, data, queue=DEFAULT_QUEUE): if k in ( "id", "sets", + "queue", ): # ignored or handled separately continue @@ -234,18 +242,24 @@ def _rankEnd(): return time.time() -def _moveImpl(cls, src, dest_id: int, retried=False): - if dest_id == -1: +def _moveImpl(src, dest_id, retried=False): + if dest_id is None: destRank = 0 else: - destRank = cls.get(id=dest_id).rank + dest_id = int(dest_id) + destRank = Job.get(id=dest_id).rank # Get the next object having a rank beyond the destination rank, # so we can then split the difference # Note the unary '&' operator and the expressions wrapped in parens (a limitation of peewee) postRank = ( - cls.select(cls.rank) - .where((cls.rank > destRank) & (cls.id != src.id)) + Job.select(Job.rank) + .where( + (Job.rank > destRank) + & (Job.id != src.id) + & (Job.queue.name != ARCHIVE_QUEUE) + ) + .order_by(Job.rank) .limit(1) .execute() ) @@ -255,13 +269,16 @@ def _moveImpl(cls, src, dest_id: int, retried=False): postRank = MAX_RANK # Pick the target value as the midpoint between the two ranks candidate = abs(postRank - destRank) / 2 + min(postRank, destRank) + # print( + # f"_moveImpl abs({postRank} - {destRank})/2 + min({postRank}, {destRank}) = {candidate}" + # ) # We may end up with an invalid candidate if we hit a singularity - in this case, rebalance all the # rows and try again if candidate <= destRank or candidate >= postRank: if not retried: - _rankBalance(cls) - _moveImpl(cls, src, dest_id, retried=True) + _rankBalance(Job) + _moveImpl(src, dest_id, retried=True) else: raise Exception("Could not rebalance job rank to move job") else: @@ -271,18 +288,7 @@ def _moveImpl(cls, src, dest_id: int, retried=False): def moveJob(src_id: int, dest_id: int): j = Job.get(id=src_id) - return _moveImpl(Job, j, dest_id) - - -def moveSet(src_id: int, dest_id: int, dest_job: int): - s = Set.get(id=src_id) - if dest_job == -1: - j = newEmptyJob(s.job.queue) - else: - j = Job.get(id=dest_job) - s.job = j - s.save() - _moveImpl(Set, s, dest_id) + return _moveImpl(j, dest_id) def newEmptyJob(q, name="", rank=_rankEnd): @@ -315,7 +321,7 @@ def appendSet(queue: str, jid, data: dict, rank=_rankEnd): if j.is_dirty(): j.save() - count = int(data["count"]) + count = getint(data, "count") sd = data.get("sd", "false") s = Set.create( path=data["path"], @@ -324,7 +330,8 @@ def appendSet(queue: str, jid, data: dict, rank=_rankEnd): material_keys=",".join(data.get("materials", "")), profile_keys=",".join(data.get("profiles", "")), count=count, - remaining=count, + remaining=getint(data, "remaining", count), + completed=getint(data, "completed"), job=j, ) @@ -364,7 +371,11 @@ def resetJobs(job_ids: list): updated += ( Job.update(remaining=Job.count).where(Job.id.in_(job_ids)).execute() ) - updated += Set.update(remaining=Set.count).where(Set.job.in_(job_ids)).execute() + updated += ( + Set.update(remaining=Set.count, completed=0) + .where(Set.job.in_(job_ids)) + .execute() + ) return dict(num_updated=updated) diff --git a/continuousprint/storage/queries_test.py b/continuousprint/storage/queries_test.py index 3ced028..8f53b64 100644 --- a/continuousprint/storage/queries_test.py +++ b/continuousprint/storage/queries_test.py @@ -219,12 +219,14 @@ def testResetJob(self): j = Job.get(id=1) s = j.sets[0] s.remaining = 0 + s.completed = 3 j.remaining = 0 s.save() j.save() q.resetJobs([j.id]) # Replenishing the job replenishes all sets self.assertEqual(Set.get(id=s.id).remaining, 1) + self.assertEqual(Set.get(id=s.id).completed, 0) self.assertEqual(Job.get(id=j.id).remaining, 1) def testUpdateJobCount(self): @@ -295,27 +297,11 @@ def rank(): ) def testMoveJob(self): - for (moveArgs, want) in [((1, 2), [2, 1]), ((2, -1), [2, 1])]: + for (moveArgs, want) in [((1, 2), [2, 1]), ((2, None), [2, 1])]: with self.subTest(f"moveJob({moveArgs}) -> want {want}"): q.moveJob(*moveArgs) self.assertEqual([j.id for j in q.getJobsAndSets(DEFAULT_QUEUE)], want) - def testMoveSet(self): - for (desc, moveArgs, want) in [ - ("FirstToLast", (1, 2, 1), [2, 1, 3, 4]), - ("LastToFirst", (2, -1, 1), [2, 1, 3, 4]), - ("DiffJob", (1, 3, 2), [2, 3, 1, 4]), - ("NewJob", (1, -1, -1), [2, 3, 4, 1]), - ]: - with self.subTest(f"{desc}: moveSet({moveArgs})"): - q.moveSet(*moveArgs) - set_order = [ - (s["id"], s["rank"]) - for j in q.getJobsAndSets(DEFAULT_QUEUE) - for s in j.as_dict()["sets"] - ] - self.assertEqual(set_order, [(w, ANY) for w in want]) - def testGetNextJobAfterDecrement(self): j = q.getNextJobInQueue(DEFAULT_QUEUE, PROFILE) s = j.sets[0] diff --git a/continuousprint/templates/continuousprint_tab.jinja2 b/continuousprint/templates/continuousprint_tab.jinja2 index 1ef1937..b37cdb1 100644 --- a/continuousprint/templates/continuousprint_tab.jinja2 +++ b/continuousprint/templates/continuousprint_tab.jinja2 @@ -1,31 +1,3 @@ -