Skip to content

Commit

Permalink
Merge pull request #3540 from elpaso/auth-integration-test
Browse files Browse the repository at this point in the history
Auth integration test + more reliable server tests
  • Loading branch information
elpaso authored Sep 29, 2016
2 parents 67d5e19 + 49ae020 commit 949241d
Show file tree
Hide file tree
Showing 12 changed files with 488 additions and 199 deletions.
1 change: 1 addition & 0 deletions tests/src/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,5 @@ IF (WITH_SERVER)
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
ADD_PYTHON_TEST(PyQgsServerWFST test_qgsserver_wfst.py)
ADD_PYTHON_TEST(PyQgsOfflineEditingWFS test_offline_editing_wfs.py)
ADD_PYTHON_TEST(PyQgsAuthManagerEnpointTest test_authmanager_endpoint.py)
ENDIF (WITH_SERVER)
46 changes: 43 additions & 3 deletions tests/src/python/qgis_wrapped_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
This script launches a QGIS Server listening on port 8081 or on the port
specified on the environment variable QGIS_SERVER_DEFAULT_PORT
For testing purposes, HTTP Basic can be enabled by setting the following
environment variables:
* QGIS_SERVER_HTTP_BASIC_AUTH (default not set, set to anything to enable)
* QGIS_SERVER_USERNAME (default ="username")
* QGIS_SERVER_PASSWORD (default ="password")
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
Expand All @@ -22,6 +29,8 @@


import os
import sys
import signal
import urllib.parse
from http.server import BaseHTTPRequestHandler, HTTPServer
from qgis.core import QgsApplication
Expand All @@ -36,11 +45,35 @@
qgs_server = QgsServer()


if os.environ.get('QGIS_SERVER_HTTP_BASIC_AUTH') is not None:
from qgis.server import QgsServerFilter
import base64

class HTTPBasicFilter(QgsServerFilter):

def responseComplete(self):
request = self.serverInterface().requestHandler()
if self.serverInterface().getEnv('HTTP_AUTHORIZATION'):
username, password = base64.b64decode(self.serverInterface().getEnv('HTTP_AUTHORIZATION')[6:]).split(b':')
if (username.decode('utf-8') == os.environ.get('QGIS_SERVER_USERNAME', 'username')
and password.decode('utf-8') == os.environ.get('QGIS_SERVER_PASSWORD', 'password')):
return
# No auth ...
request.clearHeaders()
request.setHeader('Status', '401 Authorization required')
request.setHeader('WWW-Authenticate', 'Basic realm="QGIS Server"')
request.clearBody()
request.appendBody(b'<h1>Authorization required</h1>')

filter = HTTPBasicFilter(qgs_server.serverInterface())
qgs_server.serverInterface().registerFilter(filter)


class Handler(BaseHTTPRequestHandler):

def do_GET(self):
# CGI vars:
for k, v in list(self.headers.items()):
for k, v in self.headers.items():
qgs_server.putenv('HTTP_%s' % k.replace(' ', '-').replace('-', '_').replace(' ', '-').upper(), v)
qgs_server.putenv('SERVER_PORT', str(self.server.server_port))
qgs_server.putenv('SERVER_NAME', self.server.server_name)
Expand All @@ -52,7 +85,7 @@ def do_GET(self):
self.send_response(int(headers_dict['Status'].split(' ')[0]))
except:
self.send_response(200)
for k, v in list(headers_dict.items()):
for k, v in headers_dict.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(body)
Expand All @@ -71,5 +104,12 @@ def do_POST(self):
server = HTTPServer(('localhost', QGIS_SERVER_DEFAULT_PORT), Handler)
print('Starting server on localhost:%s, use <Ctrl-C> to stop' %
QGIS_SERVER_DEFAULT_PORT)

def signal_handler(signal, frame):
global qgs_app
print("\nExiting QGIS...")
qgs_app.exitQgis()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
server.serve_forever()
qgs_app.exitQgis()
180 changes: 180 additions & 0 deletions tests/src/python/test_authmanager_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
"""
Tests for auth manager WMS/WFS using QGIS Server through HTTP Basic
enabled qgis_wrapped_server.py.
This is an integration test for QGIS Desktop Auth Manager WFS and WMS provider
and QGIS Server WFS/WMS that check if QGIS can use a stored auth manager auth
configuration to access an HTTP Basic protected endpoint.
From build dir, run: ctest -R PyQgsAuthManagerEnpointTest -V
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
"""
import os
import sys
import subprocess
import tempfile
import random
import string
import urllib

__author__ = 'Alessandro Pasotti'
__date__ = '18/09/2016'
__copyright__ = 'Copyright 2016, The QGIS Project'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'

from time import sleep
from urllib.parse import quote
from shutil import rmtree

from utilities import unitTestDataPath
from qgis.core import (
QgsAuthManager,
QgsAuthMethodConfig,
QgsVectorLayer,
QgsRasterLayer,
)
from qgis.testing import (
start_app,
unittest,
)

try:
QGIS_SERVER_AUTHMANAGER_DEFAULT_PORT = os.environ['QGIS_SERVER_AUTHMANAGER_DEFAULT_PORT']
except:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
QGIS_SERVER_AUTHMANAGER_DEFAULT_PORT = s.getsockname()[1]
s.close()

