Skip to content

Commit

Permalink
Add support for pairing
Browse files Browse the repository at this point in the history
Should work by using atvremote:

    atvremote pair

And then add the remote on the Apple TV. Default pin is 1234 (which
must be correctly entered) and name is "pyatv remote". These settings
can be changed with parameters to atvremote. Currently a pairing
timer is set to 60 seconds, so the process must be finished within
that time.
  • Loading branch information
postlund committed Feb 4, 2017
1 parent 0c4d1c1 commit a0ad21d
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 9 deletions.
6 changes: 3 additions & 3 deletions pyatv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import namedtuple
from zeroconf import ServiceBrowser, Zeroconf

from pyatv.pairing import PairingHandler
from pyatv.daap import (DaapSession, DaapRequester)
from pyatv.internal.apple_tv import AppleTVInternal

Expand Down Expand Up @@ -71,7 +72,6 @@ def connect_to_apple_tv(details, loop):
return AppleTVInternal(session, requester)


# TODO: API not determined for this yet, might change when implemented
def pair_with_apple_tv():
def pair_with_apple_tv(loop, timeout, pin_code, name):
"""Initiate pairing process with an Apple TV."""
raise NotImplementedError
return PairingHandler(loop, timeout, name, pin_code).run()
9 changes: 8 additions & 1 deletion pyatv/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,16 @@ def cli_handler(loop):
const=logging.DEBUG, default=logging.WARNING)
parser.add_argument('--name', help='apple tv name',
dest='name', default='Apple TV')
parser.add_argument('--remote-name', help='remote pairing name',
dest='remote_name', default='pyatv')
parser.add_argument('--address', help='device ip address or hostname',
dest='address', default=None)
parser.add_argument('-t', '--scan-timeout', help='timeout when scanning',
dest='scan_timeout', type=_in_range(1, 10),
metavar='TIMEOUT', default=3)
parser.add_argument('-p', '--pin', help='pairing pin code',
dest='pin_code', type=_in_range(0, 9999),
metavar='PIN', default=1234)
ident = parser.add_mutually_exclusive_group()

