Skip to content
This repository has been archived by the owner on Sep 26, 2022. It is now read-only.

Commit

Permalink
Added a constant for the ChainMonitor's "END" message; fixed a few mo…
Browse files Browse the repository at this point in the history
…re typos in docs
  • Loading branch information
bigspider committed Sep 15, 2020
1 parent 4784ce4 commit aeddc7a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
17 changes: 10 additions & 7 deletions teos/chain_monitor.py
Expand Up @@ -33,7 +33,8 @@ class ChainMonitor:
detected.
Finally, once the ``terminate`` method is called, the ``status`` is changed to ``ChainMonitorStatus.TERMINATED``,
the chain monitor stops monitoring the chain and no receiving queue will be notified about new blocks (including
any block that is currently in the internal queue). A final ``"END"`` message is sent to all the subscribers.
any block that is currently in the internal queue). A final ``ChainMonitor.END_MESSAGE`` is sent to all the
subscribers.
Args:
receiving_queues (:obj:`list`): a list of :obj:`Queue` objects that will be notified when the chain_monitor is
Expand All @@ -54,6 +55,8 @@ class ChainMonitor:
``ChainMonitorStatus.LISTENING``, ``ChainMonitorStatus.ACTIVE`` or ``ChainMonitorStatus.TERMINATED``.
"""

END_MESSAGE = "END"

def __init__(self, receiving_queues, block_processor, bitcoind_feed_params):
self.logger = get_logger(component=ChainMonitor.__name__)
self.last_tips = []
Expand Down Expand Up @@ -152,7 +155,7 @@ def notify_subscribers(self):

while self.status != ChainMonitorStatus.TERMINATED:
message = self.queue.get()
# A special "END" message is added to the queue after the status is set to TERMINATED
# A special ChainMonitor.END_MESSAGE is added to the queue after the status is set to TERMINATED
# In all the other cases, message is a block_hash
with self.lock:
for rec_queue in self.receiving_queues:
Expand All @@ -165,7 +168,7 @@ def monitor_chain(self):
and creates two threads, one per each monitoring approach (``zmq`` and ``polling``).
Raises:
:obj:`RuntimeError`: if the ``status`` was not ``ChainMonitor.IDLE`` when the method was called.
:obj:`RuntimeError`: if the ``status`` was not ``ChainMonitorStatus.IDLE`` when the method was called.
"""

if self.status != ChainMonitorStatus.IDLE:
Expand All @@ -184,7 +187,7 @@ def activate(self):
is added to the internal queue.
Raises:
:obj:`RuntimeError`: if the ``status`` was not ``ChainMonitor.LISTENING`` when the method was called.
:obj:`RuntimeError`: if the ``status`` was not ``ChainMonitorStatus.LISTENING`` when the method was called.
"""

if self.status != ChainMonitorStatus.LISTENING:
Expand All @@ -196,9 +199,9 @@ def activate(self):

def terminate(self):
"""
Changes the ``status`` of the :obj:`ChainMonitor` to terminated and sends the "END" message to the internal
queue. All the threads will stop as soon as possible.
Changes the ``status`` of the :obj:`ChainMonitor` to terminated and sends the ``ChainMonitor.END_MESSAGE``
message to the internal queue. All the threads will stop as soon as possible.
"""

self.status = ChainMonitorStatus.TERMINATED
self.queue.put("END")
self.queue.put(ChainMonitor.END_MESSAGE)
8 changes: 4 additions & 4 deletions teos/responder.py
Expand Up @@ -2,7 +2,7 @@
from threading import Thread

from teos.cleaner import Cleaner

from teos.chain_monitor import ChainMonitor
from teos.logger import get_logger
from common.constants import IRREVOCABLY_RESOLVED

Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(self, db_manager, gatekeeper, carrier, block_processor):
def awake(self):
"""
Starts a new thread to monitor the blockchain to make sure triggered appointments get enough depth.
The thread will run until the :obj:`ChainMonitor` adds the ``"END"`` message to the queue.
The thread will run until the :obj:`ChainMonitor` adds the ``ChainMonitor.END_MESSAGE`` to the queue.
Returns:
:obj:`Thread <multithreading.Thread>`: The thread object that was just created and is already running.
Expand Down Expand Up @@ -273,8 +273,8 @@ def do_watch(self):
while True:
block_hash = self.block_queue.get()

# When the ChainMonitor is stopped, a final "END" message is sent
if block_hash == "END":
# When the ChainMonitor is stopped, a final ChainMonitor.END_MESSAGE is sent
if block_hash == ChainMonitor.END_MESSAGE:
break

block = self.block_processor.get_block(block_hash)
Expand Down
7 changes: 4 additions & 3 deletions teos/watcher.py
Expand Up @@ -11,6 +11,7 @@
from common.cryptographer import Cryptographer, hash_160

from teos.cleaner import Cleaner
from teos.chain_monitor import ChainMonitor
from teos.extended_appointment import ExtendedAppointment
from teos.block_processor import InvalidTransactionFormat

Expand Down Expand Up @@ -250,7 +251,7 @@ def n_responder_trackers(self):
def awake(self):
"""
Starts a new thread to monitor the blockchain for channel breaches. The thread will run until the
:obj:`ChainMonitor` adds the ``"END"`` message to the queue.
:obj:`ChainMonitor` adds ``ChainMonitor.END_MESSAGE`` to the queue.
Returns:
:obj:`Thread <multithreading.Thread>`: The thread object that was just created and is already running.
Expand Down Expand Up @@ -451,8 +452,8 @@ def do_watch(self):
while True:
block_hash = self.block_queue.get()

# When the ChainMonitor is stopped, a final "END" message is sent
if block_hash == "END":
# When the ChainMonitor is stopped, a final ChainMonitor.END_MESSAGE message is sent
if block_hash == ChainMonitor.END_MESSAGE:
break

block = self.block_processor.get_block(block_hash)
Expand Down
5 changes: 3 additions & 2 deletions test/teos/unit/test_chain_monitor.py
Expand Up @@ -288,9 +288,10 @@ def test_terminate(block_processor):
generate_blocks(1)
time.sleep(0.11) # wait longer than the polling_delta

# there should be only the "END" message in the receiving queue, as the new block was generated after terminating
# there should be only the ChainMonitor.END_MESSAGE message in the receiving queue, as the new block was generated
# after terminating
assert queue.qsize() == 1
assert queue.get() == "END"
assert queue.get() == ChainMonitor.END_MESSAGE


@pytest.mark.timeout(5)
Expand Down

0 comments on commit aeddc7a

Please sign in to comment.