Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple job id/hash, add order in CPReplOrderedDict #3

Merged
merged 6 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ python:
install:
- pip install -e .[tests,dev]
script:
- cd peerprint && python -m unittest discover -s peerprint -p "*_test.py"
- python -m unittest discover -p "*_test.py"
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@ Type `help` to see a list of all commands, or see how they're implemented in `pe
See `.travis.yml` for CI testing configuration

All tests can also be run manually via `docker-compose run test`

## Deploying

```
python3 -m build
python3 -m twine upload --repository testpypi dist/*
```
1 change: 0 additions & 1 deletion peerprint/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
__version__ = "0.0.9"
2 changes: 1 addition & 1 deletion peerprint/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
import threading

from peerprint import __version__ as version
from .version import __version__ as version

DISCOVERY_PORT = 37020

Expand Down
8 changes: 5 additions & 3 deletions peerprint/filesharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import tempfile
from pathlib import Path
from peerprint import __version__ as version
from .version import __version__ as version
import http.server
import socketserver
import threading
Expand Down Expand Up @@ -62,9 +62,11 @@ def pack_job(manifest: dict, filepaths: dict, dest: str):
s["path"] = s["path"].split("/")[-1]
if filepaths.get(s["path"]) is None:
raise ValueError(f"Job contains set with path={s['path']}, but filepaths has no matching short name")
for k in ("id", "remaining", "rank", "sd"):
for k in ("sd"):
# Note: we leave ID and rank around here as it's useful for ordering/referral to set items
# Also leave "remaining" so that dragging between local and LAN queues is consistent.
s.pop(k, None)
for k in ("acquired", "draft", "id", "remaining"):
for k in ("acquired", "id", "queue"):
manifest.pop(k, None)


Expand Down
114 changes: 62 additions & 52 deletions peerprint/lan_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,22 @@
import logging
import random
import time
from enum import Enum
from collections import defaultdict

from .discovery import P2PDiscovery
from .filesharing import pack_job
from .sync_objects import CPReplDict, CPReplLockManager


# Peer dict is keyed by the peer addr. Value is tuple(last_update_ts, state)
# where state is an opaque dict.
class PeerDict(CPReplDict):
def _item_changed(self, prev, nxt):
if prev is None:
return True
for k in ('status', 'run'):
if prev[1].get(k) != nxt[1].get(k):
return True
return False

# Job dict is keyed by the hash of the .gjob file. Value is tuple(submitting_peer_addr, manifest)
# where manifest is an opaque dict.
class JobDict(CPReplDict):
def _item_changed(self, prev, nxt):
return True # always trigger callback
from .sync_objects import CPOrderedReplDict, CPReplLockManager

class ChangeType(Enum):
PEER = "peer"
JOB = "job"
LOCK = "lock"
QUEUE = "queue"

# This queue is shared with other printers on the local network which are configured with the same namespace.
# Actual scheduling and printing is done by the object owner.
# Job data should be considered opaque at this level -
class LANPrintQueueBase():
PEER_TIMEOUT = 60

Expand All @@ -43,10 +33,26 @@ def __init__(self, ns, addr, update_cb, logger):
self.update_cb = update_cb
self._syncobj = None

def _tagged_cb(self, changetype):
def tagcb(prev, nxt):
# Unwrap metadata tuples and return end value (user provided)
if prev is not None and type(prev) is tuple:
prev = prev[-1]
if nxt is not None and type(nxt) is tuple:
nxt = nxt[-1]
self.update_cb(changetype, prev, nxt)
return tagcb

def connect(self, peers):
self.peers = PeerDict(self.update_cb)
self.jobs = JobDict(self.update_cb)
self.locks = CPReplLockManager(selfID=self.addr, autoUnlockTime=60, cb=self.update_cb)
# Peer dict is keyed by the peer addr. Value is tuple(last_update_ts, state)
# where state is an opaque dict.
self.peers = CPOrderedReplDict(self._tagged_cb(ChangeType.PEER))

# Job dict is keyed by the ID of the job. Value is tuple(submitting_peer_addr, manifest)
# where manifest is an opaque dict.
self.jobs = CPOrderedReplDict(self._tagged_cb(ChangeType.JOB))

