|
@@ -63,6 +63,7 @@ def __init__(self, uri): |
|
|
try: |
|
|
self.connection = psycopg2.connect(expandedConnInfo) |
|
|
except self.connection_error_types() as e: |
|
|
# get credentials if cached or assking to the user no more than 3 times |
|
|
err = unicode(e) |
|
|
uri = self.uri() |
|
|
conninfo = uri.connectionInfo(False) |
|
@@ -85,44 +86,13 @@ def __init__(self, uri): |
|
|
except self.connection_error_types() as e: |
|
|
if i == 2: |
|
|
raise ConnectionError(e) |
|
|
|
|
|
err = unicode(e) |
|
|
finally: |
|
|
# remove certs (if any) of the expanded connectionInfo |
|
|
expandedUri = QgsDataSourceURI(newExpandedConnInfo) |
|
|
|
|
|
sslCertFile = expandedUri.param("sslcert") |
|
|
if sslCertFile: |
|
|
sslCertFile = sslCertFile.replace("'", "") |
|
|
os.remove(sslCertFile) |
|
|
|
|
|
sslKeyFile = expandedUri.param("sslkey") |
|
|
if sslKeyFile: |
|
|
sslKeyFile = sslKeyFile.replace("'", "") |
|
|
os.remove(sslKeyFile) |
|
|
|
|
|
sslCAFile = expandedUri.param("sslrootcert") |
|
|
if sslCAFile: |
|
|
sslCAFile = sslCAFile.replace("'", "") |
|
|
os.remove(sslCAFile) |
|
|
# clear certs for each time trying to connect |
|
|
self._clearSslTempCertsIfAny(newExpandedConnInfo) |
|
|
finally: |
|
|
# remove certs (if any) of the expanded connectionInfo |
|
|
expandedUri = QgsDataSourceURI(expandedConnInfo) |
|
|
|
|
|
sslCertFile = expandedUri.param("sslcert") |
|
|
if sslCertFile: |
|
|
sslCertFile = sslCertFile.replace("'", "") |
|
|
os.remove(sslCertFile) |
|
|
|
|
|
sslKeyFile = expandedUri.param("sslkey") |
|
|
if sslKeyFile: |
|
|
sslKeyFile = sslKeyFile.replace("'", "") |
|
|
os.remove(sslKeyFile) |
|
|
|
|
|
sslCAFile = expandedUri.param("sslrootcert") |
|
|
if sslCAFile: |
|
|
sslCAFile = sslCAFile.replace("'", "") |
|
|
os.remove(sslCAFile) |
|
|
# clear certs of the first connection try |
|
|
self._clearSslTempCertsIfAny(expandedConnInfo) |
|
|
|
|
|
self.connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) |
|
|
|
|
@@ -138,6 +108,33 @@ def __init__(self, uri): |
|
|
def _connectionInfo(self): |
|
|
return str(self.uri().connectionInfo(True)) |
|
|
|
|
|
def _clearSslTempCertsIfAny(self, connectionInfo): |
|
|
# remove certs (if any) of the connectionInfo |
|
|
expandedUri = QgsDataSourceURI(connectionInfo) |
|
|
|
|
|
def removeCert(certFile): |
|
|
certFile = certFile.replace("'", "") |
|
|
file = QFile(certFile) |
|
|
# set permission to allow removing on Win. |
|
|
# On linux and Mac if file is set with QFile::>ReadUser |
|
|
# does not create problem removin certs |
|
|
if not file.setPermissions(QFile.WriteOwner): |
|
|
raise Exception('Cannot change permissions on {}: error code: {}'.format(file.fileName(), file.error())) |
|
|
if not file.remove(): |
|
|
raise Exception('Cannot remove {}: error code: {}'.format(file.fileName(), file.error())) |
|
|
|
|
|
sslCertFile = expandedUri.param("sslcert") |
|
|
if sslCertFile: |
|
|
removeCert(sslCertFile) |
|
|
|
|
|
sslKeyFile = expandedUri.param("sslkey") |
|
|
if sslKeyFile: |
|
|
removeCert(sslKeyFile) |
|
|
|
|
|
sslCAFile = expandedUri.param("sslrootcert") |
|
|
if sslCAFile: |
|
|
removeCert(sslCAFile) |
|
|
|
|
|
def _checkSpatial(self): |
|
|
""" check whether postgis_version is present in catalog """ |
|
|
c = self._execute(None, u"SELECT COUNT(*) FROM pg_proc WHERE proname = 'postgis_version'") |
|
|