Skip to content

Commit

Permalink
feat: verify signature from event webhook (#901)
Browse files Browse the repository at this point in the history
When enabling the "Signed Event Webhook Requests" feature in Mail Settings, Twilio SendGrid will generate a private and public key pair using the Elliptic Curve Digital Signature Algorithm (ECDSA). Once that is successfully enabled, all new event posts will have two new headers: X-Twilio-Email-Event-Webhook-Signature and X-Twilio-Email-Event-Webhook-Timestamp, which can be used to validate your events.

This SDK update will make it easier to verify signatures from signed event webhook requests by using the VerifySignature method. Pass in the public key, event payload, signature, and timestamp to validate. Note: You will need to convert your public key string to an elliptic public key object in order to use the VerifySignature method.
  • Loading branch information
eshanholtz committed Jun 15, 2020
1 parent 75cc2d1 commit cb58667
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Makefile
@@ -1,6 +1,6 @@
.PHONY: venv install test-install test test-integ test-docker clean nopyc

venv:
venv: clean
@python --version || (echo "Python is not installed, please install Python 2 or Python 3"; exit 1);
virtualenv --python=python venv

Expand Down
14 changes: 14 additions & 0 deletions examples/helpers/eventwebhook/eventwebhook_example.py
@@ -0,0 +1,14 @@
from sendgrid.helpers.eventwebhook import EventWebhook, EventWebhookHeader

def is_valid_signature(request):
public_key = 'base64-encoded public key'

event_webhook = EventWebhook()
ec_public_key = event_webhook.convert_public_key_to_ecdsa(public_key)

return event_webhook.verify_signature(
request.text,
request.headers[EventWebhookHeader.SIGNATURE],
request.headers[EventWebhookHeader.TIMESTAMP],
ec_public_key
)
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -3,3 +3,4 @@ PyYAML>=4.2b1
python-http-client>=3.2.1
six==1.11.0
pytest==3.8.2
starkbank-ecdsa>=1.0.0
1 change: 1 addition & 0 deletions sendgrid/__init__.py
Expand Up @@ -18,6 +18,7 @@
from .helpers.endpoints import * # noqa
from .helpers.mail import * # noqa
from .helpers.stats import * # noqa
from .helpers.eventwebhook import * # noqa
from .sendgrid import SendGridAPIClient # noqa
from .twilio_email import TwilioEmailAPIClient # noqa
from .version import __version__
50 changes: 50 additions & 0 deletions sendgrid/helpers/eventwebhook/__init__.py
@@ -0,0 +1,50 @@
from ellipticcurve.ecdsa import Ecdsa
from ellipticcurve.publicKey import PublicKey
from ellipticcurve.signature import Signature

from .eventwebhook_header import EventWebhookHeader

class EventWebhook:
"""
This class allows you to use the Event Webhook feature. Read the docs for
more details: https://sendgrid.com/docs/for-developers/tracking-events/event
"""

def __init__(self, public_key=None):
"""
Construct the Event Webhook verifier object
:param public_key: verification key under Mail Settings
:type public_key: string
"""
self.public_key = self.convert_public_key_to_ecdsa(public_key) if public_key else public_key

def convert_public_key_to_ecdsa(self, public_key):
"""
Convert the public key string to a ECPublicKey.
:param public_key: verification key under Mail Settings
:type public_key string
:return: public key using the ECDSA algorithm
:rtype PublicKey
"""
return PublicKey.fromPem(public_key)

def verify_signature(self, payload, signature, timestamp, public_key=None):
"""
Verify signed event webhook requests.
:param payload: event payload in the request body
:type payload: string
:param signature: value obtained from the 'X-Twilio-Email-Event-Webhook-Signature' header
:type signature: string
:param timestamp: value obtained from the 'X-Twilio-Email-Event-Webhook-Timestamp' header
:type timestamp: string
:param public_key: elliptic curve public key
:type public_key: PublicKey
:return: true or false if signature is valid
"""
timestamped_payload = timestamp + payload
decoded_signature = Signature.fromBase64(signature)

key = public_key or self.public_key
return Ecdsa.verify(timestamped_payload, decoded_signature, key)
10 changes: 10 additions & 0 deletions sendgrid/helpers/eventwebhook/eventwebhook_header.py
@@ -0,0 +1,10 @@
class EventWebhookHeader:
"""
This class lists headers that get posted to the webhook. Read the docs for
more details: https://sendgrid.com/docs/for-developers/tracking-events/event
"""
SIGNATURE = 'X-Twilio-Email-Event-Webhook-Signature'
TIMESTAMP = 'X-Twilio-Email-Event-Webhook-Timestamp'

def __init__(self):
pass
2 changes: 1 addition & 1 deletion sendgrid/helpers/inbound/config.py
Expand Up @@ -16,7 +16,7 @@ def __init__(self, **opts):
'path', os.path.abspath(os.path.dirname(__file__))
)
with open('{0}/config.yml'.format(self.path)) as stream:
config = yaml.load(stream)
config = yaml.load(stream, Loader=yaml.FullLoader)
self._debug_mode = config['debug_mode']
self._endpoint = config['endpoint']
self._host = config['host']
Expand Down
42 changes: 42 additions & 0 deletions test/test_eventwebhook.py
@@ -0,0 +1,42 @@
import json
import unittest

from sendgrid import EventWebhook


class UnitTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.PUBLIC_KEY = 'MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEDr2LjtURuePQzplybdC+u4CwrqDqBaWjcMMsTbhdbcwHBcepxo7yAQGhHPTnlvFYPAZFceEu/1FwCM/QmGUhA=='
cls.SIGNATURE = 'MEUCIQCtIHJeH93Y+qpYeWrySphQgpNGNr/U+UyUlBkU6n7RAwIgJTz2C+8a8xonZGi6BpSzoQsbVRamr2nlxFDWYNH2j/0='
cls.TIMESTAMP = '1588788367'
cls.PAYLOAD = json.dumps({
'event': 'test_event',
'category': 'example_payload',
'message_id': 'message_id',
}, sort_keys=True, separators=(',', ':'))

def test_verify_valid_signature(self):
ew = EventWebhook()
key = ew.convert_public_key_to_ecdsa(self.PUBLIC_KEY)
self.assertTrue(ew.verify_signature(self.PAYLOAD, self.SIGNATURE, self.TIMESTAMP, key))

def test_verify_bad_key(self):
ew = EventWebhook('MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEqTxd43gyp8IOEto2LdIfjRQrIbsd4SXZkLW6jDutdhXSJCWHw8REntlo7aNDthvj+y7GjUuFDb/R1NGe1OPzpA==')
self.assertFalse(ew.verify_signature(self.PAYLOAD, self.SIGNATURE, self.TIMESTAMP))

def test_verify_bad_payload(self):
ew = EventWebhook(self.PUBLIC_KEY)
self.assertFalse(ew.verify_signature('payload', self.SIGNATURE, self.TIMESTAMP))

def test_verify_bad_signature(self):
ew = EventWebhook(self.PUBLIC_KEY)
self.assertFalse(ew.verify_signature(
self.PAYLOAD,
'MEUCIQCtIHJeH93Y+qpYeWrySphQgpNGNr/U+UyUlBkU6n7RAwIgJTz2C+8a8xonZGi6BpSzoQsbVRamr2nlxFDWYNH3j/0=',
self.TIMESTAMP
))

def test_verify_bad_timestamp(self):
ew = EventWebhook(self.PUBLIC_KEY)
self.assertFalse(ew.verify_signature(self.PAYLOAD, self.SIGNATURE, 'timestamp'))

0 comments on commit cb58667

Please sign in to comment.