Skip to content

Commit

Permalink
[tests] Authmanager Postgres PKI test
Browse files Browse the repository at this point in the history
  • Loading branch information
elpaso committed Nov 4, 2016
1 parent 74f49dd commit f582849
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 19 deletions.
4 changes: 3 additions & 1 deletion tests/src/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,7 @@ IF (WITH_SERVER)
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
ADD_PYTHON_TEST(PyQgsServerWFST test_qgsserver_wfst.py)
ADD_PYTHON_TEST(PyQgsOfflineEditingWFS test_offline_editing_wfs.py)
ADD_PYTHON_TEST(PyQgsAuthManagerEndpointTest test_authmanager_endpoint.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPasswordOWSTest test_authmanager_password_ows.py)
#ADD_PYTHON_TEST(PyQgsAuthManagerPKIOWSTest test_authmanager_pki_ows.py)
ADD_PYTHON_TEST(PyQgsAuthManagerPKIPostgresTest test_authmanager_pki_postgres.py)
ENDIF (WITH_SERVER)
47 changes: 44 additions & 3 deletions tests/src/python/qgis_wrapped_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
QGIS Server HTTP wrapper
This script launches a QGIS Server listening on port 8081 or on the port
specified on the environment variable QGIS_SERVER_PORT
specified on the environment variable QGIS_SERVER_PORT.
QGIS_SERVER_HOST (defaults to 127.0.0.1)
For testing purposes, HTTP Basic can be enabled by setting the following
Expand All @@ -13,6 +13,21 @@
* QGIS_SERVER_USERNAME (default ="username")
* QGIS_SERVER_PASSWORD (default ="password")
PKI authentication with HTTPS can be enabled with:
* QGIS_SERVER_PKI_CERTIFICATE (server certificate)
* QGIS_SERVER_PKI_KEY (server private key)
* QGIS_SERVER_PKI_AUTHORITY (root CA)
* QGIS_SERVER_PKI_USERNAME (valid username)
Sample run:
QGIS_SERVER_PKI_USERNAME=Gerardus QGIS_SERVER_PORT=47547 QGIS_SERVER_HOST=localhost \
QGIS_SERVER_PKI_KEY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_key.pem \
QGIS_SERVER_PKI_CERTIFICATE=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/localhost_ssl_cert.pem \
QGIS_SERVER_PKI_AUTHORITY=/home/$USER/dev/QGIS/tests/testdata/auth_system/certs_keys/chains_subissuer-issuer-root_issuer2-root2.pem \
python3 /home/$USER/dev/QGIS/tests/src/python/qgis_wrapped_server.py
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
Expand All @@ -32,13 +47,29 @@
import os
import sys
import signal
import ssl
import urllib.parse
from http.server import BaseHTTPRequestHandler, HTTPServer
from qgis.core import QgsApplication
from qgis.server import QgsServer

QGIS_SERVER_PORT = int(os.environ.get('QGIS_SERVER_PORT', '8081'))
QGIS_SERVER_HOST = os.environ.get('QGIS_SERVER_HOST', '127.0.0.1')
# PKI authentication
QGIS_SERVER_PKI_CERTIFICATE = os.environ.get('QGIS_SERVER_PKI_CERTIFICATE')
QGIS_SERVER_PKI_KEY = os.environ.get('QGIS_SERVER_PKI_KEY')
QGIS_SERVER_PKI_AUTHORITY = os.environ.get('QGIS_SERVER_PKI_AUTHORITY')
QGIS_SERVER_PKI_USERNAME = os.environ.get('QGIS_SERVER_PKI_USERNAME')

# Check if PKI - https is enabled
https = (QGIS_SERVER_PKI_CERTIFICATE is not None and
os.path.isfile(QGIS_SERVER_PKI_CERTIFICATE) and
QGIS_SERVER_PKI_KEY is not None and
os.path.isfile(QGIS_SERVER_PKI_KEY) and
QGIS_SERVER_PKI_AUTHORITY is not None and
os.path.isfile(QGIS_SERVER_PKI_AUTHORITY) and
QGIS_SERVER_PKI_USERNAME)


qgs_app = QgsApplication([], False)
qgs_server = QgsServer()
Expand Down Expand Up @@ -75,6 +106,8 @@ def do_GET(self):
for k, v in self.headers.items():
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
if https:
qgs_server.putenv('HTTPS', 'ON')
qgs_server.putenv('SERVER_NAME', self.server.server_name)
qgs_server.putenv('REQUEST_URI', self.path)
parsed_path = urllib.parse.urlparse(self.path)
Expand All @@ -101,8 +134,16 @@ def do_POST(self):

