diff --git a/electroncash/slp_dagging.py b/electroncash/slp_dagging.py index 08e140c46018..9dae75b6cd25 100644 --- a/electroncash/slp_dagging.py +++ b/electroncash/slp_dagging.py @@ -26,6 +26,7 @@ import weakref import collections import random +from itertools import count import codecs import base64 from abc import ABC, abstractmethod @@ -36,6 +37,7 @@ INF_DEPTH=2147483646 # 'infinity' value for node depths. 2**31 - 2 +unique = count() class hardref: # a proper reference that mimics weakref interface @@ -176,6 +178,7 @@ def __init__(self, graph, txid, network, fetch_hook=None, validitycache=None, download_limit=None, depth_limit=None, + height=-1, debug=False, ref=None): """ graph should be a TokenGraph instance with the appropriate validator. @@ -211,10 +214,12 @@ def __init__(self, graph, txid, network, self.fetch_hook = fetch_hook self.validitycache = {} if validitycache is None else validitycache self.download_limit = download_limit + self.removed = False if depth_limit is None: self.depth_limit = INF_DEPTH - 1 else: self.depth_limit = depth_limit + self.height = height self.callbacks = [] self.debug = debug @@ -593,7 +598,7 @@ def __init__(self, threadname="ValidationJobManager", graph_context=None, exit_w self.graph_context = graph_context self.jobs_lock = threading.Lock() self.job_current = None - self.jobs_pending = [] # list of jobs waiting to run. + self.jobs_pending = queue.PriorityQueue() self.jobs_finished = weakref.WeakSet() # set of jobs finished normally. self.jobs_stopped = weakref.WeakSet() # set of jobs stopped by calling .stop(), or that terminated abnormally with an error and/or crash self.jobs_paused = [] # list of jobs that stopped by calling .pause() @@ -622,7 +627,7 @@ def add_job(self, job): if job in self.all_jobs: raise ValueError self.all_jobs.add(job) - self.jobs_pending.append(job) + self.jobs_pending.put((job.height, next(unique), job)) self.wakeup.set() def _stop_all_common(self, job): @@ -635,7 +640,7 @@ def _stop_all_common(self, job): # Job wasn't running -- try and remove it from the # pending and paused lists try: - self.jobs_pending.remove(job) + job.removed = True return True except ValueError: pass @@ -679,7 +684,7 @@ def pause_job(self, job): return False else: try: - self.jobs_pending.remove(job) + job.removed = True except ValueError: return False else: @@ -692,7 +697,7 @@ def unpause_job(self, job): Throws ValueError if job is not in paused list. """ with self.jobs_lock: self.jobs_paused.remove(job) - self.jobs_pending.append(job) + self.jobs_pending.put((job.height, next(unique), job)) self.wakeup.set() def kill(self, ): @@ -718,8 +723,10 @@ def mainloop(self,): self.wakeup.clear() has_paused_jobs = bool(len(self.jobs_paused)) try: - self.job_current = self.jobs_pending.pop(0) - except IndexError: + _, _, self.job_current = self.jobs_pending.get(block=False) + if self.job_current.removed: + continue + except (IndexError, queue.Empty): # prepare to sleep, outside lock self.job_current = None if self.job_current is None: @@ -750,7 +757,7 @@ def mainloop(self,): self.job_current.graph.reset() except KeyError: pass - self.jobs_pending.append(self.job_current) + self.jobs_pending.put((job.height, next(unique), self.job_current)) elif retval == 'paused': self.jobs_paused.append(self.job_current) else: diff --git a/electroncash/slp_validator_0x01.py b/electroncash/slp_validator_0x01.py index 19979a8c4669..65076d6af005 100644 --- a/electroncash/slp_validator_0x01.py +++ b/electroncash/slp_validator_0x01.py @@ -232,12 +232,15 @@ def done_callback(job): if val != 0: wallet_ref().slpv1_validity[t] = val + # get transaction block height + height = wallet.verified_tx.get(txid, (-1,None,None))[0] job = ValidationJob(graph, txid, network, fetch_hook=fetch_hook, validitycache=wallet.slpv1_validity, download_limit=limit_dls, depth_limit=limit_depth, + height=height, debug=debug, ref=wallet, **kwargs) job.add_callback(done_callback) diff --git a/electroncash/slp_validator_0x01_nft1.py b/electroncash/slp_validator_0x01_nft1.py index 370ec33564d9..b48116236ab5 100644 --- a/electroncash/slp_validator_0x01_nft1.py +++ b/electroncash/slp_validator_0x01_nft1.py @@ -170,12 +170,16 @@ def done_callback(job): if val != 0: wallet.slpv1_validity[t] = val + # get transaction block height + height = wallet.verified_tx.get(txid, (-1,None,None))[0] + if nft_type == 'SLP65': job = ValidationJobNFT1Child(graph, txid, network, fetch_hook=fetch_hook, validitycache=wallet.slpv1_validity, download_limit=limit_dls, depth_limit=limit_depth, + height=height, debug=debug, was_reset=reset, ref=wallet, @@ -186,6 +190,7 @@ def done_callback(job): validitycache=wallet.slpv1_validity, download_limit=limit_dls, depth_limit=limit_depth, + height=height, debug=debug, ref=wallet, **kwargs) @@ -201,13 +206,14 @@ def __init__(self, graph, txids, network, fetch_hook=None, validitycache=None, download_limit=None, depth_limit=None, + height=-1, debug=False, was_reset=False, ref=None): self.was_reset = was_reset self.genesis_tx = None self.nft_parent_tx = None self.nft_parent_validity = 0 self.forced_failure_val = None - super().__init__(graph, txids, network, fetch_hook, validitycache, download_limit, depth_limit, debug, ref) + super().__init__(graph, txids, network, fetch_hook, validitycache, download_limit, depth_limit, height, debug, ref) # App-wide instance. Wallets share the results of the DAG lookups. # This instance is shared so that we don't redundantly verify tokens for each