Skip to content

Commit

Permalink
Merge 8120585 into 6607187
Browse files Browse the repository at this point in the history
  • Loading branch information
postlund committed Dec 1, 2019
2 parents 6607187 + 8120585 commit 5583880
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 43 deletions.
30 changes: 29 additions & 1 deletion docs/development/listeners.md
Expand Up @@ -37,4 +37,32 @@ continue to be delivered after an error has happened. The paramater
`initial_delay` to `start` specifies the delay that should be used before
"trying to deliver updates again", but it might also be ignored if it is
deemed not necessary. The reason for its existence is purly to provide a
way to not hammer the device in case of errors.
way to not hammer the device in case of errors.

## Device Updates

It is possible to get callbacks whenever a device loses its connection. Two methods
are used: one for expected loss, e.g. manually disconnecting and one for unexpected
loss, e.g. a crash or network problem. The API is defined by the
`interface.DeviceLister` interface and works similarily to how push updates works.

Here is a simple example:

```python
class DeviceListener:

def connection_lost(self, exception):
print("Lost connection:", str(exception))

def connection_closed(self):
print("Connection closed!")


atv.listener = DeviceListener()
```

A small note here about this API. For `MRP` this works fine as that protocol
is connection oriented. It's another case for `DMAP`, since that protocol is
request based. For now, this interface is implemented by the push updates
API (to be clear: for `DMAP`). So when the push updates API failes to establish
a connection, the callbacks in this interface will be called.
24 changes: 18 additions & 6 deletions pyatv/__main__.py
Expand Up @@ -7,7 +7,6 @@
import asyncio

import argparse
from argparse import ArgumentTypeError

from pyatv import (const, exceptions, interface, scan, connect, pair)
from pyatv.conf import (
Expand Down Expand Up @@ -136,7 +135,7 @@ async def pair(self):

try:
await self._perform_pairing(pairing)
except: # pylint: disable=bare-except # noqa
except Exception: # pylint: disable=broad-except # noqa
logging.exception('Pairing failed')
return 3
finally:
Expand Down Expand Up @@ -235,10 +234,21 @@ def playstatus_update(_, playstatus):
print(20*'-', flush=True)

@staticmethod
def playstatus_error(updater, exception):
def playstatus_error(_, exception):
"""Inform about an error and restart push updates."""
print("An error occurred (restarting): {0}".format(exception))
updater.start(initial_delay=1)


class DeviceListener(interface.DeviceListener):
"""Internal listener for generic device updates."""

def connection_lost(self, exception):
"""Call when unexpectedly being disconnected from device."""
print("Connection lost with error:", str(exception), file=sys.stderr)

def connection_closed(self):
"""Call when connection was (intentionally) closed."""
logging.debug("Connection was closed properly")


def _in_range(lower, upper, allow_none=False):
Expand All @@ -265,7 +275,8 @@ def __call__(self, parser, namespace, values, option_string=None):
elif values == 'airplay':
setattr(namespace, self.dest, const.PROTOCOL_AIRPLAY)
else:
raise ArgumentTypeError('Valid protocols are: mrp, dmap, airplay')
raise argparse.ArgumentTypeError(
'Valid protocols are: mrp, dmap, airplay')


async def cli_handler(loop):
Expand Down Expand Up @@ -429,6 +440,7 @@ async def _handle_commands(args, loop):
args.id, credentials=args.airplay_credentials))

atv = await connect(config, loop, protocol=args.protocol)
atv.listener = DeviceListener()
atv.push_updater.listener = PushListener()

try:
Expand Down Expand Up @@ -526,7 +538,7 @@ async def _run_application(loop):
except SystemExit:
pass # sys.exit() was used - do nothing

except: # pylint: disable=bare-except # noqa
except Exception: # pylint: disable=broad-except # noqa
import traceback

traceback.print_exc(file=sys.stderr)
Expand Down
50 changes: 33 additions & 17 deletions pyatv/dmap/__init__.py
Expand Up @@ -3,6 +3,8 @@
import logging
import asyncio

from aiohttp.client_exceptions import ClientError

from pyatv import (const, exceptions, convert, net)
from pyatv.dmap import (parser, tags)
from pyatv.dmap.daap import DaapRequester
Expand Down Expand Up @@ -47,10 +49,6 @@ async def artwork(self):
art = await self.daap.get(_ARTWORK_CMD, daap_data=False)
return art if art != b'' else None

