-
Notifications
You must be signed in to change notification settings - Fork 374
Expand file tree
/
Copy pathtasks.py
More file actions
289 lines (246 loc) · 10.9 KB
/
tasks.py
File metadata and controls
289 lines (246 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import re
from typing import TYPE_CHECKING, Union
import click
import gevent
import requests
import structlog
from eth_utils import to_hex
from gevent.event import AsyncResult
from packaging.version import LegacyVersion, Version, parse as parse_version
from web3 import Web3
from web3.types import BlockData
from raiden.api.objects import Notification
from raiden.constants import (
BLOCK_ID_LATEST,
CHECK_CHAIN_ID_INTERVAL,
CHECK_GAS_RESERVE_INTERVAL,
CHECK_RDN_MIN_DEPOSIT_INTERVAL,
CHECK_VERSION_INTERVAL,
LATEST,
RELEASE_PAGE,
SECURITY_EXPRESSION,
NotificationIDs,
)
from raiden.network.proxies.proxy_manager import ProxyManager
from raiden.network.proxies.user_deposit import UserDeposit
from raiden.settings import MIN_REI_THRESHOLD
from raiden.utils import gas_reserve
from raiden.utils.formatting import to_checksum_address
from raiden.utils.runnable import Runnable
from raiden.utils.transfers import to_rdn
from raiden.utils.typing import Any, BlockNumber, Callable, ChainID, List, Optional
if TYPE_CHECKING:
from raiden.raiden_service import RaidenService
REMOVE_CALLBACK = object()
log = structlog.get_logger(__name__)
def _do_check_version(
current_version: Union[Version, LegacyVersion], raiden: "RaidenService"
) -> bool:
content = requests.get(LATEST).json()
if "tag_name" not in content:
# probably API rate limit exceeded
click.secho(
"Error while contacting github for latest version. API rate limit exceeded?", fg="red"
)
return False
# getting the latest release version
latest_release = parse_version(content["tag_name"])
security_message = re.search(SECURITY_EXPRESSION, content["body"])
if security_message:
notification = Notification(
id=NotificationIDs.VERSION_SECURITY_WARNING.value,
summary="Security Warning",
body=security_message.group(0),
urgency="high",
)
raiden.add_notification(notification, click_opts={"fg": "red"})
# comparing it to the user's application
if current_version < latest_release:
msg = (
f"You're running version {current_version}. The latest version is {latest_release}"
f"It's time to update! Releases: {RELEASE_PAGE}"
)
notification = Notification(
id=NotificationIDs.VERSION_OUTDATED.value,
summary="Your version is outdated",
body=msg,
urgency="normal",
)
raiden.add_notification(notification, click_opts={"fg": "red"})
return False
return True
def check_version(current_version: str, raiden: "RaidenService") -> None: # pragma: no unittest
"""Check periodically for a new release"""
app_version = parse_version(current_version)
while True:
try:
_do_check_version(app_version, raiden)
except (requests.exceptions.HTTPError, ValueError) as err:
click.secho("Error while checking for version", fg="red")
print(err)
# repeat the process once every 3h
gevent.sleep(CHECK_VERSION_INTERVAL)
def check_gas_reserve(raiden: "RaidenService") -> None: # pragma: no unittest
"""Check periodically for gas reserve in the account"""
while True:
has_enough_balance, estimated_required_balance = gas_reserve.has_enough_gas_reserve(
raiden, channels_to_open=1
)
estimated_required_balance_eth = Web3.fromWei(estimated_required_balance, "ether")
if not has_enough_balance:
notification_body = (
"WARNING\n"
"Your account's balance is below the estimated gas reserve of "
f"{estimated_required_balance_eth} eth. This may lead to a loss of "
"of funds because your account will be unable to perform on-chain "
"transactions. Please add funds to your account as soon as possible."
)
notification = Notification(
id=NotificationIDs.MISSING_GAS_RESERVE.value,
summary="Missing gas reserve",
body=notification_body,
urgency="normal",
)
raiden.add_notification(
notification,
log_opts={"required_wei": estimated_required_balance},
click_opts={"fg": "red"},
)
gevent.sleep(CHECK_GAS_RESERVE_INTERVAL)
def check_rdn_deposits(
raiden: "RaidenService", user_deposit_proxy: UserDeposit
) -> None: # pragma: no unittest
"""Check periodically for RDN deposits in the user-deposits contract"""
while True:
rei_balance = user_deposit_proxy.effective_balance(raiden.address, BLOCK_ID_LATEST)
rdn_balance = to_rdn(rei_balance)
if rei_balance < MIN_REI_THRESHOLD:
notification_body = (
f"WARNING\n"
f"Your account's RDN balance deposited in the UserDepositContract of "
f"{rdn_balance} is below the minimum threshold {to_rdn(MIN_REI_THRESHOLD)}. "
f"Provided that you have either a monitoring service or a path "
f"finding service activated, your node is not going to be able to "
f"pay those services which may lead to denial of service or loss of funds."
)
notification = Notification(
id=NotificationIDs.LOW_RDN.value,
summary="RDN balance too low",
body=notification_body,
urgency="normal",
)
raiden.add_notification(notification, click_opts={"fg": "red"})
gevent.sleep(CHECK_RDN_MIN_DEPOSIT_INTERVAL)
def check_chain_id(chain_id: ChainID, web3: Web3) -> None: # pragma: no unittest
"""Check periodically if the underlying ethereum client's network id has changed"""
while True:
try:
current_id = web3.eth.chain_id
except requests.exceptions.ConnectionError:
raise RuntimeError(
"Could not reach ethereum RPC. "
"Please check that your ethereum node is running and accessible."
)
if chain_id != current_id:
raise RuntimeError(
f"Raiden was running on network with id {chain_id} and it detected "
f"that the underlying ethereum client network id changed to {current_id}."
f" Changing the underlying blockchain while the Raiden node is running "
f"is not supported."
)
gevent.sleep(CHECK_CHAIN_ID_INTERVAL)
class AlarmTask(Runnable):
"""Task to notify when a block is mined."""
def __init__(self, proxy_manager: ProxyManager, sleep_time: float) -> None:
super().__init__()
self.callbacks: List[Callable] = []
self.proxy_manager = proxy_manager
self.rpc_client = proxy_manager.client
self.known_block_number: Optional[BlockNumber] = None
self._stop_event: Optional[AsyncResult] = None
# TODO: Start with a larger sleep_time and decrease it as the
# probability of a new block increases.
self.sleep_time = sleep_time
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} node:" f"{to_checksum_address(self.rpc_client.address)}>"
)
def start(self) -> None:
log.debug("Alarm task started", node=to_checksum_address(self.rpc_client.address))
self._stop_event = AsyncResult()
super().start()
def _run(self, *args: Any, **kwargs: Any) -> None: # pylint: disable=method-hidden
self.greenlet.name = f"AlarmTask._run node:{to_checksum_address(self.rpc_client.address)}"
try:
self.loop_until_stop()
finally:
self.callbacks = []
def register_callback(self, callback: Callable) -> None:
"""Register a new callback.
Note:
The callback will be executed in the AlarmTask context and for
this reason it should not block, otherwise we can miss block
changes.
"""
if not callable(callback):
raise ValueError("callback is not a callable")
self.callbacks.append(callback)
def remove_callback(self, callback: Callable) -> None:
"""Remove callback from the list of callbacks if it exists"""
if callback in self.callbacks:
self.callbacks.remove(callback)
def loop_until_stop(self) -> None:
sleep_time = self.sleep_time
while self._stop_event and self._stop_event.wait(sleep_time) is not True:
latest_block = self.rpc_client.get_block(block_identifier=BLOCK_ID_LATEST)
self._maybe_run_callbacks(latest_block)
def _maybe_run_callbacks(self, latest_block: BlockData) -> None:
"""Run the callbacks if there is at least one new block.
The callbacks are executed only if there is a new block, otherwise the
filters may try to poll for an inexisting block number and the Ethereum
client can return an JSON-RPC error.
"""
latest_block_number = latest_block["number"]
# First run, set the block and run the callbacks
if self.known_block_number is None:
self.known_block_number = latest_block_number
missed_blocks = 1
else:
missed_blocks = latest_block_number - self.known_block_number
if missed_blocks < 0:
log.critical(
"Block number decreased",
chain_id=self.rpc_client.chain_id,
known_block_number=self.known_block_number,
old_block_number=latest_block["number"],
old_gas_limit=latest_block["gasLimit"],
old_block_hash=to_hex(latest_block["hash"]),
node=to_checksum_address(self.rpc_client.address),
)
elif missed_blocks > 0:
log_details = dict(
known_block_number=self.known_block_number,
latest_block_number=latest_block_number,
latest_block_hash=to_hex(latest_block["hash"]),
latest_block_gas_limit=latest_block["gasLimit"],
node=to_checksum_address(self.rpc_client.address),
)
if missed_blocks > 1:
log_details["num_missed_blocks"] = missed_blocks - 1
log.debug("Received new block", **log_details)
remove = []
for callback in self.callbacks:
result = callback(latest_block)
if result is REMOVE_CALLBACK:
remove.append(callback)
for callback in remove:
self.callbacks.remove(callback)
self.known_block_number = latest_block_number
def stop(self) -> Any:
if self._stop_event:
self._stop_event.set(True)
log.debug("Alarm task stopped", node=to_checksum_address(self.rpc_client.address))
result = self.greenlet.join()
# Callbacks should be cleaned after join
self.callbacks = []
return result