Skip to content

Commit

Permalink
[oauth2] Added test for resource owner password grant flow
Browse files Browse the repository at this point in the history
  • Loading branch information
elpaso authored and dakcarto committed Oct 27, 2017
1 parent 244e886 commit d259279
Show file tree
Hide file tree
Showing 5 changed files with 544 additions and 23 deletions.
1 change: 1 addition & 0 deletions .ci/travis/linux/before_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@


#pip3 install termcolor
pip install psycopg2 numpy nose2 pyyaml mock future termcolor oauthlib
3 changes: 2 additions & 1 deletion .ci/travis/macos/before_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pip3 install \
pyyaml \
mock \
future \
termcolor
termcolor \
oauthlib

brew install \
qscintilla2 \
Expand Down
3 changes: 3 additions & 0 deletions tests/src/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ IF (WITH_SERVER)
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordOWSTest test_authmanager_password_ows.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPKIOWSTest test_authmanager_pki_ows.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
IF(WITH_OAUTH2_PLUGIN)
ADD_PYTHON_TEST(PyQgsAuthManagerOAuth2OWSTest test_authmanager_oauth2_ows.py)
ENDIF()
ADD_PYTHON_TEST(PyQgsServerServices test_qgsserver_services.py)
ADD_PYTHON_TEST(PyQgsServerModules test_qgsserver_modules.py)
ADD_PYTHON_TEST(PyQgsServerRequest test_qgsserver_request.py)
Expand Down
287 changes: 265 additions & 22 deletions tests/src/python/qgis_wrapped_server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
QGIS Server HTTP wrapper
QGIS Server HTTP wrapper for testing purposes
================================================================================
This script launches a QGIS Server listening on port 8081 or on the port
specified on the environment variable QGIS_SERVER_PORT.
QGIS_SERVER_HOST (defaults to 127.0.0.1)
Hostname is set by environment variable QGIS_SERVER_HOST (defaults to 127.0.0.1)
The server can be configured to support any of the following auth systems
(mutually exclusive):
* PKI
* HTTP Basic
* OAuth2 (requires python package oauthlib, installable with:
with "pip install oauthlib")
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
SECURITY WARNING:
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
This script was developed for testing purposes and was not meant to be secure,
please do not use in a production server any of the authentication systems
implemented here.
HTTPS
--------------------------------------------------------------------------------
HTTPS is automatically enabled for PKI and OAuth2
HTTP Basic
--------------------------------------------------------------------------------
For testing purposes, HTTP Basic can be enabled by setting the following
environment variables:
Expand All @@ -13,21 +42,61 @@
* QGIS_SERVER_USERNAME (default ="username")
* QGIS_SERVER_PASSWORD (default ="password")
PKI
--------------------------------------------------------------------------------
PKI authentication with HTTPS can be enabled with:
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
* QGIS_SERVER_PKI_KEY (server private key)
* QGIS_SERVER_PKI_AUTHORITY (root CA)
* QGIS_SERVER_PKI_USERNAME (valid username)
Sample run:
OAuth2 Resource Owner Grant Flow
--------------------------------------------------------------------------------
OAuth2 Resource Owner Grant Flow with HTTPS can be enabled with:
* QGIS_SERVER_OAUTH2_AUTHORITY (no default)
* QGIS_SERVER_OAUTH2_KEY (server private key)
* QGIS_SERVER_OAUTH2_CERTIFICATE (server certificate)
* QGIS_SERVER_OAUTH2_USERNAME (default ="username")
* QGIS_SERVER_OAUTH2_PASSWORD (default ="password")
* QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN (default = 3600)
Available endpoints:
- /token (returns a new access_token),
optionally specify an expiration time in seconds with ?ttl=<int>
- /refresh (returns a new access_token from a refresh token),
optionally specify an expiration time in seconds with ?ttl=<int>
- /result (check the Bearer token and returns a short sentence if it validates)
Sample runs
--------------------------------------------------------------------------------
PKI:
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
QGIS_SERVER_PKI_KEY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
QGIS_SERVER_PKI_CERTIFICATE=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
QGIS_SERVER_PKI_AUTHORITY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
python3 /home/$USER/dev/QGIS/tests/src/python/qgis_wrapped_server.py
OAuth2:
QGIS_SERVER_PORT=8443 \
QGIS_SERVER_HOST=localhost \
QGIS_SERVER_OAUTH2_AUTHORITY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/chain_subissuer-issuer-root.pem \
QGIS_SERVER_OAUTH2_CERTIFICATE=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
QGIS_SERVER_OAUTH2_KEY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
python3 /home/$USER/dev/QGIS/tests/src/python/qgis_wrapped_server.py
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
Expand All @@ -49,42 +118,70 @@
import sys
import signal
import ssl
import copy
import urllib.parse
from http.server import BaseHTTPRequestHandler, HTTPServer
from qgis.core import QgsApplication
from qgis.server import QgsServer, QgsServerRequest, QgsBufferServerRequest, QgsBufferServerResponse

QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')

# HTTP Basic
QGIS_SERVER_HTTP_BASIC_AUTH = os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH', False)
QGIS_SERVER_USERNAME = os.environ.get('QGIS_SERVER_USERNAME', 'username')
QGIS_SERVER_PASSWORD = os.environ.get('QGIS_SERVER_PASSWORD', 'password')

# PKI authentication
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')

# Check if PKI - https is enabled
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
QGIS_SERVER_PKI_KEY is not None and
os.path.isfile(QGIS_SERVER_PKI_KEY) and
QGIS_SERVER_PKI_AUTHORITY is not None and
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
QGIS_SERVER_PKI_USERNAME)
# OAuth2 authentication
QGIS_SERVER_OAUTH2_CERTIFICATE = os.environ.get('QGIS_SERVER_OAUTH2_CERTIFICATE')
QGIS_SERVER_OAUTH2_KEY = os.environ.get('QGIS_SERVER_OAUTH2_KEY')
QGIS_SERVER_OAUTH2_AUTHORITY = os.environ.get('QGIS_SERVER_OAUTH2_AUTHORITY')
QGIS_SERVER_OAUTH2_USERNAME = os.environ.get('QGIS_SERVER_OAUTH2_USERNAME', 'username')
QGIS_SERVER_OAUTH2_PASSWORD = os.environ.get('QGIS_SERVER_OAUTH2_PASSWORD', 'password')
QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN = os.environ.get('QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN', 3600)

# Check if PKI is enabled
QGIS_SERVER_PKI_AUTH = (
QGIS_SERVER_PKI_CERTIFICATE is not None and
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
QGIS_SERVER_PKI_KEY is not None and
os.path.isfile(QGIS_SERVER_PKI_KEY) and
QGIS_SERVER_PKI_AUTHORITY is not None and
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
QGIS_SERVER_PKI_USERNAME)

# Check if OAuth2 is enabled
QGIS_SERVER_OAUTH2_AUTH = (
QGIS_SERVER_OAUTH2_CERTIFICATE is not None and
os.path.isfile(QGIS_SERVER_OAUTH2_CERTIFICATE) and
QGIS_SERVER_OAUTH2_KEY is not None and
os.path.isfile(QGIS_SERVER_OAUTH2_KEY) and
QGIS_SERVER_OAUTH2_AUTHORITY is not None and
os.path.isfile(QGIS_SERVER_OAUTH2_AUTHORITY) and
QGIS_SERVER_OAUTH2_USERNAME and QGIS_SERVER_OAUTH2_PASSWORD)

HTTPS_ENABLED = QGIS_SERVER_PKI_AUTH or QGIS_SERVER_OAUTH2_AUTH


qgs_app = QgsApplication([], False)
qgs_server = QgsServer()


if os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH') is not None:
if QGIS_SERVER_HTTP_BASIC_AUTH:
from qgis.server import QgsServerFilter
import base64

class HTTPBasicFilter(QgsServerFilter):

def responseComplete(self):
handler = self.serverInterface().requestHandler()
auth = self.serverInterface().requestHandler().requestHeader('HTTP_AUTHORIZATION')
auth = handler.requestHeader('HTTP_AUTHORIZATION')
if auth:
username, password = base64.b64decode(auth[6:]).split(b':')
if (username.decode('utf-8') == os.environ.get('QGIS_SERVER_USERNAME', 'username') and
Expand All @@ -100,6 +197,139 @@ def responseComplete(self):
qgs_server.serverInterface().registerFilter(filter)


if QGIS_SERVER_OAUTH2_AUTH:
from qgis.server import QgsServerFilter
from oauthlib.oauth2 import RequestValidator, LegacyApplicationServer
import base64
from datetime import datetime

# Naive token storage implementation
_tokens = {}

class SimpleValidator(RequestValidator):
"""Validate username and password
Note: does not support scopes or client_id"""

def validate_client_id(self, client_id, request):
return True

def authenticate_client(self, request, *args, **kwargs):
"""Wide open"""
request.client = type("Client", (), {'client_id': 'my_id'})
return True

def validate_user(self, username, password, client, request, *args, **kwargs):
if username == QGIS_SERVER_OAUTH2_USERNAME and password == QGIS_SERVER_OAUTH2_PASSWORD:
return True
return False

def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):
# Clients should only be allowed to use one type of grant.
return grant_type in ('password', 'refresh_token')

def get_default_scopes(self, client_id, request, *args, **kwargs):
# Scopes a client will authorize for if none are supplied in the
# authorization request.
return ('my_scope', )

def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
"""Wide open"""
return True

def save_bearer_token(self, token, request, *args, **kwargs):
# Remember to associate it with request.scopes, request.user and
# request.client. The two former will be set when you validate
# the authorization code. Don't forget to save both the
# access_token and the refresh_token and set expiration for the
# access_token to now + expires_in seconds.
_tokens[token['access_token']] = copy.copy(token)
_tokens[token['access_token']]['expiration'] = datetime.now().timestamp() + int(token['expires_in'])

