Skip to content

Commit

Permalink
Move waiting out from pair_with_apple_tv
Browse files Browse the repository at this point in the history
By doing this and using a handler, it is much easier to test and the
application is also responsible of aborting stopping the pairing
process. Later, maybe a callback based API can be added so the user
can be notified when pairing is done.
  • Loading branch information
postlund committed Feb 4, 2017
1 parent a0ad21d commit e471961
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 50 deletions.
5 changes: 3 additions & 2 deletions pyatv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

_LOGGER = logging.getLogger(__name__)


HOMESHARING_SERVICE = '_appletv-v2._tcp.local.'


Expand Down Expand Up @@ -72,6 +73,6 @@ def connect_to_apple_tv(details, loop):
return AppleTVInternal(session, requester)


def pair_with_apple_tv(loop, timeout, pin_code, name):
def pair_with_apple_tv(loop, pin_code, name):
"""Initiate pairing process with an Apple TV."""
return PairingHandler(loop, timeout, name, pin_code).run()
return PairingHandler(loop, name, pin_code)
61 changes: 41 additions & 20 deletions pyatv/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from argparse import ArgumentTypeError

import pyatv
import pyatv.pairing
from pyatv import (dmap, exceptions, tag_definitions)
from pyatv.interface import retrieve_commands

Expand All @@ -29,35 +30,42 @@ def cli_handler(loop):
parser = argparse.ArgumentParser()

parser.add_argument('command')
parser.add_argument('-v', '--verbose', help='increase output verbosity',
action='store_const', dest='loglevel',
const=logging.INFO)
parser.add_argument('--developer', help='show developer commands',
action='store_true', dest='developer',
default=False)
parser.add_argument('--debug', help='print debug information',
action='store_const', dest='loglevel',
const=logging.DEBUG, default=logging.WARNING)
parser.add_argument('--name', help='apple tv name',
dest='name', default='Apple TV')
parser.add_argument('--remote-name', help='remote pairing name',
dest='remote_name', default='pyatv')
parser.add_argument('--address', help='device ip address or hostname',
dest='address', default=None)
parser.add_argument('-t', '--scan-timeout', help='timeout when scanning',
dest='scan_timeout', type=_in_range(1, 10),
metavar='TIMEOUT', default=3)
parser.add_argument('-p', '--pin', help='pairing pin code',
dest='pin_code', type=_in_range(0, 9999),
metavar='PIN', default=1234)
ident = parser.add_mutually_exclusive_group()

pairing = parser.add_argument_group('pairing')
pairing.add_argument('--remote-name', help='remote pairing name',
dest='remote_name', default='pyatv')
pairing.add_argument('-p', '--pin', help='pairing pin code',
dest='pin_code', type=_in_range(0, 9999),
metavar='PIN', default=1234)
pairing.add_argument('--pairing-timeout', help='timeout when pairing',
dest='pairing_timeout', type=int,
metavar='TIMEOUT', default=60)

ident = parser.add_mutually_exclusive_group()
ident.add_argument('-a', '--autodiscover',
help='automatically find a device',
action='store_true', dest='autodiscover', default=False)
ident.add_argument('--login_id', help='home sharing id or pairing guid',
dest='login_id', default=None)

debug = parser.add_argument_group('debugging')
debug.add_argument('-v', '--verbose', help='increase output verbosity',
action='store_const', dest='loglevel',
const=logging.INFO)
debug.add_argument('--developer', help='show developer commands',
action='store_true', dest='developer',
default=False)
debug.add_argument('--debug', help='print debug information',
action='store_const', dest='loglevel',
const=logging.DEBUG, default=logging.WARNING)

args = parser.parse_args()

