Skip to content

Commit

Permalink
refactor lbrynet-daemon into modular components (#1164)
Browse files Browse the repository at this point in the history
* add daemon Component and ComponentManager classes

* convert directory and SQLiteStorage setup to be a Component

* support callbacks to component setups

* Fixed typo in ComponentManager

* convert wallet to be Component

* Use storage from session.

* Remove create_session internal function and PEP8

* Starting to convert session to its own component. Removed ref to `self.storage` from Daemon.py

* Making DHT component(broken)

* Refactored classes to reduce redundancy in getting config setting

* DHT is now it's own component

* Fixed `test_streamify` test

* Fixed regression caused by removing `peer_manager` from session

* refactor ComponentManager and Component to use instance instead of class methods

* Hash announcer, file manager, stream identifier components

* Query Handler and server components

* Reflector Component

* Fixed test_streamify(well Jack did, but ¯\_(ツ)_/¯)

* All tests now passing

* Pylint fixes

* Oops(That's all you're gonna get :-P)

* Making decorators(WIP, commit so that I don't lose work)

* Decorator made and decorating of functions done(some other changes)

* import fixes and removed temporary test function

* Fixed new broken tests from daemon refactor

* Sanitization of modules

* Reworded errors

* wallet unlock condition checks, fixed breaking changes

* Rebased on amster and other crazy stuff

* Started writing tests

* Tests for component manager

* Fix Daemon Tests

* Fixed passing mutable args in init

* Using constants instead of strings. Added CHANGELOG.md

* Now components can be skipped by setting relevant config in file.

* P-Y-L-I-N-T #angry_emoji
  • Loading branch information
jackrobison authored and eukreign committed Jul 5, 2018
1 parent 148cc96 commit 75a6ff2
Show file tree
Hide file tree
Showing 19 changed files with 1,326 additions and 662 deletions.
11 changes: 8 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ at anytime.
### Changed
*
*
* `publish` to accept bid as a decimal string
* all JSONRPC API commands are now registered asynchronously and are available to be called as soon as they are ready

### Added
*
* Component Manager for managing the dependencies, startup and stopping of components
* `requires` decorator to register the components required by a `jsonrpc_` command, to facilitate commands registering asynchronously
* added unittests for Component Manager
*

### Removed
*
*
* `STARTUP_STAGES` from `status` API and CLI call, it instead returns a dictionary of components along with their running status(this is a **potentially breaking change** if `STARTUP_STAGES` is relied upon)
* all component startup code from `Daemon.py`
* wallet, upnp and dht startup code from `session.py`, the code now resides in `Components.py`


## [0.20.3] - 2018-07-03
Expand Down
5 changes: 4 additions & 1 deletion lbrynet/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ def server_port(server_and_port):
def server_list(servers):
return [server_port(server) for server in servers]


def server_list_reverse(servers):
return ["%s:%s" % (server, port) for server, port in servers]


class Env(envparse.Env):
"""An Env parser that automatically namespaces the variables with LBRY"""

Expand Down Expand Up @@ -299,7 +301,8 @@ def _convert_value(value):
'blockchain_name': (str, 'lbrycrd_main'),
'lbryum_servers': (list, [('lbryumx1.lbry.io', 50001), ('lbryumx2.lbry.io',
50001)], server_list, server_list_reverse),
's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind
's3_headers_depth': (int, 96 * 10), # download headers from s3 when the local height is more than 10 chunks behind
'components_to_skip': (list, []) # components which will be skipped during start-up of daemon
}


Expand Down
10 changes: 10 additions & 0 deletions lbrynet/core/Error.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,23 @@ class InvalidAuthenticationToken(Exception):
class NegotiationError(Exception):
pass


class InvalidCurrencyError(Exception):
def __init__(self, currency):
self.currency = currency
Exception.__init__(
self, 'Invalid currency: {} is not a supported currency.'.format(currency))


class NoSuchDirectoryError(Exception):
def __init__(self, directory):
self.directory = directory
Exception.__init__(self, 'No such directory {}'.format(directory))


class ComponentStartConditionNotMet(Exception):
pass


class ComponentsNotStarted(Exception):
pass
140 changes: 15 additions & 125 deletions lbrynet/core/Session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import logging
import miniupnpc
from twisted.internet import threads, defer
from twisted.internet import defer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node, hashannouncer
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager

log = logging.getLogger(__name__)
Expand All @@ -32,11 +29,11 @@ class Session(object):
peers can connect to this peer.
"""

def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None,
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None,
storage=None):
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, blob_tracker_class=None,
payment_rate_manager_class=None, is_generous=True, external_ip=None, storage=None,
dht_node=None, peer_manager=None):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
Expand Down Expand Up @@ -111,8 +108,7 @@ def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manag
self.external_ip = external_ip
self.upnp_redirects = []
self.wallet = wallet
self.dht_node_class = dht_node_class
self.dht_node = None
self.dht_node = dht_node
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = OnlyFreePaymentsManager()
# self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
Expand All @@ -124,113 +120,31 @@ def setup(self):

log.debug("Starting session.")

if self.node_id is None:
self.node_id = generate_id()
if self.dht_node is not None:
if self.peer_manager is None:
self.peer_manager = self.dht_node.peer_manager

if self.peer_finder is None:
self.peer_finder = self.dht_node.peer_finder

if self.use_upnp is True:
d = self._try_upnp()
else:
d = defer.succeed(True)
d.addCallback(lambda _: self.storage.setup())
d.addCallback(lambda _: self._setup_dht())
d = self.storage.setup()
d.addCallback(lambda _: self._setup_other_components())
return d

def shut_down(self):
"""Stop all services"""
log.info('Stopping session.')
ds = []
if self.hash_announcer:
self.hash_announcer.stop()
# if self.blob_tracker is not None:
# ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None:
ds.append(defer.maybeDeferred(self.rate_limiter.stop))
if self.wallet is not None:
ds.append(defer.maybeDeferred(self.wallet.stop))
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_manager.stop))
if self.use_upnp is True:
ds.append(defer.maybeDeferred(self._unset_upnp))
# if self.use_upnp is True:
# ds.append(defer.maybeDeferred(self._unset_upnp))
return defer.DeferredList(ds)

def _try_upnp(self):

log.debug("In _try_upnp")

def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping[1]
return get_free_port(upnp, port + 1, protocol)

def get_port_mapping(upnp, port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port

def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False

def upnp_failed(err):
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False

d = threads.deferToThread(threaded_try_upnp)
d.addErrback(upnp_failed)
return d

def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary
self.dht_node = self.dht_node_class(
node_id=self.node_id,
udpPort=self.dht_node_port,
externalIP=self.external_ip,
peerPort=self.peer_port,
peer_manager=self.peer_manager,
peer_finder=self.peer_finder,
)
if not self.hash_announcer:
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage)
self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
d = self.dht_node.start(self.known_dht_nodes)
d.addCallback(lambda _: log.info("Joined the dht"))
d.addCallback(lambda _: self.hash_announcer.start())

def _setup_other_components(self):
log.debug("Setting up the rest of the components")

Expand All @@ -255,28 +169,4 @@ def _setup_other_components(self):

self.rate_limiter.start()
d = self.blob_manager.setup()
d.addCallback(lambda _: self.wallet.start())
# d.addCallback(lambda _: self.blob_tracker.start())
return d

def _unset_upnp(self):
log.info("Unsetting upnp for session")

def threaded_unset_upnp():
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
self.upnp_redirects = []

d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d
19 changes: 10 additions & 9 deletions lbrynet/core/Wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,9 +938,7 @@ def __init__(self, storage, config=None):
self._lag_counter = 0
self.blocks_behind = 0
self.catchup_progress = 0

# fired when the wallet actually unlocks (wallet_unlocked_d can be called multiple times)
self.wallet_unlock_success = defer.Deferred()
self.is_wallet_unlocked = None

def _is_first_run(self):
return (not self.printed_retrieving_headers and
Expand All @@ -953,21 +951,23 @@ def get_cmd_runner(self):
return self._cmd_runner

def check_locked(self):
if not self.wallet.use_encryption:
log.info("Wallet is not encrypted")
self.wallet_unlock_success.callback(True)
elif not self._cmd_runner:
"""
Checks if the wallet is encrypted(locked) or not
:return: (boolean) indicating whether the wallet is locked or not
"""
if not self._cmd_runner:
raise Exception("Command runner hasn't been initialized yet")
elif self._cmd_runner.locked:
log.info("Waiting for wallet password")
self.wallet_unlocked_d.addCallback(self.unlock)
return self.wallet_unlock_success
return self.is_wallet_unlocked

def unlock(self, password):
if self._cmd_runner and self._cmd_runner.locked:
try:
self._cmd_runner.unlock_wallet(password)
self.wallet_unlock_success.callback(True)
self.is_wallet_unlocked = True
log.info("Unlocked the wallet!")
except InvalidPassword:
log.warning("Incorrect password, try again")
Expand Down Expand Up @@ -1054,6 +1054,7 @@ def _load_wallet(self):
wallet.create_main_account()
wallet.synchronize()
self.wallet = wallet
self.is_wallet_unlocked = not self.wallet.use_encryption
self._check_large_wallet()
return defer.succeed(True)

Expand Down
63 changes: 63 additions & 0 deletions lbrynet/daemon/Component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import logging
from twisted.internet import defer
from ComponentManager import ComponentManager

log = logging.getLogger(__name__)


class ComponentType(type):
def __new__(mcs, name, bases, newattrs):
klass = type.__new__(mcs, name, bases, newattrs)
if name != "Component":
ComponentManager.default_component_classes[klass.component_name] = klass
return klass


class Component(object):
"""
lbrynet-daemon component helper
Inheriting classes will be automatically registered with the ComponentManager and must implement setup and stop
methods
"""

__metaclass__ = ComponentType
depends_on = []
component_name = None

def __init__(self, component_manager):
self.component_manager = component_manager
self._running = False

@property
def running(self):
return self._running

def start(self):
raise NotImplementedError()

def stop(self):
raise NotImplementedError()

def component(self):
raise NotImplementedError()

@defer.inlineCallbacks
def _setup(self):
try:
result = yield defer.maybeDeferred(self.start)
self._running = True
defer.returnValue(result)
except Exception as err:
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
raise err

@defer.inlineCallbacks
def _stop(self):
try:
result = yield defer.maybeDeferred(self.stop)
self._running = False
defer.returnValue(result)
except Exception as err:
log.exception("Error stopping %s", self.__class__.__name__)
raise err
Loading

0 comments on commit 75a6ff2

Please sign in to comment.