Skip to content

Commit

Permalink
Removes the twisted dependency from the analytics file and replaces it
Browse files Browse the repository at this point in the history
with `asyncio` and `aiohttp`. Also removes the calling loop from the
analytics in favor of a `dict` that stores the names of the methods
and their coroutines that are wrapped inside a looping task. The tasks
are canceled when the analytics manager is asked to shutdown

Signed-off-by: Oleg Silkin <o.silkin98@gmail.com>
  • Loading branch information
osilkin98 committed Jan 9, 2019
1 parent db3e578 commit 24b9263
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 111 deletions.
2 changes: 1 addition & 1 deletion lbrynet/extras/daemon/Components.py
Expand Up @@ -696,7 +696,7 @@ async def start(self):
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
else:
log.error("failed to setup upnp")
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
await self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False))

async def stop(self):
Expand Down
34 changes: 17 additions & 17 deletions lbrynet/extras/daemon/Daemon.py
Expand Up @@ -412,14 +412,14 @@ async def start_listening(self):
)
log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2])
await self.setup()
self.analytics_manager.send_server_startup_success()
await self.analytics_manager.send_server_startup_success()
except OSError:
log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is '
'already in use by another application.', conf.settings['api_host'], conf.settings['api_port'])
except defer.CancelledError:
log.info("shutting down before finished starting")
except Exception as err:
self.analytics_manager.send_server_startup_error(str(err))
await self.analytics_manager.send_server_startup_error(str(err))
log.exception('Failed to start lbrynet-daemon')

async def setup(self):
Expand All @@ -428,7 +428,7 @@ async def setup(self):

if not self.analytics_manager.is_started:
self.analytics_manager.start()
self.analytics_manager.send_server_startup()
await self.analytics_manager.send_server_startup()
for lc_name, lc_time in self._looping_call_times.items():
self.looping_call_manager.start(lc_name, lc_time)

Expand Down Expand Up @@ -459,7 +459,7 @@ async def shutdown(self):
await self.handler.shutdown(60.0)
await self.app.cleanup()
if self.analytics_manager:
self.analytics_manager.shutdown()
await self.analytics_manager.shutdown()
try:
self._component_setup_task.cancel()
except (AttributeError, asyncio.CancelledError):
Expand Down Expand Up @@ -643,22 +643,22 @@ async def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=No

async def _download_finished(download_id, name, claim_dict):
report = await self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
self.analytics_manager.send_new_download_success(download_id, name, claim_dict)
await self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
await self.analytics_manager.send_new_download_success(download_id, name, claim_dict)

async def _download_failed(error, download_id, name, claim_dict):
report = await self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
await self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report)
self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error)
await self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error)

