Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Switch to new http2 based APNS protocol
Browse files Browse the repository at this point in the history
Existing libraries either lack tests or are not good matches for
twisted. Created a very light weight apns2 library that does connection
pooling and sends out the new notification. Connection pooling is
strongly suggested by APNs docs.

closes: #662
  • Loading branch information
jrconlin committed Sep 22, 2016
1 parent f47b743 commit 15fea87
Show file tree
Hide file tree
Showing 8 changed files with 318 additions and 251 deletions.
4 changes: 0 additions & 4 deletions autopush/main.py
Expand Up @@ -569,9 +569,5 @@ def endpoint_main(sysargs=None, use_files=True):
# Start the table rotation checker/updater
l = task.LoopingCall(settings.update_rotating_tables)
l.start(60)
if settings.routers.get('apns'):
l = task.LoopingCall(settings.routers['apns']._cleanup)
l.start(10)

reactor.suggestThreadPoolSize(50)
reactor.run()
147 changes: 147 additions & 0 deletions autopush/router/apns2.py
@@ -0,0 +1,147 @@
import json
from collections import deque

import hyper.tls
from hyper import HTTP20Connection
from hyper.http20.exceptions import HTTP20Error

from autopush.router.interface import RouterException

SANDBOX = 'api.development.push.apple.com'
SERVER = 'api.push.apple.com'

APNS_PRIORITY_IMMEDIATE = 10
APNS_PRIORITY_LOW = 5
APNS_MAX_CONNECTIONS = 20


class APNSException(Exception):
pass


class APNSClient(object):
def __init__(self, cert_file, key_file, topic,
alt=False, use_sandbox=False,
max_connections=APNS_MAX_CONNECTIONS,
logger=None, metrics=None):
"""Create the APNS client connector.
The cert_file and key_file can be derived from the exported `.p12`
**Apple Push Services: *bundleID*** key contained in the **Keychain
Access** application. To extract the proper PEM formatted data, you
can use the following commands:
```
openssl pkcs12 -in file.p12 -out apns_cert.pem -clcerts -nokeys
openssl pkcs12 -in file.p12 -out apns_key.pem -nocerts -nodes
```
The *topic* is the Bundle ID of the bridge recipient iOS application.
Since the cert needs to be tied directly to an application, the topic
is usually similar to "com.example.MyApplication".
:param cert_file: Path to the PEM formatted APNs certification file.
:type cert_file: str
:param key_file: Path to the PEM formatted APNs key file.
:type key_file: str
:param topic: The *Bundle ID* that identifies the assoc. iOS app.
:type topic: str
:param alt: Use the alternate APNs publication port (if 443 is blocked)
:type alt: bool
:param use_sandbox: Use the development sandbox
:type use_sandbox: bool
:param max_connections: Max number of pooled connections to use
:type max_connections: int
:param logger: Status logger
:type logger: logger
:param metrics: Metric recorder
:type metrics: autopush.metrics.IMetric
"""
self.server = SANDBOX if use_sandbox else SERVER
self.port = 2197 if alt else 443
self.ssl_context = hyper.tls.init_context(cert=(cert_file, key_file))
self.log = logger
self.metrics = metrics
self.topic = topic
self._max_connections = max_connections
self.connections = deque(maxlen=max_connections)
if self.log:
self.log.info("Starting APNS connection")

self.connections.extendleft((HTTP20Connection(
self.server,
self.port,
ssl_context=self.ssl_context,
force_proto='h2') for x in range(0, max_connections)))

def send(self, router_token, payload, apns_id,
priority=True, topic=None, exp=None):
"""Send the dict of values to the remote bridge
This sends the raw data to the remote bridge application using the
APNS2 HTTP2 API.
:param router_token: APNs provided hex token identifying recipient
:type router_token: str
:param payload: Data to send to recipient
:type payload: dict
:param priority: True is high priority, false is low priority
:type priority: bool
:param topic: BundleID for the recipient application (overides default)
:type topic: str
:param exp: Message expiration timestamp
:type exp: timestamp
"""
body = json.dumps(payload)
priority = APNS_PRIORITY_IMMEDIATE if priority else APNS_PRIORITY_LOW
headers = {
'apns-id': apns_id,
'apns-priority': priority,
'apns-topic': topic or self.topic,
}
if exp:
headers['apns-expiration'] = exp
url = '/3/device/' + router_token
connection = self._get_connection()
try:
# request auto-opens closed connections, so if a connection
# has timed out or failed for other reasons, it's automatically
# re-established.
stream_id = connection.request(
'POST', url=url, body=body, headers=headers)
response = connection.get_response(stream_id)
if response.status != 200:
reason = json.loads(response.read().decode('utf-8'))['reason']
raise RouterException(
"APNS Transmit Error {}:{}".format(response.status,
reason),
status_code=500,
response_body="APNS could not process "
"your message {}".format(reason))
except HTTP20Error as ex:
connection.close()
raise RouterException(
"APNS Processing error: {}".format(repr(ex)),
status_code=503,
response_body="APNS returned an error processing request",
)
finally:
# Returning a closed connection to the pool is ok.
# hyper will reconnect on .request()
self._return_connection(connection)

