diff --git a/docs/source/_json/login.json b/docs/source/_json/login.json new file mode 100644 index 0000000000..78031d4d95 --- /dev/null +++ b/docs/source/_json/login.json @@ -0,0 +1,9 @@ +POST /plone/@login +Accept: application/json + +HTTP 200 OK +content-type: application/json + +{ + "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmdWxsbmFtZSI6IiIsInN1YiI6ImFkbWluIn0.SZDnl_baH5M_StJJrzfbj7o-5My30NmSFbMrhpSX5I4" +} \ No newline at end of file diff --git a/docs/source/_json/login_renew.json b/docs/source/_json/login_renew.json new file mode 100644 index 0000000000..fd84949e5a --- /dev/null +++ b/docs/source/_json/login_renew.json @@ -0,0 +1,9 @@ +POST /plone/@login-renew +Accept: application/json + +HTTP 200 OK +content-type: application/json + +{ + "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmdWxsbmFtZSI6IiIsInN1YiI6ImFkbWluIn0.SZDnl_baH5M_StJJrzfbj7o-5My30NmSFbMrhpSX5I4" +} \ No newline at end of file diff --git a/docs/source/_json/logout.json b/docs/source/_json/logout.json new file mode 100644 index 0000000000..740035fa59 --- /dev/null +++ b/docs/source/_json/logout.json @@ -0,0 +1,5 @@ +POST /plone/@logout +Accept: application/json + +HTTP 204 No Content + diff --git a/docs/source/authentication.rst b/docs/source/authentication.rst index 278710390b..8fecffc91d 100644 --- a/docs/source/authentication.rst +++ b/docs/source/authentication.rst @@ -31,4 +31,91 @@ Or the same example using ``curl``: .. code:: python - curl -u username:password -H 'Accept:application/json' $URL \ No newline at end of file + curl -u username:password -H 'Accept:application/json' $URL + + +JSON Web Tokens (JWT) +--------------------- + +``plone.restapi`` includes a Plone PAS plugin for authentication with JWT. The +plugin is installed automatically when installing the product. + +A JWT token can be acquired by posting a users credentials to the ``@login`` +endpoint. + +.. example-code:: + + .. code-block:: http-request + + POST /@login HTTP/1.1 + Accept: application/json + Content-Type: application/json + + { + 'login': 'admin', + 'password': 'admin', + } + + .. code-block:: curl + + curl -i \ + -X POST \ + -H "Accept: application/json" \ + -H "Content-type: application/json" \ + --data-raw '{"login":"admin", "password": "admin"}' \ + http://localhost:8080/Plone/@login + + .. code-block:: python-requests + + requests.post('http://localhost:8080/Plone/@login', + headers={'Accept': 'application/json', 'Content-Type': 'application/json'}, + data={'login': 'admin', 'password': 'admin'}) + + +The server responds with a JSON object containing the token. + +.. literalinclude:: _json/login.json + :language: js + +The token can now be used in subsequent requests by including it in the +``Authorization`` header: + +.. code:: + + GET /Plone HTTP/1.1 + Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmdWxsbmFtZSI6IiIsInN1YiI6ImFkbWluIiwiZXhwIjoxNDY0MDQyMTAzfQ.aOyvMwdcIMV6pzC0GYQ3ZMdGaHR1_W7DxT0W0ok4FxI + Accept: application/json + +By default the token will expire after 12 hours and thus must be renewed before +expiration. To renew the token simply post to the ``@login-renew`` endpoint. + +.. code:: + + POST /@login-renew HTTP/1.1 + Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmdWxsbmFtZSI6IiIsInN1YiI6ImFkbWluIiwiZXhwIjoxNDY0MDQyMTAzfQ.aOyvMwdcIMV6pzC0GYQ3ZMdGaHR1_W7DxT0W0ok4FxI + Accept: application/json + +The server returns a JSON object with a new token: + +.. literalinclude:: _json/login_renew.json + :language: js + + +The ``@logout`` endpoint can be used to invalidate tokens. However by default +tokens are not persisted on the server and thus can not be invalidated. To enable +token invaldiation, activate the ``store_tokes`` option in the PAS plugin. If you +need tokens that are valid indefinitely you should also disable the use of Plone's +keyring in the PAS plugin (option ``use_keyring``). + +The logout request must contain the existing token in the ``Authorization`` header. + +.. code:: + + POST /@logout HTTP/1.1 + Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmdWxsbmFtZSI6IiIsInN1YiI6ImFkbWluIiwiZXhwIjoxNDY0MDQyMTAzfQ.aOyvMwdcIMV6pzC0GYQ3ZMdGaHR1_W7DxT0W0ok4FxI + Accept: application/json + +If invalidation succeeds, the server responds with an empty 204 reponse: + +.. literalinclude:: _json/logout.json + :language: js diff --git a/setup.py b/setup.py index b4aafb9dce..ef0503db5e 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ install_requires=[ 'setuptools', 'plone.rest >= 1.0a6', + 'PyJWT', ], extras_require={'test': [ 'Products.Archetypes', diff --git a/src/plone/restapi/__init__.py b/src/plone/restapi/__init__.py index 99e12b83df..ecd6745988 100644 --- a/src/plone/restapi/__init__.py +++ b/src/plone/restapi/__init__.py @@ -1,9 +1,14 @@ # -*- coding: utf-8 -*- from AccessControl import allow_module -allow_module('json') +from AccessControl.Permissions import add_user_folders +from Products.PluggableAuthService.PluggableAuthService import ( + registerMultiPlugin) +from plone.restapi.pas import plugin import pkg_resources +allow_module('json') + try: pkg_resources.get_distribution('plone.app.testing') except pkg_resources.DistributionNotFound: @@ -21,6 +26,15 @@ def initialize(context): + registerMultiPlugin(plugin.JWTAuthenticationPlugin.meta_type) + context.registerClass( + plugin.JWTAuthenticationPlugin, + permission=add_user_folders, + constructors=(plugin.manage_addJWTAuthenticationPlugin, + plugin.addJWTAuthenticationPlugin), + visibility=None, + ) + if REGISTER_TEST_TYPES: from Products.Archetypes.ArchetypeTool import process_types, listTypes from Products.CMFCore import permissions diff --git a/src/plone/restapi/configure.zcml b/src/plone/restapi/configure.zcml index 747f41d2ab..9b3eaea788 100644 --- a/src/plone/restapi/configure.zcml +++ b/src/plone/restapi/configure.zcml @@ -34,6 +34,14 @@ provides="Products.GenericSetup.interfaces.EXTENSION" /> + + + diff --git a/src/plone/restapi/pas/__init__.py b/src/plone/restapi/pas/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/plone/restapi/pas/add_plugin.zpt b/src/plone/restapi/pas/add_plugin.zpt new file mode 100644 index 0000000000..c960d9ba4c --- /dev/null +++ b/src/plone/restapi/pas/add_plugin.zpt @@ -0,0 +1,33 @@ + + +