if sd_hash in self.streams:
downloader = self.streams[sd_hash]
return await d2f(downloader.finished_deferred)
else:
download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.analytics_manager.send_new_download_start(download_id, name, claim_dict)
await self.analytics_manager.send_download_started(download_id, name, claim_dict)
await self.analytics_manager.send_new_download_start(download_id, name, claim_dict)
self.streams[sd_hash] = GetStream(
self.file_manager.sd_identifier, self.wallet_manager, self.exchange_rate_manager, self.blob_manager,
self.component_manager.peer_finder, self.rate_limiter, self.payment_rate_manager, self.storage,
Expand Down Expand Up @@ -704,7 +704,7 @@ async def _publish_stream(self, account, name, bid, claim_dict, file_path=None,
d = reupload.reflect_file(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception)
self.analytics_manager.send_claim_action('publish')
await self.analytics_manager.send_claim_action('publish')
nout = 0
txo = tx.outputs[nout]
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout)
Expand Down Expand Up @@ -1365,7 +1365,7 @@ async def jsonrpc_wallet_send(self, amount, address=None, claim_id=None, account
raise InsufficientFundsError()
account = self.get_account_or_default(account_id)
result = await self.wallet_manager.send_points_to_address(reserved_points, amount, account)
self.analytics_manager.send_credits_sent()
await self.analytics_manager.send_credits_sent()
else:
log.info("This command is deprecated for sending tips, please use the newer claim_tip command")
result = await self.jsonrpc_claim_tip(claim_id=claim_id, amount=amount, account_id=account_id)
Expand Down Expand Up @@ -1776,7 +1776,7 @@ async def jsonrpc_account_send(self, amount, addresses, account_id=None, broadca

account = self.get_account_or_default(account_id)
result = await account.send_to_addresses(amount, addresses, broadcast)
self.analytics_manager.send_credits_sent()
await self.analytics_manager.send_credits_sent()
return result

@requires(WALLET_COMPONENT)
Expand Down Expand Up @@ -2307,7 +2307,7 @@ async def jsonrpc_channel_new(self, channel_name, amount, account_id=None):
channel_name, amount, self.get_account_or_default(account_id)
)
self.default_wallet.save()
self.analytics_manager.send_new_channel()
await self.analytics_manager.send_new_channel()
nout = 0
txo = tx.outputs[nout]
log.info("Claimed a new channel! lbry://%s txid: %s nout: %d", channel_name, tx.id, nout)
Expand Down Expand Up @@ -2627,7 +2627,7 @@ async def jsonrpc_claim_abandon(self, claim_id=None, txid=None, nout=None, accou
raise Exception('Must specify nout')

tx = await self.wallet_manager.abandon_claim(claim_id, txid, nout, account)
self.analytics_manager.send_claim_action('abandon')
await self.analytics_manager.send_claim_action('abandon')
if blocking:
await self.ledger.wait(tx)
return {"success": True, "tx": tx}
Expand Down Expand Up @@ -2662,7 +2662,7 @@ async def jsonrpc_claim_new_support(self, name, claim_id, amount, account_id=Non
account = self.get_account_or_default(account_id)
amount = self.get_dewies_or_error("amount", amount)
result = await self.wallet_manager.support_claim(name, claim_id, amount, account)
self.analytics_manager.send_claim_action('new_support')
await self.analytics_manager.send_claim_action('new_support')
return result

@requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
Expand Down Expand Up @@ -2695,7 +2695,7 @@ async def jsonrpc_claim_tip(self, claim_id, amount, account_id=None):
amount = self.get_dewies_or_error("amount", amount)
validate_claim_id(claim_id)
result = await self.wallet_manager.tip_claim(amount, claim_id, account)
self.analytics_manager.send_claim_action('new_support')
await self.analytics_manager.send_claim_action('new_support')
return result

@deprecated()
Expand Down
3 changes: 2 additions & 1 deletion lbrynet/extras/daemon/DaemonConsole.py
Expand Up @@ -295,7 +295,8 @@ def start_server_and_listen(use_auth, analytics_manager, quiet):
logging.getLogger("lbryum").setLevel(logging.CRITICAL)
logging.getLogger("requests").setLevel(logging.CRITICAL)

analytics_manager.send_server_startup()
# TODO: turn this all into async. Until then this routine can't be called
# analytics_manager.send_server_startup()
yield Daemon().start_listening()


Expand Down
155 changes: 69 additions & 86 deletions lbrynet/extras/daemon/analytics.py
@@ -1,11 +1,12 @@
import collections
import logging

import treq
from twisted.internet import defer, task
import asyncio
import aiohttp

from lbrynet import conf, utils
from lbrynet.extras import looping_call_manager, system_info
from lbrynet.extras import system_info
from lbrynet.extras.daemon.storage import looping_call

# Things We Track
SERVER_STARTUP = 'Server Startup'
Expand All @@ -30,7 +31,7 @@ class Manager:
def __init__(self, analytics_api, context=None, installation_id=None, session_id=None):
self.analytics_api = analytics_api
self._tracked_data = collections.defaultdict(list)
self.looping_call_manager = self._setup_looping_calls()
self.looping_tasks = {}
self.context = context or self._make_context(
system_info.get_platform(), conf.settings['wallet'])
self.installation_id = installation_id or conf.settings.installation_id
Expand All @@ -43,20 +44,20 @@ def new_instance(cls, enabled=None):
return cls(api)

# Things We Track
def send_new_download_start(self, download_id, name, claim_dict):
self._send_new_download_stats("start", download_id, name, claim_dict)
async def send_new_download_start(self, download_id, name, claim_dict):
await self._send_new_download_stats("start", download_id, name, claim_dict)

def send_new_download_success(self, download_id, name, claim_dict):
self._send_new_download_stats("success", download_id, name, claim_dict)
async def send_new_download_success(self, download_id, name, claim_dict):
await self._send_new_download_stats("success", download_id, name, claim_dict)

def send_new_download_fail(self, download_id, name, claim_dict, e):
self._send_new_download_stats("failure", download_id, name, claim_dict, {
async def send_new_download_fail(self, download_id, name, claim_dict, e):
await self._send_new_download_stats("failure", download_id, name, claim_dict, {
'name': type(e).__name__ if hasattr(type(e), "__name__") else str(type(e)),
'message': str(e),
})

def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None):
self.analytics_api.track({
async def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None):
await self.analytics_api.track({
'userId': 'lbry', # required, see https://segment.com/docs/sources/server/http/#track
'event': NEW_DOWNLOAD_STAT,
'properties': self._event_properties({
Expand All @@ -70,91 +71,81 @@ def _send_new_download_stats(self, action, download_id, name, claim_dict, e=None
'timestamp': utils.isonow(),
})

def send_upnp_setup_success_fail(self, success, status):
self.analytics_api.track(
async def send_upnp_setup_success_fail(self, success, status):
await self.analytics_api.track(
self._event(UPNP_SETUP, {
'success': success,
'status': status,
})
)

def send_server_startup(self):
self.analytics_api.track(self._event(SERVER_STARTUP))
async def send_server_startup(self):
await self.analytics_api.track(self._event(SERVER_STARTUP))

def send_server_startup_success(self):
self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS))
async def send_server_startup_success(self):
await self.analytics_api.track(self._event(SERVER_STARTUP_SUCCESS))

def send_server_startup_error(self, message):
self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
async def send_server_startup_error(self, message):
await self.analytics_api.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))

def send_download_started(self, id_, name, claim_dict=None):
self.analytics_api.track(
async def send_download_started(self, id_, name, claim_dict=None):
await self.analytics_api.track(
self._event(DOWNLOAD_STARTED, self._download_properties(id_, name, claim_dict))
)

def send_download_errored(self, err, id_, name, claim_dict, report):
async def send_download_errored(self, err, id_, name, claim_dict, report):
download_error_properties = self._download_error_properties(err, id_, name, claim_dict,
report)
self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))
await self.analytics_api.track(self._event(DOWNLOAD_ERRORED, download_error_properties))

def send_download_finished(self, id_, name, report, claim_dict=None):
async def send_download_finished(self, id_, name, report, claim_dict=None):
download_properties = self._download_properties(id_, name, claim_dict, report)
self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))
await self.analytics_api.track(self._event(DOWNLOAD_FINISHED, download_properties))

def send_claim_action(self, action):
self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))
async def send_claim_action(self, action):
await self.analytics_api.track(self._event(CLAIM_ACTION, {'action': action}))

def send_new_channel(self):
self.analytics_api.track(self._event(NEW_CHANNEL))
async def send_new_channel(self):
await self.analytics_api.track(self._event(NEW_CHANNEL))

def send_credits_sent(self):
self.analytics_api.track(self._event(CREDITS_SENT))
async def send_credits_sent(self):
await self.analytics_api.track(self._event(CREDITS_SENT))

def _send_heartbeat(self):
self.analytics_api.track(self._event(HEARTBEAT))
async def _send_heartbeat(self):
await self.analytics_api.track(self._event(HEARTBEAT))

def _update_tracked_metrics(self):
async def _update_tracked_metrics(self):
should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED)
if should_send:
self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value))
await self.analytics_api.track(self._metric_event(BLOB_BYTES_UPLOADED, value))