self.locks = CPReplLockManager(selfID=self.addr, autoUnlockTime=60, cb=self._tagged_cb(ChangeType.LOCK))
conf = SyncObjConf(
onReady=self.on_ready,
dynamicMembershipChange=True,
Expand All @@ -60,7 +66,7 @@ def on_ready(self):
# Set ready state on all objects, enabling callbacks now
# that they've been fast-forwarded
self._logger.info("LANPrintQueueBase.on_ready")
self.update_cb()
self.update_cb(ChangeType.QUEUE, False, True)

# ==== Network methods ====

Expand Down Expand Up @@ -96,44 +102,48 @@ def getPeers(self):
result = {}
peerlocks = self.locks.getPeerLocks()
for k, v in self.peers.items():
result[k] = dict(**v[1], acquired=peerlocks.get(k, []))
result[k] = dict(**v[1]) # Exclude peer update timestamp
return result

def setJob(self, hash_, manifest: dict, addr=None):
def getPeer(self, peer):
p = self.peers.get(peer)
if p is not None:
return dict(**p[-1])

def hasJob(self, jid) -> bool:
return (jid in self.jobs)

def setJob(self, jid, manifest: dict, addr=None):
# performed synchronously to prevent race conditions when quickly
# writing, then reading job information
if addr == None:
addr = self.addr
self.jobs.set(hash_, (addr, manifest), sync=True, timeout=5.0)
self.jobs.set(jid, (addr, manifest), sync=True, timeout=5.0)

def getJobs(self):
jobs = []
def getLocks(self):
joblocks = {}
for (peer, locks) in self.locks.getPeerLocks().items():
for lock in locks:
joblocks[lock] = peer
for (hash_, v) in self.jobs.items():
(peer, manifest) = v
job = dict(**manifest, peer_=peer, acquired_by_=joblocks.get(hash_))
# Ensure IDs are up to date
job['id'] = hash_
for i, s in enumerate(job['sets']):
s['id'] = f"{hash_}_{i}"
jobs.append(job)
# Note that jobs are returned unordered; caller can sort it after the fact.
return jobs

def removeJob(self, hash_: str):
self.jobs.pop(hash_, None)

def acquireJob(self, hash_: str):
try:
return self.locks.tryAcquire(hash_, sync=True, timeout=self.acquire_timeout)
except SyncObjException: # timeout
return False

def releaseJob(self, hash_: str):
self.locks.release(hash_)
return joblocks

def getJobs(self):
return self.jobs.ordered_items()

def getJob(self, jid):
return self.jobs.get(jid)

def removeJob(self, jid: str):
return self.jobs.pop(jid, None)

def acquireJob(self, jid: str):
try:
return self.locks.tryAcquire(jid, sync=True, timeout=self.acquire_timeout)
except SyncObjException: # timeout
return False

def releaseJob(self, jid: str):
self.locks.release(jid)


# Wrap LANPrintQueueBase in a discovery class, allowing for dynamic membership based
Expand Down Expand Up @@ -172,11 +182,11 @@ def _on_host_removed(self, host):
self._logger.info(f"Host removed: {host}")
self.q.removePeer(host)

def _init_base(self, results):
def _init_base(self):
self.q = LANPrintQueueBase(self.ns, self.addr, self.update_cb, self._logger)

def _on_startup_complete(self, results):
self._logger.info(f"Discover end: {results}; initializing queue")
self._init_base(results)
self._init_base()
self.q.connect(results.keys())

85 changes: 54 additions & 31 deletions peerprint/lan_queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,46 @@
import logging
from unittest.mock import MagicMock, ANY
from .lan_queue import LANPrintQueue
from .sync_objects_test import TestReplDict

logging.basicConfig(level=logging.DEBUG)

class MockJobList(dict):
def set(self, k, v, **kwargs):
self[k] = v
class MockLockManager():
def __init__(self, selfID=None):
self.id = selfID
self.locks = {}

class TestLANQueueInit(unittest.TestCase):
def tryAcquire(self, lockid, sync=False, timeout=0):
if lockid in self.locks:
return False
self.locks[self.id] = [lockid]
return True

def release(self, lockid):
for peer, locks in self.locks.items():
locks.remove(lockid)

def getPeerLocks(self):
return self.locks

class LANQueueLocalTest():
def setUp(self):
self.addr = "localhost:6789"
self.manifest = {"man": "ifest"}
cb = MagicMock()
self.q = LANPrintQueue("ns", self.addr, cb, logging.getLogger())
self.q._init_base()
self.q.q._syncobj = MagicMock()
self.q.q.peers = TestReplDict(cb)
self.q.q.jobs = TestReplDict(cb)
self.q.q.locks = MockLockManager(self.addr)

def tearDown(self):
self.q.destroy()



class TestLanQueueInitExceptions(unittest.TestCase):
def test_init_privileged_port(self):
with self.assertRaises(ValueError):
LANPrintQueue("ns", "locahost:80", None, logging.getLogger())
Expand All @@ -18,56 +50,47 @@ def test_init_bad_addr(self):
with self.assertRaises(ValueError):
LANPrintQueue("ns", "hi", None, logging.getLogger())

class TestLANQueuePeers(unittest.TestCase):
class TestLanQueuePreStartup(unittest.TestCase):
def setUp(self):
self.q = LANPrintQueue("ns", "localhost:6789", MagicMock(), logging.getLogger())

def tearDown(self):
self.q.destroy()

def test_startup_with_no_peers(self):
self.q._init_base({}) # Don't call on_startup_complete as it actually does networking
self.q._init_base() # Don't call on_startup_complete as it actually does networking
self.assertNotEqual(self.q.q, None)

def test_startup_with_discovered_peers(self):
def test_startup_host_added(self):
self.q._on_host_added("doesnothing") # Verifies no errors due to queue not initialized
self.q._init_base({"peer1": True, "peer2": True})
self.q._init_base()
self.assertNotEqual(self.q.q, None)

class TestLanQueueOperations(LANQueueLocalTest, unittest.TestCase):
def test_peer_added_after_startup(self):
self.q._init_base({})
self.q.q.peers = {}
self.q.q._syncobj = MagicMock()
self.q._on_host_added("peer1")
self.q.q._syncobj.addNodeToCluster.assert_called_with("peer1", callback=ANY)

def test_peer_removed_after_startup(self):
self.q._init_base({"peer1": True})
self.q.q.peers = {}
self.q.q._syncobj = MagicMock()
self.q._on_host_removed("peer1")
self.q.q._syncobj.removeNodeFromCluster.assert_called_with("peer1", callback=ANY)

class TestLanQueueOperations(unittest.TestCase):
def setUp(self):
self.addr = "localhost:6789"
self.manifest = {"man": "ifest"}
self.q = LANPrintQueue("ns", self.addr, MagicMock(), logging.getLogger())
self.q._init_base({})
# Replace pysyncobj objects with non-network equivalents
self.q.q.jobs = MockJobList()
self.q.q.peers = {}
self.q.q.locks = MagicMock()

def tearDown(self):
self.q.destroy()

def test_job_operations(self):
self.q.q.setJob("hash", self.manifest)
self.assertEqual(self.q.q.jobs["hash"], (self.addr, self.manifest))
self.q.q.acquireJob("hash")
self.q.q.locks.tryAcquire.assert_called_with("hash", sync=True, timeout=ANY)
self.assertEqual(self.q.q.locks.getPeerLocks()[self.addr][0], 'hash')
self.q.q.releaseJob("hash")
self.q.q.locks.release.assert_called_with("hash")
self.assertEqual(len(self.q.q.locks.getPeerLocks()[self.addr]), 0)
self.q.q.removeJob("hash")
self.assertEqual(self.q.q.jobs, {})
self.assertEqual(len(self.q.q.jobs), 0)

def testPeerGettersHideTimestamp(self):
self.q.q.syncPeer(dict(a=1), "addr1")
self.assertEqual(self.q.q.getPeers(), {"addr1": {"a": 1}})
self.assertEqual(self.q.q.getPeer("addr1"), {"a": 1})

def testJobGettersIncludePeer(self):
self.q.q.setJob("job1", dict(a=1), addr="1.2.3.4")
self.assertEqual(list(self.q.q.getJobs()), [("job1", ("1.2.3.4", {"a": 1}))])
self.assertEqual(self.q.q.getJob("job1"), ("1.2.3.4", {"a": 1}))
Loading