From d10e564b574c96529e1493841c2fd749b3b97cf1 Mon Sep 17 00:00:00 2001 From: Even Rouault Date: Tue, 18 Oct 2016 14:28:11 +0200 Subject: [PATCH] [DBManager] Add tests for GPKG plugin --- tests/src/python/CMakeLists.txt | 1 + tests/src/python/test_db_manager_gpkg.py | 393 +++++++++++++++++++++++ 2 files changed, 394 insertions(+) create mode 100644 tests/src/python/test_db_manager_gpkg.py diff --git a/tests/src/python/CMakeLists.txt b/tests/src/python/CMakeLists.txt index db31d0a6b2ca..b20cbb774d26 100644 --- a/tests/src/python/CMakeLists.txt +++ b/tests/src/python/CMakeLists.txt @@ -118,6 +118,7 @@ ADD_PYTHON_TEST(PyQgsWFSProviderGUI test_provider_wfs_gui.py) ADD_PYTHON_TEST(PyQgsConsole test_console.py) ADD_PYTHON_TEST(PyQgsLayerDependencies test_layer_dependencies.py) ADD_PYTHON_TEST(PyQgsVersionCompare test_versioncompare.py) +ADD_PYTHON_TEST(PyQgsDBManagerGpkg test_db_manager_gpkg.py) IF (NOT WIN32) ADD_PYTHON_TEST(PyQgsLogger test_qgslogger.py) diff --git a/tests/src/python/test_db_manager_gpkg.py b/tests/src/python/test_db_manager_gpkg.py new file mode 100644 index 000000000000..da86e4fc86d4 --- /dev/null +++ b/tests/src/python/test_db_manager_gpkg.py @@ -0,0 +1,393 @@ +# -*- coding: utf-8 -*- +"""QGIS Unit tests for the DBManager GPKG plugin + +.. note:: This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or +(at your option) any later version. +""" +__author__ = 'Even Rouault' +__date__ = '2016-10-17' +__copyright__ = 'Copyright 2016, Even Rouault' +# This will get replaced with a git SHA1 when you do a git archive +__revision__ = '$Format:%H$' + +import qgis # NOQA + +import os +import tempfile +import shutil +from osgeo import gdal, ogr, osr + +from qgis.core import QgsDataSourceUri +from qgis.PyQt.QtCore import QCoreApplication, QSettings +from qgis.testing import start_app, unittest + +from plugins.db_manager.db_plugins import supportedDbTypes, createDbPlugin +from plugins.db_manager.db_plugins.plugin import TableField + + +def GDAL_COMPUTE_VERSION(maj, min, rev): + return ((maj) * 1000000 + (min) * 10000 + (rev) * 100) + + +class TestPyQgsDBManagerGpkg(unittest.TestCase): + + @classmethod + def setUpClass(cls): + """Run before all tests""" + + QCoreApplication.setOrganizationName("QGIS_Test") + QCoreApplication.setOrganizationDomain("TestPyQgsDBManagerGpkg.com") + QCoreApplication.setApplicationName("TestPyQgsDBManagerGpkg") + QSettings().clear() + start_app() + + cls.basetestpath = tempfile.mkdtemp() + + cls.test_gpkg = os.path.join(cls.basetestpath, 'TestPyQgsDBManagerGpkg.gpkg') + ds = ogr.GetDriverByName('GPKG').CreateDataSource(cls.test_gpkg) + lyr = ds.CreateLayer('testLayer', geom_type=ogr.wkbLineString, options=['SPATIAL_INDEX=NO']) + cls.supportsAlterFieldDefn = lyr.TestCapability(ogr.OLCAlterFieldDefn) == 1 + lyr.CreateField(ogr.FieldDefn('text_field', ogr.OFTString)) + f = ogr.Feature(lyr.GetLayerDefn()) + f['text_field'] = 'foo' + f.SetGeometry(ogr.CreateGeometryFromWkt('LINESTRING(1 2,3 4)')) + lyr.CreateFeature(f) + f = None + ds = None + + @classmethod + def tearDownClass(cls): + """Run after all tests""" + + QSettings().clear() + shutil.rmtree(cls.basetestpath, True) + + def testSupportedDbTypes(self): + self.assertIn('gpkg', supportedDbTypes()) + + def testCreateDbPlugin(self): + plugin = createDbPlugin('gpkg') + self.assertIsNotNone(plugin) + + def testConnect(self): + connection_name = 'testConnect' + plugin = createDbPlugin('gpkg') + + uri = QgsDataSourceUri() + uri.setDatabase(self.test_gpkg) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connections = plugin.connections() + self.assertEqual(len(connections), 1) + + connection = createDbPlugin('gpkg', connection_name + '_does_not_exist') + connection_succeeded = False + try: + connection.connect() + connection_succeeded = True + except: + pass + self.assertFalse(connection_succeeded, 'exception should have been raised') + + connection = connections[0] + connection.connect() + + connection.reconnect() + + connection.remove() + + self.assertEqual(len(plugin.connections()), 0) + + connection = createDbPlugin('gpkg', connection_name) + connection_succeeded = False + try: + connection.connect() + connection_succeeded = True + except: + pass + self.assertFalse(connection_succeeded, 'exception should have been raised') + + def testListLayer(self): + connection_name = 'testListLayer' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + uri.setDatabase(self.test_gpkg) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 1) + table = tables[0] + self.assertEqual(table.name, 'testLayer') + info = table.info() + expected_html = """