# Setup / Shutdown

def start(self):
if not self.is_started:
for name, _, interval in self._get_looping_calls():
self.looping_call_manager.start(name, interval)
for name, fn, secs in self._get_looping_calls():
self.looping_tasks[name] = asyncio.create_task(looping_call(secs, fn))
self.is_started = True
log.info("Start")

def shutdown(self):
self.looping_call_manager.shutdown()

def register_repeating_metric(self, event_name, value_generator, frequency=300):
lcall = task.LoopingCall(self._send_repeating_metric, event_name, value_generator)
self.looping_call_manager.register_looping_call(event_name, lcall)
lcall.start(frequency)

def _get_looping_calls(self):
if self.is_started:
try:
for name, task in self.looping_tasks.items():
if task:
task.cancel()
self.looping_tasks[name] = None
log.info("Stopped analytics looping calls")
self.is_started = False
except Exception as e:
log.exception('Got exception when trying to cancel tasks in analytics: ', exc_info=e)

def _get_looping_calls(self) -> list:
return [
('send_heartbeat', self._send_heartbeat, 300),
('update_tracked_metrics', self._update_tracked_metrics, 600),
]

def _setup_looping_calls(self):
call_manager = looping_call_manager.LoopingCallManager()
for name, fn, _ in self._get_looping_calls():
call_manager.register_looping_call(name, task.LoopingCall(fn))
return call_manager

