Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Use FCM HTTPv1 protocol with twisted async
Browse files Browse the repository at this point in the history
With the coming switch to FCM, it makes sense to also switch to the new
FCM HTTPv1 protocol.

Closes #1291
  • Loading branch information
jrconlin committed Dec 13, 2018
1 parent 991b5d5 commit e111ec1
Show file tree
Hide file tree
Showing 10 changed files with 816 additions and 15 deletions.
32 changes: 21 additions & 11 deletions autopush/config.py
Expand Up @@ -283,17 +283,27 @@ def from_argparse(cls, ns, **kwargs):
client_certs[sig] = name

if ns.fcm_enabled:
# Create a common gcmclient
if not ns.fcm_auth:
raise InvalidConfig("No Authorization Key found for FCM")
if not ns.fcm_senderid:
raise InvalidConfig("No SenderID found for FCM")
router_conf["fcm"] = {"ttl": ns.fcm_ttl,
"dryrun": ns.fcm_dryrun,
"max_data": ns.max_data,
"collapsekey": ns.fcm_collapsekey,
"auth": ns.fcm_auth,
"senderid": ns.fcm_senderid}
fcm_core = {
"ttl": ns.fcm_ttl,
"dryrun": ns.fcm_dryrun,
"max_data": ns.max_data,
"collapsekey": ns.fcm_collapsekey}
if len(ns.fcm_auth) > 0:
if not ns.fcm_senderid:
raise InvalidConfig("No SenderID found for FCM")
fcm_core.update({
"version": 0,
"auth": ns.fcm_auth,
"senderID": ns.fcm_senderid})
if len(ns.fcm_service_cred_path) > 0:
fcm_core.update({
"ttl": ns.fcm_ttl,
"version": 1,
"service_cred_path": ns.fcm_service_cred_path,
"senderID": ns.fcm_project_id})
if "version" not in fcm_core:
raise InvalidConfig("No credential info found for FCM")
router_conf["fcm"] = fcm_core

if ns.adm_creds:
# Create a common admclient
Expand Down
8 changes: 8 additions & 0 deletions autopush/main_argparse.py
Expand Up @@ -180,6 +180,14 @@ def _add_external_router_args(parser):
type=str, default="", env_var="FCM_AUTH")
parser.add_argument('--fcm_senderid', help='SenderID for FCM',
type=str, default="", env_var="FCM_SENDERID")
# FCM v1 HTTP API
parser.add_argument('--fcm_project_id', help="FCM Project identifier",
type=str, default="", env_var="FCM_PROJECT_ID")
parser.add_argument('--fcm_service_cred_path',
help="Path to FCM Service Credentials",
type=str, default="",
env_var="FCM_SERVICE_CRED_PATH")

# Apple Push Notification system (APNs) for iOS
# credentials consist of JSON struct containing a channel type
# followed by the settings,
Expand Down
13 changes: 11 additions & 2 deletions autopush/router/__init__.py
Expand Up @@ -15,10 +15,11 @@
from autopush.router.interface import IRouter # noqa
from autopush.router.webpush import WebPushRouter
from autopush.router.fcm import FCMRouter
from autopush.router.fcm_v1 import FCMv1Router
from autopush.router.adm import ADMRouter

__all__ = ["APNSRouter", "FCMRouter", "GCMRouter", "WebPushRouter",
"ADMRouter"]
__all__ = ["APNSRouter", "FCMRouter", "FCMv1Router", "GCMRouter",
"WebPushRouter", "ADMRouter"]


def routers_from_config(conf, db, agent):
Expand All @@ -34,4 +35,12 @@ def routers_from_config(conf, db, agent):
routers["gcm"] = GCMRouter(conf, router_conf["gcm"], db.metrics)
if 'adm' in router_conf:
routers["adm"] = ADMRouter(conf, router_conf["adm"], db.metrics)
if 'fcm' in router_conf:
# There are two forms of FCM that can be used. "Legacy" and "v1".
# While possible, they really should not be used in parallel, and only
# one form should be defined.
if router_conf['fcm']['version'] == 0:
routers["fcm"] = FCMRouter(conf, router_conf["fcm"], db.metrics)
if router_conf['fcm']['version'] == 1:
routers["fcm"] = FCMv1Router(conf, router_conf["fcm"], db.metrics)
return routers
2 changes: 1 addition & 1 deletion autopush/router/fcm.py
@@ -1,4 +1,4 @@
"""FCM Router"""
"""FCM legacy HTTP Router"""
from typing import Any # noqa