ident.add_argument('-a', '--autodiscover',
Expand All @@ -67,7 +72,9 @@ def cli_handler(loop):
if args.command == 'scan':
yield from _handle_scan(args, loop)
elif args.command == 'pair':
pyatv.pair_with_apple_tv()
# TODO: hardcoded timeout
yield from pyatv.pair_with_apple_tv(
loop, 60, args.pin_code, args.remote_name)
elif args.autodiscover:
return (yield from _handle_autodiscover(args, loop))
elif args.login_id:
Expand Down
94 changes: 94 additions & 0 deletions pyatv/pairing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Module used for pairing pyatv with a device."""

import socket
import asyncio
import hashlib
import logging
from io import StringIO

from aiohttp import web
from zeroconf import Zeroconf, ServiceInfo
from pyatv import tags

_LOGGER = logging.getLogger(__name__)

PAIR = '0000000000000001'


class PairingHandler:
"""Handle the pairing process.
This class will publish a bonjour service and configure a webserver
that responds to pairing requests.
"""

def __init__(self, loop, timeout, name, pairing_code):
"""Initialize a new instance."""
self.loop = loop
self.timeout = timeout
self.name = name
self.pairing_code = pairing_code
self.zeroconf = Zeroconf()

@asyncio.coroutine
def run(self):
"""Start the pairing server and publish service."""
web_server = web.Server(self.handle_request, loop=self.loop)
server = yield from self.loop.create_server(web_server, '0.0.0.0')
allocated_port = server.sockets[0].getsockname()[1]
service = self._setup_zeroconf(allocated_port)
print("lol: " + str(self.zeroconf))
print("timeout: " + str(self.timeout))
try:
yield from asyncio.sleep(self.timeout, loop=self.loop)
finally:
# TODO: should close here but that makes it hard to test
# Use have a stop method instead?
# server.close()
self.zeroconf.unregister_service(service)
self.zeroconf.close()

def _setup_zeroconf(self, port):
props = {
'DvNm': self.name,
'RemV': '10000',
'DvTy': 'iPod',
'RemN': 'Remote',
'txtvers': '1',
'Pair': PAIR
}

local_ip = socket.inet_aton(socket.gethostbyname(socket.gethostname()))
service = ServiceInfo('_touch-remote._tcp.local.',
'0'*39 + '1._touch-remote._tcp.local.',
local_ip, port, 0, 0, props)
self.zeroconf.register_service(service)
return service

@asyncio.coroutine
def handle_request(self, request):
"""Respond to request if PIN is correct."""
received_code = request.rel_url.query['pairingcode'].lower()

if self._verify_pin(received_code):
cmpg = tags.uint64_tag('cmpg', 1)
cmnm = tags.string_tag('cmnm', self.name)
cmty = tags.string_tag('cmty', 'ipod')
response = tags.container_tag('cmpa', cmpg + cmnm + cmty)
# TODO: take down web server and cancel delay when done?
return web.Response(body=response)

# Code did not match, generate an error
return web.Response(status=500)

def _verify_pin(self, received_code):
merged = StringIO()
merged.write(PAIR)
for char in str(self.pairing_code):
merged.write(char)
merged.write("\x00")

expected_code = hashlib.md5(merged.getvalue().encode()).hexdigest()
_LOGGER.debug('Got code %s, expects %s', received_code, expected_code)

return received_code == expected_code
4 changes: 4 additions & 0 deletions pyatv/tag_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ def _read_unknown(data, start, length):
'ceQR': DmapTag('container', 'com.apple.itunes.playqueue-contents-response'), # NOQA
'cmcp': DmapTag('container', 'dmcp.controlprompt'),
'cmmk': DmapTag(read_uint, 'dmcp.mediakind'),
'cmnm': DmapTag(read_str, 'dacp.devicename'),
'cmpa': DmapTag('container', 'dacp.pairinganswer'),
'cmpg': DmapTag(read_uint, 'dacp.pairingguid'),
'cmpr': DmapTag(read_uint, 'dmcp.protocolversion'),
'cmsr': DmapTag(read_uint, 'dmcp.serverrevision'),
'cmst': DmapTag('container', 'dmcp.playstatus'),
'cmty': DmapTag(read_str, 'dacp.devicetype'),
'mdcl': DmapTag('container', 'dmap.dictionary'),
'miid': DmapTag(read_uint, 'dmap.itemid'),
'minm': DmapTag(read_str, 'dmap.itemname'),
Expand Down
7 changes: 7 additions & 0 deletions pyatv/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ def uint32_tag(name, value):
value.to_bytes(4, byteorder='big')


def uint64_tag(name, value):
"""Create a DMAP tag with uint64 data."""
return name.encode('utf-8') + \
b'\x00\x00\x00\x08' + \
value.to_bytes(8, byteorder='big')


def bool_tag(name, value):
"""Create a DMAP tag with boolean data."""
return name.encode('utf-8') + \
Expand Down
7 changes: 5 additions & 2 deletions tests/test_dmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
'uuu8': dmap.DmapTag(tags.read_uint, 'uint8'),
'uu16': dmap.DmapTag(tags.read_uint, 'uint16'),
'uu32': dmap.DmapTag(tags.read_uint, 'uint32'),
'uu64': dmap.DmapTag(tags.read_uint, 'uint64'),
'bola': dmap.DmapTag(tags.read_bool, 'bool'),
'bolb': dmap.DmapTag(tags.read_bool, 'bool'),
'stra': dmap.DmapTag(tags.read_str, 'string'),
Expand All @@ -28,12 +29,14 @@ class DmapTest(unittest.TestCase):
def test_parse_uint_of_various_lengths(self):
in_data = tags.uint8_tag('uuu8', 12) + \
tags.uint16_tag('uu16', 37888) + \
tags.uint32_tag('uu32', 305419896)
tags.uint32_tag('uu32', 305419896) + \
tags.uint64_tag('uu64', 8982983289232)
parsed = dmap.parse(in_data, lookup_tag)
self.assertEqual(3, len(parsed))
self.assertEqual(4, len(parsed))
self.assertEqual(12, dmap.first(parsed, 'uuu8'))
self.assertEqual(37888, dmap.first(parsed, 'uu16'))
self.assertEqual(305419896, dmap.first(parsed, 'uu32'))
self.assertEqual(8982983289232, dmap.first(parsed, 'uu64'))

def test_parse_bool(self):
in_data = tags.bool_tag('bola', True) + \
Expand Down
45 changes: 43 additions & 2 deletions tests/test_functional.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
"""Functional tests using the API with a fake Apple TV."""

import pyatv
import aiohttp
import ipaddress

from tests.log_output_handler import LogOutputHandler
from aiohttp.test_utils import (AioHTTPTestCase, unittest_run_loop)

from pyatv import (AppleTVDevice, connect_to_apple_tv, const, exceptions)
from pyatv import (AppleTVDevice, connect_to_apple_tv, const,
exceptions, dmap, tag_definitions, pairing)
from tests.fake_apple_tv import (FakeAppleTV, AppleTVUseCases)
from tests import zeroconf_stub

HSGID = '12345-6789-0'
PAIRING_GUID = '0x0000000000000001'
SESSION_ID = 55555

# This is valid for the PAIR in the pairing module and pin 1234
# (extracted form a real device)
PAIRINGCODE = '690E6FF61E0D7C747654A42AED17047D'


class FunctionalTest(AioHTTPTestCase):

Expand Down Expand Up @@ -45,7 +52,6 @@ def get_connected_device(self, identifier):

@unittest_run_loop
def test_scan_for_apple_tvs(self):
import pyatv
zeroconf_stub.stub(pyatv, zeroconf_stub.homesharing_service(
'AAAA', b'Apple TV', '10.0.0.1', b'aaaa'))

Expand All @@ -56,6 +62,41 @@ def test_scan_for_apple_tvs(self):
self.assertEqual(atvs[0].login_id, 'aaaa')
self.assertEqual(atvs[0].port, 3689)

@unittest_run_loop
def test_pairing_with_device(self):
zeroconf_stub.stub(pairing)

# Start pairing process
yield from pyatv.pair_with_apple_tv(self.loop, 0, 1234, 'pyatv remote')

# Verify that bonjour service was published
zeroconf = zeroconf_stub.instance
self.assertEqual(len(zeroconf.registered_services), 1,
msg='no zeroconf service registered')

service = zeroconf.registered_services[0]
self.assertEqual(service.properties['DvNm'], 'pyatv remote',
msg='remote name does not match')

# Extract port from service (as it is randomized) and request pairing
# with the web server.
url = 'http://127.0.0.1:{}/pairing?pairingcode={}'.format(
service.port, PAIRINGCODE)
session = aiohttp.ClientSession(loop=self.loop)
response = yield from session.request('GET', url)
self.assertEqual(response.status, 200)

# Verify content returned in pairingresponse
data = yield from response.content.read()
parsed = dmap.parse(data, tag_definitions.lookup_tag)
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmpg'), 1)
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmnm'), 'pyatv remote')
self.assertEqual(dmap.first(parsed, 'cmpa', 'cmty'), 'ipod')

response.close()
yield from session.close()
yield from self.atv.logout()

@unittest_run_loop
def test_login_failed(self):
self.usecase.make_login_fail()
Expand Down
16 changes: 15 additions & 1 deletion tests/zeroconf_stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,36 @@ class ZeroconfStub:
def __init__(self, services):
"""Create a new instance of Zeroconf."""
self.services = services
self.registered_services = []

def get_service_info(self, service_type, service_name):
"""Look up service information."""
for service in self.services:
if service.name == service_name:
return service

def register_service(self, service):
"""Save services registered services."""
self.registered_services.append(service)

def unregister_service(self, service):
"""Stub for unregistering services (does nothing)."""
pass

def close(self):
"""Stub for closing zeroconf (does nothing)."""
pass


instance = None


def stub(module, *services):
"""Stub a module using zeroconf."""
def _zeroconf():
return ZeroconfStub(services)
global instance
instance = ZeroconfStub(list(services))
return instance

module.Zeroconf = _zeroconf
module.ServiceBrowser = ServiceBrowserStub

0 comments on commit a0ad21d

Please sign in to comment.