def _send_repeating_metric(self, event_name, value_generator):
result = value_generator()
self._if_deferred(result, self._send_repeating_metric_value, event_name)

def _send_repeating_metric_value(self, result, event_name):
should_send, value = result
if should_send:
self.analytics_api.track(self._metric_event(event_name, value))

def add_observation(self, metric, value):
self._tracked_data[metric].append(value)

Expand Down Expand Up @@ -239,13 +230,6 @@ def _make_context(platform, wallet):
context['os']['distro'] = platform['distro']
return context

@staticmethod
def _if_deferred(maybe_deferred, callback, *args, **kwargs):
if isinstance(maybe_deferred, defer.Deferred):
maybe_deferred.addCallback(callback, *args, **kwargs)
else:
callback(maybe_deferred, *args, **kwargs)


class Api:
def __init__(self, cookies, url, write_key, enabled):
Expand All @@ -254,7 +238,7 @@ def __init__(self, cookies, url, write_key, enabled):
self._write_key = write_key
self._enabled = enabled

def _post(self, endpoint, data):
async def _post(self, endpoint, data):
# there is an issue with a timing condition with keep-alive
# that is best explained here: https://github.com/mikem23/keepalive-race
#
Expand All @@ -265,29 +249,28 @@ def _post(self, endpoint, data):
#
# by forcing the connection to close, we will disable the keep-alive.

def update_cookies(response):
self.cookies.update(response.cookies())
return response

assert endpoint[0] == '/'
headers = {b"Connection": b"close"}
d = treq.post(self.url + endpoint, auth=(self._write_key, ''), json=data,
headers=headers, cookies=self.cookies)
d.addCallback(update_cookies)
return d
request_kwargs = {
'method': 'POST',
'url': self.url + endpoint,
'headers': {'Connection': 'Close'},
'auth': aiohttp.BasicAuth(self._write_key, ''),
'json': data,
'cookies': self.cookies
}
try:
async with aiohttp.request(**request_kwargs) as response:
self.cookies.update(response.cookies)
except Exception as e:
log.exception('Encountered an exception while POSTing to %s: ', self.url + endpoint, exc_info=e)

def track(self, event):
async def track(self, event):
"""Send a single tracking event"""
if not self._enabled:
return defer.succeed('Analytics disabled')

def _log_error(failure, event):
log.warning('Failed to send track event. %s (%s)', failure.getTraceback(), str(event))
return 'Analytics disabled'

log.debug('Sending track event: %s', event)
d = self._post('/track', event)
d.addErrback(_log_error, event)
return d
await self._post('/track', event)

@classmethod
def new_instance(cls, enabled=None):
Expand Down

0 comments on commit 24b9263

Please sign in to comment.