import pyfcm
Expand Down
168 changes: 168 additions & 0 deletions autopush/router/fcm_v1.py
@@ -0,0 +1,168 @@
"""FCM v1 HTTP Router"""
from typing import Any # noqa

from twisted.internet.error import ConnectError, TimeoutError
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.router.fcm import FCMRouter
from autopush.router.fcmv1client import (FCMv1, FCMAuthenticationError)
from autopush.types import JSONDict # noqa


class FCMv1Router(FCMRouter):
"""FCM v1 HTTP Router Implementation
Note: FCM v1 is a newer version of the FCM HTTP API.
"""

def __init__(self, conf, router_conf, metrics):
"""Create a new FCM router and connect to FCM"""
self.conf = conf
self.router_conf = router_conf
self.metrics = metrics
self.min_ttl = router_conf.get("ttl", 60)
self.dryRun = router_conf.get("dryrun", False)
self.collapseKey = router_conf.get("collapseKey", "webpush")
self.senderID = router_conf.get("senderID")
self.version = router_conf["version"]
self.log = Logger()
self.fcm = FCMv1(project_id=self.senderID,
service_cred_path=router_conf['service_cred_path'],
logger=self.log,
metrics=self.metrics)
self._base_tags = ["platform:fcmv1"]
self.log.debug("Starting FCMv1 router...")

def amend_endpoint_response(self, response, router_data):
# type: (JSONDict, JSONDict) -> None
response["senderid"] = self.senderID

def register(self, uaid, router_data, app_id, *args, **kwargs):
# type: (str, JSONDict, str, *Any, **Any) -> None
"""Validate that the FCM Instance Token is in the ``router_data``"""
senderid = app_id
# "token" is the FCM token generated by the client.
if "token" not in router_data:
raise self._error("connect info missing FCM Instance 'token'",
status=401,
uri=kwargs.get('uri'),
senderid=repr(senderid))
if not (senderid == self.senderID):
raise self._error("Invalid SenderID", status=410, errno=105)
# Assign a senderid
router_data["creds"] = {"senderID": self.senderID}

def route_notification(self, notification, uaid_data):
"""Start the FCM notification routing, returns a deferred"""
router_data = uaid_data["router_data"]
# Kick the entire notification routing off to a thread
return self._route(notification, router_data)

def _route(self, notification, router_data):
"""Blocking FCM call to route the notification"""
# THIS MUST MATCH THE CHANNELID GENERATED BY THE REGISTRATION SERVICE
# Currently this value is in hex form.
data = {"chid": notification.channel_id.hex}
if not router_data.get("token"):
raise self._error("No registration token found. "
"Rejecting message.",
410, errno=106, log_exception=False)
# Payload data is optional. The endpoint handler validates that the
# correct encryption headers are included with the data.
if notification.data:
mdata = self.router_conf.get('max_data', 4096)
if notification.data_length > mdata:
raise self._error("This message is intended for a " +
"constrained device and is limited " +
"to 3070 bytes. Converted buffer too " +
"long by %d bytes" %
(notification.data_length - mdata),
413, errno=104, log_exception=False)

data['body'] = notification.data
data['con'] = notification.headers['encoding']

if 'encryption' in notification.headers:
data['enc'] = notification.headers['encryption']
if 'crypto_key' in notification.headers:
data['cryptokey'] = notification.headers['crypto_key']
elif 'encryption_key' in notification.headers:
data['enckey'] = notification.headers['encryption_key']

# registration_ids are the FCM instance tokens (specified during
# registration.
router_ttl = min(self.MAX_TTL,
max(self.min_ttl, notification.ttl or 0))
d = self.fcm.send(
token=router_data.get("token"),
payload={
"collapse_key": self.collapseKey,
"data_message": data,
"dry_run": self.dryRun or ('dryrun' in router_data),
"ttl": router_ttl
})
d.addCallback(
self._process_reply, notification, router_data, router_ttl
)
d.addErrback(
self._process_error
)
return d