General info

Relation type: Table 
Rows: 

GeoPackage

Column: geom 
Geometry: LINE STRING 
Dimension: XY 
Spatial ref: Undefined (-1) 
Extent: 1.00000, 2.00000 - 3.00000, 4.00000 

No spatial index defined (create it)

Fields

Name Type Null Default 
fid INTEGER  
geom LINESTRING  
text_field TEXT  
""" + self.assertEqual(info.toHtml(), expected_html, info.toHtml()) + + connection.remove() + + def testCreateRenameDeleteTable(self): + connection_name = 'testCreateRenameDeleteTable' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + + test_gpkg_new = os.path.join(self.basetestpath, 'testCreateRenameDeleteTable.gpkg') + shutil.copy(self.test_gpkg, test_gpkg_new) + + uri.setDatabase(test_gpkg_new) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 1) + table = tables[0] + self.assertTrue(table.rename('newName')) + self.assertEqual(table.name, 'newName') + + connection.reconnect() + + db = connection.database() + tables = db.tables() + self.assertEqual(len(tables), 1) + table = tables[0] + self.assertEqual(table.name, 'newName') + + fields = [] + geom = ['geometry', 'POINT', 4326, 3] + field1 = TableField(table) + field1.name = 'fid' + field1.dataType = 'INTEGER' + field1.notNull = True + field1.primaryKey = True + + field2 = TableField(table) + field2.name = 'str_field' + field2.dataType = 'TEXT' + field2.modifier = 20 + + fields = [field1, field2] + self.assertTrue(db.createVectorTable('newName2', fields, geom)) + + tables = db.tables() + self.assertEqual(len(tables), 2) + new_table = tables[1] + self.assertEqual(new_table.name, 'newName2') + fields = new_table.fields() + self.assertEqual(len(fields), 3) + self.assertFalse(new_table.hasSpatialIndex()) + + self.assertTrue(new_table.createSpatialIndex()) + self.assertTrue(new_table.hasSpatialIndex()) + + self.assertTrue(new_table.delete()) + + tables = db.tables() + self.assertEqual(len(tables), 1) + + connection.remove() + + def testCreateRenameDeleteFields(self): + + if not self.supportsAlterFieldDefn: + return + + connection_name = 'testCreateRenameDeleteFields' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + + test_gpkg_new = os.path.join(self.basetestpath, 'testCreateRenameDeleteFields.gpkg') + shutil.copy(self.test_gpkg, test_gpkg_new) + + uri.setDatabase(test_gpkg_new) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 1) + table = tables[0] + + field_before_count = len(table.fields()) + + field = TableField(table) + field.name = 'real_field' + field.dataType = 'DOUBLE' + self.assertTrue(table.addField(field)) + + self.assertEqual(len(table.fields()), field_before_count + 1) + + self.assertTrue(field.update('real_field2', new_type_str='TEXT (30)', new_not_null=True, new_default_str='foo')) + + field = table.fields()[field_before_count] + self.assertEqual(field.name, 'real_field2') + self.assertEqual(field.dataType, 'TEXT(30)') + self.assertEqual(field.notNull, 1) + self.assertEqual(field.default, "'foo'") + + self.assertTrue(table.deleteField(field)) + + self.assertEqual(len(table.fields()), field_before_count) + + connection.remove() + + def testTableDataModel(self): + connection_name = 'testTableDataModel' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + uri.setDatabase(self.test_gpkg) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 1) + table = tables[0] + self.assertEqual(table.name, 'testLayer') + model = table.tableDataModel(None) + self.assertEqual(model.rowCount(), 1) + self.assertEqual(model.getData(0, 0), 1) # fid + self.assertEqual(model.getData(0, 1), 'LINESTRING (1 2,3 4)') + self.assertEqual(model.getData(0, 2), 'foo') + + connection.remove() + + def testRaster(self): + + if int(gdal.VersionInfo('VERSION_NUM')) < GDAL_COMPUTE_VERSION(2, 0, 2): + return + + connection_name = 'testRaster' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + + test_gpkg_new = os.path.join(self.basetestpath, 'testRaster.gpkg') + shutil.copy(self.test_gpkg, test_gpkg_new) + mem_ds = gdal.GetDriverByName('MEM').Create('', 20, 20) + mem_ds.SetGeoTransform([2, 0.01, 0, 49, 0, -0.01]) + sr = osr.SpatialReference() + sr.ImportFromEPSG(4326) + mem_ds.SetProjection(sr.ExportToWkt()) + mem_ds.GetRasterBand(1).Fill(255) + gdal.GetDriverByName('GPKG').CreateCopy(test_gpkg_new, mem_ds, options=['APPEND_SUBDATASET=YES', 'RASTER_TABLE=raster_table']) + mem_ds = None + + uri.setDatabase(test_gpkg_new) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 2) + table = None + for i in range(2): + if tables[i].name == 'raster_table': + table = tables[i] + break + self.assertIsNotNone(table) + info = table.info() + expected_html = """