def playqueue(self):
"""Return current playqueue. Must be logged in."""
return self.daap.post('playqueue-contents?[AUTH]')

def ctrl_int_cmd(self, cmd):
"""Perform a "ctrl-int" command."""
cmd_url = 'ctrl-int/1/{}?[AUTH]&prompt-id=0'.format(cmd)
Expand Down Expand Up @@ -304,17 +302,19 @@ async def playing(self):
class DmapPushUpdater(PushUpdater):
"""Implementation of API for handling push update from an Apple TV."""

def __init__(self, loop, apple_tv):
def __init__(self, loop, apple_tv, listener):
"""Initialize a new DmapPushUpdater instance."""
super().__init__()
self._loop = loop
self._atv = apple_tv
self._listener = listener
self._future = None
self._initial_delay = 0

def start(self, initial_delay=0):
"""Wait for push updates from device.
Will throw NoAsyncListenerError if no listner has been set.
Will throw NoAsyncListenerError if no listener has been set.
"""
if self.listener is None:
raise exceptions.NoAsyncListenerError
Expand All @@ -325,26 +325,35 @@ def start(self, initial_delay=0):
# first request
self._atv.playstatus_revision = 0

# Delay before restarting after an error
self._initial_delay = initial_delay

# This for some reason fails on travis but not in other places.
# Why is that (same python version)?
# pylint: disable=deprecated-method
self._future = asyncio.ensure_future(
self._poller(initial_delay), loop=self._loop)
self._future = asyncio.ensure_future(self._poller(), loop=self._loop)
return self._future

def stop(self):
"""No longer wait for push updates."""
"""No longer forward updates to listener."""
if self._future is not None:
self._future.cancel()
self._future = None

async def _poller(self, initial_delay):
# Sleep some time before waiting for updates
if initial_delay > 0:
_LOGGER.debug('Initial delay set to %d', initial_delay)
await asyncio.sleep(initial_delay, loop=self._loop)
# Let listener know that we disconnected
if self._listener and self._listener.listener:
self._listener.listener.connection_closed()

async def _poller(self):
first_call = True

while True:
# Sleep some time before waiting for updates
if not first_call and self._initial_delay > 0:
_LOGGER.debug('Initial delay set to %d', self._initial_delay)
await asyncio.sleep(self._initial_delay, loop=self._loop)
first_call = False