def _process_error(self, failure):
err = failure.value
if isinstance(err, FCMAuthenticationError):
self.log.error("FCM Authentication Error: {}".format(err))
raise RouterException("Server error", status_code=500, errno=901)
if isinstance(err, TimeoutError):
self.log.warn("FCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="timeout"))
raise RouterException("Server error", status_code=502,
errno=903,
log_exception=False)
if isinstance(err, ConnectError):
self.log.warn("FCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
errno=902,
log_exception=False)
if isinstance(err, RouterException):
self.log.warn("FCM Error: {}".format(err))
self.metrics.increment("notification.bridge.error",
tags=make_tags(
self._base_tags,
reason="server_error"))
return failure

def _error(self, err, status, **kwargs):
"""Error handler that raises the RouterException"""
self.log.debug(err, **kwargs)
return RouterException(err, status_code=status, response_body=err,
**kwargs)

def _process_reply(self, reply, notification, router_data, ttl):
"""Process FCM send reply"""
# acks:
# for reg_id, msg_id in reply.success.items():
# updates
# Failures are returned as non-200 messages (404, 410, etc.)
self.metrics.increment("notification.bridge.sent",
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.conf.endpoint_url, notification.version)
return RouterResponse(status_code=201, response_body="",
headers={"TTL": ttl,
"Location": location},
logged_status=200)
142 changes: 142 additions & 0 deletions autopush/router/fcmv1client.py
@@ -0,0 +1,142 @@
import json

import treq
from oauth2client.service_account import ServiceAccountCredentials
from twisted.logger import Logger
from twisted.internet.error import (ConnectError, TimeoutError)

from autopush.exceptions import RouterException


class FCMAuthenticationError(Exception):
pass


class Result(object):

def __init__(self, response):
self.code = response.code
self.success = 0
self.retry_message = None
self.retry_after = (
response.headers.getRawHeaders('Retry-After') or [None])[0]

def parse_response(self, content):
# 400 will return an error message indicating what's wrong with the
# javascript message you sent.
# 403 is an error indicating that the client app is missing the
# FCM Cloud Messaging permission (and a URL to set it)
# Successful content body
# { "name": "projects/.../messages/0:..."}
# Failures:
# { "error":
# { "status": str
# "message": str
# "code": u64
# "details: [
# {"errorCode": str,
# "@type": str},
# {"fieldViolations": [
# {"field": str,
# "description": str}
# ],
# "type", str
# }
# ]
# }
# }
# (Failures are a tad more verbose)
if 500 <= self.code <= 599:
self.retry_message = content
return self
try:
data = json.loads(content)
if self.code in (400, 403, 404) or data.get('error'):
# Having a hard time finding information about how some
# things are handled in FCM, e.g. retransmit requests.
# For now, catalog them as errors and provide back-pressure.
err = data.get("error")
raise RouterException("{}: {}".format(err.get("status"),
err.get("message")))
if "name" in data:
self.success = 1
except (TypeError, ValueError, KeyError, AttributeError):
raise RouterException(
"Unknown error response: {}".format(content))
return self


class FCMv1(object):
def __init__(self,
project_id,
service_cred_path=None,
logger=None,
metrics=None,
**options):
self.project_id = project_id
self.endpoint = ("https://fcm.googleapis.com/v1/"
"projects/{}/messages:send".format(self.project_id))

self.token = None
self.metrics = metrics
self.logger = logger or Logger()
self._options = options
if service_cred_path:
self.svc_cred = ServiceAccountCredentials.from_json_keyfile_name(
service_cred_path,
["https://www.googleapis.com/auth/firebase.messaging"])
self._sender = treq.post

def _get_access_token(self):
return self.svc_cred.get_access_token()

def _build_message(self, token, notif):
msg = {
"token": token,
# Specify the various formats (we use android only)
"android": {
# TTL is a duration string e.g. ("60s")
"ttl": str(int(notif.get("ttl", 0)))+"s",
"data": notif.get("data_message")
},
}
# Wrap up the whole thing in a "message" tag.
return {"message": msg}

def process(self, response, payload=None):
if response.code == 401:
raise FCMAuthenticationError("Authentication Error")

result = Result(response)

d = response.text()
d.addCallback(result.parse_response)
return d

def error(self, failure):
if isinstance(failure.value, FCMAuthenticationError) or \
isinstance(failure.value, TimeoutError) or \
isinstance(failure.value, ConnectError):
raise failure.value
self.logger.error("FCMv1Client failure: {}".format(failure.value))
raise RouterException("Server error: {}".format(failure.value))

def send(self, token, payload):
atoken = self._get_access_token()
headers = {
'Authorization': 'Bearer {}'.format(atoken.access_token),
'Content-Type': 'application/json; UTF-8'
}
message = self._build_message(token, payload)
if 'timeout' not in self._options:
self._options['timeout'] = 3

d = self._sender(
url=self.endpoint,
headers=headers,
data=json.dumps(message),
**self._options
)
d.addCallback(self.process, payload)
d.addErrback(self.error)
return d

0 comments on commit e111ec1

Please sign in to comment.