QGIS_AUTH_DB_DIR_PATH = tempfile.mkdtemp()

os.environ['QGIS_AUTH_DB_DIR_PATH'] = QGIS_AUTH_DB_DIR_PATH

qgis_app = start_app()


class TestAuthManager(unittest.TestCase):

@classmethod
def setUpClass(cls):
"""Run before all tests:
Creates an auth configuration"""
cls.port = QGIS_SERVER_AUTHMANAGER_DEFAULT_PORT
# Clean env just to be sure
env_vars = ['QUERY_STRING', 'QGIS_PROJECT_FILE']
for ev in env_vars:
try:
del os.environ[ev]
except KeyError:
pass
cls.testdata_path = unitTestDataPath('qgis_server') + '/'
cls.project_path = quote(cls.testdata_path + "test_project.qgs")
# Enable auth
#os.environ['QGIS_AUTH_PASSWORD_FILE'] = QGIS_AUTH_PASSWORD_FILE
authm = QgsAuthManager.instance()
assert (authm.setMasterPassword('masterpassword', True))
cls.auth_config = QgsAuthMethodConfig('Basic')
cls.auth_config.setName('test_auth_config')
cls.username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
cls.password = cls.username[::-1] # reversed
cls.auth_config.setConfig('username', cls.username)
cls.auth_config.setConfig('password', cls.password)
assert (authm.storeAuthenticationConfig(cls.auth_config)[0])

os.environ['QGIS_SERVER_HTTP_BASIC_AUTH'] = '1'
os.environ['QGIS_SERVER_USERNAME'] = cls.username
os.environ['QGIS_SERVER_PASSWORD'] = cls.password
os.environ['QGIS_SERVER_DEFAULT_PORT'] = str(cls.port)
server_path = os.path.dirname(os.path.realpath(__file__)) + \
'/qgis_wrapped_server.py'
cls.server = subprocess.Popen([sys.executable, server_path],
env=os.environ)
sleep(2)

@classmethod
def tearDownClass(cls):
"""Run after all tests"""
cls.server.terminate()
rmtree(QGIS_AUTH_DB_DIR_PATH)
del cls.server

def setUp(self):
"""Run before each test."""
pass

def tearDown(self):
"""Run after each test."""
pass

@classmethod
def _getWFSLayer(cls, type_name, layer_name=None, authcfg=None):
"""
WFS layer factory
"""
if layer_name is None:
layer_name = 'wfs_' + type_name
parms = {
'srsname': 'EPSG:4326',
'typename': type_name,
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
'version': 'auto',
'table': '',
}
if authcfg is not None:
parms.update({'authcfg': authcfg})
uri = ' '.join([("%s='%s'" % (k, v)) for k, v in list(parms.items())])
wfs_layer = QgsVectorLayer(uri, layer_name, 'WFS')
return wfs_layer

@classmethod
def _getWMSLayer(cls, layers, layer_name=None, authcfg=None):
"""
WMS layer factory
"""
if layer_name is None:
layer_name = 'wms_' + layers.replace(',', '')
parms = {
'crs': 'EPSG:4326',
'url': 'http://127.0.0.1:%s/?map=%s' % (cls.port, cls.project_path),
'format': 'image/png',
# This is needed because of a really wierd implementation in QGIS Server, that
# replaces _ in the the real layer name with spaces
'layers': urllib.parse.quote(layers).replace('_', ' '),
'styles': '',
#'sql': '',
}
if authcfg is not None:
parms.update({'authcfg': authcfg})
uri = '&'.join([("%s=%s" % (k, v.replace('=', '%3D'))) for k, v in list(parms.items())])
wms_layer = QgsRasterLayer(uri, layer_name, 'wms')
return wms_layer

def testValidAuthAccess(self):
"""
Access the HTTP Basic protected layer with valid credentials
"""
wfs_layer = self._getWFSLayer('testlayer_èé', authcfg=self.auth_config.id())
self.assertTrue(wfs_layer.isValid())
wms_layer = self._getWMSLayer('testlayer_èé', authcfg=self.auth_config.id())
self.assertTrue(wms_layer.isValid())

def testInvalidAuthAccess(self):
"""
Access the HTTP Basic protected layer with no credentials
"""
wfs_layer = self._getWFSLayer('testlayer_èé')
self.assertFalse(wfs_layer.isValid())
wms_layer = self._getWMSLayer('testlayer_èé')
self.assertFalse(wms_layer.isValid())


if __name__ == '__main__':
unittest.main()
7 changes: 6 additions & 1 deletion tests/src/python/test_offline_editing_wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@

from offlineditingtestbase import OfflineTestBase


try:
QGIS_SERVER_WFST_DEFAULT_PORT = os.environ['QGIS_SERVER_WFST_DEFAULT_PORT']
except:
QGIS_SERVER_WFST_DEFAULT_PORT = 8081
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
QGIS_SERVER_WFST_DEFAULT_PORT = s.getsockname()[1]
s.close()

qgis_app = start_app()

Expand Down
Loading

0 comments on commit 949241d

Please sign in to comment.