Skip to content

Commit 9bb7681

Browse files
authored
Merge pull request #4124 from pblottiere/security
add unit tests for sql injection
2 parents b9a0ba1 + 3807936 commit 9bb7681

File tree

4 files changed

+873
-0
lines changed

4 files changed

+873
-0
lines changed

tests/src/python/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ IF (WITH_SERVER)
183183
ADD_PYTHON_TEST(PyQgsServer test_qgsserver.py)
184184
ADD_PYTHON_TEST(PyQgsServerSettings test_qgsserver_settings.py)
185185
ADD_PYTHON_TEST(PyQgsServerProjectUtils test_qgsserver_projectutils.py)
186+
ADD_PYTHON_TEST(PyQgsServerSecurity test_qgsserver_security.py)
186187
ADD_PYTHON_TEST(PyQgsServerAccessControl test_qgsserver_accesscontrol.py)
187188
ADD_PYTHON_TEST(PyQgsServerWFST test_qgsserver_wfst.py)
188189
ADD_PYTHON_TEST(PyQgsOfflineEditingWFS test_offline_editing_wfs.py)
Lines changed: 382 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,382 @@
1+
# -*- coding: utf-8 -*-
2+
"""QGIS Unit tests for server security.
3+
4+
.. note:: This program is free software; you can redistribute it and/or modify
5+
it under the terms of the GNU General Public License as published by
6+
the Free Software Foundation; either version 2 of the License, or
7+
(at your option) any later version.
8+
9+
"""
10+
__author__ = 'Paul Blottiere'
11+
__date__ = '31/01/2017'
12+
__copyright__ = 'Copyright 2017, The QGIS Project'
13+
# This will get replaced with a git SHA1 when you do a git archive
14+
__revision__ = '$Format:%H$'
15+
16+
from qgis.utils import spatialite_connect
17+
import os
18+
import time
19+
import urllib.parse
20+
from shutil import copyfile
21+
from qgis.core import QgsApplication
22+
from qgis.server import QgsServer
23+
from qgis.testing import unittest
24+
from utilities import unitTestDataPath
25+
26+
27+
class TestQgsServerSecurity(unittest.TestCase):
28+
29+
@classmethod
30+
def setUpClass(cls):
31+
cls.testdatapath = unitTestDataPath('qgis_server_security') + '/'
32+
cls.db = os.path.join(cls.testdatapath, 'db.sqlite')
33+
cls.db_clone = os.path.join(cls.testdatapath, 'db_clone.sqlite')
34+
cls.project = os.path.join(cls.testdatapath, 'project.qgs')
35+
cls.app = QgsApplication([], False)
36+
37+
@classmethod
38+
def tearDownClass(cls):
39+
cls.app.exitQgis()
40+
try:
41+
os.remove(cls.db_clone)
42+
except OSError:
43+
pass
44+
45+
def setUp(self):
46+
self.server = QgsServer()
47+
copyfile(self.db, self.db_clone)
48+
49+
def test_wms_getfeatureinfo_filter_and_based_blind(self):
50+
"""
51+
And-based blind attack to check the kind of database currently used (if
52+
the result is valid for the point nammed 'b', then sqlite_version()
53+
function exist).
54+
55+
But does not work because of the whitelist.
56+
57+
If you remove the safety check, this is a valid injection.
58+
"""
59+
60+
filter_sql = "point:\"name\" = 'b'"
61+
injection_sql = ") and (select sqlite_version()"
62+
63+
query = "{0} {1}".format(filter_sql, injection_sql)
64+
d, h = self.handle_request_wms_getfeatureinfo(query)
65+
66+
self.assertFalse(b"name = 'b'" in d)
67+
68+
def test_wms_getfeatureinfo_filter_time_based_blind(self):
69+
"""
70+
Time-based blind to check the current version of database. If the
71+
server is too long to respond, then we have the answer!
72+
73+
But it does not work because of the whitelist.
74+
75+
If you remove the safety check, this is a valid injection.
76+
"""
77+
78+
# first step, retrieve the version of sqlite by a regular way
79+
conn = spatialite_connect(self.db_clone)
80+
cur = conn.cursor()
81+
sql = "select sqlite_version()"
82+
sqlite_version = ''
83+
for row in cur.execute(sql):
84+
sqlite_version = row[0]
85+
conn.close()
86+
87+
# second step, check the time of response for an invalid version
88+
filter_sql = "point:\"name\" = 'b'"
89+
injection_sql = ") and (select case sqlite_version() when '0.0.0' then substr(upper(hex(randomblob(99999999))),0,1) end)--"
90+
91+
query = "{0} {1}".format(filter_sql, injection_sql)
92+
start = time.time()
93+
d, h = self.handle_request_wms_getfeatureinfo(query)
94+
duration_invalid_version = time.time() - start
95+
96+
# third step, check the time of response for a valid version
97+
# maximum: several seconds
98+
injection_sql = ") and (select case sqlite_version() when '{0}' then substr(upper(hex(randomblob(99999999))),0,1) end)--".format(sqlite_version)
99+
100+
query = "{0} {1}".format(filter_sql, injection_sql)
101+
start = time.time()
102+
d, h = self.handle_request_wms_getfeatureinfo(query)
103+
duration_valid_version = time.time() - start
104+
105+
# compare duration. On my computer when safety check is deactivated:
106+
# duration_invalid_version: 0.012360334396362305
107+
# duration_valid_version: 2.8810460567474365
108+
self.assertAlmostEqual(duration_valid_version, duration_invalid_version, delta=0.5)
109+
110+
def test_wms_getfeatureinfo_filter_stacked(self):
111+
"""
112+
The aim is to execute some staked queries. Here the 'drop' function is
113+
used but it could be done with create, insert, ...
114+
115+
But the filter string is split thanks to the semicolon so it seems
116+
totally ignored whatever the query is (even without the safety check).
117+
"""
118+
119+
filter_sql = "point:\"name\" = 'fake'"
120+
injection_sql = "); drop table point"
121+
122+
query = "{0} {1}".format(filter_sql, injection_sql)
123+
d, h = self.handle_request_wms_getfeatureinfo(query)
124+
125+
self.assertTrue(self.is_point_table_still_exist())
126+
127+
def test_wms_getfeatureinfo_filter_union_0(self):
128+
"""
129+
The aim is to retrieve name of tables within the database (like
130+
'SpatialIndex').
131+
132+
But the whitelist blocks this request because of invalid tokens.
133+
134+
If you remove the safety check, this is a valid injection.
135+
"""
136+
137+
filter_sql = "point:\"name\" = 'fake'"
138+
injection_sql = ") union select 1,1,name,1,1 from sqlite_master where type = \"table\" order by name--"
139+
140+
query = "{0} {1}".format(filter_sql, injection_sql)
141+
d, h = self.handle_request_wms_getfeatureinfo(query)
142+
143+
self.assertFalse(b'SpatialIndex' in d)
144+
145+
def test_wms_getfeatureinfo_filter_union_1(self):
146+
"""
147+
The aim is to retrieve data from an excluded layer.
148+
149+
But the whitelist blocks this request because of invalid tokens.
150+
151+
If you remove the safety check, this is a valid injection.
152+
"""
153+
154+
filter_sql = "point:\"name\" = 'fake'"
155+
injection_sql = ") union select 1,1,* from aoi--"
156+
157+
query = "{0} {1}".format(filter_sql, injection_sql)
158+
d, h = self.handle_request_wms_getfeatureinfo(query)
159+
160+
self.assertFalse(b'private_value' in d)
161+
162+
def test_wms_getfeatureinfo_filter_unicode(self):
163+
"""
164+
The aim is to send some invalid token in unicode to bypass the
165+
whitelist.
166+
167+
But unicode is interpreted and checked by the safety function.
168+
"""
169+
170+
# %3B -> semicolon
171+
filter_sql = "point:\"name\" = 'fake %3B'"
172+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
173+
self.assertTrue(self.check_service_exception_report(d))
174+
175+
def test_wms_getfeatureinfo_filter_patternmatching(self):
176+
"""
177+
The aim is to retrieve the table's name thanks to pattern matching.
178+
179+
If you remove the safety check, this is a valid injection.
180+
"""
181+
182+
filter_sql = "point:\"name\" = 'b'"
183+
injection_sql = "or ( select name from sqlite_master where type='table' and name like '{0}') != ''"
184+
query = "{0} {1}".format(filter_sql, injection_sql)
185+
186+
# there's no table named as 'az%'
187+
name = "az%"
188+
sql = query.format(name)
189+
d, h = self.handle_request_wms_getfeatureinfo(sql)
190+
# self.assertTrue(b"name = 'b'" in d) #true if sanity check deactivated
191+
self.assertTrue(self.check_service_exception_report(d))
192+
193+
# a table named as 'ao%' exist
194+
name = "ao%"
195+
sql = query.format(name)
196+
d, h = self.handle_request_wms_getfeatureinfo(sql)
197+
# self.assertTrue(b"name = 'a'" in d) #true if sanity check deactivated
198+
self.assertTrue(self.check_service_exception_report(d))
199+
200+
# a table named as 'aoi' exist
201+
name = "aoi"
202+
sql = query.format(name)
203+
d, h = self.handle_request_wms_getfeatureinfo(sql)
204+
# self.assertTrue(b"name = 'a'" in d) #true if sanity check deactivated
205+
self.assertTrue(self.check_service_exception_report(d))
206+
207+
def test_wms_getfeatureinfo_filter_whitelist(self):
208+
"""
209+
The aim is to check that some tokens cannot pass the safety check
210+
whatever their positions in the filter string.
211+
"""
212+
213+
# create
214+
filter_sql = "point:\"name\" = 'a'create"
215+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
216+
self.assertTrue(self.check_service_exception_report(d))
217+
218+
filter_sql = "point:\"name\" = 'a' create"
219+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
220+
self.assertTrue(self.check_service_exception_report(d))
221+
222+
# invalid token and escape single quote
223+
filter_sql = "point:\"name\" = 'a\\'create"
224+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
225+
self.assertTrue(self.check_service_exception_report(d))
226+
227+
# drop
228+
filter_sql = "point:\"name\" = 'a' drop"
229+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
230+
self.assertTrue(self.check_service_exception_report(d))
231+
232+
# select
233+
filter_sql = "point:\"name\" = 'a' select"
234+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
235+
self.assertTrue(self.check_service_exception_report(d))
236+
237+
# comments
238+
filter_sql = "point:\"name\" = 'a' #"
239+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
240+
self.assertTrue(self.check_service_exception_report(d))
241+
242+
filter_sql = "point:\"name\" = 'a' -"
243+
d, h = self.handle_request_wms_getfeatureinfo(filter_sql)
244+
self.assertTrue(self.check_service_exception_report(d))
245+
246+
def test_wfs_getfeature_filter_stacked(self):
247+
"""
248+
The aim is to execute some staked queries within the 'Literal'
249+
and 'PropertyName' field. Here the 'drop' function is used but it
250+
could be done with create, insert, ...
251+
252+
But due to the implementation, these filters are not resolved on
253+
database side but in server side with QgsExpression. So, there's no
254+
'WHERE' clause and the query never reach the database. By the way,
255+
it's exactly the same thing whatever the kind of attacks and for
256+
the EXP_FILTER parameter too (filter described with QgsExpression).
257+
258+
It's typically the kind of SQL injection which has been fixed in
259+
mapserver several years ago:
260+
https://trac.osgeo.org/mapserver/ticket/3874
261+
"""
262+
263+
# ogc:Literal / ogc:PropertyIsEqualTo
264+
literal = "4')); drop table point --"
265+
filter_xml = "<ogc:Filter%20xmlns:ogc=\"http://www.opengis.net/ogc\"><ogc:PropertyIsEqualTo><ogc:PropertyName>pkuid</ogc:PropertyName><ogc:Literal>{0}</ogc:Literal></ogc:PropertyIsEqualTo></ogc:Filter>".format(literal)
266+
self.handle_request_wfs_getfeature_filter(filter_xml)
267+
self.assertTrue(self.is_point_table_still_exist())
268+
269+
# ogc:Literal / ogc:PropertyIsLike
270+
literal = "4')); drop table point --"
271+
filter_xml = "<ogc:Filter%20xmlns:ogc=\"http://www.opengis.net/ogc\"><ogc:PropertyIsLike><ogc:PropertyName>pkuid</ogc:PropertyName><ogc:Literal>{0}</ogc:Literal></ogc:PropertyIsLike></ogc:Filter>".format(literal)
272+
self.handle_request_wfs_getfeature_filter(filter_xml)
273+
self.assertTrue(self.is_point_table_still_exist())
274+
275+
# ogc:PropertyName / ogc:PropertyIsLike
276+
propname = "name = 'a')); drop table point --"
277+
filter_xml = "<ogc:Filter%20xmlns:ogc=\"http://www.opengis.net/ogc\"><ogc:PropertyIsLike><ogc:PropertyName>{0}</ogc:PropertyName><ogc:Literal>4</ogc:Literal></ogc:PropertyIsLike></ogc:Filter>".format(propname)
278+
self.handle_request_wfs_getfeature_filter(filter_xml)
279+
self.assertTrue(self.is_point_table_still_exist())
280+
281+
def test_wms_getmap_sld_stacked(self):
282+
"""
283+
The aim is to execute some staked queries within the 'Literal'
284+
and 'PropertyName' field. Here the 'drop' function is used but it
285+
could be done with create, insert, ...
286+
287+
However it's not working because special characters are duplicated. For
288+
example, with 'Literal' as "4')); drop table point --", the underlying
289+
query is:
290+
SELECT .... AND (("pkuid" = '4'')); drop table point --'))
291+
"""
292+
293+
literal = "4')); drop table point --"
294+
sld = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><StyledLayerDescriptor xmlns=\"http://www.opengis.net/sld\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ogc=\"http://www.opengis.net/ogc\" xsi:schemaLocation=\"http://www.opengis.net/sld http://schemas.opengis.net/sld/1.1.0/StyledLayerDescriptor.xsd\" version=\"1.1.0\" xmlns:se=\"http://www.opengis.net/se\" xmlns:xlink=\"http://www.w3.org/1999/xlink\"> <NamedLayer> <se:Name>point</se:Name> <UserStyle> <se:Name>point</se:Name> <se:FeatureTypeStyle> <se:Rule> <se:Name>Single symbol</se:Name> <ogc:Filter xmlns:ogc=\"http://www.opengis.net/ogc\"> <ogc:PropertyIsEqualTo> <ogc:PropertyName>pkuid</ogc:PropertyName> <ogc:Literal>{0}</ogc:Literal> </ogc:PropertyIsEqualTo> </ogc:Filter> <se:PointSymbolizer> <se:Graphic> <se:Mark> <se:WellKnownName>circle</se:WellKnownName> <se:Fill><se:SvgParameter name=\"fill\">5e86a1</se:SvgParameter></se:Fill><se:Stroke><se:SvgParameter name=\"stroke\">000000</se:SvgParameter></se:Stroke></se:Mark><se:Size>7</se:Size></se:Graphic></se:PointSymbolizer></se:Rule></se:FeatureTypeStyle></UserStyle></NamedLayer></StyledLayerDescriptor>".format(literal)
295+
self.handle_request_wms_getmap(sld)
296+
self.assertTrue(self.is_point_table_still_exist())
297+
298+
def check_service_exception_report(self, d):
299+
"""
300+
Return True if a ServiceExceptionReport is raised, False otherwise
301+
"""
302+
303+
if b'<ServiceExceptionReport' in d:
304+
return True
305+
else:
306+
return False
307+
308+
def handle_request_wfs_getfeature_filter(self, filter_xml):
309+
qs = "&".join(["%s=%s" % i for i in list({
310+
"MAP": urllib.parse.quote(self.project),
311+
"SERVICE": "WFS",
312+
"VERSION": "1.1.1",
313+
"REQUEST": "GetFeature",
314+
"TYPENAME": "point",
315+
"STYLES": "",
316+
"CRS": "EPSG:32613",
317+
"FILTER": filter_xml}.items())])
318+
319+
return self.server.handleRequest(qs)
320+
321+
def handle_request_wms_getfeatureinfo(self, filter_sql):
322+
qs = "&".join(["%s=%s" % i for i in list({
323+
"MAP": urllib.parse.quote(self.project),
324+
"SERVICE": "WMS",
325+
"VERSION": "1.1.1",
326+
"REQUEST": "GetFeatureInfo",
327+
"QUERY_LAYERS": "point",
328+
"LAYERS": "point",
329+
"STYLES": "",
330+
"FORMAT": "image/png",
331+
"HEIGHT": "500",
332+
"WIDTH": "500",
333+
"BBOX": "606171,4822867,612834,4827375",
334+
"CRS": "EPSG:32613",
335+
"FILTER": filter_sql}.items())])
336+
337+
return self._result(self.server.handleRequest(qs))
338+
339+
def handle_request_wms_getmap(self, sld):
340+
qs = "&".join(["%s=%s" % i for i in list({
341+
"MAP": urllib.parse.quote(self.project),
342+
"SERVICE": "WMS",
343+
"VERSION": "1.0.0",
344+
"REQUEST": "GetMap",
345+
"QUERY_LAYERS": "point",
346+
"LAYERS": "point",
347+
"STYLES": "",
348+
"FORMAT": "image/png",
349+
"HEIGHT": "500",
350+
"WIDTH": "500",
351+
"BBOX": "606171,4822867,612834,4827375",
352+
"CRS": "EPSG:32613",
353+
"SLD": sld}.items())])
354+
355+
return self._result(self.server.handleRequest(qs))
356+
357+
def is_point_table_still_exist(self):
358+
conn = spatialite_connect(self.db_clone)
359+
cur = conn.cursor()
360+
sql = "select * from point"
361+
point_table_exist = True
362+
try:
363+
cur.execute(sql)
364+
except:
365+
point_table_exist = False
366+
conn.close()
367+
368+
return point_table_exist
369+
370+
def _result(self, data):
371+
headers = {}
372+
for line in data[0].decode('UTF-8').split("\n"):
373+
if line != "":
374+
header = line.split(":")
375+
self.assertEqual(len(header), 2, line)
376+
headers[str(header[0])] = str(header[1]).strip()
377+
378+
return data[1], headers
379+
380+
381+
if __name__ == '__main__':
382+
unittest.main()
259 KB
Binary file not shown.

0 commit comments

Comments
 (0)