-
-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3724 from elpaso/auth_tests_2_14
[tests] Authmanager tests for username/pwd and PKI
- Loading branch information
Showing
11 changed files
with
973 additions
and
111 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
QGIS Server HTTP wrapper | ||
This script launches a QGIS Server listening on port 8081 or on the port | ||
specified on the environment variable QGIS_SERVER_PORT. | ||
QGIS_SERVER_HOST (defaults to 127.0.0.1) | ||
For testing purposes, HTTP Basic can be enabled by setting the following | ||
environment variables: | ||
* QGIS_SERVER_HTTP_BASIC_AUTH (default not set, set to anything to enable) | ||
* QGIS_SERVER_USERNAME (default ="username") | ||
* QGIS_SERVER_PASSWORD (default ="password") | ||
PKI authentication with HTTPS can be enabled with: | ||
* QGIS_SERVER_PKI_CERTIFICATE (server certificate) | ||
* QGIS_SERVER_PKI_KEY (server private key) | ||
* QGIS_SERVER_PKI_AUTHORITY (root CA) | ||
* QGIS_SERVER_PKI_USERNAME (valid username) | ||
Sample run: | ||
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \ | ||
QGIS_SERVER_PKI_KEY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \ | ||
QGIS_SERVER_PKI_CERTIFICATE=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \ | ||
QGIS_SERVER_PKI_AUTHORITY=/home/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \ | ||
python /home/dev/QGIS/tests/src/python/qgis_wrapped_server.py | ||
.. note:: This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; either version 2 of the License, or | ||
(at your option) any later version. | ||
""" | ||
from __future__ import print_function | ||
from future import standard_library | ||
standard_library.install_aliases() | ||
|
||
__author__ = 'Alessandro Pasotti' | ||
__date__ = '05/15/2016' | ||
__copyright__ = 'Copyright 2016, The QGIS Project' | ||
# This will get replaced with a git SHA1 when you do a git archive | ||
__revision__ = '$Format:%H$' | ||
|
||
|
||
import os | ||
import sys | ||
import ssl | ||
import urllib.parse | ||
from http.server import BaseHTTPRequestHandler, HTTPServer | ||
from qgis.server import QgsServer, QgsServerFilter | ||
|
||
QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081')) | ||
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1') | ||
# PKI authentication | ||
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE') | ||
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY') | ||
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY') | ||
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME') | ||
|
||
# Check if PKI - https is enabled | ||
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and | ||
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and | ||
QGIS_SERVER_PKI_KEY is not None and | ||
os.path.isfile(QGIS_SERVER_PKI_KEY) and | ||
QGIS_SERVER_PKI_AUTHORITY is not None and | ||
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and | ||
QGIS_SERVER_PKI_USERNAME) | ||
|
||
qgs_server = QgsServer() | ||
|
||
if os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH') is not None: | ||
import base64 | ||
|
||
class HTTPBasicFilter(QgsServerFilter): | ||
|
||
def responseComplete(self): | ||
request = self.serverInterface().requestHandler() | ||
if self.serverInterface().getEnv('HTTP_AUTHORIZATION'): | ||
username, password = base64.b64decode(self.serverInterface().getEnv('HTTP_AUTHORIZATION')[6:]).split(':') | ||
if (username == os.environ.get('QGIS_SERVER_USERNAME', 'username') | ||
and password == os.environ.get('QGIS_SERVER_PASSWORD', 'password')): | ||
return | ||
# No auth ... | ||
request.clearHeaders() | ||
request.setHeader('Status', '401 Authorization required') | ||
request.setHeader('WWW-Authenticate', 'Basic realm="QGIS Server"') | ||
request.clearBody() | ||
request.appendBody('<h1>Authorization required</h1>') | ||
|
||
filter = HTTPBasicFilter(qgs_server.serverInterface()) | ||
qgs_server.serverInterface().registerFilter(filter) | ||
|
||
|
||
class Handler(BaseHTTPRequestHandler): | ||
|
||
def do_GET(self): | ||
# For PKI: check the username from client certificate | ||
if https: | ||
try: | ||
ssl.match_hostname(self.connection.getpeercert(), QGIS_SERVER_PKI_USERNAME) | ||
except Exception as ex: | ||
print("SSL Exception %s" % ex) | ||
self.send_response(401) | ||
self.end_headers() | ||
self.wfile.write('UNAUTHORIZED') | ||
return | ||
# CGI vars: | ||
for k, v in self.headers.items(): | ||
# Uncomment to print debug info about env vars passed into QGIS Server env | ||
#print('Setting ENV var %s to %s' % ('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)) | ||
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v) | ||
qgs_server.putenv('SERVER_PORT', str(self.server.server_port)) | ||
qgs_server.putenv('SERVER_NAME', self.server.server_name) | ||
qgs_server.putenv('REQUEST_URI', self.path) | ||
parsed_path = urllib.parse.urlparse(self.path) | ||
headers, body = qgs_server.handleRequest(parsed_path.query) | ||
headers_dict = dict(h.split(': ', 1) for h in headers.decode().split('\n') if h) | ||
try: | ||
self.send_response(int(headers_dict['Status'].split(' ')[0])) | ||
except: | ||
self.send_response(200) | ||
for k, v in headers_dict.items(): | ||
self.send_header(k, v) | ||
self.end_headers() | ||
self.wfile.write(body) | ||
return | ||
|
||
def do_POST(self): | ||
content_len = int(self.headers.get('content-length', 0)) | ||
post_body = self.rfile.read(content_len).decode() | ||
request = post_body[1:post_body.find(' ')] | ||
self.path = self.path + '&REQUEST_BODY=' + \ | ||
post_body.replace('&', '') + '&REQUEST=' + request | ||
return self.do_GET() | ||
|
||
|
||
if __name__ == '__main__': | ||
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler) | ||
if https: | ||
server.socket = ssl.wrap_socket(server.socket, | ||
certfile=QGIS_SERVER_PKI_CERTIFICATE, | ||
keyfile=QGIS_SERVER_PKI_KEY, | ||
ca_certs=QGIS_SERVER_PKI_AUTHORITY, | ||
cert_reqs=ssl.CERT_REQUIRED, | ||
server_side=True, | ||
ssl_version=ssl.PROTOCOL_TLSv1) | ||
message = 'Starting server on %s://%s:%s, use <Ctrl-C> to stop' % \ | ||
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port) | ||
try: | ||
print(message, flush=True) | ||
except: | ||
print(message) | ||
sys.stdout.flush() | ||
server.serve_forever() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
Tests for auth manager WMS/WFS using QGIS Server through HTTP Basic | ||
enabled qgis_wrapped_server.py. | ||
This is an integration test for QGIS Desktop Auth Manager WFS and WMS provider | ||
and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth | ||
configuration to access an HTTP Basic protected endpoint. | ||
From build dir, run: ctest -R PyQgsAuthManagerPasswordOWSTest -V | ||
.. note:: This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; either version 2 of the License, or | ||
(at your option) any later version. | ||
""" | ||
import os | ||
import sys | ||
import re | ||
import subprocess | ||
import tempfile | ||
import random | ||
import string | ||
try: | ||
from urllib.parse import quote | ||
except: | ||
from urllib import quote | ||
|
||
__author__ = 'Alessandro Pasotti' | ||
__date__ = '18/09/2016' | ||
__copyright__ = 'Copyright 2016, The QGIS Project' | ||
# This will get replaced with a git SHA1 when you do a git archive | ||
__revision__ = '$Format:%H$' | ||
|
||
from shutil import rmtree | ||
|
||
from utilities import unitTestDataPath, waitServer | ||
from qgis.core import ( | ||
QgsAuthManager, | ||
QgsAuthMethodConfig, | ||
QgsVectorLayer, | ||
QgsRasterLayer, | ||
) | ||
from qgis.testing import ( | ||
start_app, | ||
unittest, | ||
) | ||
|
||
try: | ||
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT'] | ||
except: | ||
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto | ||
|
||
|
||
QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp() | ||
|
||
os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH | ||
|
||
qgis_app = start_app() | ||
|
||
|
||
class TestAuthManager(unittest.TestCase): | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
"""Run before all tests: | ||
Creates an auth configuration""" | ||
cls.port = QGIS_SERVER_ENDPOINT_PORT | ||
# Clean env just to be sure | ||
env_vars = ['QUERY_STRING', 'QGIS_PROJECT_FILE'] | ||
for ev in env_vars: | ||
try: | ||
del os.environ[ev] | ||
except KeyError: | ||
pass | ||
cls.testdata_path = unitTestDataPath('qgis_server') + '/' | ||
cls.project_path = cls.testdata_path + "test_project.qgs" | ||
# Enable auth | ||
#os.environ['QGIS_AUTH_PASSWORD_FILE'] = QGIS_AUTH_PASSWORD_FILE | ||
authm = QgsAuthManager.instance() | ||
assert (authm.setMasterPassword('masterpassword', True)) | ||
cls.auth_config = QgsAuthMethodConfig('Basic') | ||
cls.auth_config.setName('test_auth_config') | ||
cls.username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) | ||
cls.password = cls.username[::-1] # reversed | ||
cls.auth_config.setConfig('username', cls.username) | ||
cls.auth_config.setConfig('password', cls.password) | ||
assert (authm.storeAuthenticationConfig(cls.auth_config)[0]) | ||
cls.hostname = '127.0.0.1' | ||
cls.protocol = 'http' | ||
|
||
os.environ['QGIS_SERVER_HTTP_BASIC_AUTH'] = '1' | ||
os.environ['QGIS_SERVER_USERNAME'] = cls.username | ||
os.environ['QGIS_SERVER_PASSWORD'] = cls.password | ||
os.environ['QGIS_SERVER_PORT'] = str(cls.port) | ||
os.environ['QGIS_SERVER_HOST'] = cls.hostname | ||
server_path = os.path.dirname(os.path.realpath(__file__)) + \ | ||
'/qgis_wrapped_server.py' | ||
cls.server = subprocess.Popen([sys.executable, server_path], | ||
env=os.environ, stdout=subprocess.PIPE) | ||
|
||
line = cls.server.stdout.readline() | ||
cls.port = int(re.findall(b':(\d+)', line)[0]) | ||
assert cls.port != 0 | ||
# Wait for the server process to start | ||
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! '%s://%s:%s" % (cls.protocol, cls.hostname, cls.port) | ||
|
||
@classmethod | ||
def tearDownClass(cls): | ||
"""Run after all tests""" | ||
cls.server.terminate() | ||
rmtree(QGIS_AUTH_DB_DIR_PATH) | ||
del cls.server | ||
|
||
def setUp(self): | ||
"""Run before each test.""" | ||
pass | ||
|
||
def tearDown(self): | ||
"""Run after each test.""" | ||
pass | ||
|
||
@classmethod | ||
def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None): | ||
""" | ||
WFS layer factory | ||
""" | ||
if layer_name is None: | ||
layer_name = 'wfs_' + type_name | ||
parms = { | ||
'srsname': 'EPSG:4326', | ||
'typename': type_name.decode('utf-8').replace(' ', '_'), | ||
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path), | ||
'version': '1.0.0', | ||
'table': '', | ||
} | ||
uri = u'%(url)s&SERVICE=WFS&VERSION=%(version)s&REQUEST=GetFeature&TYPENAME=%(typename)s&SRSNAME=%(srsname)s' % parms | ||
if authcfg is not None: | ||
uri = uri + "&authcfg=%s" % authcfg | ||
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS') | ||
return wfs_layer | ||
|
||
@classmethod | ||
def _getWMSLayer(cls, layers, layer_name=None, authcfg=None): | ||
""" | ||
WMS layer factory | ||
""" | ||
if layer_name is None: | ||
layer_name = 'wms_' + layers.replace(',', '') | ||
parms = { | ||
'crs': 'EPSG:4326', | ||
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path), | ||
'format': 'image/png', | ||
'layers': quote(layers), | ||
'styles': '', | ||
'version': 'auto', | ||
#'sql': '', | ||
} | ||
if authcfg is not None: | ||
parms.update({'authcfg': authcfg}) | ||
uri = '&'.join([("%s=%s" % (k, v.replace('=', '%3D'))) for k, v in list(parms.items())]) | ||
wms_layer = QgsRasterLayer(uri, layer_name, 'wms') | ||
return wms_layer | ||
|
||
def testValidAuthAccess(self): | ||
""" | ||
Access the HTTP Basic protected layer with valid credentials | ||
""" | ||
wfs_layer = self._getWFSLayer('testlayer èé', authcfg=self.auth_config.id()) | ||
self.assertTrue(wfs_layer.isValid()) | ||
wms_layer = self._getWMSLayer('testlayer èé', authcfg=self.auth_config.id()) | ||
self.assertTrue(wms_layer.isValid()) | ||
|
||
def testInvalidAuthAccess(self): | ||
""" | ||
Access the HTTP Basic protected layer with no credentials | ||
""" | ||
wfs_layer = self._getWFSLayer('testlayer èé') | ||
self.assertFalse(wfs_layer.isValid()) | ||
wms_layer = self._getWMSLayer('testlayer èé') | ||
self.assertFalse(wms_layer.isValid()) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.