Skip to content
Permalink
Browse files

[Server][Feature][needs-docs] Update cache manager tests for images (…

…tiles)
  • Loading branch information
rldhont committed Aug 1, 2018
1 parent 05e9ab9 commit 9463d04d5452ddd151ae74479fc390437279d8c7
Showing with 1,667 additions and 1,428 deletions.
  1. +175 −2 tests/src/python/test_qgsserver_cachemanager.py
  2. +1,492 −1,426 tests/testdata/qgis_server_accesscontrol/project.qgs
@@ -27,7 +27,8 @@
from utilities import unitTestDataPath
from qgis.server import QgsServer, QgsServerCacheFilter, QgsServerRequest, QgsBufferServerRequest, QgsBufferServerResponse
from qgis.core import QgsApplication, QgsFontUtils, QgsProject
from qgis.PyQt.QtCore import QFile, QByteArray
from qgis.PyQt.QtCore import QIODevice, QFile, QByteArray, QBuffer
from qgis.PyQt.QtGui import QImage
from qgis.PyQt.QtXml import QDomDocument


@@ -40,10 +41,15 @@ class PyServerCache(QgsServerCacheFilter):

def __init__(self, server_iface):
super(QgsServerCacheFilter, self).__init__(server_iface)

self._cache_dir = os.path.join(tempfile.gettempdir(), "qgs_server_cache")
if not os.path.exists(self._cache_dir):
os.mkdir(self._cache_dir)

self._tile_cache_dir = os.path.join(self._cache_dir, 'tiles')
if not os.path.exists(self._tile_cache_dir):
os.mkdir(self._tile_cache_dir)

def getCachedDocument(self, project, request, key):
m = hashlib.md5()
paramMap = request.parameters()
@@ -87,6 +93,53 @@ def deleteCachedDocuments(self, project):
filelist = [f for f in os.listdir(self._cache_dir) if f.endswith(".xml")]
return len(filelist) == 0

def getCachedImage(self, project, request, key):
m = hashlib.md5()
paramMap = request.parameters()
urlParam = "&".join(["%s=%s" % (k, paramMap[k]) for k in paramMap.keys()])
m.update(urlParam.encode('utf8'))

if not os.path.exists(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png")):
return QByteArray()

img = QImage(m.hexdigest() + ".png")
with open(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png"), "rb") as f:
statusOK = img.loadFromData(f.read())
if not statusOK:
print("Could not read or find the contents document. Error at line %d, column %d:\n%s" % (errorLine, errorColumn, errorStr))
return QByteArray()

ba = QByteArray()
buff = QBuffer(ba)
buff.open(QIODevice.WriteOnly)
img.save(buff, 'PNG')
return ba

def setCachedImage(self, img, project, request, key):
m = hashlib.md5()
paramMap = request.parameters()
urlParam = "&".join(["%s=%s" % (k, paramMap[k]) for k in paramMap.keys()])
m.update(urlParam.encode('utf8'))
with open(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png"), "wb") as f:
f.write(img)
return os.path.exists(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png"))

def deleteCachedImage(self, project, request, key):
m = hashlib.md5()
paramMap = request.parameters()
urlParam = "&".join(["%s=%s" % (k, paramMap[k]) for k in paramMap.keys()])
m.update(urlParam.encode('utf8'))
if os.path.exists(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png")):
os.remove(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png"))
return not os.path.exists(os.path.join(self._tile_cache_dir, m.hexdigest() + ".png"))

def deleteCachedImages(self, project):
filelist = [f for f in os.listdir(self._tile_cache_dir) if f.endswith(".png")]
for f in filelist:
os.remove(os.path.join(self._tile_cache_dir, f))
filelist = [f for f in os.listdir(self._tile_cache_dir) if f.endswith(".png")]
return len(filelist) == 0


class TestQgsServerCacheManager(unittest.TestCase):

@@ -186,8 +239,14 @@ def test_getcapabilities(self):
# with cache
header, body = self._execute_request(query_string)

# without cache
query_string = '?MAP=%s&SERVICE=WMTS&VERSION=1.0.0&REQUEST=%s' % (urllib.parse.quote(project), 'GetCapabilities')
header, body = self._execute_request(query_string)
# with cache
header, body = self._execute_request(query_string)

filelist = [f for f in os.listdir(self._servercache._cache_dir) if f.endswith(".xml")]
self.assertEqual(len(filelist), 5, 'Not enough file in cache')
self.assertEqual(len(filelist), 6, 'Not enough file in cache')

cacheManager = self._server_iface.cacheManager()

@@ -218,6 +277,120 @@ def test_getcapabilities(self):

self.assertTrue(cacheManager.deleteCachedDocuments(None), 'deleteCachedDocuments does not retrun True')

def test_gettile(self):
project = self._project_path
assert os.path.exists(project), "Project file not found: " + project

qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMTS",
"VERSION": "1.0.0",
"REQUEST": "GetTile",
"LAYER": "Country",
"STYLE": "",
"TILEMATRIXSET": "EPSG:3857",
"TILEMATRIX": "0",
"TILEROW": "0",
"TILECOL": "0",
"FORMAT": "image/png"
}.items())])

# without cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))
# with cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))

qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMTS",
"VERSION": "1.0.0",
"REQUEST": "GetTile",
"LAYER": "Country",
"STYLE": "",
"TILEMATRIXSET": "EPSG:4326",
"TILEMATRIX": "0",
"TILEROW": "0",
"TILECOL": "0",
"FORMAT": "image/png"
}.items())])

# without cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))
# with cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))

qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMTS",
"VERSION": "1.0.0",
"REQUEST": "GetTile",
"LAYER": "QGIS Server Hello World",
"STYLE": "",
"TILEMATRIXSET": "EPSG:3857",
"TILEMATRIX": "0",
"TILEROW": "0",
"TILECOL": "0",
"FORMAT": "image/png"
}.items())])

# without cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))
# with cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))

qs = "?" + "&".join(["%s=%s" % i for i in list({
"MAP": urllib.parse.quote(project),
"SERVICE": "WMTS",
"VERSION": "1.0.0",
"REQUEST": "GetTile",
"LAYER": "QGIS Server Hello World",
"STYLE": "",
"TILEMATRIXSET": "EPSG:4326",
"TILEMATRIX": "0",
"TILEROW": "0",
"TILECOL": "0",
"FORMAT": "image/png"
}.items())])

# without cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))
# with cache
r, h = self._result(self._execute_request(qs))
self.assertEqual(
h.get("Content-Type"), "image/png",
"Content type is wrong: %s\n%s" % (h.get("Content-Type"), r))

filelist = [f for f in os.listdir(self._servercache._tile_cache_dir) if f.endswith(".png")]
self.assertEqual(len(filelist), 4, 'Not enough image in cache')

cacheManager = self._server_iface.cacheManager()

self.assertTrue(cacheManager.deleteCachedImages(None), 'deleteCachedImages does not retrun True')

filelist = [f for f in os.listdir(self._servercache._tile_cache_dir) if f.endswith(".png")]
self.assertEqual(len(filelist), 0, 'All images in cache are not deleted ')


if __name__ == "__main__":
unittest.main()

0 comments on commit 9463d04

Please sign in to comment.
You can’t perform that action at this time.