|
@@ -29,6 +29,7 @@ |
|
|
import re |
|
|
import tempfile |
|
|
import inspect |
|
|
import sys |
|
|
import time |
|
|
import test_qgsdelimitedtextprovider_wanted as want # NOQA |
|
|
|
|
@@ -57,6 +58,53 @@ |
|
|
geomkey = "#geometry" |
|
|
fidkey = "#fid" |
|
|
|
|
|
try: |
|
|
# Qt 5 |
|
|
from qgis.PyQt.QtCore import QUrlQuery |
|
|
|
|
|
class MyUrl: |
|
|
|
|
|
def __init__(self, url): |
|
|
self.url = url |
|
|
self.query = QUrlQuery() |
|
|
|
|
|
@classmethod |
|
|
def fromLocalFile(cls, filename): |
|
|
return cls(QUrl.fromLocalFile(filename)) |
|
|
|
|
|
def addQueryItem(self, k, v): |
|
|
self.query.addQueryItem(k, v) |
|
|
|
|
|
def toString(self): |
|
|
urlstr = self.url.toString() |
|
|
querystr = self.query.toString(QUrl.FullyDecoded) |
|
|
if querystr != '': |
|
|
urlstr += '?' |
|
|
urlstr += querystr |
|
|
return urlstr |
|
|
except: |
|
|
MyUrl = QUrl |
|
|
|
|
|
|
|
|
def normalize_query_items_order(s): |
|
|
splitted_url = s.split('?') |
|
|
urlstr = splitted_url[0] |
|
|
if len(splitted_url) == 2: |
|
|
items_list = splitted_url[1].split('&') |
|
|
items_map = {} |
|
|
for item in items_list: |
|
|
splitted_item = item.split('=') |
|
|
items_map[splitted_item[0]] = splitted_item[1] |
|
|
first_arg = True |
|
|
for k in sorted(items_map.keys()): |
|
|
if first_arg: |
|
|
urlstr += '?' |
|
|
first_arg = False |
|
|
else: |
|
|
urlstr += '&' |
|
|
urlstr += k + '=' + items_map[k] |
|
|
return urlstr |
|
|
|
|
|
# Thought we could connect to messageReceived signal but doesn't seem to be available |
|
|
# in python :-( Not sure why? |
|
|
|
|
@@ -77,7 +125,10 @@ def __exit__(self, type, value, traceback): |
|
|
|
|
|
def logMessage(self, msg, tag, level): |
|
|
if tag == self.tag or not self.tag: |
|
|
self.log.append(unicode(msg)) |
|
|
if sys.version_info.major == 2: |
|
|
self.log.append(unicode(msg)) |
|
|
else: |
|
|
self.log.append(str(msg)) |
|
|
|
|
|
def messages(self): |
|
|
return self.log |
|
@@ -92,7 +143,7 @@ def setUpClass(cls): |
|
|
srcpath = os.path.join(TEST_DATA_DIR, 'provider') |
|
|
cls.basetestfile = os.path.join(srcpath, 'delimited_xy.csv') |
|
|
|
|
|
url = QUrl.fromLocalFile(cls.basetestfile) |
|
|
url = MyUrl.fromLocalFile(cls.basetestfile) |
|
|
url.addQueryItem("crs", "epsg:4326") |
|
|
url.addQueryItem("type", "csv") |
|
|
url.addQueryItem("xField", "X") |
|
@@ -119,7 +170,7 @@ def setUpClass(cls): |
|
|
srcpath = os.path.join(TEST_DATA_DIR, 'provider') |
|
|
cls.basetestfile = os.path.join(srcpath, 'delimited_wkt.csv') |
|
|
|
|
|
url = QUrl.fromLocalFile(cls.basetestfile) |
|
|
url = MyUrl.fromLocalFile(cls.basetestfile) |
|
|
url.addQueryItem("crs", "epsg:4326") |
|
|
url.addQueryItem("type", "csv") |
|
|
url.addQueryItem("wktField", "wkt") |
|
@@ -133,7 +184,7 @@ def setUpClass(cls): |
|
|
|
|
|
cls.basetestpolyfile = os.path.join(srcpath, 'delimited_wkt_poly.csv') |
|
|
|
|
|
url = QUrl.fromLocalFile(cls.basetestpolyfile) |
|
|
url = MyUrl.fromLocalFile(cls.basetestpolyfile) |
|
|
url.addQueryItem("crs", "epsg:4326") |
|
|
url.addQueryItem("type", "csv") |
|
|
url.addQueryItem("wktField", "wkt") |
|
@@ -188,7 +239,10 @@ def layerData(self, layer, request={}, offset=0): |
|
|
for field in f.fields(): |
|
|
fields.append(str(field.name())) |
|
|
fieldTypes.append(str(field.typeName())) |
|
|
fielddata = dict((name, unicode(f[name])) for name in fields) |
|
|
if sys.version_info.major == 2: |
|
|
fielddata = dict((name, unicode(f[name])) for name in fields) |
|
|
else: |
|
|
fielddata = dict((name, str(f[name])) for name in fields) |
|
|
g = f.constGeometry() |
|
|
if g: |
|
|
fielddata[geomkey] = str(g.exportToWkt()) |
|
@@ -216,7 +270,7 @@ def delimitedTextData(self, testname, filename, requests, verbose, **params): |
|
|
# and return the data for the layer (fields, data) |
|
|
|
|
|
filepath = os.path.join(unitTestDataPath("delimitedtext"), filename) |
|
|
url = QUrl.fromLocalFile(filepath) |
|
|
url = MyUrl.fromLocalFile(filepath) |
|
|
if not requests: |
|
|
requests = [{}] |
|
|
for k in params.keys(): |
|
@@ -227,7 +281,7 @@ def delimitedTextData(self, testname, filename, requests, verbose, **params): |
|
|
if verbose: |
|
|
print(testname) |
|
|
layer = QgsVectorLayer(urlstr, 'test', 'delimitedtext') |
|
|
uri = unicode(layer.dataProvider().dataSourceUri()) |
|
|
uri = layer.dataProvider().dataSourceUri() |
|
|
if verbose: |
|
|
print(uri) |
|
|
basename = os.path.basename(filepath) |
|
@@ -318,20 +372,20 @@ def runTest(self, file, requests, **params): |
|
|
print("Running test:", testname) |
|
|
result = self.delimitedTextData(testname, file, requests, verbose, **params) |
|
|
if rebuildTests: |
|
|
printWanted(testname, result) |
|
|
self.printWanted(testname, result) |
|
|
assert False, "Test not run - being rebuilt" |
|
|
try: |
|
|
wanted = eval('want.{0}()'.format(testname)) |
|
|
except: |
|
|
printWanted(testname, result) |
|
|
self.printWanted(testname, result) |
|
|
assert False, "Test results not available for {0}".format(testname) |
|
|
|
|
|
data = result['data'] |
|
|
log = result['log'] |
|
|
failures = [] |
|
|
if result['uri'] != wanted['uri']: |
|
|
if normalize_query_items_order(result['uri']) != normalize_query_items_order(wanted['uri']): |
|
|
msg = "Layer Uri ({0}) doesn't match expected ({1})".format( |
|
|
result['uri'], wanted['uri']) |
|
|
normalize_query_items_order(result['uri']), normalize_query_items_order(wanted['uri'])) |
|
|
print(' ' + msg) |
|
|
failures.append(msg) |
|
|
if result['fieldTypes'] != wanted['fieldTypes']: |
|
@@ -381,7 +435,7 @@ def runTest(self, file, requests, **params): |
|
|
print(' Message log correct: Passed') |
|
|
|
|
|
if failures: |
|
|
printWanted(testname, result) |
|
|
self.printWanted(testname, result) |
|
|
|
|
|
assert len(failures) == 0, "\n".join(failures) |
|
|
|
|
@@ -604,14 +658,14 @@ def test_029_file_watcher(self): |
|
|
f.write("id,name\n1,rabbit\n2,pooh\n") |
|
|
|
|
|
def appendfile(layer): |
|
|
with file(filename, 'a') as f: |
|
|
with open(filename, 'a') as f: |
|
|
f.write('3,tigger\n') |
|
|
# print "Appended to file - sleeping" |
|
|
time.sleep(1) |
|
|
QCoreApplication.instance().processEvents() |
|
|
|
|
|
def rewritefile(layer): |
|
|
with file(filename, 'w') as f: |
|
|
with open(filename, 'w') as f: |
|
|
f.write("name,size,id\ntoad,small,5\nmole,medium,6\nbadger,big,7\n") |
|
|
# print "Rewritten file - sleeping" |
|
|
time.sleep(1) |
|
|