General info

Relation type: Table 
Rows: Unknown (find out

GeoPackage

Column:  
Geometry: RASTER 
Spatial ref: WGS 84 geodetic (4326) 
Extent: 2.00000, 48.80000 - 2.20000, 49.00000 

Fields

Name Type Null Default 
id INTEGER  
zoom_level INTEGER  
tile_column INTEGER  
tile_row INTEGER  
tile_data BLOB  

Indexes

Name Column(s) 
sqlite_autoindex_raster_table_1 zoom_level
tile_column
tile_row 
""" + self.assertEqual(info.toHtml(), expected_html, info.toHtml()) + + connection.remove() + + def testTwoRaster(self): + + if int(gdal.VersionInfo('VERSION_NUM')) < GDAL_COMPUTE_VERSION(2, 0, 2): + return + + connection_name = 'testTwoRaster' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + + test_gpkg_new = os.path.join(self.basetestpath, 'testTwoRaster.gpkg') + shutil.copy(self.test_gpkg, test_gpkg_new) + mem_ds = gdal.GetDriverByName('MEM').Create('', 20, 20) + mem_ds.SetGeoTransform([2, 0.01, 0, 49, 0, -0.01]) + sr = osr.SpatialReference() + sr.ImportFromEPSG(4326) + mem_ds.SetProjection(sr.ExportToWkt()) + mem_ds.GetRasterBand(1).Fill(255) + for i in range(2): + gdal.GetDriverByName('GPKG').CreateCopy(test_gpkg_new, mem_ds, options=['APPEND_SUBDATASET=YES', 'RASTER_TABLE=raster_table%d' % (i + 1)]) + mem_ds = None + + uri.setDatabase(test_gpkg_new) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 3) + table = None + for i in range(2): + if tables[i].name.startswith('raster_table'): + table = tables[i] + info = table.info() + info.toHtml() + + connection.remove() + + def testNonSpatial(self): + + connection_name = 'testNonSpatial' + plugin = createDbPlugin('gpkg') + uri = QgsDataSourceUri() + + test_gpkg = os.path.join(self.basetestpath, 'testNonSpatial.gpkg') + ds = ogr.GetDriverByName('GPKG').CreateDataSource(test_gpkg) + lyr = ds.CreateLayer('testNonSpatial', geom_type=ogr.wkbNone) + lyr.CreateField(ogr.FieldDefn('text_field', ogr.OFTString)) + f = ogr.Feature(lyr.GetLayerDefn()) + f['text_field'] = 'foo' + lyr.CreateFeature(f) + f = None + ds = None + + uri.setDatabase(test_gpkg) + self.assertTrue(plugin.addConnection(connection_name, uri)) + + connection = createDbPlugin('gpkg', connection_name) + connection.connect() + + db = connection.database() + self.assertIsNotNone(db) + + tables = db.tables() + self.assertEqual(len(tables), 1) + table = tables[0] + self.assertEqual(table.name, 'testNonSpatial') + info = table.info() + expected_html = """

General info

Relation type: Table 
Rows: 

Fields

Name Type Null Default 
fid INTEGER  
text_field TEXT  
""" + self.assertEqual(info.toHtml(), expected_html, info.toHtml()) + + connection.remove() + +if __name__ == '__main__': + unittest.main()