def validate_bearer_token(self, token, scopes, request):
"""Check the token"""
return token in _tokens and _tokens[token]['expiration'] > datetime.now().timestamp()

def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):
"""Ensure the Bearer token is valid and authorized access to scopes."""
for t in _tokens.values():
if t['refresh_token'] == refresh_token:
return True
return False

def get_original_scopes(self, refresh_token, request, *args, **kwargs):
"""Get the list of scopes associated with the refresh token."""
return []

validator = SimpleValidator()
oauth_server = LegacyApplicationServer(validator, token_expires_in=QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN)

class OAuth2Filter(QgsServerFilter):
"""This filter provides testing endpoint for OAuth2 Resource Owner Grant Flow
Available endpoints:
- /token (returns a new access_token),
optionally specify an expiration time in seconds with ?ttl=<int>
- /refresh (returns a new access_token from a refresh token),
optionally specify an expiration time in seconds with ?ttl=<int>
- /result (check the Bearer token and returns a short sentence if it validates)
"""

def responseComplete(self):

handler = self.serverInterface().requestHandler()

def _token(ttl):
"""Common code for new and refresh token"""
handler.clear()
body = bytes(handler.data()).decode('utf8')
old_expires_in = oauth_server.default_token_type.expires_in
# Hacky way to dynamically set token expiration time
oauth_server.default_token_type.expires_in = ttl
headers, payload, code = oauth_server.create_token_response('/token', 'post', body, {})
oauth_server.default_token_type.expires_in = old_expires_in
for k, v in headers.items():
handler.setResponseHeader(k, v)
handler.setStatusCode(code)
handler.appendBody(payload.encode('utf-8'))

# Token expiration
ttl = handler.parameterMap().get('TTL', QGIS_SERVER_OAUTH2_TOKEN_EXPIRES_IN)
# Issue a new token
if handler.url().find('/token') != -1:
_token(ttl)
return

# Refresh token
if handler.url().find('/refresh') != -1:
_token(ttl)
return

# Check for valid token
auth = handler.requestHeader('HTTP_AUTHORIZATION')
if auth:
result, response = oauth_server.verify_request(handler.url(), 'post', '', {'Authorization': auth})
if result:
# This is a test endpoint for OAuth2, it requires a valid token
if handler.url().find('/result') != -1:
handler.clear()
handler.appendBody(b'Valid Token: enjoy OAuth2')
# Standard flow
return
else:
# Wrong token, default response 401
pass

# No auth ...
handler.clear()
handler.setStatusCode(401)
handler.setResponseHeader('Status', '401 Unauthorized')
handler.setResponseHeader('WWW-Authenticate', 'Bearer realm="QGIS Server"')
handler.appendBody(b'Invalid Token: Authorization required.')

filter = OAuth2Filter(qgs_server.serverInterface())
qgs_server.serverInterface().registerFilter(filter)


class Handler(BaseHTTPRequestHandler):

def do_GET(self, post_body=None):
Expand Down Expand Up @@ -130,16 +360,29 @@ def do_POST(self):

if __name__ == '__main__':
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
if https:
server.socket = ssl.wrap_socket(server.socket,
certfile=QGIS_SERVER_PKI_CERTIFICATE,
keyfile=QGIS_SERVER_PKI_KEY,
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
cert_reqs=ssl.CERT_REQUIRED,
server_side=True,
ssl_version=ssl.PROTOCOL_TLSv1)
# HTTPS is enabled if any of PKI or OAuth2 are enabled too
if HTTPS_ENABLED:
if QGIS_SERVER_OAUTH2_AUTH:
server.socket = ssl.wrap_socket(
server.socket,
certfile=QGIS_SERVER_OAUTH2_CERTIFICATE,
ca_certs=QGIS_SERVER_OAUTH2_AUTHORITY,
keyfile=QGIS_SERVER_OAUTH2_KEY,
server_side=True,
#cert_reqs=ssl.CERT_REQUIRED, # No certs for OAuth2
ssl_version=ssl.PROTOCOL_TLSv1)
else:
server.socket = ssl.wrap_socket(
server.socket,
certfile=QGIS_SERVER_PKI_CERTIFICATE,
keyfile=QGIS_SERVER_PKI_KEY,
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
cert_reqs=ssl.CERT_REQUIRED,
server_side=True,
ssl_version=ssl.PROTOCOL_TLSv1)

print('Starting server on %s://%s:%s, use <Ctrl-C> to stop' %
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port), flush=True)
('https' if HTTPS_ENABLED else 'http', QGIS_SERVER_HOST, server.server_port), flush=True)

def signal_handler(signal, frame):
global qgs_app
Expand Down
Loading

0 comments on commit d259279

Please sign in to comment.