try:
_LOGGER.debug('Waiting for playstatus updates')
playstatus = await self._atv.playstatus(
Expand All @@ -355,12 +364,19 @@ async def _poller(self, initial_delay):
except asyncio.CancelledError:
break

except ClientError as ex:
_LOGGER.exception('A communication error happened')
if self._listener and self._listener.listener:
self._loop.call_soon(
self._listener.listener.connection_lost, ex)

break

# It is not pretty to disable pylint here, but we must catch _all_
# exceptions to keep the API.
except Exception as ex: # pylint: disable=broad-except
_LOGGER.debug('Playstatus error occurred: %s', ex)
self._loop.call_soon(self.listener.playstatus_error, self, ex)
break

self._future = None

Expand All @@ -385,13 +401,13 @@ def __init__(self, loop, session, config, airplay):
self._apple_tv = BaseDmapAppleTV(self._requester)
self._dmap_remote = DmapRemoteControl(self._apple_tv)
self._dmap_metadata = DmapMetadata(config.identifier, self._apple_tv)
self._dmap_push_updater = DmapPushUpdater(loop, self._apple_tv)
self._dmap_push_updater = DmapPushUpdater(loop, self._apple_tv, self)
self._airplay = airplay

def connect(self):
"""Initiate connection to device.
Not needed as it is performed automatically.
No need to call it yourself, it's done automatically.
"""
return self._requester.login()

Expand Down
44 changes: 36 additions & 8 deletions pyatv/interface.py
Expand Up @@ -348,10 +348,7 @@ def listener(self, listener):
- playstatus_error(updater, exception)
The first method is called when a new update happens and the second one
is called if an error occurs. Please note that if an error happens,
push updates will be stopped. So they will need to be enabled again,
e.g. from the error method. A delay should preferably be passed to
start() to avoid an infinite error-loop.
is called if an error occurs.
"""
self.__listener = listener

Expand All @@ -365,7 +362,7 @@ def start(self, initial_delay=0):

@abstractmethod
def stop(self):
"""No longer listen for updates."""
"""No longer forward updates to listener."""
raise exceptions.NotSupportedError


Expand All @@ -380,21 +377,52 @@ def play_url(self, url, **kwargs):
raise exceptions.NotSupportedError


class DeviceListener:
"""Listener interface for generic device updates."""

@abstractmethod
def connection_lost(self, exception):
"""Device was unexpectedly disconnected."""
raise NotImplementedError()

@abstractmethod
def connection_closed(self):
"""Device connection was (intentionally) closed."""
raise NotImplementedError()


class AppleTV:
"""Base class representing an Apple TV."""

__metaclass__ = ABCMeta

def __init__(self):
"""Initialize a new AppleTV."""
self.__listener = None

@property
def listener(self):
"""Object receiving generic device updates.
Must be an object conforming to DeviceListener.
"""
return self.__listener

@listener.setter
def listener(self, target):
"""Change object receiving generic device updates."""
self.__listener = target

@abstractmethod
def connect(self):
async def connect(self):
"""Initiate connection to device.
Not needed as it is performed automatically.
No need to call it yourself, it's done automatically.
"""
raise exceptions.NotSupportedError

@abstractmethod
def close(self):
async def close(self):
"""Close connection and release allocated resources."""
raise exceptions.NotSupportedError

Expand Down
18 changes: 11 additions & 7 deletions pyatv/mrp/__init__.py
Expand Up @@ -278,22 +278,26 @@ def __init__(self, loop, metadata, psm):
def start(self, initial_delay=0):
"""Wait for push updates from device.
Will throw NoAsyncListenerError if no listner has been set.
Will throw NoAsyncListenerError if no listener has been set.
"""
if self.listener is None:
raise exceptions.NoAsyncListenerError

self.psm.listener = self

def stop(self):
"""No longer wait for push updates."""
"""No longer forward updates to listener."""
self.psm.listener = None

async def state_updated(self):
"""State was updated for active player."""
playstatus = await self.metadata.playing()
self.loop.call_soon(
self.listener.playstatus_update, self, playstatus)
try:
playstatus = await self.metadata.playing()
self.loop.call_soon(
self.listener.playstatus_update, self, playstatus)
except Exception as ex: # pylint: disable=broad-except
_LOGGER.debug('Playstatus error occurred: %s', ex)
self.loop.call_soon(self.listener.playstatus_error, self, ex)


class MrpAppleTV(AppleTV):
Expand All @@ -309,7 +313,7 @@ def __init__(self, loop, session, config, airplay):
self._mrp_service = config.get_service(const.PROTOCOL_MRP)

self._connection = MrpConnection(
config.address, self._mrp_service.port, loop)
config.address, self._mrp_service.port, loop, atv=self)
self._srp = SRPAuthHandler()
self._protocol = MrpProtocol(
loop, self._connection, self._srp, self._mrp_service)
Expand All @@ -324,7 +328,7 @@ def __init__(self, loop, session, config, airplay):
async def connect(self):
"""Initiate connection to device.
Not needed as it is performed automatically.
No need to call it yourself, it's done automatically.
"""
await self._protocol.start()

Expand Down
13 changes: 10 additions & 3 deletions pyatv/mrp/connection.py
Expand Up @@ -11,13 +11,14 @@
_LOGGER = logging.getLogger(__name__)


class MrpConnection(asyncio.Protocol):
class MrpConnection(asyncio.Protocol): # pylint: disable=too-many-instance-attributes # noqa
"""Network layer that encryptes/decryptes and (de)serializes messages."""

def __init__(self, host, port, loop):
def __init__(self, host, port, loop, atv=None):
"""Initialize a new MrpConnection."""
self.host = host
self.host = str(host)
self.port = port
self.atv = atv
self.loop = loop
self.listener = None
self._buffer = b''
Expand All @@ -34,6 +35,12 @@ def connection_lost(self, exc):
_LOGGER.debug('Disconnected from device: %s', exc)
self._transport = None

if self.atv and self.atv.listener:
if exc is None:
self.atv.listener.connection_closed()
else:
self.atv.listener.connection_lost(exc)

def enable_encryption(self, output_key, input_key):
"""Enable encryption with the specified keys."""
self._chacha = chacha20.Chacha20Cipher(output_key, input_key)
Expand Down

0 comments on commit 5583880

Please sign in to comment.