def _get_connection(self):
try:
connection = self.connections.pop()
return connection
except IndexError:
raise RouterException(
"Too many APNS requests, increase pool from {}".format(
self._max_connections
),
status_code=503,
response_body="APNS busy, please retry")

def _return_connection(self, connection):
self.connections.appendleft(connection)
126 changes: 33 additions & 93 deletions autopush/router/apnsrouter.py
@@ -1,8 +1,10 @@
"""APNS Router"""
import time
import uuid

import apns
from autopush.router.apns2 import (
APNSClient,
APNS_MAX_CONNECTIONS,
)
from twisted.logger import Logger
from twisted.internet.threads import deferToThread
from autopush.router.interface import RouterException, RouterResponse
Expand All @@ -13,49 +15,37 @@ class APNSRouter(object):
"""APNS Router Implementation"""
log = Logger()
apns = None
messages = {}
errors = {0: 'No error',
1: 'Processing error',
2: 'Missing device token',
3: 'Missing topic',
4: 'Missing payload',
5: 'Invalid token size',
6: 'Invalid topic size',
7: 'Invalid payload size',
8: 'Invalid token',
10: 'Shutdown',
255: 'Unknown',
}

def _connect(self, cert_info):

def _connect(self, rel_channel):
"""Connect to APNS
:param cert_info: APNS certificate configuration info
:type cert_info: dict
:param rel_channel: Release channel name (e.g. Firefox. FirefoxBeta,..)
:type rel_channel: str
:returns: APNs to be stored under the proper release channel name.
:rtype: apns.APNs
"""
# Do I still need to call this in _error?
return apns.APNs(
use_sandbox=cert_info.get("sandbox", False),
default_topic = "com.mozilla.org." + rel_channel
cert_info = self._config[rel_channel]
return APNSClient(
cert_file=cert_info.get("cert"),
key_file=cert_info.get("key"),
enhanced=True)
use_sandbox=cert_info.get("sandbox", False),
max_connections=cert_info.get("max_connections",
APNS_MAX_CONNECTIONS),
topic=cert_info.get("topic", default_topic),
logger=self.log,
metrics=self.ap_settings.metrics)

def __init__(self, ap_settings, router_conf):
"""Create a new APNS router and connect to APNS"""
self.ap_settings = ap_settings
self._base_tags = []
self.apns = dict()
self.messages = dict()
self._config = router_conf
self._max_messages = self._config.pop('max_messages', 100)
for rel_channel in self._config:
self.apns[rel_channel] = self._connect(self._config[rel_channel])
self.apns[rel_channel].gateway_server.register_response_listener(
self._error)
self.apns[rel_channel] = self._connect(rel_channel)
self.ap_settings = ap_settings
self.log.debug("Starting APNS router...")