if __name__ == '__main__':
server = HTTPServer((QGIS_SERVER_HOST, QGIS_SERVER_PORT), Handler)
print('Starting server on %s:%s, use <Ctrl-C> to stop' %
(QGIS_SERVER_HOST, server.server_port), flush=True)
if https:
server.socket = ssl.wrap_socket(server.socket,
certfile=QGIS_SERVER_PKI_CERTIFICATE,
keyfile=QGIS_SERVER_PKI_KEY,
ca_certs=QGIS_SERVER_PKI_AUTHORITY,
cert_reqs=ssl.CERT_REQUIRED,
server_side=True,
ssl_version=ssl.PROTOCOL_TLSv1)
print('Starting server on %s://%s:%s, use <Ctrl-C> to stop' %
('https' if https else 'http', QGIS_SERVER_HOST, server.server_port), flush=True)

def signal_handler(signal, frame):
global qgs_app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
configuration to access an HTTP Basic protected endpoint.
From build dir, run: ctest -R PyQgsAuthManagerEndpointTest -V
From build dir, run: ctest -R PyQgsAuthManagerPasswordOWSTest -V
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand All @@ -30,7 +30,6 @@
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'

from urllib.parse import quote
from shutil import rmtree

from utilities import unitTestDataPath, waitServer
Expand All @@ -45,11 +44,10 @@
unittest,
)


try:
QGIS_SERVER_ENDPOINT_PORT = os.environ['QGIS_SERVER_ENDPOINT_PORT']
except:
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto
QGIS_SERVER_ENDPOINT_PORT = '0' # Auto


QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()
Expand All @@ -74,7 +72,7 @@ def setUpClass(cls):
except KeyError:
pass
cls.testdata_path = unitTestDataPath('qgis_server') + '/'
cls.project_path = quote(cls.testdata_path + "test_project.qgs")
cls.project_path = cls.testdata_path + "test_project.qgs"
# Enable auth
#os.environ['QGIS_AUTH_PASSWORD_FILE'] = QGIS_AUTH_PASSWORD_FILE
authm = QgsAuthManager.instance()
Expand All @@ -86,26 +84,30 @@ def setUpClass(cls):
cls.auth_config.setConfig('username', cls.username)
cls.auth_config.setConfig('password', cls.password)
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])
cls.hostname = '127.0.0.1'
cls.protocol = 'http'

os.environ['QGIS_SERVER_HTTP_BASIC_AUTH'] = '1'
os.environ['QGIS_SERVER_USERNAME'] = cls.username
os.environ['QGIS_SERVER_PASSWORD'] = cls.password
os.environ['QGIS_SERVER_PORT'] = str(cls.port)
os.environ['QGIS_SERVER_HOST'] = cls.hostname

server_path = os.path.dirname(os.path.realpath(__file__)) + \
'/qgis_wrapped_server.py'
cls.server = subprocess.Popen([sys.executable, server_path],
env=os.environ, stdout=subprocess.PIPE)

line = cls.server.stdout.readline()
cls.port = int(re.findall(b':(\d+)', line)[0])
assert cls.port != 0
# Wait for the server process to start
assert waitServer('http://127.0.0.1:%s' % cls.port), "Server is not responding!"
assert waitServer('%s://%s:%s' % (cls.protocol, cls.hostname, cls.port)), "Server is not responding! %s://%s:%s" % (cls.protocol, cls.hostname, cls.port)

@classmethod
def tearDownClass(cls):
"""Run after all tests"""
cls.server.terminate()
cls.server.wait()
rmtree(QGIS_AUTH_DB_DIR_PATH)
del cls.server

Expand All @@ -127,7 +129,7 @@ def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
parms = {
'srsname': 'EPSG:4326',
'typename': type_name,
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
'version': 'auto',
'table': '',
}
Expand All @@ -146,12 +148,12 @@ def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
layer_name = 'wms_' + layers.replace(',', '')
parms = {
'crs': 'EPSG:4326',
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
'format': 'image/png',
'url': '%s://%s:%s/?map=%s' % (cls.protocol, cls.hostname, cls.port, cls.project_path),
# This is needed because of a really weird implementation in QGIS Server, that
# replaces _ in the the real layer name with spaces
'layers': urllib.parse.quote(layers).replace('_', ' '),
'layers': urllib.parse.quote(layers.replace('_', ' ')),
'styles': '',
'version': 'auto',
#'sql': '',
}
if authcfg is not None:
Expand All @@ -173,7 +175,7 @@ def testInvalidAuthAccess(self):
"""
Access the HTTP Basic protected layer with no credentials
"""
wfs_layer = self._getWFSLayer('testlayer_èé')
wfs_layer = self._getWFSLayer('testlayer èé')
self.assertFalse(wfs_layer.isValid())
wms_layer = self._getWMSLayer('testlayer_èé')
self.assertFalse(wms_layer.isValid())
Expand Down
Loading

0 comments on commit f582849

Please sign in to comment.