logging.basicConfig(level=args.loglevel,
Expand All @@ -72,9 +80,16 @@ def cli_handler(loop):
if args.command == 'scan':
yield from _handle_scan(args, loop)
elif args.command == 'pair':
# TODO: hardcoded timeout
yield from pyatv.pair_with_apple_tv(
loop, 60, args.pin_code, args.remote_name)
handler = pyatv.pair_with_apple_tv(
loop, args.pin_code, args.remote_name)
print('Use pin {} to pair with "{}" (waiting for {}s)'.format(
args.pin_code, args.remote_name, args.pairing_timeout))
print('After successful pairing, use login id {}'.format(
pyatv.pairing.PAIRING_GUID))
print('Note: If remote does not show up, reboot you Apple TV')
yield from handler.start()
yield from asyncio.sleep(args.pairing_timeout, loop=loop)
yield from handler.stop()
elif args.autodiscover:
return (yield from _handle_autodiscover(args, loop))
elif args.login_id:
Expand Down Expand Up @@ -228,6 +243,9 @@ def main():
def _run_application(loop):
try:
asyncio.wait_for((yield from cli_handler(loop)), timeout=15)
except KeyboardInterrupt:
pass # User pressed Ctrl+C, just ignore it

except SystemExit:
pass # sys.exit() was used - do nothing

Expand All @@ -238,8 +256,11 @@ def _run_application(loop):
sys.stderr.writelines(
'\n>>> An error occurred, full stack trace above\n')

loop = asyncio.get_event_loop()
loop.run_until_complete(_run_application(loop))
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(_run_application(loop))
except KeyboardInterrupt:
pass


if __name__ == '__main__':
Expand Down
42 changes: 19 additions & 23 deletions pyatv/pairing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@

_LOGGER = logging.getLogger(__name__)

PAIR = '0000000000000001'

PAIRING_GUID = '0000000000000001'

class PairingHandler:
"""Handle the pairing process.
Expand All @@ -22,31 +21,27 @@ class PairingHandler:
that responds to pairing requests.
"""

def __init__(self, loop, timeout, name, pairing_code):
def __init__(self, loop, name, pairing_code):
"""Initialize a new instance."""
self.loop = loop
self.timeout = timeout
self.name = name
self.pairing_code = pairing_code
self.zeroconf = Zeroconf()

@asyncio.coroutine
def run(self):
def start(self):
"""Start the pairing server and publish service."""
self.zeroconf = Zeroconf()
web_server = web.Server(self.handle_request, loop=self.loop)
server = yield from self.loop.create_server(web_server, '0.0.0.0')
allocated_port = server.sockets[0].getsockname()[1]
service = self._setup_zeroconf(allocated_port)
print("lol: " + str(self.zeroconf))
print("timeout: " + str(self.timeout))
try:
yield from asyncio.sleep(self.timeout, loop=self.loop)
finally:
# TODO: should close here but that makes it hard to test
# Use have a stop method instead?
# server.close()
self.zeroconf.unregister_service(service)
self.zeroconf.close()
self.server = yield from self.loop.create_server(web_server, '0.0.0.0')
allocated_port = self.server.sockets[0].getsockname()[1]
self._setup_zeroconf(allocated_port)

@asyncio.coroutine
def stop(self):
"""Stop pairing server and unpublish service."""
self.server.close()
yield from self.server.wait_closed()
self.zeroconf.close()

def _setup_zeroconf(self, port):
props = {
Expand All @@ -55,35 +50,36 @@ def _setup_zeroconf(self, port):
'DvTy': 'iPod',
'RemN': 'Remote',
'txtvers': '1',
'Pair': PAIR
'Pair': PAIRING_GUID
}

local_ip = socket.inet_aton(socket.gethostbyname(socket.gethostname()))
service = ServiceInfo('_touch-remote._tcp.local.',
'0'*39 + '1._touch-remote._tcp.local.',
local_ip, port, 0, 0, props)
self.zeroconf.register_service(service)
return service

@asyncio.coroutine
def handle_request(self, request):
"""Respond to request if PIN is correct."""
service_name = request.rel_url.query['servicename']
received_code = request.rel_url.query['pairingcode'].lower()
_LOGGER.info('Got pairing request from %s with code %s',
service_name, received_code)

if self._verify_pin(received_code):
cmpg = tags.uint64_tag('cmpg', 1)
cmnm = tags.string_tag('cmnm', self.name)
cmty = tags.string_tag('cmty', 'ipod')
response = tags.container_tag('cmpa', cmpg + cmnm + cmty)
# TODO: take down web server and cancel delay when done?
return web.Response(body=response)

# Code did not match, generate an error
return web.Response(status=500)

def _verify_pin(self, received_code):
merged = StringIO()
merged.write(PAIR)
merged.write(PAIRING_GUID)
for char in str(self.pairing_code):
merged.write(char)
merged.write("\x00")
Expand Down
18 changes: 13 additions & 5 deletions tests/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
HSGID = '12345-6789-0'
PAIRING_GUID = '0x0000000000000001'
SESSION_ID = 55555
REMOTE_NAME = 'pyatv remote'
PIN_CODE = 1234

# This is valid for the PAIR in the pairing module and pin 1234
# (extracted form a real device)
Expand Down Expand Up @@ -62,12 +64,15 @@ def test_scan_for_apple_tvs(self):
self.assertEqual(atvs[0].login_id, 'aaaa')
self.assertEqual(atvs[0].port, 3689)

# This is not a pretty test and it does crazy things. Should probably be
# re-written later but will do for now.
@unittest_run_loop
def test_pairing_with_device(self):
zeroconf_stub.stub(pairing)

# Start pairing process
yield from pyatv.pair_with_apple_tv(self.loop, 0, 1234, 'pyatv remote')
handler = pyatv.pair_with_apple_tv(self.loop, PIN_CODE, REMOTE_NAME)
yield from handler.start()

# Verify that bonjour service was published
zeroconf = zeroconf_stub.instance
Expand All @@ -80,21 +85,24 @@ def test_pairing_with_device(self):

# Extract port from service (as it is randomized) and request pairing
# with the web server.
url = 'http://127.0.0.1:{}/pairing?pairingcode={}'.format(
service.port, PAIRINGCODE)
server = 'http://127.0.0.1:{}'.format(service.port)
url = '{}/pairing?pairingcode={}&servicename=test'.format(
server, PAIRINGCODE)
session = aiohttp.ClientSession(loop=self.loop)
response = yield from session.request('GET', url)
self.assertEqual(response.status, 200)
self.assertEqual(response.status, 200,
msg='pairing failed')

# Verify content returned in pairingresponse
data = yield from response.content.read()
parsed = dmap.parse(data, tag_definitions.lookup_tag)
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmpg'), 1)
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmnm'), 'pyatv remote')
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmnm'), REMOTE_NAME)
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmty'), 'ipod')

response.close()
yield from session.close()
yield from handler.stop()
yield from self.atv.logout()

@unittest_run_loop
Expand Down

0 comments on commit e471961

Please sign in to comment.