Header

+ +

Form Title

+ +

+ Plone PAS plugin for authentication with JSON web tokens (JWT) +

+ +
+ + + + + + + + + + + + +
Id
Title
+
+ +
+
+
+ + \ No newline at end of file diff --git a/src/plone/restapi/pas/config.zpt b/src/plone/restapi/pas/config.zpt new file mode 100644 index 0000000000..46e5fd6196 --- /dev/null +++ b/src/plone/restapi/pas/config.zpt @@ -0,0 +1,47 @@ +

PAGE HEADER

+

PAGE HEADER

+ +

JWT Authentication

+ +

+ Plone PAS plugin for authentication with JSON web tokens (JWT). +

+ +
+ + + + + + + + + + + + + + +
Token Validity Timeout (in seconds)
+
After this, the token is invalid and the user + must login again. Set to 0 for the token to remain valid indefinitely.
+
+   +
If enabled, tokens are signed with a secret from + Plone's keyring. If you want tokens that remain valid indefinitely you should disable this.
+
+   +
By default tokens are not stored on the server and + thus can't be invalidated. If enabled, tokens that don't expire can be invalidated.
+
+
+ +
+
+
+ + +

PAGE FOOTER

\ No newline at end of file diff --git a/src/plone/restapi/pas/plugin.py b/src/plone/restapi/pas/plugin.py new file mode 100644 index 0000000000..b3b193ecb1 --- /dev/null +++ b/src/plone/restapi/pas/plugin.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +from AccessControl.SecurityInfo import ClassSecurityInfo +from AccessControl.requestmethod import postonly +from BTrees.OIBTree import OIBTree +from BTrees.OOBTree import OOBTree +from Products.CMFCore.permissions import ManagePortal +from Products.PageTemplates.PageTemplateFile import PageTemplateFile +from Products.PluggableAuthService.interfaces.plugins import ( + IAuthenticationPlugin) +from Products.PluggableAuthService.interfaces.plugins import IChallengePlugin +from Products.PluggableAuthService.interfaces.plugins import IExtractionPlugin +from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin +from datetime import datetime +from datetime import timedelta +from plone.keyring.interfaces import IKeyManager +from plone.keyring.keyring import GenerateSecret +from zope.component import getUtility +from zope.interface import implements + +import jwt +import time + + +manage_addJWTAuthenticationPlugin = PageTemplateFile( + "add_plugin", globals(), __name__="manage_addJWTAuthenticationPlugin") + + +def addJWTAuthenticationPlugin(self, id_, title=None, REQUEST=None): + """Add a JWT authentication plugin + """ + plugin = JWTAuthenticationPlugin(id_, title) + self._setObject(plugin.getId(), plugin) + + if REQUEST is not None: + REQUEST["RESPONSE"].redirect( + "%s/manage_workspace" + "?manage_tabs_message=JWT+authentication+plugin+added." % + self.absolute_url() + ) + + +class JWTAuthenticationPlugin(BasePlugin): + """Plone PAS plugin for authentication with JSON web tokens (JWT). + """ + implements( + IAuthenticationPlugin, + IChallengePlugin, + IExtractionPlugin, + ) + meta_type = "JWT Authentication Plugin" + security = ClassSecurityInfo() + + token_timeout = 60 * 60 * 12 # 12 hours + use_keyring = True + store_tokens = False + _secret = None + _tokens = None + + # ZMI tab for configuration page + manage_options = ( + ({'label': 'Configuration', + 'action': 'manage_config'},) + + BasePlugin.manage_options + ) + security.declareProtected(ManagePortal, 'manage_config') + manage_config = PageTemplateFile('config', globals(), + __name__='manage_config') + + def __init__(self, id_, title=None): + self._setId(id_) + self.title = title + + security.declarePrivate('challenge') + + # Initiate a challenge to the user to provide credentials. + def challenge(self, request, response, **kw): + + realm = response.realm + if realm: + response.setHeader('WWW-Authenticate', + 'Bearer realm="%s"' % realm) + m = "You are not authorized to access this resource." + + response.setBody(m, is_error=1) + response.setStatus(401) + return True + + security.declarePrivate('extractCredentials') + + # IExtractionPlugin implementation + # Extracts a JSON web token from the request. + def extractCredentials(self, request): + creds = {} + auth = request._auth + if auth is None: + return None + if auth[:7].lower() == 'bearer ': + creds['token'] = auth.split()[-1] + else: + return None + + return creds + + security.declarePrivate('authenticateCredentials') + + # IAuthenticationPlugin implementation + def authenticateCredentials(self, credentials): + # Ignore credentials that are not from our extractor + extractor = credentials.get('extractor') + if extractor != self.getId(): + return None + + payload = self._decode_token(credentials['token']) + if not payload: + return None + + if 'sub' not in payload: + return None + + userid = payload['sub'] + + if self.store_tokens: + if userid not in self._tokens: + return None + if credentials['token'] not in self._tokens[userid]: + return None + + return (userid, userid) + + security.declareProtected(ManagePortal, 'manage_updateConfig') + + @postonly + def manage_updateConfig(self, REQUEST): + """Update configuration of JWT Authentication Plugin. + """ + response = REQUEST.response + + self.token_timeout = int(REQUEST.form.get('token_timeout', + self.token_timeout)) + self.use_keyring = bool(REQUEST.form.get('use_keyring', True)) + self.store_tokens = bool(REQUEST.form.get('store_tokens', False)) + if self.store_tokens and self._tokens is None: + self._tokens = OOBTree() + + response.redirect('%s/manage_config?manage_tabs_message=%s' % + (self.absolute_url(), 'Configuration+updated.')) + + def _decode_token(self, token, verify=True): + payload = None + if self.use_keyring: + manager = getUtility(IKeyManager) + for secret in manager[u"_system"]: + if secret is None: + continue + try: + payload = jwt.decode(token, secret) + except jwt.DecodeError: + pass + else: + break + else: + try: + payload = jwt.decode(token, self._secret, verify=verify) + except jwt.DecodeError: + pass + return payload + + def _signing_secret(self): + if self.use_keyring: + manager = getUtility(IKeyManager) + return manager.secret() + if not self._secret: + self._secret = GenerateSecret() + return self._secret + + def delete_token(self, token): + payload = self._decode_token(token, verify=False) + if 'sub' not in payload: + return False + userid = payload['sub'] + if userid in self._tokens and token in self._tokens[userid]: + del self._tokens[userid][token] + return True + + def create_token(self, userid, timeout=None, data=None): + payload = {} + payload['sub'] = userid + if timeout is None: + timeout = self.token_timeout + if timeout: + payload['exp'] = datetime.utcnow() + timedelta(seconds=timeout) + if data is not None: + payload.update(data) + token = jwt.encode(payload, self._signing_secret(), algorithm='HS256') + if self.store_tokens: + if self._tokens is None: + self._tokens = OOBTree() + if userid not in self._tokens: + self._tokens[userid] = OIBTree() + self._tokens[userid][token] = int(time.time()) + return token diff --git a/src/plone/restapi/profiles/default/plone.restapi_various.txt b/src/plone/restapi/profiles/default/plone.restapi_various.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/plone/restapi/services/auth/__init__.py b/src/plone/restapi/services/auth/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/plone/restapi/services/auth/configure.zcml b/src/plone/restapi/services/auth/configure.zcml new file mode 100644 index 0000000000..ab0440f72a --- /dev/null +++ b/src/plone/restapi/services/auth/configure.zcml @@ -0,0 +1,30 @@ + + + + + + + + + diff --git a/src/plone/restapi/services/auth/login.py b/src/plone/restapi/services/auth/login.py new file mode 100644 index 0000000000..2cfea7d0e8 --- /dev/null +++ b/src/plone/restapi/services/auth/login.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +from Acquisition import aq_inner +from Acquisition import aq_parent +from Products.CMFCore.utils import getToolByName +from Products.PluggableAuthService.interfaces.plugins import ( + IAuthenticationPlugin) +from plone.restapi.deserializer import json_body +from plone.restapi.services import Service + + +class Login(Service): + """Handles login and returns a JSON web token (JWT). + """ + def reply(self): + plugin = None + acl_users = getToolByName(self, "acl_users") + plugins = acl_users._getOb('plugins') + authenticators = plugins.listPlugins(IAuthenticationPlugin) + for id_, authenticator in authenticators: + if authenticator.meta_type == "JWT Authentication Plugin": + plugin = authenticator + break + + if plugin is None: + self.request.response.setStatus(501) + return dict(error=dict( + type='Login failed', + message='JWT authentication plugin not installed.')) + + data = json_body(self.request) + if 'login' not in data or 'password' not in data: + self.request.response.setStatus(400) + return dict(error=dict( + type='Missing credentials', + message='Login and password must be provided in body.')) + + userid = data['login'].encode('utf8') + password = data['password'].encode('utf8') + uf = self._find_userfolder(userid) + if uf is not None: + user = uf.authenticate( + userid, password, self.request) + else: + user = None + + if not user: + self.request.response.setStatus(401) + return dict(error=dict( + type='Invalid credentials', + message='Wrong login and/or password.')) + + payload = {} + payload['fullname'] = user.getProperty('fullname') + return { + 'token': plugin.create_token(user.getId(), data=payload) + } + + def _find_userfolder(self, userid): + """Try to find a user folder that contains a user with the given + userid. + """ + uf_parent = aq_inner(self.context) + info = None + + while not info: + uf = getToolByName(uf_parent, 'acl_users') + if uf: + info = uf._verifyUser(uf.plugins, login=userid) + if uf_parent is self.context.getPhysicalRoot(): + break + uf_parent = aq_parent(uf_parent) + + if info: + return uf + return None diff --git a/src/plone/restapi/services/auth/logout.py b/src/plone/restapi/services/auth/logout.py new file mode 100644 index 0000000000..fb868723a8 --- /dev/null +++ b/src/plone/restapi/services/auth/logout.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +from Products.CMFCore.utils import getToolByName +from Products.PluggableAuthService.interfaces.plugins import ( + IAuthenticationPlugin) +from plone.restapi.services import Service + + +class Logout(Service): + """Handles logout by invalidating the JWT + """ + def reply(self): + plugin = None + acl_users = getToolByName(self, "acl_users") + plugins = acl_users._getOb('plugins') + authenticators = plugins.listPlugins(IAuthenticationPlugin) + for id_, authenticator in authenticators: + if authenticator.meta_type == "JWT Authentication Plugin": + plugin = authenticator + break + + if plugin is None: + self.request.response.setStatus(501) + return dict(error=dict( + type='Logout failed', + message='JWT authentication plugin not installed.')) + + if not plugin.store_tokens: + self.request.response.setStatus(501) + return dict(error=dict( + type='Logout failed', + message="Token can't be invalidated")) + + creds = plugin.extractCredentials(self.request) + if creds and 'token' in creds and plugin.delete_token(creds['token']): + self.request.response.setStatus(200) + return super(Logout, self).reply() + + self.request.response.setStatus(400) + return dict(error=dict(type='Logout failed', message="Unknown token")) diff --git a/src/plone/restapi/services/auth/renew.py b/src/plone/restapi/services/auth/renew.py new file mode 100644 index 0000000000..91698f8599 --- /dev/null +++ b/src/plone/restapi/services/auth/renew.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +from Products.CMFCore.utils import getToolByName +from Products.PluggableAuthService.interfaces.plugins import ( + IAuthenticationPlugin) +from plone.restapi.services import Service + + +class Renew(Service): + """Renew authentication token + """ + def reply(self): + plugin = None + acl_users = getToolByName(self, "acl_users") + plugins = acl_users._getOb('plugins') + authenticators = plugins.listPlugins(IAuthenticationPlugin) + for id_, authenticator in authenticators: + if authenticator.meta_type == "JWT Authentication Plugin": + plugin = authenticator + break + + if plugin is None: + self.request.response.setStatus(501) + return dict(error=dict( + type='Renew failed', + message='JWT authentication plugin not installed.')) + + mtool = getToolByName(self.context, 'portal_membership') + user = mtool.getAuthenticatedMember() + payload = {} + payload['fullname'] = user.getProperty('fullname') + new_token = plugin.create_token(user.getId(), data=payload) + if plugin.store_tokens and self.request._auth: + old_token = self.request._auth[7:] + plugin.delete_token(old_token) + return { + 'token': new_token + } diff --git a/src/plone/restapi/services/configure.zcml b/src/plone/restapi/services/configure.zcml index ae6ae0eb9e..e73b3e726b 100644 --- a/src/plone/restapi/services/configure.zcml +++ b/src/plone/restapi/services/configure.zcml @@ -1,6 +1,7 @@ + diff --git a/src/plone/restapi/setuphandlers.py b/src/plone/restapi/setuphandlers.py new file mode 100644 index 0000000000..a8eaf14065 --- /dev/null +++ b/src/plone/restapi/setuphandlers.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +from Products.CMFCore.utils import getToolByName +from plone.restapi.pas.plugin import JWTAuthenticationPlugin + + +def install_pas_plugin(context): + uf = getToolByName(context, 'acl_users') + plugin = JWTAuthenticationPlugin('jwt_auth') + uf._setObject(plugin.getId(), plugin) + plugin = uf['jwt_auth'] + plugin.manage_activateInterfaces([ + 'IAuthenticationPlugin', + 'IExtractionPlugin', + ]) + + +def import_various(context): + """Miscellanous steps import handle + """ + if context.readDataFile('plone.restapi_various.txt') is None: + return + + site = context.getSite() + install_pas_plugin(site) diff --git a/src/plone/restapi/tests/test_auth.py b/src/plone/restapi/tests/test_auth.py new file mode 100644 index 0000000000..2c6405dbf4 --- /dev/null +++ b/src/plone/restapi/tests/test_auth.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +from ZPublisher.pubevents import PubStart +from plone.app.testing import SITE_OWNER_NAME +from plone.app.testing import SITE_OWNER_PASSWORD +from plone.restapi.testing import PLONE_RESTAPI_DX_INTEGRATION_TESTING +from unittest2 import TestCase +from zope.event import notify + + +class TestLogin(TestCase): + + layer = PLONE_RESTAPI_DX_INTEGRATION_TESTING + + def setUp(self): + self.portal = self.layer['portal'] + self.request = self.layer['request'] + + def traverse(self, path='/plone/@login', accept='application/json', + method='POST'): + request = self.layer['request'] + request.environ['PATH_INFO'] = path + request.environ['PATH_TRANSLATED'] = path + request.environ['HTTP_ACCEPT'] = accept + request.environ['REQUEST_METHOD'] = method + notify(PubStart(request)) + return request.traverse(path) + + def test_login_without_pas_plugin_fails(self): + self.portal.acl_users._delOb('jwt_auth') + service = self.traverse() + res = service.reply() + self.assertIn('error', res) + self.assertNotIn('token', res) + + def test_login_without_credentials_fails(self): + service = self.traverse() + res = service.reply() + self.assertIn('error', res) + self.assertNotIn('token', res) + + def test_login_with_invalid_credentials_fails(self): + self.request['BODY'] = '{"login": "admin", "password": "admin"}' + service = self.traverse() + res = service.reply() + self.assertIn('error', res) + self.assertNotIn('token', res) + + def test_successful_login_returns_token(self): + self.request['BODY'] = '{"login": "%s", "password": "%s"}' % ( + SITE_OWNER_NAME, SITE_OWNER_PASSWORD) + service = self.traverse() + res = service.reply() + self.assertIn('token', res) + + +class TestLogout(TestCase): + + layer = PLONE_RESTAPI_DX_INTEGRATION_TESTING + + def setUp(self): + self.portal = self.layer['portal'] + self.request = self.layer['request'] + + def traverse(self, path='/plone/@logout', accept='application/json', + method='POST'): + request = self.layer['request'] + request.environ['PATH_INFO'] = path + request.environ['PATH_TRANSLATED'] = path + request.environ['HTTP_ACCEPT'] = accept + request.environ['REQUEST_METHOD'] = method + notify(PubStart(request)) + return request.traverse(path) + + def test_logout_without_pas_plugin_fails(self): + self.portal.acl_users._delOb('jwt_auth') + service = self.traverse() + res = service.reply() + self.assertIn('error', res) + + def test_logout_with_not_stored_token_fails(self): + self.portal.acl_users.jwt_auth.store_tokens = False + service = self.traverse() + res = service.reply() + self.assertEqual(501, self.request.response.getStatus()) + self.assertEqual("Token can't be invalidated", res['error']['message']) + + def test_logout_with_without_credentials_fails(self): + self.portal.acl_users.jwt_auth.store_tokens = True + service = self.traverse() + res = service.reply() + self.assertEqual(400, self.request.response.getStatus()) + self.assertEqual("Unknown token", res['error']['message']) + + def test_logout_succeeds(self): + self.portal.acl_users.jwt_auth.store_tokens = True + token = self.portal.acl_users.jwt_auth.create_token('admin') + self.request._auth = 'Bearer {}'.format(token) + service = self.traverse() + service.reply() + self.assertEqual(200, self.request.response.getStatus()) + + +class TestRenew(TestCase): + + layer = PLONE_RESTAPI_DX_INTEGRATION_TESTING + + def setUp(self): + self.portal = self.layer['portal'] + self.request = self.layer['request'] + + def traverse(self, path='/plone/@login-renew', accept='application/json', + method='POST'): + request = self.layer['request'] + request.environ['PATH_INFO'] = path + request.environ['PATH_TRANSLATED'] = path + request.environ['HTTP_ACCEPT'] = accept + request.environ['REQUEST_METHOD'] = method + notify(PubStart(request)) + return request.traverse(path) + + def test_renew_without_pas_plugin_fails(self): + self.portal.acl_users._delOb('jwt_auth') + service = self.traverse() + res = service.reply() + self.assertIn('error', res) + + def test_renew_returns_token(self): + service = self.traverse() + res = service.reply() + self.assertIn('token', res) + + def test_renew_deletes_old_token(self): + self.portal.acl_users.jwt_auth.store_tokens = True + token = self.portal.acl_users.jwt_auth.create_token('admin') + self.request._auth = 'Bearer {}'.format(token) + service = self.traverse() + res = service.reply() + self.assertIn('token', res) + self.assertEqual( + 1, len(self.portal.acl_users.jwt_auth._tokens['admin'])) diff --git a/src/plone/restapi/tests/test_documentation.py b/src/plone/restapi/tests/test_documentation.py index 2b4f2e41ed..a67a4216d6 100644 --- a/src/plone/restapi/tests/test_documentation.py +++ b/src/plone/restapi/tests/test_documentation.py @@ -17,9 +17,9 @@ from zope.component import getUtility from zope.intid.interfaces import IIntIds -import unittest2 as unittest - +import json import os +import unittest2 as unittest REQUEST_HEADER_KEYS = [ 'accept' @@ -300,3 +300,48 @@ def test_documentation_types(self): def test_documentation_types_document(self): response = self.api_session.get('@types/Document') save_response_for_documentation('types_document.json', response) + + def test_documentation_login(self): + self.portal.acl_users.jwt_auth._secret = 'secret' + self.portal.acl_users.jwt_auth.use_keyring = False + self.portal.acl_users.jwt_auth.token_timeout = 0 + import transaction + transaction.commit() + self.api_session.auth = None + response = self.api_session.post( + '{}/@login'.format(self.portal.absolute_url()), + json={'login': SITE_OWNER_NAME, 'password': SITE_OWNER_PASSWORD}) + save_response_for_documentation('login.json', response) + + def test_documentation_login_renew(self): + self.portal.acl_users.jwt_auth._secret = 'secret' + self.portal.acl_users.jwt_auth.use_keyring = False + self.portal.acl_users.jwt_auth.token_timeout = 0 + import transaction + transaction.commit() + self.api_session.auth = None + response = self.api_session.post( + '{}/@login'.format(self.portal.absolute_url()), + json={'login': SITE_OWNER_NAME, 'password': SITE_OWNER_PASSWORD}) + token = json.loads(response.content)['token'] + response = self.api_session.post( + '{}/@login-renew'.format(self.portal.absolute_url()), + headers={'Authorization': 'Bearer {}'.format(token)}) + save_response_for_documentation('login_renew.json', response) + + def test_documentation_logout(self): + self.portal.acl_users.jwt_auth._secret = 'secret' + self.portal.acl_users.jwt_auth.use_keyring = False + self.portal.acl_users.jwt_auth.token_timeout = 0 + self.portal.acl_users.jwt_auth.store_tokens = True + import transaction + transaction.commit() + self.api_session.auth = None + response = self.api_session.post( + '{}/@login'.format(self.portal.absolute_url()), + json={'login': SITE_OWNER_NAME, 'password': SITE_OWNER_PASSWORD}) + token = json.loads(response.content)['token'] + response = self.api_session.post( + '{}/@logout'.format(self.portal.absolute_url()), + headers={'Authorization': 'Bearer {}'.format(token)}) + save_response_for_documentation('logout.json', response) diff --git a/src/plone/restapi/tests/test_pas.py b/src/plone/restapi/tests/test_pas.py new file mode 100644 index 0000000000..87ba659b48 --- /dev/null +++ b/src/plone/restapi/tests/test_pas.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +from Products.CMFCore.utils import getToolByName +from plone.keyring.interfaces import IKeyManager +from plone.restapi.testing import PLONE_RESTAPI_DX_INTEGRATION_TESTING +from zope.component import getUtility + +import unittest + + +class TestJWTAuthenticationPlugin(unittest.TestCase): + + layer = PLONE_RESTAPI_DX_INTEGRATION_TESTING + + def setUp(self): + + self.portal = self.layer['portal'] + uf = getToolByName(self.portal, 'acl_users') + self.plugin = uf['jwt_auth'] + + def test_challenge(self): + request = self.layer['request'] + response = request.response + self.plugin.challenge(request, request.response) + self.assertEqual(401, response.getStatus()) + self.assertEqual( + 'Bearer realm="Zope"', response.getHeader('WWW-Authenticate')) + + def test_extract_credentials_without_authorization_header(self): + request = self.layer['request'] + request._auth = '' + self.assertEqual(None, self.plugin.extractCredentials(request)) + + def test_extract_credentials_with_other_authorization_header(self): + request = self.layer['request'] + request._auth = 'Basic YWRtaW46YWRtaW4=' + self.assertEqual(None, self.plugin.extractCredentials(request)) + + def test_extract_credentials_with_bearer_authorization_header(self): + request = self.layer['request'] + request._auth = ( + 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiJ9.' + 'PGnRccPTXeaxA8nzfytWewWRkizJa_ihI_3H6ec-Zbw') + self.assertEqual( + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiJ9.PGnRccP' + 'TXeaxA8nzfytWewWRkizJa_ihI_3H6ec-Zbw', + self.plugin.extractCredentials(request)['token']) + + def test_authenticate_credentials_from_unknown_extractor(self): + creds = {} + creds['extractor'] = 'credentials_basic_auth' + self.assertEqual(None, self.plugin.authenticateCredentials(creds)) + + def test_authenticate_credentials_with_invalid_token(self): + creds = {} + creds['extractor'] = 'jwt_auth' + creds['token'] = 'invalid' + self.assertEqual(None, self.plugin.authenticateCredentials(creds)) + + def test_authenticate_credentials_without_subject(self): + creds = {} + creds['extractor'] = 'jwt_auth' + creds['token'] = ( + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe' + '3iZaWbvV5XKSTbuAn0M') + self.assertEqual(None, self.plugin.authenticateCredentials(creds)) + + def test_authenticate_credentials_with_valid_token(self): + creds = {} + creds['extractor'] = 'jwt_auth' + creds['token'] = self.plugin.create_token('admin') + self.assertEqual( + ('admin', 'admin'), + self.plugin.authenticateCredentials(creds)) + + def test_decode_token_after_key_rotation(self): + token = self.plugin.create_token('admin', timeout=0) + key_manager = getUtility(IKeyManager) + key_manager.rotate() + self.assertEqual({'sub': 'admin'}, self.plugin._decode_token(token)) + + def test_decode_with_static_secret(self): + self.plugin.use_keyring = False + token = self.plugin.create_token('admin', timeout=0) + self.assertEqual({'sub': 'admin'}, self.plugin._decode_token(token)) + + def test_authenticate_credentials_with_stored_token(self): + self.plugin.store_tokens = True + creds = {} + creds['extractor'] = 'jwt_auth' + creds['token'] = self.plugin.create_token('admin') + self.assertEqual( + ('admin', 'admin'), + self.plugin.authenticateCredentials(creds)) + + def test_authenticate_credentials_with_deleted_token_fails(self): + self.plugin.store_tokens = True + creds = {} + creds['extractor'] = 'jwt_auth' + creds['token'] = self.plugin.create_token('admin') + self.plugin.delete_token(creds['token']) + self.assertEqual( + None, + self.plugin.authenticateCredentials(creds))