Skip to content

Commit

Permalink
use priority queue for val job mgr
Browse files Browse the repository at this point in the history
Changing to a priority queue in order to validate jobs sorted by height
after adding a new token type.  Otherwise, the jobs occur in alphabetical
order by txid string key.  This was suboptimal since validation cache from
newer jobs are not useful for older jobs.
  • Loading branch information
jcramer committed Mar 24, 2021
1 parent a6d701f commit 6542a35
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
23 changes: 15 additions & 8 deletions electroncash/slp_dagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import weakref
import collections
import random
from itertools import count
import codecs
import base64
from abc import ABC, abstractmethod
Expand All @@ -36,6 +37,7 @@

INF_DEPTH=2147483646 # 'infinity' value for node depths. 2**31 - 2

unique = count()

class hardref:
# a proper reference that mimics weakref interface
Expand Down Expand Up @@ -176,6 +178,7 @@ def __init__(self, graph, txid, network,
fetch_hook=None,
validitycache=None,
download_limit=None, depth_limit=None,
height=-1,
debug=False, ref=None):
"""
graph should be a TokenGraph instance with the appropriate validator.
Expand Down Expand Up @@ -211,10 +214,12 @@ def __init__(self, graph, txid, network,
self.fetch_hook = fetch_hook
self.validitycache = {} if validitycache is None else validitycache
self.download_limit = download_limit
self.removed = False
if depth_limit is None:
self.depth_limit = INF_DEPTH - 1
else:
self.depth_limit = depth_limit
self.height = height
self.callbacks = []

self.debug = debug
Expand Down Expand Up @@ -593,7 +598,7 @@ def __init__(self, threadname="ValidationJobManager", graph_context=None, exit_w
self.graph_context = graph_context
self.jobs_lock = threading.Lock()
self.job_current = None
self.jobs_pending = [] # list of jobs waiting to run.
self.jobs_pending = queue.PriorityQueue()
self.jobs_finished = weakref.WeakSet() # set of jobs finished normally.
self.jobs_stopped = weakref.WeakSet() # set of jobs stopped by calling .stop(), or that terminated abnormally with an error and/or crash
self.jobs_paused = [] # list of jobs that stopped by calling .pause()
Expand Down Expand Up @@ -622,7 +627,7 @@ def add_job(self, job):
if job in self.all_jobs:
raise ValueError
self.all_jobs.add(job)
self.jobs_pending.append(job)
self.jobs_pending.put((job.height, next(unique), job))
self.wakeup.set()

def _stop_all_common(self, job):
Expand All @@ -635,7 +640,7 @@ def _stop_all_common(self, job):
# Job wasn't running -- try and remove it from the
# pending and paused lists
try:
self.jobs_pending.remove(job)
job.removed = True
return True
except ValueError:
pass
Expand Down Expand Up @@ -679,7 +684,7 @@ def pause_job(self, job):
return False
else:
try:
self.jobs_pending.remove(job)
job.removed = True
except ValueError:
return False
else:
Expand All @@ -692,7 +697,7 @@ def unpause_job(self, job):
Throws ValueError if job is not in paused list. """
with self.jobs_lock:
self.jobs_paused.remove(job)
self.jobs_pending.append(job)
self.jobs_pending.put((job.height, next(unique), job))
self.wakeup.set()

def kill(self, ):
Expand All @@ -718,8 +723,10 @@ def mainloop(self,):
self.wakeup.clear()
has_paused_jobs = bool(len(self.jobs_paused))
try:
self.job_current = self.jobs_pending.pop(0)
except IndexError:
_, _, self.job_current = self.jobs_pending.get(block=False)
if self.job_current.removed:
continue
except (IndexError, queue.Empty):
# prepare to sleep, outside lock
self.job_current = None
if self.job_current is None:
Expand Down Expand Up @@ -750,7 +757,7 @@ def mainloop(self,):
self.job_current.graph.reset()
except KeyError:
pass
self.jobs_pending.append(self.job_current)
self.jobs_pending.put((job.height, next(unique), self.job_current))
elif retval == 'paused':
self.jobs_paused.append(self.job_current)
else:
Expand Down
3 changes: 3 additions & 0 deletions electroncash/slp_validator_0x01.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,15 @@ def done_callback(job):
if val != 0:
wallet_ref().slpv1_validity[t] = val

# get transaction block height
height = wallet.verified_tx.get(txid, (-1,None,None))[0]

job = ValidationJob(graph, txid, network,
fetch_hook=fetch_hook,
validitycache=wallet.slpv1_validity,
download_limit=limit_dls,
depth_limit=limit_depth,
height=height,
debug=debug, ref=wallet,
**kwargs)
job.add_callback(done_callback)
Expand Down
8 changes: 7 additions & 1 deletion electroncash/slp_validator_0x01_nft1.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,16 @@ def done_callback(job):
if val != 0:
wallet.slpv1_validity[t] = val

# get transaction block height
height = wallet.verified_tx.get(txid, (-1,None,None))[0]

if nft_type == 'SLP65':
job = ValidationJobNFT1Child(graph, txid, network,
fetch_hook=fetch_hook,
validitycache=wallet.slpv1_validity,
download_limit=limit_dls,
depth_limit=limit_depth,
height=height,
debug=debug,
was_reset=reset,
ref=wallet,
Expand All @@ -186,6 +190,7 @@ def done_callback(job):
validitycache=wallet.slpv1_validity,
download_limit=limit_dls,
depth_limit=limit_depth,
height=height,
debug=debug,
ref=wallet,
**kwargs)
Expand All @@ -201,13 +206,14 @@ def __init__(self, graph, txids, network,
fetch_hook=None,
validitycache=None,
download_limit=None, depth_limit=None,
height=-1,
debug=False, was_reset=False, ref=None):
self.was_reset = was_reset
self.genesis_tx = None
self.nft_parent_tx = None
self.nft_parent_validity = 0
self.forced_failure_val = None
super().__init__(graph, txids, network, fetch_hook, validitycache, download_limit, depth_limit, debug, ref)
super().__init__(graph, txids, network, fetch_hook, validitycache, download_limit, depth_limit, height, debug, ref)

# App-wide instance. Wallets share the results of the DAG lookups.
# This instance is shared so that we don't redundantly verify tokens for each
Expand Down

0 comments on commit 6542a35

Please sign in to comment.