Expand Down Expand Up @@ -95,7 +85,7 @@ def route_notification(self, notification, uaid_data):
"""Start the APNS notification routing, returns a deferred
:param notification: Notification data to send
:type notification: dict
:type notification: autopush.endpoint.Notification
:param uaid_data: User Agent specific data
:type uaid_data: dict
Expand All @@ -116,43 +106,28 @@ def _route(self, notification, router_data):
router_token = router_data["token"]
rel_channel = router_data["rel_channel"]
config = self._config[rel_channel]
if len(self.messages) >= self._max_messages:
raise RouterException("Too many messages in pending queue",
status_code=503,
response_body="Pending buffer full",
)
apns_client = self.apns[rel_channel]
custom = {
payload = {
"chid": notification.channel_id,
"ver": notification.version,
}
if notification.data:
custom["body"] = notification.data
custom["con"] = notification.headers["content-encoding"]
custom["enc"] = notification.headers["encryption"]
payload["body"] = notification.data
payload["con"] = notification.headers["content-encoding"]
payload["enc"] = notification.headers["encryption"]

if "crypto-key" in notification.headers:
custom["cryptokey"] = notification.headers["crypto-key"]
payload["cryptokey"] = notification.headers["crypto-key"]
elif "encryption-key" in notification.headers:
custom["enckey"] = notification.headers["encryption-key"]

payload = apns.Payload(
alert=router_data.get("title", config.get('default_title',
'Mozilla Push')),
content_available=1,
custom=custom)
now = time.time()

# "apns-id"
msg_id = str(uuid.uuid4())
self.messages[msg_id] = {
"time_sent": now,
"rel_channel": router_data["rel_channel"],
"router_token": router_token,
"payload": payload}

apns_client.gateway_server.send_notification(router_token, payload,
msg_id)
payload["enckey"] = notification.headers["encryption-key"]
payload['aps'] = dict(
alert=router_data.get("title", config.get('default_title',
'Mozilla Push')),
content_available=1)
apns_id = str(uuid.uuid4()).lower()

apns_client.send(router_token=router_token, payload=payload,
apns_id=apns_id)
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
self.ap_settings.metrics.increment(
Expand All @@ -163,38 +138,3 @@ def _route(self, notification, router_data):
headers={"TTL": notification.ttl,
"Location": location},
logged_status=200)

def _cleanup(self):
"""clean up pending, but expired messages.
APNs may not always respond with a status code, this will clean out
pending retryable messages.
"""
for msg_id in self.messages.keys():
message = self.messages[msg_id]
expry = self._config[message['rel_channel']].get("expry", 10)
if message["time_sent"] < time.time() - expry:
try:
del self.messages[msg_id]
except KeyError: # pragma nocover
pass

def _error(self, err):
"""Error handler"""
if err['status'] == 0:
self.log.debug("Success")
del self.messages[err['identifier']]
return
self.log.debug("APNs Error encountered: {status}",
status=self.errors[err['status']])
if err['status'] in [1, 255]:
self.log.debug("Retrying...")
resend = self.messages.get(err.get('identifier'))
if resend is None:
return
apns_client = self.apns[resend["rel_channel"]]
apns_client.gateway_server.send_notification(resend['token'],
resend['payload'],
err['identifier'],
)
23 changes: 12 additions & 11 deletions autopush/tests/test_main.py
Expand Up @@ -7,6 +7,8 @@
from nose.tools import eq_, ok_
from twisted.internet.defer import Deferred
from twisted.trial import unittest as trialtest
import hyper
import hyper.tls

from autopush.db import get_rotating_message_table
from autopush.main import (
Expand Down Expand Up @@ -282,14 +284,17 @@ def test_bad_apnsconf(self):
"--apns_creds='[Invalid'"
], False)

def test_ping_settings(self):
@patch('autopush.router.apns2.HTTP20Connection',
spec=hyper.HTTP20Connection)
@patch('hyper.tls', spec=hyper.tls)
def test_ping_settings(self, *args):
ap = make_settings(self.TestArg)
# verify that the hostname is what we said.
eq_(ap.hostname, self.TestArg.hostname)
# gcm isn't created until later since we may have to pull
# config info from s3
eq_(ap.routers["apns"].apns["firefox"].cert_file, "cert.file")
eq_(ap.routers["apns"].apns["firefox"].key_file, "key.file")
eq_(ap.routers["apns"]._config['firefox']['cert'], "cert.file")
eq_(ap.routers["apns"]._config['firefox']['key'], "key.file")
eq_(ap.wake_timeout, 10)

def test_bad_senders(self):
Expand Down Expand Up @@ -317,15 +322,11 @@ def test_gcm_start(self):
"""--senderid_list={"123":{"auth":"abcd"}}""",
], False)

def test_apns_loop(self):
endpoint_main([
"""--apns_creds={"firefox":{"cert":"foo.cert","key":"foo.key"}}"""
], False)
ok_('APNSRouter._cleanup' in
repr(self.mocks.get('autopush.main.task').method_calls[1][1]))

@patch('autopush.router.apns2.HTTP20Connection',
spec=hyper.HTTP20Connection)
@patch('hyper.tls', spec=hyper.tls)
@patch("requests.get")
def test_aws_ami_id(self, request_mock):
def test_aws_ami_id(self, request_mock, mt, mc):
class MockReply:
content = "ami_123"

Expand Down

0 comments on commit 15fea87

Please sign in to comment.