[zk: localhost:2181(CONNECTED) 39] ls /videoseg/taskqueue/entries
[entry-201-0000059505]
[zk: localhost:2181(CONNECTED) 40] ls /videoseg/taskqueue/taken
[entry-201-0000059505, entry-401-0000059506]
this is my LockingQueue after runnig for a while,LockingQueue not clean entry in taken;
def _take(self, id_):
try:
self.client.create(
"{path}/{id}".format(
path=self._lock_path,
id=id_),
self.id,
ephemeral=True)
value, stat = self.client.retry(
self.client.get,
"{path}/{id}".format(path=self._entries_path, id=id_))
except (NoNodeError, NodeExistsError):
# Item is already consumed or locked
return None
return (id_, value)
this code may problem produce this bug. the ephemeral node should be removed if
value, stat = self.client.retry(
self.client.get,
"{path}/{id}".format(path=self._entries_path, id=id_))
failed.
for example two client A,B.
step1:A check and find a item
step2: A try to lock the item by create a ephemeral Node,A check ensure get the lock
if B lock and consume the same item between step1 and step2,then the ephemeral node created by A will not be removed
[zk: localhost:2181(CONNECTED) 39] ls /videoseg/taskqueue/entries
[entry-201-0000059505]
[zk: localhost:2181(CONNECTED) 40] ls /videoseg/taskqueue/taken
[entry-201-0000059505, entry-401-0000059506]
this is my LockingQueue after runnig for a while,LockingQueue not clean entry in taken;
this code may problem produce this bug. the ephemeral node should be removed if
failed.
for example two client A,B.
step1:A check and find a item
step2: A try to lock the item by create a ephemeral Node,A check ensure get the lock
if B lock and consume the same item between step1 and step2,then the ephemeral node created by A will not be removed