Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor lbrynet-daemon into modular components #1164

Merged
merged 37 commits into from
Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
75a77ee
add daemon Component and ComponentManager classes
jackrobison Apr 2, 2018
a1cc639
convert directory and SQLiteStorage setup to be a Component
jackrobison Apr 2, 2018
abe2628
support callbacks to component setups
jackrobison Apr 2, 2018
53070a6
Fixed typo in ComponentManager
hackrush01 Apr 20, 2018
635b5d8
convert wallet to be Component
hackrush01 Apr 20, 2018
fac2b2e
Use storage from session.
hackrush01 Apr 22, 2018
ae0145b
Remove create_session internal function and PEP8
hackrush01 Apr 28, 2018
59ce9bd
Starting to convert session to its own component. Removed ref to `sel…
hackrush01 Apr 29, 2018
2e83bf2
Making DHT component(broken)
hackrush01 May 16, 2018
75d8f7b
Refactored classes to reduce redundancy in getting config setting
hackrush01 May 17, 2018
9428990
DHT is now it's own component
hackrush01 May 18, 2018
579214f
Fixed `test_streamify` test
hackrush01 May 18, 2018
49211a7
Fixed regression caused by removing `peer_manager` from session
hackrush01 May 21, 2018
58b9b01
refactor ComponentManager and Component to use instance instead of cl…
jackrobison May 21, 2018
82055b2
Hash announcer, file manager, stream identifier components
hackrush01 May 28, 2018
4edcbc3
Query Handler and server components
hackrush01 May 29, 2018
d5e84bb
Reflector Component
hackrush01 May 30, 2018
69679b5
Fixed test_streamify(well Jack did, but ¯\_(ツ)_/¯)
hackrush01 May 30, 2018
cdba205
All tests now passing
hackrush01 May 31, 2018
c081f25
Pylint fixes
hackrush01 May 31, 2018
b99f7a1
Oops(That's all you're gonna get :-P)
hackrush01 May 31, 2018
5550e94
Making decorators(WIP, commit so that I don't lose work)
hackrush01 Jun 1, 2018
a300b23
Decorator made and decorating of functions done(some other changes)
hackrush01 Jun 4, 2018
c1c7d2f
import fixes and removed temporary test function
hackrush01 Jun 4, 2018
b0a3902
Fixed new broken tests from daemon refactor
hackrush01 Jun 4, 2018
24b48c6
Sanitization of modules
hackrush01 Jun 5, 2018
8093eda
Reworded errors
hackrush01 Jun 6, 2018
2aa07a0
wallet unlock condition checks, fixed breaking changes
hackrush01 Jun 7, 2018
0225ee6
Rebased on amster and other crazy stuff
hackrush01 Jun 8, 2018
d5eae25
Started writing tests
hackrush01 Jun 12, 2018
f5403e2
Tests for component manager
hackrush01 Jun 13, 2018
4e8d3dc
Fix Daemon Tests
hackrush01 Jun 15, 2018
523d074
Fixed passing mutable args in init
hackrush01 Jun 15, 2018
888dc34
Using constants instead of strings. Added CHANGELOG.md
hackrush01 Jun 17, 2018
ab5fd64
Now components can be skipped by setting relevant config in file.
hackrush01 Jun 25, 2018
2609887
P-Y-L-I-N-T #angry_emoji
hackrush01 Jun 25, 2018
0f8cedd
Merge branch 'master' into daemon-components
eukreign Jul 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ at anytime.
### Changed
*
*
* `publish` to accept bid as a decimal string
* all JSONRPC API commands are now registered asynchronously and are available to be called as soon as they are ready

### Added
*
* Component Manager for managing the dependencies, startup and stopping of components
* `requires` decorator to register the components required by a `jsonrpc_` command, to facilitate commands registering asynchronously
* added unittests for Component Manager
*

### Removed
*
*
* `STARTUP_STAGES` from `status` API and CLI call, it instead returns a dictionary of components along with their running status(this is a **potentially breaking change** if `STARTUP_STAGES` is relied upon)
* all component startup code from `Daemon.py`
* wallet, upnp and dht startup code from `session.py`, the code now resides in `Components.py`


## [0.20.3] - 2018-07-03
Expand Down
5 changes: 4 additions & 1 deletion lbrynet/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ def server_port(server_and_port):
def server_list(servers):
return [server_port(server) for server in servers]


def server_list_reverse(servers):
return ["%s:%s" % (server, port) for server, port in servers]


class Env(envparse.Env):
"""An Env parser that automatically namespaces the variables with LBRY"""

Expand Down Expand Up @@ -299,7 +301,8 @@ def _convert_value(value):
'blockchain_name': (str, 'lbrycrd_main'),
'lbryum_servers': (list, [('lbryumx1.lbry.io', 50001), ('lbryumx2.lbry.io',
50001)], server_list, server_list_reverse),
's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind
's3_headers_depth': (int, 96 * 10), # download headers from s3 when the local height is more than 10 chunks behind
'components_to_skip': (list, []) # components which will be skipped during start-up of daemon
}


Expand Down
10 changes: 10 additions & 0 deletions lbrynet/core/Error.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,23 @@ class InvalidAuthenticationToken(Exception):
class NegotiationError(Exception):
pass


class InvalidCurrencyError(Exception):
def __init__(self, currency):
self.currency = currency
Exception.__init__(
self, 'Invalid currency: {} is not a supported currency.'.format(currency))


class NoSuchDirectoryError(Exception):
def __init__(self, directory):
self.directory = directory
Exception.__init__(self, 'No such directory {}'.format(directory))


class ComponentStartConditionNotMet(Exception):
pass


class ComponentsNotStarted(Exception):
pass
140 changes: 15 additions & 125 deletions lbrynet/core/Session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import logging
import miniupnpc
from twisted.internet import threads, defer
from twisted.internet import defer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node, hashannouncer
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager

log = logging.getLogger(__name__)
Expand All @@ -32,11 +29,11 @@ class Session(object):
peers can connect to this peer.
"""

def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None,
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None,
storage=None):
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, blob_tracker_class=None,
payment_rate_manager_class=None, is_generous=True, external_ip=None, storage=None,
dht_node=None, peer_manager=None):
"""@param blob_data_payment_rate: The default payment rate for blob data

@param db_dir: The directory in which levelDB files should be stored
Expand Down Expand Up @@ -111,8 +108,7 @@ def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manag
self.external_ip = external_ip
self.upnp_redirects = []
self.wallet = wallet
self.dht_node_class = dht_node_class
self.dht_node = None
self.dht_node = dht_node
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = OnlyFreePaymentsManager()
# self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
Expand All @@ -124,113 +120,31 @@ def setup(self):

log.debug("Starting session.")

if self.node_id is None:
self.node_id = generate_id()
if self.dht_node is not None:
if self.peer_manager is None:
self.peer_manager = self.dht_node.peer_manager

if self.peer_finder is None:
self.peer_finder = self.dht_node.peer_finder

if self.use_upnp is True:
d = self._try_upnp()
else:
d = defer.succeed(True)
d.addCallback(lambda _: self.storage.setup())
d.addCallback(lambda _: self._setup_dht())
d = self.storage.setup()
d.addCallback(lambda _: self._setup_other_components())
return d

def shut_down(self):
"""Stop all services"""
log.info('Stopping session.')
ds = []
if self.hash_announcer:
self.hash_announcer.stop()
# if self.blob_tracker is not None:
# ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None:
ds.append(defer.maybeDeferred(self.rate_limiter.stop))
if self.wallet is not None:
ds.append(defer.maybeDeferred(self.wallet.stop))
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_manager.stop))
if self.use_upnp is True:
ds.append(defer.maybeDeferred(self._unset_upnp))
# if self.use_upnp is True:
# ds.append(defer.maybeDeferred(self._unset_upnp))
return defer.DeferredList(ds)

def _try_upnp(self):

log.debug("In _try_upnp")

def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping[1]
return get_free_port(upnp, port + 1, protocol)

def get_port_mapping(upnp, port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port

def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False

def upnp_failed(err):
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False

d = threads.deferToThread(threaded_try_upnp)
d.addErrback(upnp_failed)
return d

def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary
self.dht_node = self.dht_node_class(
node_id=self.node_id,
udpPort=self.dht_node_port,
externalIP=self.external_ip,
peerPort=self.peer_port,
peer_manager=self.peer_manager,
peer_finder=self.peer_finder,
)
if not self.hash_announcer:
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage)
self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
d = self.dht_node.start(self.known_dht_nodes)
d.addCallback(lambda _: log.info("Joined the dht"))
d.addCallback(lambda _: self.hash_announcer.start())

def _setup_other_components(self):
log.debug("Setting up the rest of the components")

Expand All @@ -255,28 +169,4 @@ def _setup_other_components(self):

self.rate_limiter.start()
d = self.blob_manager.setup()
d.addCallback(lambda _: self.wallet.start())
# d.addCallback(lambda _: self.blob_tracker.start())
return d

def _unset_upnp(self):
log.info("Unsetting upnp for session")

def threaded_unset_upnp():
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
self.upnp_redirects = []

d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d
19 changes: 10 additions & 9 deletions lbrynet/core/Wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,9 +938,7 @@ def __init__(self, storage, config=None):
self._lag_counter = 0
self.blocks_behind = 0
self.catchup_progress = 0

# fired when the wallet actually unlocks (wallet_unlocked_d can be called multiple times)
self.wallet_unlock_success = defer.Deferred()
self.is_wallet_unlocked = None

def _is_first_run(self):
return (not self.printed_retrieving_headers and
Expand All @@ -953,21 +951,23 @@ def get_cmd_runner(self):
return self._cmd_runner

def check_locked(self):
if not self.wallet.use_encryption:
log.info("Wallet is not encrypted")
self.wallet_unlock_success.callback(True)
elif not self._cmd_runner:
"""
Checks if the wallet is encrypted(locked) or not

:return: (boolean) indicating whether the wallet is locked or not
"""
if not self._cmd_runner:
raise Exception("Command runner hasn't been initialized yet")
elif self._cmd_runner.locked:
log.info("Waiting for wallet password")
self.wallet_unlocked_d.addCallback(self.unlock)
return self.wallet_unlock_success
return self.is_wallet_unlocked

def unlock(self, password):
if self._cmd_runner and self._cmd_runner.locked:
try:
self._cmd_runner.unlock_wallet(password)
self.wallet_unlock_success.callback(True)
self.is_wallet_unlocked = True
log.info("Unlocked the wallet!")
except InvalidPassword:
log.warning("Incorrect password, try again")
Expand Down Expand Up @@ -1054,6 +1054,7 @@ def _load_wallet(self):
wallet.create_main_account()
wallet.synchronize()
self.wallet = wallet
self.is_wallet_unlocked = not self.wallet.use_encryption
self._check_large_wallet()
return defer.succeed(True)

Expand Down
63 changes: 63 additions & 0 deletions lbrynet/daemon/Component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
from twisted.internet import defer
from ComponentManager import ComponentManager

log = logging.getLogger(__name__)


class ComponentType(type):
def __new__(mcs, name, bases, newattrs):
klass = type.__new__(mcs, name, bases, newattrs)
if name != "Component":
ComponentManager.default_component_classes[klass.component_name] = klass
return klass


class Component(object):
"""
lbrynet-daemon component helper

Inheriting classes will be automatically registered with the ComponentManager and must implement setup and stop
methods
"""

__metaclass__ = ComponentType
depends_on = []
component_name = None

def __init__(self, component_manager):
self.component_manager = component_manager
self._running = False

@property
def running(self):
return self._running

def start(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()

def component(self):
raise NotImplementedError()

@defer.inlineCallbacks
def _setup(self):
try:
result = yield defer.maybeDeferred(self.start)
self._running = True
defer.returnValue(result)
except Exception as err:
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
raise err

@defer.inlineCallbacks
def _stop(self):
try:
result = yield defer.maybeDeferred(self.stop)
self._running = False
defer.returnValue(result)
except Exception as err:
log.exception("Error stopping %s", self.__class__.__name__)
raise err
Loading