diff --git a/.vscode/settings.json b/.vscode/settings.json index 5fdb0a53..ae2bd35c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -15,7 +15,7 @@ // Formatting with black "python.formatting.provider": "black", - "python.formatting.blackPath": "${workspaceFolder}/.venv_dev/Lib", + "python.formatting.blackPath": "${workspaceFolder}\\.venv_dev\\Scripts\\black.exe", "python.formatting.blackArgs": ["--target-version=py37",], // Replicate the QGIS environment diff --git a/__init__.py b/__init__.py index 91aeaf56..d030cd7d 100644 --- a/__init__.py +++ b/__init__.py @@ -25,9 +25,10 @@ # noinspection PyPep8Naming def classFactory(iface): # pylint: disable=invalid-name - """Load Isogeo class from file Isogeo. + """Load Isogeo class from file Isogeo. :param iface: A QGIS interface instance. :type iface: QgsInterface """ - from .isogeo import Isogeo - return Isogeo(iface) + from .isogeo import Isogeo + return Isogeo(iface) + diff --git a/isogeo.py b/isogeo.py index e75231af..0e07511a 100644 --- a/isogeo.py +++ b/isogeo.py @@ -73,7 +73,14 @@ from .ui.credits.dlg_credits import IsogeoCredits # Plugin modules -from .modules import Authenticator, ApiRequester, MetadataDisplayer, IsogeoPlgTools, SharesParser, SearchFormManager +from .modules import ( + Authenticator, + ApiRequester, + MetadataDisplayer, + IsogeoPlgTools, + SharesParser, + SearchFormManager, +) # ############################################################################ # ########## Globals ############### diff --git a/modules/__init__.py b/modules/__init__.py index db33271e..043b9b4b 100644 --- a/modules/__init__.py +++ b/modules/__init__.py @@ -1,6 +1,6 @@ from .api import Authenticator, ApiRequester, SharesParser from .metadata_display import MetadataDisplayer -from .results import ResultsManager, CacheManager +from .results import ResultsManager, CacheManager from .tools import IsogeoPlgTools from .quick_search import QuickSearchManager from .search_form import SearchFormManager diff --git a/modules/api/__init__.py b/modules/api/__init__.py index 3654d252..5dbb92db 100644 --- a/modules/api/__init__.py +++ b/modules/api/__init__.py @@ -1,3 +1,3 @@ from .auth import Authenticator from .request import ApiRequester -from .shares import SharesParser \ No newline at end of file +from .shares import SharesParser diff --git a/modules/api/auth.py b/modules/api/auth.py index c1249ef1..8cdf66c9 100644 --- a/modules/api/auth.py +++ b/modules/api/auth.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- - + # Standard library import json import logging @@ -29,7 +29,8 @@ # ########## Classes ############### # ################################## -class Authenticator(): + +class Authenticator: """Basic class to manage user authentication to Isogeo's API : - Getting credentials from oAuth2 file or QGIS Settings - Storing credentials @@ -38,40 +39,39 @@ class Authenticator(): :param str auth_folder: the path to the plugin/_auth subfolder where oAuth2 file is stored. """ + # ui reference - authentication form ui_auth_form = IsogeoAuthentication() # api parameters api_params = { - "app_id" : "", - "app_secret" : "", - "url_base" : "https://v1.api.isogeo.com/", - "url_auth" : "https://id.api.isogeo.com/oauth/authorize", - "url_token" : "https://id.api.isogeo.com/oauth/token", - "url_redirect" : "http://localhost:5000/callback" - } - + "app_id": "", + "app_secret": "", + "url_base": "https://v1.api.isogeo.com/", + "url_auth": "https://id.api.isogeo.com/oauth/authorize", + "url_token": "https://id.api.isogeo.com/oauth/token", + "url_redirect": "http://localhost:5000/callback", + } + # plugin credentials storage parameters - credentials_location = { - "QSettings": 0, - "oAuth2_file": 0, - } + credentials_location = {"QSettings": 0, "oAuth2_file": 0} + + def __init__(self, auth_folder: str = r"../_auth"): - def __init__(self, auth_folder: str=r"../_auth"): - # API URLs - Prod - self.platform, self.api_url, self.app_url, self.csw_url,\ - self.mng_url, self.oc_url, self.ssl = plg_tools.set_base_url("prod") + self.platform, self.api_url, self.app_url, self.csw_url, self.mng_url, self.oc_url, self.ssl = plg_tools.set_base_url( + "prod" + ) # credentials storage folder if isinstance(auth_folder, Path): self.auth_folder = auth_folder - self.cred_filepath = self.auth_folder/"client_secrets.json" + self.cred_filepath = self.auth_folder / "client_secrets.json" elif auth_folder == None: self.auth_folder = None else: raise ValueError("pathlib.Path expected") - + # translation self.tr = object self.lang = str @@ -112,8 +112,10 @@ def credentials_check_qsettings(self): """ if "isogeo-plugin" in qsettings.childGroups(): - logger.warning("Old credentials found and removed in QGIS QSettings: isogeo-plugin") - + logger.warning( + "Old credentials found and removed in QGIS QSettings: isogeo-plugin" + ) + qsettings.remove("isogeo-plugin") return False elif "isogeo" in qsettings.childGroups(): @@ -127,7 +129,9 @@ def credentials_check_qsettings(self): qsettings.remove("isogeo/api_auth") logger.debug("QSettings clean up - api_auth") pass - if "auth" in qsettings.childGroups() and not qsettings.contains("auth/app_id"): + if "auth" in qsettings.childGroups() and not qsettings.contains( + "auth/app_id" + ): qsettings.remove("isogeo/auth") logger.debug("QSettings clean up - bad formatted auth") pass @@ -153,10 +157,9 @@ def credentials_check_file(self): credentials_filepath = self.cred_filepath # check if a client_secrets.json file is stored inside the _auth subfolder if not credentials_filepath.is_file(): - logger.debug("No credential files found: {}" - .format(credentials_filepath)) + logger.debug("No credential files found: {}".format(credentials_filepath)) return False - else : + else: pass # check file structure try: @@ -169,7 +172,7 @@ def credentials_check_file(self): return True # CREDENTIALS SAVER ------------------------------------------------------- - def credentials_storer(self, store_location: str="QSettings"): + def credentials_storer(self, store_location: str = "QSettings"): """Store class attributes (API parameters) into the specified store_location. :param str store_location: name of targetted store location. Options: @@ -181,12 +184,14 @@ def credentials_storer(self, store_location: str="QSettings"): qsettings.setValue("isogeo/auth/url_base", self.api_params["url_base"]) qsettings.setValue("isogeo/auth/url_auth", self.api_params["url_auth"]) qsettings.setValue("isogeo/auth/url_token", self.api_params["url_token"]) - qsettings.setValue("isogeo/auth/url_redirect", self.api_params["url_redirect"]) + qsettings.setValue( + "isogeo/auth/url_redirect", self.api_params["url_redirect"] + ) else: pass logger.debug("Credentials stored into: {}".format(store_location)) - def credentials_update(self, credentials_source: str="QSettings"): + def credentials_update(self, credentials_source: str = "QSettings"): """Update class attributes (API parameters) from specified credentials source. :param str credentials_source: name of targetted credentials source. Options: @@ -196,11 +201,21 @@ def credentials_update(self, credentials_source: str="QSettings"): # update class attributes if credentials_source == "QSettings": self.api_params["app_id"] = qsettings.value("isogeo/auth/app_id", "") - self.api_params["app_secret"] = qsettings.value("isogeo/auth/app_secret", "") - self.api_params["url_base"] = qsettings.value("isogeo/auth/url_base", "https://v1.api.isogeo.com/") - self.api_params["url_auth"] = qsettings.value("isogeo/auth/url_auth", "https://id.api.isogeo.com/oauth/authorize") - self.api_params["url_token"] = qsettings.value("isogeo/auth/url_token", "https://id.api.isogeo.com/oauth/token") - self.api_params["url_redirect"] = qsettings.value("isogeo/auth/url_redirect", "http://localhost:5000/callback") + self.api_params["app_secret"] = qsettings.value( + "isogeo/auth/app_secret", "" + ) + self.api_params["url_base"] = qsettings.value( + "isogeo/auth/url_base", "https://v1.api.isogeo.com/" + ) + self.api_params["url_auth"] = qsettings.value( + "isogeo/auth/url_auth", "https://id.api.isogeo.com/oauth/authorize" + ) + self.api_params["url_token"] = qsettings.value( + "isogeo/auth/url_token", "https://id.api.isogeo.com/oauth/token" + ) + self.api_params["url_redirect"] = qsettings.value( + "isogeo/auth/url_redirect", "http://localhost:5000/callback" + ) elif credentials_source == "oAuth2_file": creds = plg_tools.credentials_loader(self.cred_filepath) self.api_params["app_id"] = creds.get("client_id") @@ -209,12 +224,15 @@ def credentials_update(self, credentials_source: str="QSettings"): self.api_params["url_auth"] = creds.get("uri_auth") self.api_params["url_token"] = creds.get("uri_token") self.api_params["url_redirect"] = creds.get("uri_redirect") - #self.credentials_storer(store_location="QSettings") + # self.credentials_storer(store_location="QSettings") else: pass - logger.debug("Credentials updated from: {}. Application ID used: {}" - .format(credentials_source, self.api_params["app_id"])) + logger.debug( + "Credentials updated from: {}. Application ID used: {}".format( + credentials_source, self.api_params["app_id"] + ) + ) # AUTHENTICATION FORM ----------------------------------------------------- def display_auth_form(self): @@ -224,19 +242,25 @@ def display_auth_form(self): # connecting widgets self.ui_auth_form.chb_isogeo_editor.stateChanged.connect( lambda: qsettings.setValue( - "isogeo/user/editor", int(self.ui_auth_form.chb_isogeo_editor.isChecked()) + "isogeo/user/editor", + int(self.ui_auth_form.chb_isogeo_editor.isChecked()), ) ) - self.ui_auth_form.btn_account_new.pressed.connect(partial(plg_tools.mail_to_isogeo, lang=self.lang)) - self.ui_auth_form.btn_browse_credentials.fileChanged.connect(self.credentials_uploader) + self.ui_auth_form.btn_account_new.pressed.connect( + partial(plg_tools.mail_to_isogeo, lang=self.lang) + ) + self.ui_auth_form.btn_browse_credentials.fileChanged.connect( + self.credentials_uploader + ) # fillfull auth form fields from stored settings self.ui_auth_form.ent_app_id.setText(self.api_params["app_id"]) self.ui_auth_form.ent_app_secret.setText(self.api_params["app_secret"]) self.ui_auth_form.lbl_api_url_value.setText(self.api_params["url_base"]) - self.ui_auth_form.chb_isogeo_editor.setChecked(qsettings - .value("isogeo/user/editor", 0)) - + self.ui_auth_form.chb_isogeo_editor.setChecked( + qsettings.value("isogeo/user/editor", 0) + ) + # display logger.debug("Authentication form filled and ready to be launched.") self.ui_auth_form.show() @@ -252,19 +276,26 @@ def credentials_uploader(self): api_credentials = plg_tools.credentials_loader(selected_file) except Exception as e: logger.error(e) - QMessageBox.critical(self.ui_auth_form, - self.tr("Alert", "Authenticator"), - self.tr("The selected credentials file is not correct.", - "Authenticator")) + QMessageBox.critical( + self.ui_auth_form, + self.tr("Alert", "Authenticator"), + self.tr( + "The selected credentials file is not correct.", "Authenticator" + ), + ) # move credentials file into the plugin file structure dest_path = self.cred_filepath if dest_path.is_file(): - logger.debug("client_secrets.json already existed. " - "Previous file has been renamed.") + logger.debug( + "client_secrets.json already existed. " + "Previous file has been renamed." + ) - old_file_renamed = self.auth_folder/"old_client_secrets_{}.json".format(int(time.time())) + old_file_renamed = self.auth_folder / "old_client_secrets_{}.json".format( + int(time.time()) + ) dest_path.rename(old_file_renamed) - + else: pass try: @@ -272,8 +303,9 @@ def credentials_uploader(self): except Exception as e: logger.debug("OAuth2 file issue : check path validity.") - logger.debug("Selected credentials file has been moved into plugin" - "_auth subfolder") + logger.debug( + "Selected credentials file has been moved into plugin" "_auth subfolder" + ) # set form self.ui_auth_form.ent_app_id.setText(api_credentials.get("client_id")) @@ -367,25 +399,27 @@ def get_tags(self, tags: dict): pass # storing dicts - tags_parsed = {"actions": actions, - "compliance": compliance, - "contacts": contacts, - "formats": formats, - "inspire": inspire, - "keywords": keywords, - "licenses": licenses, - "owners": owners, - "srs": srs, - "types": md_types, - # "unused": unused - } + tags_parsed = { + "actions": actions, + "compliance": compliance, + "contacts": contacts, + "formats": formats, + "inspire": inspire, + "keywords": keywords, + "licenses": licenses, + "owners": owners, + "srs": srs, + "types": md_types, + # "unused": unused + } # method ending logger.info("Tags retrieved") return tags_parsed + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/api/request.py b/modules/api/request.py index 296db27e..bea5770d 100644 --- a/modules/api/request.py +++ b/modules/api/request.py @@ -26,6 +26,7 @@ # ########## Classes ############### # ################################## + class ApiRequester(QgsNetworkAccessManager): """Basic class to manage direct interactions with Isogeo's API : - Authentication request for token @@ -71,7 +72,7 @@ def setup_api_params(self, dict_params: dict): :param dict dict_params: a dict containing API parameters provided by Authenticator().manage_api_initialization method. """ - logger.debug("Setting api parameters") + logger.debug("Setting api parameters") self.app_id = dict_params.get("app_id", "") self.app_secret = dict_params.get("app_secret", "") self.api_url_base = dict_params.get("url_base", "") @@ -82,7 +83,7 @@ def setup_api_params(self, dict_params: dict): self.send_request("token") # self.auth_post_get_token() - def create_request(self, request_type:str): + def create_request(self, request_type: str): """Creates a QNetworkRequest() with appropriate headers and URL according to the 'request_type' parameter. @@ -105,20 +106,22 @@ def create_request(self, request_type:str): if request_type == "token": # filling request header with credentials header_value.append("Basic ") - header_value.append(base64.b64encode("{}:{}".format(self.app_id, self.app_secret).encode())) - # creating the QNetworkRequest from oAuth2 authentication URL + header_value.append( + base64.b64encode("{}:{}".format(self.app_id, self.app_secret).encode()) + ) + # creating the QNetworkRequest from oAuth2 authentication URL request = QNetworkRequest(QUrl(self.api_url_token)) - # creating and setting the 'Content-type header' + # creating and setting the 'Content-type header' ct_header_value = QByteArray() ct_header_value.append("application/json") request.setHeader(request.ContentTypeHeader, ct_header_value) # for other request_type, setting appropriate url - else : + else: if request_type == "shares": url = QUrl("{}/shares".format(self.api_url_base)) - elif request_type == "search" or request_type == "details" : + elif request_type == "search" or request_type == "details": url = QUrl(self.currentUrl) - else : + else: logger.debug("Unkown request type asked : {}".format(request_type)) raise ValueError # filling request header with token @@ -138,17 +141,19 @@ def send_request(self, request_type: str = "search"): - 'details' - 'shares' """ - logger.debug("-------------- Sending a '{}' request --------------".format(request_type)) + logger.debug( + "-------------- Sending a '{}' request --------------".format(request_type) + ) # creating the QNetworkRequest appropriate to the request_type request = self.create_request(request_type) logger.debug("to : {}".format(request.url().toString())) - # post request for 'token' request + # post request for 'token' request if request_type == "token": data = QByteArray() data.append(urlencode({"grant_type": "client_credentials"})) reply = self.post(request, data) # get request for other - else : + else: reply = self.get(request) return @@ -184,40 +189,52 @@ def handle_reply(self, reply: QNetworkReply): else: pass return - + url = reply.url().toString() # for token request, one signal is emitted passing a string whose # value depend on the reply content if "token" in url: logger.debug("Handling reply to a 'token' request") logger.debug("(from : {}).".format(url)) - if 'access_token' in parsed_content: + if "access_token" in parsed_content: QgsMessageLog.logMessage("Authentication succeeded", "Isogeo") logger.debug("Access token retrieved.") # storing token - self.token = "Bearer " + parsed_content.get('access_token') + self.token = "Bearer " + parsed_content.get("access_token") self.token_sig.emit("tokenOK") - elif 'error' in parsed_content: - logger.error("The API reply is an error: {}. ID and SECRET must be " - "invalid. Asking for them again." - .format(parsed_content.get('error'))) - msgBar.pushMessage("Isogeo", - self.tr("API authentication failed.Isogeo API answered: {}") - .format(parsed_content.get('error')), - duration=10, - level=1) + elif "error" in parsed_content: + logger.error( + "The API reply is an error: {}. ID and SECRET must be " + "invalid. Asking for them again.".format( + parsed_content.get("error") + ) + ) + msgBar.pushMessage( + "Isogeo", + self.tr( + "API authentication failed.Isogeo API answered: {}" + ).format(parsed_content.get("error")), + duration=10, + level=1, + ) self.token_sig.emit("credIssue") else: - msgBar.pushMessage("Isogeo", - self.tr("API authentication failed.Isogeo API answered: {}") - .format(parsed_content.get('error')), - duration=10, - level=2) - logger.debug("The API reply has an unexpected form: {}." - .format(parsed_content)) + msgBar.pushMessage( + "Isogeo", + self.tr( + "API authentication failed.Isogeo API answered: {}" + ).format(parsed_content.get("error")), + duration=10, + level=2, + ) + logger.debug( + "The API reply has an unexpected form: {}.".format( + parsed_content + ) + ) # for other types of request, a different signal is emitted depending # on the type of request but it always pass the reply's content - else : + else: self.loopCount = 0 if "shares" in url: logger.debug("Handling reply to a 'shares' request") @@ -226,15 +243,19 @@ def handle_reply(self, reply: QNetworkReply): elif "resources/search?" in url: logger.debug("Handling reply to a 'search' request") logger.debug("(from : {}).".format(url)) - self.search_sig.emit(parsed_content, self.get_tags(parsed_content.get("tags"))) + self.search_sig.emit( + parsed_content, self.get_tags(parsed_content.get("tags")) + ) elif "resources/" in reply.url().toString(): logger.debug("Handling reply to a 'details' request") logger.debug("(from : {}).".format(url)) - self.details_sig.emit(parsed_content, self.get_tags(parsed_content.get("tags"))) - else : + self.details_sig.emit( + parsed_content, self.get_tags(parsed_content.get("tags")) + ) + else: logger.debug("Unkown reply type") del parsed_content - + # if replys's content is invalid elif reply.error() == 204: logger.debug("Token expired. Renewing it.") @@ -242,9 +263,11 @@ def handle_reply(self, reply: QNetworkReply): self.send_request("token") elif content == "": - logger.error("Empty reply. Weither no catalog is shared with the " - "plugin, or there is a problem (2 requests sent " - "together)") + logger.error( + "Empty reply. Weither no catalog is shared with the " + "plugin, or there is a problem (2 requests sent " + "together)" + ) if self.loopCount < 3: self.loopCount += 1 reply.abort() @@ -253,20 +276,24 @@ def handle_reply(self, reply: QNetworkReply): else: # self.status_isClear = True msgBar.pushMessage( - self.tr("The script is looping. Make sure you shared a " - "catalog with the plugin. If so, please report " - "this on the bug tracker.")) + self.tr( + "The script is looping. Make sure you shared a " + "catalog with the plugin. If so, please report " + "this on the bug tracker." + ) + ) self.token_sig.emit("NoInternet") return - else : + else: logger.warning("Unknown error : {}".format(str(reply.error()))) # self.status_isClear = True - QMessageBox.information(self.iface.mainWindow(), - self.tr("Error"), - self.tr("You are facing an unknown error. " - "Code: ") + - str(answer.error()) + - "\nPlease report it on the bug tracker.") + QMessageBox.information( + self.iface.mainWindow(), + self.tr("Error"), + self.tr("You are facing an unknown error. " "Code: ") + + str(answer.error()) + + "\nPlease report it on the bug tracker.", + ) return def build_request_url(self, params: dict): @@ -317,16 +344,16 @@ def build_request_url(self, params: dict): # Geographical filter if params.get("geofilter") is not None: if params.get("coord") is not False: - filters += "&box={0}&rel={1}".format(params.get("coord"), - params.get("operation")) + filters += "&box={0}&rel={1}".format( + params.get("coord"), params.get("operation") + ) else: pass else: pass # Sorting order and direction if params.get("show"): - filters += "&ob={0}&od={1}".format(params.get("ob"), - params.get("od")) + filters += "&ob={0}&od={1}".format(params.get("ob"), params.get("od")) filters += "&_include=serviceLayers,layers" limit = 10 else: @@ -339,7 +366,7 @@ def build_request_url(self, params: dict): # BUILDING FINAL URL url += filters # method ending - return url + return url def get_tags(self, tags: dict): """ This parse the tags contained in API_answer[tags] and class them so @@ -421,24 +448,27 @@ def get_tags(self, tags: dict): pass # storing dicts - tags_parsed = {"actions": actions, - "compliance": compliance, - "contacts": contacts, - "formats": formats, - "inspire": inspire, - "keywords": keywords, - "licenses": licenses, - "owners": owners, - "srs": srs, - "types": md_types, - # "unused": unused - } + tags_parsed = { + "actions": actions, + "compliance": compliance, + "contacts": contacts, + "formats": formats, + "inspire": inspire, + "keywords": keywords, + "licenses": licenses, + "owners": owners, + "srs": srs, + "types": md_types, + # "unused": unused + } # method ending logger.info("Tags retrieved") return tags_parsed + + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/api/shares.py b/modules/api/shares.py index 1f5f86fe..b98e7a86 100644 --- a/modules/api/shares.py +++ b/modules/api/shares.py @@ -27,6 +27,7 @@ class SharesParser(QObject): """Build the string informing the user about the shares feeding his plugin from the Isogeo API's response to a share request. """ + shares_ready = pyqtSignal(str) def __init__(self): @@ -47,35 +48,46 @@ def send_share_info(self, shares: list): logger.debug("Application properties provided by the Isogeo API.") content = shares - text = u"" # opening html content + text = "" # opening html content # Isogeo application authenticated in the plugin app = content[0].get("applications")[0] - text += self.tr(u"

This plugin is authenticated as " - u"{} and ").format(app.get("url", "https://isogeo.gitbooks.io/app-plugin-qgis/content"), - app.get("name", "Isogeo plugin for QGIS")) + text += self.tr( + "

This plugin is authenticated as " "{} and " + ).format( + app.get("url", "https://isogeo.gitbooks.io/app-plugin-qgis/content"), + app.get("name", "Isogeo plugin for QGIS"), + ) # shares feeding the application if len(content) == 1: - text += self.tr(u" powered by 1 share:


") + text += self.tr(" powered by 1 share:


") else: - text += self.tr(u" powered by {} shares:


").format(len(content)) + text += self.tr(" powered by {} shares:


").format(len(content)) # shares details for share in content: # share variables creator_name = share.get("_creator").get("contact").get("name") creator_email = share.get("_creator").get("contact").get("email") creator_id = share.get("_creator").get("_tag")[6:] - share_url = "https://app.isogeo.com/groups/{}/admin/shares/{}".format(creator_id, share.get("_id")) + share_url = "https://app.isogeo.com/groups/{}/admin/shares/{}".format( + creator_id, share.get("_id") + ) # formatting text - text += u"

{}

".format(share_url, - share.get("name")) - text += self.tr(u"

Updated: {}

").format(plg_tools.handle_date(share.get("_modified"))) - text += self.tr(u"

Contact: {} - {}

").format(creator_name, creator_email) - text += u"


" - text += u"" + text += "

{}

".format( + share_url, share.get("name") + ) + text += self.tr("

Updated: {}

").format( + plg_tools.handle_date(share.get("_modified")) + ) + text += self.tr("

Contact: {} - {}

").format( + creator_name, creator_email + ) + text += "


" + text += "" self.shares_ready.emit(text) + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/layer/__init__.py b/modules/layer/__init__.py index b8d051b5..16b56c54 100644 --- a/modules/layer/__init__.py +++ b/modules/layer/__init__.py @@ -1 +1 @@ -from .add_layer import LayerAdder \ No newline at end of file +from .add_layer import LayerAdder diff --git a/modules/layer/add_layer.py b/modules/layer/add_layer.py index e4ffb621..dc960127 100644 --- a/modules/layer/add_layer.py +++ b/modules/layer/add_layer.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import logging @@ -15,8 +14,14 @@ # PyQGIS import db_manager.db_plugins.postgis.connector as pgis_con -from qgis.core import QgsDataSourceUri, QgsProject, QgsVectorLayer, QgsRasterLayer, QgsMessageLog -from qgis.utils import iface +from qgis.core import ( + QgsDataSourceUri, + QgsProject, + QgsVectorLayer, + QgsRasterLayer, + QgsMessageLog, +) +from qgis.utils import iface # Plugin modules from ..tools import IsogeoPlgTools @@ -30,11 +35,15 @@ logger = logging.getLogger("IsogeoQgisPlugin") plg_tools = IsogeoPlgTools() -qgis_wms_formats = ('image/png', 'image/png8', - 'image/jpeg', - 'image/svg', - 'image/gif', - 'image/geotiff', 'image/tiff') +qgis_wms_formats = ( + "image/png", + "image/png8", + "image/jpeg", + "image/svg", + "image/gif", + "image/geotiff", + "image/tiff", +) # ############################################################################ # ##### Conditional imports ######## @@ -45,28 +54,31 @@ from owslib.wmts import WebMapTileService from owslib.util import ServiceException import owslib - logging.info("Depencencies - owslib version: {}" - .format(owslib.__version__)) + + logging.info("Depencencies - owslib version: {}".format(owslib.__version__)) except ImportError as e: logger.warning("Depencencies - owslib is not present") try: from owslib.wfs import WebFeatureService except ImportError as e: - logger.warning("Depencencies - owslib WFS issue: {}" - .format(e)) + logger.warning("Depencencies - owslib WFS issue: {}".format(e)) try: from owslib.util import HTTPError + logger.info("Depencencies - HTTPError within owslib") except ImportError as e: from urllib.error import HTTPError - logger.warning("Depencencies - HTTPError not within owslib." - " Directly imported from urllib.error") + + logger.warning( + "Depencencies - HTTPError not within owslib." + " Directly imported from urllib.error" + ) try: import requests - logger.info("Depencencies - Requests version: {}" - .format(requests.__version__)) + + logger.info("Depencencies - Requests version: {}".format(requests.__version__)) except ImportError as e: logger.warning("Depencencies - Requests not available") @@ -75,7 +87,7 @@ # ################################## -class LayerAdder(): +class LayerAdder: """Basic class that holds utilitary methods for the plugin.""" def __init__(self): @@ -89,7 +101,9 @@ def __init__(self): self.tr = None self.md_sync = MetadataSynchronizer() - def build_efs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete"): + def build_efs_url( + self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete" + ): """Reformat the input Esri Feature Service url so it fits QGIS criterias. Tests weither all the needed information is provided in the url, and @@ -107,10 +121,11 @@ def build_efs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= efs_uri.setParam("restrictToRequestBBOX", "1") btn_lbl = "EFS : {}".format(efs_lyr_title) - return ["EFS", efs_lyr_title, efs_uri.uri(), - api_layer, srv_details, btn_lbl] + return ["EFS", efs_lyr_title, efs_uri.uri(), api_layer, srv_details, btn_lbl] - def build_ems_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete"): + def build_ems_url( + self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete" + ): """Reformat the input Esri Map Service url so it fits QGIS criterias. Tests weither all the needed information is provided in the url, and @@ -128,10 +143,11 @@ def build_ems_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= # ems_uri.setParam("restrictToRequestBBOX", "1") btn_lbl = "EMS : {}".format(ems_lyr_title) - return ["EMS", ems_lyr_title, ems_uri.uri(), - api_layer, srv_details, btn_lbl] + return ["EMS", ems_lyr_title, ems_uri.uri(), api_layer, srv_details, btn_lbl] - def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete"): + def build_wfs_url( + self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete" + ): """Reformat the input WMS url so it fits QGIS criterias. Tests weither all the needed information is provided in the url, and @@ -139,14 +155,15 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= """ # local variables layer_title = api_layer.get("titles")[0].get("value", "WFS Layer") - wfs_url_getcap = srv_details.get("path")\ - + "?request=GetCapabilities&service=WFS" + wfs_url_getcap = ( + srv_details.get("path") + "?request=GetCapabilities&service=WFS" + ) geoserver = "geoserver" in wfs_url_getcap layer_id = api_layer.get("id") - layer_name = re.sub('\{.*?}', "", layer_id) + layer_name = re.sub("\{.*?}", "", layer_id) # handling WFS namespaces if "{" in layer_id: - namespace = layer_id[layer_id.find("{") + 1:layer_id.find("}")] + namespace = layer_id[layer_id.find("{") + 1 : layer_id.find("}")] logging.debug("WFS - Namespace: " + namespace) else: namespace = "" @@ -164,8 +181,7 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= wfs_url_quicky = uri.uri() btn_lbl = "WFS : {}".format(layer_title) - return ["WFS", layer_title, wfs_url_quicky, - api_layer, srv_details, btn_lbl] + return ["WFS", layer_title, wfs_url_quicky, api_layer, srv_details, btn_lbl] elif mode == "complete": # Clean, complete but slower way - OWSLib ------------------------- if srv_details.get("path") == self.cached_wfs.get("srv_path"): @@ -187,9 +203,14 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= return 0, e # check if GetFeature and DescribeFeatureType operation are available - if not hasattr(wfs, "getfeature") or "GetFeature" not in [op.name for op in wfs.operations]: + if not hasattr(wfs, "getfeature") or "GetFeature" not in [ + op.name for op in wfs.operations + ]: self.cached_wfs["GetFeature"] = 0 - return 0, "Required GetFeature operation not available in: " + wfs_url_getcap + return ( + 0, + "Required GetFeature operation not available in: " + wfs_url_getcap, + ) else: self.cached_wfs["GetFeature"] = 1 logger.info("GetFeature available") @@ -197,7 +218,11 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= if "DescribeFeatureType" not in [op.name for op in wfs.operations]: self.cached_wfs["DescribeFeatureType"] = 0 - return 0, "Required DescribeFeatureType operation not available in: " + wfs_url_getcap + return ( + 0, + "Required DescribeFeatureType operation not available in: " + + wfs_url_getcap, + ) else: self.cached_wfs["DescribeFeatureType"] = 1 logger.info("DescribeFeatureType available") @@ -207,26 +232,35 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= try: wfs_lyr = wfs[layer_name] except KeyError as e: - logger.error("Layer {} not found in WFS service: {}" - .format(layer_name, - wfs_url_getcap)) - if geoserver and layer_name in [l.split(":")[1] for l in list(wfs.contents)]: - layer_name = list(wfs.contents)[[l.split(":")[1] - for l in list(wfs.contents)].index(layer_name)] + logger.error( + "Layer {} not found in WFS service: {}".format( + layer_name, wfs_url_getcap + ) + ) + if geoserver and layer_name in [ + l.split(":")[1] for l in list(wfs.contents) + ]: + layer_name = list(wfs.contents)[ + [l.split(":")[1] for l in list(wfs.contents)].index(layer_name) + ] try: wfs_lyr = wfs[layer_name] except KeyError as e: - return (0, - "Layer {} not found in WFS service: {}" - .format(layer_name, - wfs_url_getcap), - e) + return ( + 0, + "Layer {} not found in WFS service: {}".format( + layer_name, wfs_url_getcap + ), + e, + ) else: - return (0, - "Layer {} not found in WFS service: {}" - .format(layer_name, - wfs_url_getcap), - e) + return ( + 0, + "Layer {} not found in WFS service: {}".format( + layer_name, wfs_url_getcap + ), + e, + ) # SRS definition srs_map = plg_tools.get_map_crs() @@ -244,16 +278,21 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= # "OTF smart enabled: " + srs_qgs_otf_auto, # "Map canvas SRS:" + plg_tools.get_map_crs()) - wfs_lyr_crs_epsg = ["{}:{}".format(srs.authority, srs.code) - for srs in wfs_lyr.crsOptions] + wfs_lyr_crs_epsg = [ + "{}:{}".format(srs.authority, srs.code) for srs in wfs_lyr.crsOptions + ] self.cached_wfs["CRS"] = wfs_lyr_crs_epsg if srs_map in wfs_lyr_crs_epsg: logger.debug("It's a SRS match! With map canvas: " + srs_map) srs = srs_map - elif srs_qgs_new in wfs_lyr_crs_epsg\ - and srs_qgs_otf_on == "false"\ - and srs_qgs_otf_auto == "false": - logger.debug("It's a SRS match! With default new project: " + srs_qgs_new) + elif ( + srs_qgs_new in wfs_lyr_crs_epsg + and srs_qgs_otf_on == "false" + and srs_qgs_otf_auto == "false" + ): + logger.debug( + "It's a SRS match! With default new project: " + srs_qgs_new + ) srs = srs_qgs_new elif srs_lyr_crs in wfs_lyr_crs_epsg and srs_lyr_new == "useGlobal": logger.debug("It's a SRS match! With default new layer: " + srs_lyr_crs) @@ -271,7 +310,7 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= # print(lyr_style) # GetFeature URL - wfs_lyr_url = wfs.getOperationByName('GetFeature').methods + wfs_lyr_url = wfs.getOperationByName("GetFeature").methods wfs_lyr_url = wfs_lyr_url[0].get("url") if wfs_lyr_url[-1] != "&": wfs_lyr_url = wfs_lyr_url + "&" @@ -281,18 +320,20 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= # url construction try: - wfs_url_params = {"SERVICE": "WFS", - "VERSION": "1.0.0", - "TYPENAME": layer_name, - "SRSNAME": srs, - } + wfs_url_params = { + "SERVICE": "WFS", + "VERSION": "1.0.0", + "TYPENAME": layer_name, + "SRSNAME": srs, + } wfs_url_final = wfs_lyr_url + unquote(urlencode(wfs_url_params, "utf8")) except UnicodeEncodeError: - wfs_url_params = {"SERVICE": "WFS", - "VERSION": "1.0.0", - "TYPENAME": layer_name.decode("latin1"), - "SRSNAME": srs, - } + wfs_url_params = { + "SERVICE": "WFS", + "VERSION": "1.0.0", + "TYPENAME": layer_name.decode("latin1"), + "SRSNAME": srs, + } wfs_url_final = wfs_lyr_url + unquote(urlencode(wfs_url_params)) # method ending logger.debug(wfs_url_final) @@ -302,7 +343,9 @@ def build_wfs_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= else: return None - def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete"): + def build_wms_url( + self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete" + ): """Reformat the input WMS url so it fits QGIS criterias. Tests weither all the needed information is provided in the url, and @@ -311,8 +354,9 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= # local variables layer_name = api_layer.get("id") layer_title = api_layer.get("titles")[0].get("value", "WMS Layer") - wms_url_getcap = srv_details.get("path")\ - + "?request=GetCapabilities&service=WMS" + wms_url_getcap = ( + srv_details.get("path") + "?request=GetCapabilities&service=WMS" + ) geoserver = "geoserver" in wms_url_getcap if mode == "quicky": @@ -324,26 +368,39 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= else: pass # url construction - wms_url_params = {"SERVICE": "WMS", - "VERSION": srv_details.get("formatVersion", "1.3.0"), - "REQUEST": "GetMap", - "layers": layer_name, - "crs": srs_map, - "format": "image/png", - "styles": "", - "url": wms_url_base, - } + wms_url_params = { + "SERVICE": "WMS", + "VERSION": srv_details.get("formatVersion", "1.3.0"), + "REQUEST": "GetMap", + "layers": layer_name, + "crs": srs_map, + "format": "image/png", + "styles": "", + "url": wms_url_base, + } wms_url_quicky = unquote(urlencode(wms_url_params, "utf8")) # prevent encoding errors (#102) try: btn_lbl = "WMS : {}".format(layer_title) - return ["WMS", layer_title, wms_url_quicky, - api_layer, srv_details, btn_lbl] + return [ + "WMS", + layer_title, + wms_url_quicky, + api_layer, + srv_details, + btn_lbl, + ] except UnicodeEncodeError as e: btn_lbl = "WMS : {}".format(layer_name) logger.debug(e) - return ["WMS", layer_title, wms_url_quicky, - api_layer, srv_details, btn_lbl] + return [ + "WMS", + layer_title, + wms_url_quicky, + api_layer, + srv_details, + btn_lbl, + ] elif mode == "complete": # Clean, complete but slower way - OWSLib ------------------------- @@ -371,9 +428,14 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= return 0, e # check if GetMap operation is available - if not hasattr(wms, "getmap") or "GetMap" not in [op.name for op in wms.operations]: + if not hasattr(wms, "getmap") or "GetMap" not in [ + op.name for op in wms.operations + ]: self.cached_wms["GetMap"] = 1 - return 0, "Required GetMap operation not available in: "+ wms_url_getcap + return ( + 0, + "Required GetMap operation not available in: " + wms_url_getcap, + ) else: self.cached_wms["GetMap"] = 0 pass @@ -381,13 +443,18 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= try: wms_lyr = wms[layer_name] except KeyError as e: - logger.error("Layer {} not found in WMS service: {}" - .format(layer_name, - wms_url_getcap)) - return (0, - "Layer {} not found in WMS service: {}" - .format(layer_name, - wms_url_getcap), e) + logger.error( + "Layer {} not found in WMS service: {}".format( + layer_name, wms_url_getcap + ) + ) + return ( + 0, + "Layer {} not found in WMS service: {}".format( + layer_name, wms_url_getcap + ), + e, + ) # SRS definition srs_map = plg_tools.get_map_crs() @@ -408,8 +475,14 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= if srs_map in wms_lyr.crsOptions: logger.debug("It's a SRS match! With map canvas: " + srs_map) srs = srs_map - elif srs_qgs_new in wms_lyr.crsOptions and srs_qgs_otf_on == "false" and srs_qgs_otf_auto == "false": - logger.debug("It's a SRS match! With default new project: " + srs_qgs_new) + elif ( + srs_qgs_new in wms_lyr.crsOptions + and srs_qgs_otf_on == "false" + and srs_qgs_otf_auto == "false" + ): + logger.debug( + "It's a SRS match! With default new project: " + srs_qgs_new + ) srs = srs_qgs_new elif srs_lyr_crs in wms_lyr.crsOptions and srs_lyr_new == "useGlobal": logger.debug("It's a SRS match! With default new layer: " + srs_lyr_crs) @@ -422,9 +495,10 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= srs = wms_lyr.crsOptions[0] # Format definition - wms_lyr_formats = wms.getOperationByName('GetMap').formatOptions - formats_image = [f.split(" ", 1)[0] for f in wms_lyr_formats - if f in qgis_wms_formats] + wms_lyr_formats = wms.getOperationByName("GetMap").formatOptions + formats_image = [ + f.split(" ", 1)[0] for f in wms_lyr_formats if f in qgis_wms_formats + ] self.cached_wms["formats"] = formats_image if "image/png" in formats_image: layer_format = "image/png" @@ -437,7 +511,7 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= lyr_style = list(wms_lyr.styles.keys())[0] # GetMap URL - wms_lyr_url = wms.getOperationByName('GetMap').methods + wms_lyr_url = wms.getOperationByName("GetMap").methods wms_lyr_url = wms_lyr_url[0].get("url") if wms_lyr_url[-1] == "&": wms_lyr_url = wms_lyr_url[:-1] @@ -447,37 +521,41 @@ def build_wms_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode= # url construction try: - wms_url_params = {"SERVICE": "WMS", - "VERSION": srv_details.get("formatVersion", "1.3.0"), - "REQUEST": "GetMap", - "layers": layer_name, - "crs": srs, - "format": layer_format, - "styles": "", - # "styles": lyr_style, - # "url": srv_details.get("path"), - "url": wms_lyr_url, - } + wms_url_params = { + "SERVICE": "WMS", + "VERSION": srv_details.get("formatVersion", "1.3.0"), + "REQUEST": "GetMap", + "layers": layer_name, + "crs": srs, + "format": layer_format, + "styles": "", + # "styles": lyr_style, + # "url": srv_details.get("path"), + "url": wms_lyr_url, + } wms_url_final = unquote(urlencode(wms_url_params, "utf8")) except UnicodeEncodeError: - wms_url_params = {"SERVICE": "WMS", - "VERSION": srv_details.get("formatVersion", "1.3.0"), - "REQUEST": "GetMap", - "layers": layer_name.decode("latin1"), - "crs": srs, - "format": layer_format, - "styles": "", - # "styles": lyr_style, - # "url": srv_details.get("path"), - "url": wms_lyr_url, - } + wms_url_params = { + "SERVICE": "WMS", + "VERSION": srv_details.get("formatVersion", "1.3.0"), + "REQUEST": "GetMap", + "layers": layer_name.decode("latin1"), + "crs": srs, + "format": layer_format, + "styles": "", + # "styles": lyr_style, + # "url": srv_details.get("path"), + "url": wms_lyr_url, + } wms_url_final = unquote(urlencode(wms_url_params, "utf8")) # method ending return ["WMS", layer_title, wms_url_final] else: return None - def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete"): + def build_wmts_url( + self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode="complete" + ): """Format the input WMTS URL to fit QGIS criterias. Retrieve GetCapabilities from information transmitted by Isogeo API @@ -486,8 +564,9 @@ def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode # local variables layer_name = api_layer.get("id") layer_title = api_layer.get("titles")[0].get("value", "WMTS Layer") - wmts_url_getcap = srv_details.get("path")\ - + "?request=GetCapabilities&service=WMTS" + wmts_url_getcap = ( + srv_details.get("path") + "?request=GetCapabilities&service=WMTS" + ) geoserver = "geoserver" in wmts_url_getcap # basic checks on service url try: @@ -506,7 +585,9 @@ def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode return 0, "WMTS - Service not reached: " + wmts_url_getcap, e # check if GetTile operation is available - if not hasattr(wmts, "gettile") or "GetTile" not in [op.name for op in wmts.operations]: + if not hasattr(wmts, "gettile") or "GetTile" not in [ + op.name for op in wmts.operations + ]: return 0, "Required GetTile operation not available in: " + wmts_url_getcap else: logger.debug("GetTile available") @@ -518,13 +599,18 @@ def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode layer_title = wmts_lyr.title layer_id = wmts_lyr.id except KeyError as e: - logger.error("Layer {} not found in WMTS service: {}" - .format(layer_name, - wmts_url_getcap)) - return (0, - "Layer {} not found in WMS service: {}" - .format(layer_name, - wmts_url_getcap), e) + logger.error( + "Layer {} not found in WMTS service: {}".format( + layer_name, wmts_url_getcap + ) + ) + return ( + 0, + "Layer {} not found in WMS service: {}".format( + layer_name, wmts_url_getcap + ), + e, + ) # Tile Matrix Set & SRS srs_map = plg_tools.get_map_crs() @@ -547,9 +633,10 @@ def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode srs = tile_matrix_set # Format definition - wmts_lyr_formats = wmts.getOperationByName('GetTile').formatOptions - formats_image = [f.split(" ", 1)[0] for f in wmts_lyr_formats - if f in qgis_wms_formats] + wmts_lyr_formats = wmts.getOperationByName("GetTile").formatOptions + formats_image = [ + f.split(" ", 1)[0] for f in wmts_lyr_formats if f in qgis_wms_formats + ] if len(formats_image): if "image/png" in formats_image: layer_format = "image/png" @@ -565,7 +652,7 @@ def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode lyr_style = wmts_lyr.styles.keys()[0] # GetTile URL - wmts_lyr_url = wmts.getOperationByName('GetTile').methods + wmts_lyr_url = wmts.getOperationByName("GetTile").methods wmts_lyr_url = wmts_lyr_url[0].get("url") if wmts_lyr_url[-1] == "&": wmts_lyr_url = wmts_lyr_url[:-1] @@ -573,16 +660,17 @@ def build_wmts_url(self, api_layer, srv_details, rsc_type="ds_dyn_lyr_srv", mode pass # construct URL - wmts_url_params = {"SERVICE": "WMTS", - "VERSION": "1.0.0", - "REQUEST": "GetCapabilities", - "layers": layer_id, - "crs": srs, - "format": layer_format, - "styles": "", - "tileMatrixSet": tile_matrix_set, - "url": wmts_lyr_url, - } + wmts_url_params = { + "SERVICE": "WMTS", + "VERSION": "1.0.0", + "REQUEST": "GetCapabilities", + "layers": layer_id, + "crs": srs, + "format": layer_format, + "styles": "", + "tileMatrixSet": tile_matrix_set, + "url": wmts_lyr_url, + } wmts_url_final = unquote(urlencode(wmts_url_params, "utf8")) logger.debug(wmts_url_final) @@ -614,29 +702,36 @@ def adding(self, layer_info): if layer_info[0] == "vector": path = layer_info[1] name = os.path.basename(path).split(".")[0] - layer = QgsVectorLayer(path, layer_info[2], 'ogr') + layer = QgsVectorLayer(path, layer_info[2], "ogr") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) try: - QgsMessageLog.logMessage("Data layer added: {}" - .format(name), - "Isogeo") + QgsMessageLog.logMessage( + "Data layer added: {}".format(name), "Isogeo" + ) logger.debug("Vector layer added: {}".format(path)) except UnicodeEncodeError: QgsMessageLog.logMessage( - "Vector layer added:: {}".format( - name.decode("latin1")), "Isogeo") - logger.debug("Vector layer added: {}" - .format(name.decode("latin1"))) + "Vector layer added:: {}".format(name.decode("latin1")), + "Isogeo", + ) + logger.debug( + "Vector layer added: {}".format(name.decode("latin1")) + ) else: error_msg = layer.error().message() - logger.warning("Invalid vector layer: {}. QGIS says: {}" - .format(path, - error_msg)) - QMessageBox.information(iface.mainWindow(), - self.tr('Error'), - self.tr("Vector not valid {}. QGIS says: {}") - .format(path, error_msg)) + logger.warning( + "Invalid vector layer: {}. QGIS says: {}".format( + path, error_msg + ) + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Error"), + self.tr("Vector not valid {}. QGIS says: {}").format( + path, error_msg + ), + ) # If raster file elif layer_info[0] == "raster": path = layer_info[1] @@ -645,135 +740,160 @@ def adding(self, layer_info): if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) try: - QgsMessageLog.logMessage("Data layer added: {}" - .format(name), - "Isogeo") + QgsMessageLog.logMessage( + "Data layer added: {}".format(name), "Isogeo" + ) logger.debug("Raster layer added: {}".format(path)) except UnicodeEncodeError: QgsMessageLog.logMessage( - "Raster layer added:: {}".format( - name.decode("latin1")), "Isogeo") - logger.debug("Raster layer added: {}" - .format(name.decode("latin1"))) + "Raster layer added:: {}".format(name.decode("latin1")), + "Isogeo", + ) + logger.debug( + "Raster layer added: {}".format(name.decode("latin1")) + ) else: error_msg = layer.error().message() - logger.warning("Invalid raster layer: {}. QGIS says: {}" - .format(path, - error_msg)) - QMessageBox.information(iface.mainWindow(), - self.tr('Error'), - self.tr("Raster not valid {}. QGIS says: {}") - .format(path, error_msg)) + logger.warning( + "Invalid raster layer: {}. QGIS says: {}".format( + path, error_msg + ) + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Error"), + self.tr("Raster not valid {}. QGIS says: {}").format( + path, error_msg + ), + ) # If EFS link - elif layer_info[0] == 'EFS': + elif layer_info[0] == "EFS": name = layer_info[1] uri = layer_info[2] - layer = QgsVectorLayer(uri, - name, - 'arcgisfeatureserver') + layer = QgsVectorLayer(uri, name, "arcgisfeatureserver") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("EFS layer added: {0}".format(uri)) else: error_msg = layer.error().message() - logger.warning("Invalid service: {0}. QGIS says: {}" - .format(uri, error_msg.encode("latin1"))) - QMessageBox.information(iface.mainWindow(), - self.tr('Error'), - self.tr("EFS not valid. QGIS says: {}") - .format(error_msg)) + logger.warning( + "Invalid service: {0}. QGIS says: {}".format( + uri, error_msg.encode("latin1") + ) + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Error"), + self.tr("EFS not valid. QGIS says: {}").format(error_msg), + ) # If EMS link - elif layer_info[0] == 'EMS': + elif layer_info[0] == "EMS": name = layer_info[1] uri = layer_info[2] - layer = QgsRasterLayer(uri,name,"arcgismapserver") + layer = QgsRasterLayer(uri, name, "arcgismapserver") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("EMS layer added: {0}".format(uri)) else: error_msg = layer.error().message() - logger.warning("Invalid service: {0}. QGIS says: {}" - .format(uri, error_msg.encode("latin1"))) - QMessageBox.information(iface.mainWindow(), - self.tr('Error'), - self.tr("EMS not valid. QGIS says: {}") - .format(error_msg)) + logger.warning( + "Invalid service: {0}. QGIS says: {}".format( + uri, error_msg.encode("latin1") + ) + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Error"), + self.tr("EMS not valid. QGIS says: {}").format(error_msg), + ) # If WFS link - elif layer_info[0] == 'WFS': + elif layer_info[0] == "WFS": url = layer_info[2] name = layer_info[1] - layer = QgsVectorLayer(url, name, 'WFS') + layer = QgsVectorLayer(url, name, "WFS") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("WFS layer added: {0}".format(url)) else: error_msg = layer.error().message() - name_url = self.build_wfs_url(layer_info[3], - layer_info[4], - mode="complete") + name_url = self.build_wfs_url( + layer_info[3], layer_info[4], mode="complete" + ) if name_url[0] != 0: - layer = QgsVectorLayer(name_url[2], name_url[1], 'WFS') + layer = QgsVectorLayer(name_url[2], name_url[1], "WFS") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("WFS layer added: {0}".format(url)) else: error_msg = layer.error().message() - logger.warning("Invalid service: {0}. QGIS says: {}" - .format(url, error_msg.encode("latin1"))) + logger.warning( + "Invalid service: {0}. QGIS says: {}".format( + url, error_msg.encode("latin1") + ) + ) else: QMessageBox.information( iface.mainWindow(), - self.tr('Error'), - self.tr("WFS is not valid. QGIS says: {}") - .format(error_msg)) + self.tr("Error"), + self.tr("WFS is not valid. QGIS says: {}").format( + error_msg + ), + ) pass # If WMS link - elif layer_info[0] == 'WMS': + elif layer_info[0] == "WMS": url = layer_info[2] name = layer_info[1] - layer = QgsRasterLayer(url, name, 'wms') + layer = QgsRasterLayer(url, name, "wms") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("WMS layer added: {0}".format(url)) else: error_msg = layer.error().message() - name_url = self.build_wms_url(layer_info[3], - layer_info[4], - mode="complete") + name_url = self.build_wms_url( + layer_info[3], layer_info[4], mode="complete" + ) if name_url[0] != 0: - layer = QgsRasterLayer(name_url[2], name_url[1], 'wms') + layer = QgsRasterLayer(name_url[2], name_url[1], "wms") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("WMS layer added: {0}".format(url)) else: error_msg = layer.error().message() - logger.warning("Invalid service: {0}. QGIS says: {1}".format(url, error_msg.encode("latin1"))) + logger.warning( + "Invalid service: {0}. QGIS says: {1}".format( + url, error_msg.encode("latin1") + ) + ) else: QMessageBox.information( iface.mainWindow(), - self.tr('Error'), - self.tr("WMS is not valid. QGIS says: {}") - .format(error_msg)) + self.tr("Error"), + self.tr("WMS is not valid. QGIS says: {}").format( + error_msg + ), + ) # If WMTS link - elif layer_info[0] == 'WMTS': + elif layer_info[0] == "WMTS": url = layer_info[2] name = layer_info[1] - layer = QgsRasterLayer(url, name, 'wms') + layer = QgsRasterLayer(url, name, "wms") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("WMTS service layer added: {0}".format(url)) else: error_msg = layer.error().message() - logger.warning("Invalid service: {}. QGIS says: {}" - .format(url, error_msg)) + logger.warning( + "Invalid service: {}. QGIS says: {}".format(url, error_msg) + ) QMessageBox.information( iface.mainWindow(), - self.tr('Error'), - self.tr("WMTS is not valid. QGIS says: {}") - .format(error_msg)) + self.tr("Error"), + self.tr("WMTS is not valid. QGIS says: {}").format(error_msg), + ) else: pass - + # If the data is a PostGIS table elif type(layer_info) == dict: logger.debug("Data type: PostGIS") @@ -783,10 +903,10 @@ def adding(self, layer_info): table = layer_info.get("table", "") # Retrieve the database information stored in the PostGISdict uri = QgsDataSourceURI() - host = self.PostGISdict[base_name]['host'] - port = self.PostGISdict[base_name]['port'] - user = self.PostGISdict[base_name]['username'] - password = self.PostGISdict[base_name]['password'] + host = self.PostGISdict[base_name]["host"] + port = self.PostGISdict[base_name]["port"] + user = self.PostGISdict[base_name]["username"] + password = self.PostGISdict[base_name]["password"] # set host name, port, database name, username and password uri.setConnection(host, port, base_name, user, password) # Get the geometry column name from the database connexion & table @@ -803,10 +923,16 @@ def adding(self, layer_info): if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) logger.debug("Data added: {}".format(table)) - elif not layer.isValid() and plg_tools.last_error[0] == "postgis" and "prim" in plg_tools.last_error[1]: - logger.debug("PostGIS layer may be a view, " - "so key column is missing. " - "Trying to automatically set one...") + elif ( + not layer.isValid() + and plg_tools.last_error[0] == "postgis" + and "prim" in plg_tools.last_error[1] + ): + logger.debug( + "PostGIS layer may be a view, " + "so key column is missing. " + "Trying to automatically set one..." + ) # get layer fields to set as key column fields = layer.dataProvider().fields() fields_names = [i.name() for i in fields] @@ -817,10 +943,13 @@ def adding(self, layer_info): layer = QgsVectorLayer(uri.uri(True), table, "postgres") if layer.isValid(): lyr = QgsProject.instance().addMapLayer(layer) - logger.debug("PostGIS view layer added with [{}] as key column" - .format(field)) + logger.debug( + "PostGIS view layer added with [{}] as key column".format( + field + ) + ) # filling 'QGIS Server' tab of layer Properties - self.md_sync.basic_sync(layer = lyr, info = layer_info) + self.md_sync.basic_sync(layer=lyr, info=layer_info) return 1 else: continue @@ -829,15 +958,21 @@ def adding(self, layer_info): QMessageBox.information( iface.mainWindow(), self.tr("Error"), - self.tr("The PostGIS layer is not valid." - " Reason: {}".format(plg_tools.last_error))) + self.tr( + "The PostGIS layer is not valid." + " Reason: {}".format(plg_tools.last_error) + ), + ) return 0 # filling 'QGIS Server' tab of layer Properties if layer.isValid(): try: - self.md_sync.basic_sync(layer = lyr, info = layer_info) + self.md_sync.basic_sync(layer=lyr, info=layer_info) except IndexError as e: - logger.debug("Not supported 'layer_info' format causes this error : {}".format(e)) - else : + logger.debug( + "Not supported 'layer_info' format causes this error : {}".format(e) + ) + else: pass - return 1 + return 1 + diff --git a/modules/layer/metadata_sync.py b/modules/layer/metadata_sync.py index df2d2b8a..bfb3ecdb 100644 --- a/modules/layer/metadata_sync.py +++ b/modules/layer/metadata_sync.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import logging @@ -20,7 +19,7 @@ # ################################## -class MetadataSynchronizer(): +class MetadataSynchronizer: """Basic class that allow to fill some tabs of 'Layer's Properties' with metadatas""" def __init__(self): @@ -28,15 +27,16 @@ def __init__(self): self.tr = None def basic_sync(self, layer, info): - logger.debug("Filling {} layer's properties from : {}".format( - layer.name, info - )) + logger.debug("Filling {} layer's properties from : {}".format(layer.name, info)) layer_type = info[0] # If the data is a PostGIS table if type(info) == dict: - self.filling_field(layer, info.get("title", "notitle"), - info.get("abstract", ""), - info.get("keywords", ())) + self.filling_field( + layer, + info.get("title", "notitle"), + info.get("abstract", ""), + info.get("keywords", ()), + ) # If the data is a file or a service else: # vector or raster file @@ -50,13 +50,15 @@ def basic_sync(self, layer, info): # WMTS else: self.filling_field(layer, info[3][0], info[3][1], info[3][2]) - else : + else: pass - return + return def filling_field(self, layer, title, abstract, kw_list): layer.setTitle(title) layer.setAbstract(abstract) layer.setKeywordList(",".join(kw_list)) - logger.debug("'QGIS Server' tab from 'Layer's Properties' filled with basic info") - return \ No newline at end of file + logger.debug( + "'QGIS Server' tab from 'Layer's Properties' filled with basic info" + ) + return diff --git a/modules/metadata_display.py b/modules/metadata_display.py index 32d57c2c..3e2a17a4 100644 --- a/modules/metadata_display.py +++ b/modules/metadata_display.py @@ -1,14 +1,22 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import logging from datetime import datetime # PyQGIS -from qgis.core import (QgsProject, QgsMessageLog, QgsVectorLayer, QgsPointXY, - QgsRectangle, QgsFeature, QgsGeometry, QgsRasterLayer, QgsRenderContext) +from qgis.core import ( + QgsProject, + QgsMessageLog, + QgsVectorLayer, + QgsPointXY, + QgsRectangle, + QgsFeature, + QgsGeometry, + QgsRasterLayer, + QgsRenderContext, +) # PyQT from qgis.PyQt.QtCore import QSettings, Qt @@ -21,7 +29,7 @@ # Plugin modules from .tools import IsogeoPlgTools -# UI module +# UI module from ..ui.metadata.dlg_md_details import IsogeoMdDetails # ############################################################################ @@ -38,9 +46,9 @@ blue_marble = "contextualWMSLegend=0&crs=EPSG:4326&dpiMode=7&featureCount=10&format=image/jpeg&layers=BlueMarble_ShadedRelief_Bathymetry&styles=default&tileMatrixSet=500m&url=https://gibs.earthdata.nasa.gov/wmts/epsg4326/best/1.0.0/WMTSCapabilities.xml" li_lyrs_refs = [ - QgsRasterLayer(osm_lbls, "Labels", 'wms'), - QgsRasterLayer(osm_refs, "Refs", 'wms'), - QgsRasterLayer(blue_marble, "Base", 'wms') + QgsRasterLayer(osm_lbls, "Labels", "wms"), + QgsRasterLayer(osm_refs, "Refs", "wms"), + QgsRasterLayer(blue_marble, "Base", "wms"), ] # ############################################################################ @@ -48,8 +56,9 @@ # ################################## -class MetadataDisplayer(): +class MetadataDisplayer: """Manage metadata displaying in QGIS UI.""" + url_edition = "https://app.isogeo.com" def __init__(self): @@ -61,7 +70,9 @@ def __init__(self): self.complete_md.wid_bbox.setCanvasColor(Qt.white) self.complete_md.wid_bbox.enableAntiAliasing(True) - self.complete_md.btn_md_edit.pressed.connect(lambda: plg_tools.open_webpage(link=self.url_edition)) + self.complete_md.btn_md_edit.pressed.connect( + lambda: plg_tools.open_webpage(link=self.url_edition) + ) def show_complete_md(self, md: dict, tags: dict): """Open the pop up window that shows the metadata sheet details. @@ -69,19 +80,19 @@ def show_complete_md(self, md: dict, tags: dict): :param md dict: Isogeo metadata dict """ logger.info("Displaying the whole metadata sheet.") - isogeo_tr = IsogeoTranslator(qsettings.value('locale/userLocale')[0:2]) + isogeo_tr = IsogeoTranslator(qsettings.value("locale/userLocale")[0:2]) # clean map canvas vec_lyr = [i.id() for i in self.complete_md.wid_bbox.layers() if i.type() == 0] QgsProject.instance().removeMapLayers(vec_lyr) self.complete_md.wid_bbox.refresh() - + # -- GENERAL --------------------------------------------------------- title = md.get("title", "NR") self.complete_md.lbl_title.setText(md.get("title", "NR")) - self.complete_md.val_owner.setText(md.get("_creator") - .get("contact") - .get("name", "NR")) + self.complete_md.val_owner.setText( + md.get("_creator").get("contact").get("name", "NR") + ) # Keywords kwords = tags.get("keywords", {"none": "NR"}) self.complete_md.val_keywords.setText(" ; ".join(kwords.keys())) @@ -91,15 +102,13 @@ def show_complete_md(self, md: dict, tags: dict): if tags.get("compliance"): self.complete_md.ico_inspire_conformity.setEnabled(1) self.complete_md.ico_inspire_conformity.setToolTip( - isogeo_tr.tr("quality", - "isConform")\ - + " INSPIRE") + isogeo_tr.tr("quality", "isConform") + " INSPIRE" + ) else: self.complete_md.ico_inspire_conformity.setDisabled(1) self.complete_md.ico_inspire_conformity.setToolTip( - isogeo_tr.tr("quality", - "isNotConform")\ - + " INSPIRE") + isogeo_tr.tr("quality", "isNotConform") + " INSPIRE" + ) # Abstract self.complete_md.val_abstract.setText(md.get("abstract", "NR")) @@ -135,39 +144,37 @@ def show_complete_md(self, md: dict, tags: dict): contacts_pt_cct = [] contacts_other_cct = [] - for ctact in sorted(contacts, key = lambda i: i.get("contact").get("name")): + for ctact in sorted(contacts, key=lambda i: i.get("contact").get("name")): item = ctact.get("contact") if ctact.get("role", "NR") == "pointOfContact": - content = "{1} ({2})
{3}
{4}
{5} {6}
{7} {8}
{8}
{9}"\ - .format( - isogeo_tr.tr("roles", ctact.get("role")), - item.get("name", "NR"), - item.get("organization", "NR"), - item.get("email", "NR"), - item.get("phone", "NR"), - item.get("addressLine1", ""), - item.get("addressLine2", ""), - item.get("zipCode", ""), - item.get("city", ""), - item.get("country", "") - ) + content = "{1} ({2})
{3}
{4}
{5} {6}
{7} {8}
{8}
{9}".format( + isogeo_tr.tr("roles", ctact.get("role")), + item.get("name", "NR"), + item.get("organization", "NR"), + item.get("email", "NR"), + item.get("phone", "NR"), + item.get("addressLine1", ""), + item.get("addressLine2", ""), + item.get("zipCode", ""), + item.get("city", ""), + item.get("country", ""), + ) contacts_pt_cct.append(content) else: - content = "{0} - {1} ({2})
{3}
{4}
{5} {6}
{7} {8}
{9}"\ - .format( - isogeo_tr.tr("roles", ctact.get("role")), - item.get("name", "NR"), - item.get("organization", "NR"), - item.get("email", "NR"), - item.get("phone", ""), - item.get("addressLine1", ""), - item.get("addressLine2", ""), - item.get("zipCode", ""), - item.get("city", ""), - item.get("country", "") - ) + content = "{0} - {1} ({2})
{3}
{4}
{5} {6}
{7} {8}
{9}".format( + isogeo_tr.tr("roles", ctact.get("role")), + item.get("name", "NR"), + item.get("organization", "NR"), + item.get("email", "NR"), + item.get("phone", ""), + item.get("addressLine1", ""), + item.get("addressLine2", ""), + item.get("zipCode", ""), + item.get("city", ""), + item.get("country", ""), + ) contacts_other_cct.append(content) # write @@ -176,26 +183,30 @@ def show_complete_md(self, md: dict, tags: dict): # -- HISTORY --------------------------------------------------------- # Data creation and last update dates - self.complete_md.val_data_crea.setText(plg_tools.handle_date( - md.get("_created", "NR"))) - self.complete_md.val_data_update.setText(plg_tools.handle_date( - md.get("_modified", "NR"))) + self.complete_md.val_data_crea.setText( + plg_tools.handle_date(md.get("_created", "NR")) + ) + self.complete_md.val_data_update.setText( + plg_tools.handle_date(md.get("_modified", "NR")) + ) # Update frequency information if md.get("updateFrequency", None): freq = md.get("updateFrequency") - frequency_info = "{}{} {}"\ - .format(isogeo_tr.tr("frequencyTypes", "frequencyUpdateHelp"), - ''.join(i for i in freq if i.isdigit()), - isogeo_tr.tr("frequencyShortTypes", - freq[-1])) + frequency_info = "{}{} {}".format( + isogeo_tr.tr("frequencyTypes", "frequencyUpdateHelp"), + "".join(i for i in freq if i.isdigit()), + isogeo_tr.tr("frequencyShortTypes", freq[-1]), + ) self.complete_md.val_frequency.setText(frequency_info) else: self.complete_md.val_frequency.setText("NR") # Validity - self.complete_md.val_valid_start.setText(plg_tools.handle_date( - md.get("validFrom", "NR"))) - self.complete_md.val_valid_end.setText(plg_tools.handle_date( - md.get("validTo", "NR"))) + self.complete_md.val_valid_start.setText( + plg_tools.handle_date(md.get("validFrom", "NR")) + ) + self.complete_md.val_valid_end.setText( + plg_tools.handle_date(md.get("validTo", "NR")) + ) self.complete_md.val_valid_comment.setText(md.get("validityComment", "NR")) # Collect information self.complete_md.val_method.setText(md.get("collectionMethod", "NR")) @@ -208,7 +219,9 @@ def show_complete_md(self, md: dict, tags: dict): idx = 0 for e in events: if e.get("kind") == "update": - tbl_events.setItem(idx, 0, QTableWidgetItem(plg_tools.handle_date(e.get("date", "NR")))) + tbl_events.setItem( + idx, 0, QTableWidgetItem(plg_tools.handle_date(e.get("date", "NR"))) + ) tbl_events.setItem(idx, 1, QTableWidgetItem(e.get("description", ""))) idx += 1 else: @@ -221,15 +234,16 @@ def show_complete_md(self, md: dict, tags: dict): # -- TECHNICAL ------------------------------------------------------- # SRS coord_sys = md.get("coordinate-system", {"None": "NR"}) - self.complete_md.val_srs.setText("{} (EPSG:{})" - .format(coord_sys.get("name", "NR"), - coord_sys.get("code", "NR"))) + self.complete_md.val_srs.setText( + "{} (EPSG:{})".format( + coord_sys.get("name", "NR"), coord_sys.get("code", "NR") + ) + ) # Set the data format - if tags.get('formats') != {}: - self.complete_md.val_format.setText( - list(tags.get('formats').values())[0]) + if tags.get("formats") != {}: + self.complete_md.val_format.setText(list(tags.get("formats").values())[0]) else: - self.complete_md.val_format.setText('NR') + self.complete_md.val_format.setText("NR") # feature info self.complete_md.val_feat_count.setText(str(md.get("features", "/"))) @@ -249,15 +263,17 @@ def show_complete_md(self, md: dict, tags: dict): else: s_conformity = isogeo_tr.tr("quality", "isNotConform") # make data human readable - s_date = datetime.strptime(s_in.get("specification").get("published"), - "%Y-%m-%dT%H:%M:%S") + s_date = datetime.strptime( + s_in.get("specification").get("published"), "%Y-%m-%dT%H:%M:%S" + ) s_date = s_date.strftime("%Y-%m-%d") # prepare text - spec_text = "{0} ({2}): {3}"\ - .format(s_in.get("specification").get("name", "NR"), - s_in.get("specification").get("link", ""), - s_date, - s_conformity) + spec_text = "{0} ({2}): {3}".format( + s_in.get("specification").get("name", "NR"), + s_in.get("specification").get("link", ""), + s_date, + s_conformity, + ) # store into the final list specs_out.append(spec_text) # write @@ -271,12 +287,16 @@ def show_complete_md(self, md: dict, tags: dict): # get convex hull coordinates and create the polygon md_lyr = self.envelope2layer(md.get("envelope")) # add layers - qgs_prj.addMapLayers([md_lyr, li_lyrs_refs[0], li_lyrs_refs[1], li_lyrs_refs[2]], 0) + qgs_prj.addMapLayers( + [md_lyr, li_lyrs_refs[0], li_lyrs_refs[1], li_lyrs_refs[2]], 0 + ) - map_canvas_layer_list = [qgs_prj.mapLayer(md_lyr.id()), - qgs_prj.mapLayer(li_lyrs_refs[0].id()), - qgs_prj.mapLayer(li_lyrs_refs[1].id()), - qgs_prj.mapLayer(li_lyrs_refs[2].id())] + map_canvas_layer_list = [ + qgs_prj.mapLayer(md_lyr.id()), + qgs_prj.mapLayer(li_lyrs_refs[0].id()), + qgs_prj.mapLayer(li_lyrs_refs[1].id()), + qgs_prj.mapLayer(li_lyrs_refs[2].id()), + ] self.complete_md.wid_bbox.setLayers(map_canvas_layer_list) self.complete_md.wid_bbox.setExtent(md_lyr.extent()) @@ -290,13 +310,16 @@ def show_complete_md(self, md: dict, tags: dict): cgus_out = [] for c_in in cgus_in: if "license" in c_in: - cgu_text = "{0}
{2}
{3}".format(c_in.get("license").get("name", "NR"), - c_in.get("license").get("link", ""), - c_in.get("description", ""), - c_in.get("license").get("content", "")) + cgu_text = "{0}
{2}
{3}".format( + c_in.get("license").get("name", "NR"), + c_in.get("license").get("link", ""), + c_in.get("description", ""), + c_in.get("license").get("content", ""), + ) else: - cgu_text = "{0}
{1}".format(isogeo_tr.tr("conditions", "noLicense"), - c_in.get("description", "")) + cgu_text = "{0}
{1}".format( + isogeo_tr.tr("conditions", "noLicense"), c_in.get("description", "") + ) # store into the final list cgus_out.append(cgu_text) @@ -308,17 +331,23 @@ def show_complete_md(self, md: dict, tags: dict): lims_in = md.get("limitations", dict()) lims_out = [] for l_in in lims_in: - lim_text = "{0}
{1}".format(isogeo_tr.tr("limitations", l_in.get("type")), - l_in.get("description", "")) + lim_text = "{0}
{1}".format( + isogeo_tr.tr("limitations", l_in.get("type")), + l_in.get("description", ""), + ) # legal type if l_in.get("type") == "legal": - lim_text += "
" + isogeo_tr.tr("restrictions", l_in.get("restriction")) + lim_text += "
" + isogeo_tr.tr( + "restrictions", l_in.get("restriction") + ) else: pass # INSPIRE precision if "directive" in l_in: - lim_text += "
INSPIRE
".format(l_in.get("directive").get("name"), - l_in.get("directive").get("description")) + lim_text += "
INSPIRE
".format( + l_in.get("directive").get("name"), + l_in.get("directive").get("description"), + ) else: pass @@ -335,27 +364,32 @@ def show_complete_md(self, md: dict, tags: dict): self.complete_md.val_owner_name.setText(wg_contact.get("name", "")) self.complete_md.val_owner_email.setText(wg_contact.get("email", "")) self.complete_md.val_owner_phone.setText(wg_contact.get("phone", "")) - self.complete_md.val_owner_address.setText("{}
{}" - .format(wg_contact.get("addressLine1", "NR"), - wg_contact.get("addressLine2", ""))) + self.complete_md.val_owner_address.setText( + "{}
{}".format( + wg_contact.get("addressLine1", "NR"), wg_contact.get("addressLine2", "") + ) + ) self.complete_md.val_owner_city.setText(wg_contact.get("zipCode", "")) self.complete_md.val_owner_country.setText(wg_contact.get("countryCode", "")) # Metadata self.complete_md.val_md_lang.setText(md.get("language", "NR")) - self.complete_md.val_md_date_crea.setText(plg_tools.handle_date( - md.get("_modified")[:19])) - self.complete_md.val_md_date_update.setText(plg_tools.handle_date( - md.get("_created")[:19])) + self.complete_md.val_md_date_crea.setText( + plg_tools.handle_date(md.get("_modified")[:19]) + ) + self.complete_md.val_md_date_update.setText( + plg_tools.handle_date(md.get("_created")[:19]) + ) # -- EDIT LINK ------------------------------------------------------- - self.url_edition = plg_tools.get_edit_url(md_id=md.get("_id"), - md_type=md.get("type"), - owner_id=wg_id) + self.url_edition = plg_tools.get_edit_url( + md_id=md.get("_id"), md_type=md.get("type"), owner_id=wg_id + ) # only if user declared himself as Isogeo editor in authentication form - self.complete_md.btn_md_edit.setEnabled(qsettings - .value("isogeo/user/editor", 1)) + self.complete_md.btn_md_edit.setEnabled( + qsettings.value("isogeo/user/editor", 1) + ) # -- ADD OPTIONS ------------------------------------------------------ self.complete_md.btn_addtomap.setHidden(1) @@ -366,31 +400,27 @@ def show_complete_md(self, md: dict, tags: dict): # Finally open the damned window self.complete_md.show() try: - QgsMessageLog.logMessage("Detailed metadata displayed: {}" - .format(title), - "Isogeo") + QgsMessageLog.logMessage( + "Detailed metadata displayed: {}".format(title), "Isogeo" + ) except UnicodeEncodeError: - QgsMessageLog.logMessage("Detailed metadata displayed: {}" - .format(title), - "Isogeo") + QgsMessageLog.logMessage( + "Detailed metadata displayed: {}".format(title), "Isogeo" + ) def envelope2layer(self, envelope): """Transform metadata envelope into a QGIS layer.""" # layer - md_lyr = QgsVectorLayer("Polygon?crs=epsg:4326", - "Metadata envelope", - "memory") + md_lyr = QgsVectorLayer("Polygon?crs=epsg:4326", "Metadata envelope", "memory") symbols = md_lyr.renderer().symbols(QgsRenderContext()) symbol = symbols[0] - symbol.setColor(QColor.fromRgb(255,20,147)) + symbol.setColor(QColor.fromRgb(255, 20, 147)) symbol.setOpacity(0.25) if envelope.get("type") == "Polygon": # parse coordinates coords = envelope.get("coordinates")[0] - poly_pts = [QgsPointXY(round(i[0], 3), - round(i[1], 3)) - for i in coords] + poly_pts = [QgsPointXY(round(i[0], 3), round(i[1], 3)) for i in coords] # add geometry to layer poly = QgsFeature() poly.setGeometry(QgsGeometry.fromPolygonXY([poly_pts])) @@ -398,10 +428,12 @@ def envelope2layer(self, envelope): md_lyr.updateExtents() elif envelope.get("type") == "MultiPolygon": coords = envelope.get("bbox") - bbox = QgsRectangle(round(coords[0], 3), - round(coords[1], 3), - round(coords[2], 3), - round(coords[3], 3),) + bbox = QgsRectangle( + round(coords[0], 3), + round(coords[1], 3), + round(coords[2], 3), + round(coords[3], 3), + ) poly = QgsFeature() poly.setGeometry(QgsGeometry.fromWkt(bbox.asWktPolygon())) md_lyr.dataProvider().addFeatures([poly]) @@ -530,8 +562,9 @@ def fields_displayer(self, md_type="vectorDataset", series=0): logger.error("Metadata type not recognized:", md_type) return + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/quick_search.py b/modules/quick_search.py index 30c5e3ea..012a4878 100644 --- a/modules/quick_search.py +++ b/modules/quick_search.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import logging @@ -29,13 +28,14 @@ msgBar = iface.messageBar() -ico_bolt = QIcon(':/plugins/Isogeo/resources/search/bolt.svg') +ico_bolt = QIcon(":/plugins/Isogeo/resources/search/bolt.svg") # ############################################################################ # ########## Classes ############### # ################################## -class QuickSearchManager(): + +class QuickSearchManager: """ A basic class to manage quick searches : - Create a new quick search by giving it a name and writing its parameters in the JSON file (`_user/quicksearches.json`) - Rename a quick search @@ -43,7 +43,7 @@ class QuickSearchManager(): """ def __init__(self, search_form_manager: object = None): - + if search_form_manager: # Getting wath the class need from Isogeo self.form_mng = search_form_manager @@ -57,56 +57,61 @@ def __init__(self, search_form_manager: object = None): # Connecting ui self.dlg_new.accepted.connect(self.save) self.dlg_rename.accepted.connect(self.rename) - else : + else: pass # path to json storage file for quicksearch - self.json_path = Path(__file__).parents[1]/"_user"/"quicksearches.json" + self.json_path = Path(__file__).parents[1] / "_user" / "quicksearches.json" # Getting wath the class need from ApiRequester to build search URL self.url_builder = object - def write_params(self, search_name: str = '_default', search_kind: str ="Default"): + def write_params(self, search_name: str = "_default", search_kind: str = "Default"): """Write a new element in the json file when a search is saved.""" # Open the saved_search file as a dict. Each key is a search name, # each value is a dict containing the parameters for this search name saved_searches = self.load_file() # If the name already exists, ask for a new one. ================ TO DO if search_kind == "Last": - params = saved_searches.get("_current", - "https://api.isogeo.com/resources/search?&_limit=0") - else : + params = saved_searches.get( + "_current", "https://api.isogeo.com/resources/search?&_limit=0" + ) + else: # Write the current parameters in a dict, and store it in the saved # search dict params = self.form_mng.save_params() # Info for _offset parameter self.page_index = 1 - params['page'] = self.page_index + params["page"] = self.page_index # Info for _limit parameter - params['show'] = True + params["show"] = True # Info for _lang parameter - params['lang'] = self.lang + params["lang"] = self.lang # building request url - params['url'] = self.url_builder(params) + params["url"] = self.url_builder(params) - for i in range(len(params.get('keys'))): - params['keyword_{0}'.format(i)] = params.get('keys')[i] - params.pop('keys', None) + for i in range(len(params.get("keys"))): + params["keyword_{0}".format(i)] = params.get("keys")[i] + params.pop("keys", None) saved_searches[search_name] = params # writing file self.dump_file(saved_searches) # Log and messages - logger.debug("{} search stored: {}. Parameters: {}" - .format(search_kind, search_name, params)) + logger.debug( + "{} search stored: {}. Parameters: {}".format( + search_kind, search_name, params + ) + ) if search_kind != "Current" and search_kind != "Last": - msgBar.pushMessage(self.tr("{} successfully saved: {}") - .format(search_kind, search_name), - duration=3) + msgBar.pushMessage( + self.tr("{} successfully saved: {}").format(search_kind, search_name), + duration=3, + ) else: pass return - + def save(self): """Call the write_search() function and refresh the combobox.""" # retrieve quicksearch given name and store it @@ -116,28 +121,29 @@ def save(self): saved_searches = self.load_file() search_list = list(saved_searches.keys()) # updating quick search widgets - self.form_mng.pop_qs_cbbs(items_list = search_list) + self.form_mng.pop_qs_cbbs(items_list=search_list) # method ending return - + def rename(self): """Modify the json file in order to rename a search.""" old_name = self.form_mng.cbb_quicksearch_edit.currentText() new_name = self.dlg_rename.txt_quicksearch_rename.text() - + saved_searches = self.load_file() saved_searches[new_name] = saved_searches[old_name] saved_searches.pop(old_name) search_list = list(saved_searches.keys()) - self.form_mng.pop_qs_cbbs(items_list = search_list) + self.form_mng.pop_qs_cbbs(items_list=search_list) # Update JSON file - self.dump_file(content = saved_searches) + self.dump_file(content=saved_searches) # inform user - msgBar.pushMessage("Isogeo", - self.tr("Quicksearch renamed: from {} to {}") - .format(old_name, new_name), - level=0, - duration=3) + msgBar.pushMessage( + "Isogeo", + self.tr("Quicksearch renamed: from {} to {}").format(old_name, new_name), + level=0, + duration=3, + ) # method ending logger.debug("'{}' quicksearch renamed '{}'".format(old_name, new_name)) return @@ -149,15 +155,16 @@ def remove(self): saved_searches.pop(to_remove) search_list = list(saved_searches.keys()) # updating quick search widgets - self.form_mng.pop_qs_cbbs(items_list = search_list) + self.form_mng.pop_qs_cbbs(items_list=search_list) # Update JSON file - self.dump_file(content = saved_searches) + self.dump_file(content=saved_searches) # inform user - msgBar.pushMessage("Isogeo", - self.tr("Quicksearch removed: {}") - .format(to_remove), - level=0, - duration=3) + msgBar.pushMessage( + "Isogeo", + self.tr("Quicksearch removed: {}").format(to_remove), + level=0, + duration=3, + ) # method ending logger.debug("'{}' quicksearch removed from JSON file.".format(to_remove)) return @@ -165,10 +172,14 @@ def remove(self): def load_file(self): with open(self.json_path, "r") as saved_searches_file: saved_searches = json.load(saved_searches_file) - logger.debug("{} quicksearche(s) found : {}".format(len(saved_searches.keys()), list(saved_searches.keys()))) + logger.debug( + "{} quicksearche(s) found : {}".format( + len(saved_searches.keys()), list(saved_searches.keys()) + ) + ) return saved_searches - + def dump_file(self, content: dict): - with open(self.json_path, 'w') as outfile: + with open(self.json_path, "w") as outfile: json.dump(content, outfile, sort_keys=True, indent=4) return diff --git a/modules/results/cache.py b/modules/results/cache.py index e3f28f8c..dbb3fa06 100644 --- a/modules/results/cache.py +++ b/modules/results/cache.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import logging @@ -18,14 +17,14 @@ # ################################## -class CacheManager(): +class CacheManager: """Basic class to manage the cache system of the layer addition. """ def __init__(self): # Path to JSON cache file - self.cache_file = Path(__file__).parents[2]/"_user"/"cache.json" - # Objects for storing inaccessible elements + self.cache_file = Path(__file__).parents[2] / "_user" / "cache.json" + # Objects for storing inaccessible elements self.cached_dict = {} self.cached_unreach_paths = [] self.cached_unreach_postgis = [] @@ -39,13 +38,15 @@ def dumper(self): :rtype: dict """ - self.cached_dict = {"files" : list(set(self.cached_unreach_paths)), - "PostGIS" : list(set(self.cached_unreach_postgis)), - "services" : list(set(self.cached_unreach_srv))} - with open(self.cache_file, 'w') as cache: + self.cached_dict = { + "files": list(set(self.cached_unreach_paths)), + "PostGIS": list(set(self.cached_unreach_postgis)), + "services": list(set(self.cached_unreach_srv)), + } + with open(self.cache_file, "w") as cache: json.dump([self.cached_dict], cache, indent=4) logger.debug("Paths cache has been dumped") - + return self.cached_dict def loader(self): @@ -56,11 +57,11 @@ def loader(self): :rtype: dict """ try: - with open(self.cache_file, 'r') as cache: + with open(self.cache_file, "r") as cache: cache_loaded = json.load(cache) if len(cache_loaded) == 0: logger.debug("Empty cache file.") - elif isinstance(cache_loaded[0], dict) : + elif isinstance(cache_loaded[0], dict): self.cached_unreach_paths = cache_loaded[0].get("files") self.cached_unreach_postgis = cache_loaded[0].get("PostGIS") self.cached_unreach_srv = cache_loaded[0].get("services") @@ -73,8 +74,8 @@ def loader(self): except ValueError as e: logger.error("Path JSON corrupted") except IOError: - logger.debug("Paths cache file not found. Maybe because of first launch.") - self.dumper() + logger.debug("Paths cache file not found. Maybe because of first launch.") + self.dumper() def cleaner(self): """Removes the stored elements and empties the JSON cache file.""" @@ -84,8 +85,9 @@ def cleaner(self): self.dumper() logger.debug("Cache has been cleaned") + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/results/display.py b/modules/results/display.py index 18d7d1ab..5d65f182 100644 --- a/modules/results/display.py +++ b/modules/results/display.py @@ -1,18 +1,25 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import logging import json from functools import partial from pathlib import Path + # PyQT # from QByteArray from qgis.PyQt.QtCore import QSettings, QObject, pyqtSignal from qgis.PyQt.QtGui import QIcon, QPixmap -from qgis.PyQt.QtWidgets import (QTableWidgetItem, QComboBox, QPushButton, QLabel, - QProgressBar, QHeaderView) +from qgis.PyQt.QtWidgets import ( + QTableWidgetItem, + QComboBox, + QPushButton, + QLabel, + QProgressBar, + QHeaderView, +) + # PyQGIS from qgis.utils import iface @@ -31,17 +38,35 @@ logger = logging.getLogger("IsogeoQgisPlugin") # Isogeo geometry types -polygon_list = ("CurvePolygon", "MultiPolygon", - "MultiSurface", "Polygon", "PolyhedralSurface") +polygon_list = ( + "CurvePolygon", + "MultiPolygon", + "MultiSurface", + "Polygon", + "PolyhedralSurface", +) point_list = ("Point", "MultiPoint") -line_list = ("CircularString", "CompoundCurve", "Curve", - "LineString", "MultiCurve", "MultiLineString") +line_list = ( + "CircularString", + "CompoundCurve", + "Curve", + "LineString", + "MultiCurve", + "MultiLineString", +) multi_list = ("Geometry", "GeometryCollection") # Isogeo formats li_formats_vect = ("shp", "dxf", "dgn", "filegdb", "tab") -li_formats_rastr = ("esriasciigrid", "geotiff", - "intergraphgdb", "jpeg", "png", "xyz", "ecw") +li_formats_rastr = ( + "esriasciigrid", + "geotiff", + "intergraphgdb", + "jpeg", + "png", + "xyz", + "ecw", +) # Qt icons # see https://github.com/qgis/QGIS/blob/master/images/images.qrc @@ -109,9 +134,11 @@ def show_results(self, api_results, pg_connections=dict()): for i in api_results.get("results"): # get useful metadata md_id = i.get("_id") - md_keywords = [i.get("tags").get(k) - for k in i.get("tags", ["NR", ]) - if k.startswith("keyword:isogeo")] + md_keywords = [ + i.get("tags").get(k) + for k in i.get("tags", ["NR"]) + if k.startswith("keyword:isogeo") + ] md_title = i.get("title", "NR") ds_geometry = i.get("geometry") @@ -119,8 +146,7 @@ def show_results(self, api_results, pg_connections=dict()): # Displaying the metadata title inside a button btn_md_title = QPushButton(plg_tools.format_button_title(md_title)) # Connecting the button to the full metadata popup - btn_md_title.pressed.connect(partial( - self.md_asked.emit, md_id)) + btn_md_title.pressed.connect(partial(self.md_asked.emit, md_id)) # Putting the abstract as a tooltip on this button btn_md_title.setToolTip(i.get("abstract", "")[:300]) # Insert it in column 1 @@ -128,8 +154,8 @@ def show_results(self, api_results, pg_connections=dict()): # COLUMN 2 - Data last update tbl_result.setItem( - count, 1, QTableWidgetItem( - plg_tools.handle_date(i.get("_modified")))) + count, 1, QTableWidgetItem(plg_tools.handle_date(i.get("_modified"))) + ) # COLUMN 3 - Geometry type lbl_geom = QLabel(tbl_result) @@ -147,12 +173,13 @@ def show_results(self, api_results, pg_connections=dict()): lbl_geom.setPixmap(pix_multi) lbl_geom.setToolTip(self.tr("MultiPolygon", "ResultsManager")) elif ds_geometry == "TIN": - tbl_result.setItem( - count, 2, QTableWidgetItem(u"TIN")) + tbl_result.setItem(count, 2, QTableWidgetItem("TIN")) else: tbl_result.setItem( - count, 2, QTableWidgetItem( - self.tr("Unknown geometry", "ResultsManager"))) + count, + 2, + QTableWidgetItem(self.tr("Unknown geometry", "ResultsManager")), + ) else: if "rasterDataset" in i.get("type"): lbl_geom.setPixmap(pix_rastr) @@ -176,22 +203,32 @@ def show_results(self, api_results, pg_connections=dict()): if i.get("format", "NR") in li_formats_vect and "path" in i: add_path = self._filepath_builder(i.get("path")) if add_path: - params = ["vector", add_path, - i.get("title", "NR"), - i.get("abstract", "NR"), - md_keywords] - dico_add_options[self.tr("Data file", "ResultsManager")] = params + params = [ + "vector", + add_path, + i.get("title", "NR"), + i.get("abstract", "NR"), + md_keywords, + ] + dico_add_options[ + self.tr("Data file", "ResultsManager") + ] = params else: pass # Same if the data is a raster elif i.get("format", "NR") in li_formats_rastr and "path" in i: add_path = self._filepath_builder(i.get("path")) if add_path: - params = ["raster", add_path, - i.get("title", "NR"), - i.get("abstract", "NR"), - md_keywords] - dico_add_options[self.tr("Data file", "ResultsManager")] = params + params = [ + "raster", + add_path, + i.get("title", "NR"), + i.get("abstract", "NR"), + md_keywords, + ] + dico_add_options[ + self.tr("Data file", "ResultsManager") + ] = params else: pass # If the data is a postGIS table and the connexion has @@ -208,13 +245,19 @@ def show_results(self, api_results, pg_connections=dict()): params["abstract"] = i.get("abstract", None) params["title"] = i.get("title", None) params["keywords"] = md_keywords - dico_add_options[self.tr("PostGIS table", "ResultsManager")] = params + dico_add_options[ + self.tr("PostGIS table", "ResultsManager") + ] = params else: pass else: pass else: - logger.debug("Metadata {} has a format but it's not recognized or path is missing".format(md_id)) + logger.debug( + "Metadata {} has a format but it's not recognized or path is missing".format( + md_id + ) + ) pass # Associated service layers d_type = i.get("type") @@ -222,40 +265,61 @@ def show_results(self, api_results, pg_connections=dict()): for layer in i.get("serviceLayers"): service = layer.get("service") if service is not None: - srv_details = {"path": service.get("path", "NR"), - "formatVersion": service.get("formatVersion")} + srv_details = { + "path": service.get("path", "NR"), + "formatVersion": service.get("formatVersion"), + } # EFS if service.get("format") == "efs": - params = self.layer_adder.build_efs_url(layer, srv_details, - rsc_type="ds_dyn_lyr_srv", - mode="quicky") + params = self.layer_adder.build_efs_url( + layer, + srv_details, + rsc_type="ds_dyn_lyr_srv", + mode="quicky", + ) # EMS elif service.get("format") == "ems": - params = self.layer_adder.build_ems_url(layer, srv_details, - rsc_type="ds_dyn_lyr_srv", - mode="quicky") + params = self.layer_adder.build_ems_url( + layer, + srv_details, + rsc_type="ds_dyn_lyr_srv", + mode="quicky", + ) # WFS elif service.get("format") == "wfs": - params = self.layer_adder.build_wfs_url(layer, srv_details, - rsc_type="ds_dyn_lyr_srv", - mode="quicky") + params = self.layer_adder.build_wfs_url( + layer, + srv_details, + rsc_type="ds_dyn_lyr_srv", + mode="quicky", + ) # WMS elif service.get("format") == "wms": - params = self.layer_adder.build_wms_url(layer, srv_details, - rsc_type="ds_dyn_lyr_srv", - mode="quicky") + params = self.layer_adder.build_wms_url( + layer, + srv_details, + rsc_type="ds_dyn_lyr_srv", + mode="quicky", + ) # WMTS elif service.get("format") == "wmts": - params = self.layer_adder.build_wmts_url(layer, srv_details, - rsc_type="ds_dyn_lyr_srv") + params = self.layer_adder.build_wmts_url( + layer, srv_details, rsc_type="ds_dyn_lyr_srv" + ) else: pass if params[0] != 0: - basic_md = [i.get("title", "NR"), i.get("abstract", "NR"), md_keywords] + basic_md = [ + i.get("title", "NR"), + i.get("abstract", "NR"), + md_keywords, + ] params.append(basic_md) - dico_add_options["{} : {}".format(params[0], params[1])] = params + dico_add_options[ + "{} : {}".format(params[0], params[1]) + ] = params else: pass else: @@ -264,14 +328,16 @@ def show_results(self, api_results, pg_connections=dict()): # are stored in the purposely named include: "layers". elif i.get("type") == "service": if i.get("layers") is not None: - srv_details = {"path": i.get("path", "NR"), - "formatVersion": i.get("formatVersion")} + srv_details = { + "path": i.get("path", "NR"), + "formatVersion": i.get("formatVersion"), + } # EFS if i.get("format") == "efs": for layer in i.get("layers"): - name_url = self.layer_adder.build_efs_url(layer, srv_details, - rsc_type="service", - mode="quicky") + name_url = self.layer_adder.build_efs_url( + layer, srv_details, rsc_type="service", mode="quicky" + ) if name_url[0] != 0: dico_add_options[name_url[5]] = name_url else: @@ -279,9 +345,9 @@ def show_results(self, api_results, pg_connections=dict()): # EMS if i.get("format") == "ems": for layer in i.get("layers"): - name_url = self.layer_adder.build_ems_url(layer, srv_details, - rsc_type="service", - mode="quicky") + name_url = self.layer_adder.build_ems_url( + layer, srv_details, rsc_type="service", mode="quicky" + ) if name_url[0] != 0: dico_add_options[name_url[5]] = name_url else: @@ -289,9 +355,9 @@ def show_results(self, api_results, pg_connections=dict()): # WFS if i.get("format") == "wfs": for layer in i.get("layers"): - name_url = self.layer_adder.build_wfs_url(layer, srv_details, - rsc_type="service", - mode="quicky") + name_url = self.layer_adder.build_wfs_url( + layer, srv_details, rsc_type="service", mode="quicky" + ) if name_url[0] != 0: dico_add_options[name_url[5]] = name_url else: @@ -299,9 +365,9 @@ def show_results(self, api_results, pg_connections=dict()): # WMS elif i.get("format") == "wms": for layer in i.get("layers"): - name_url = self.layer_adder.build_wms_url(layer, srv_details, - rsc_type="service", - mode="quicky") + name_url = self.layer_adder.build_wms_url( + layer, srv_details, rsc_type="service", mode="quicky" + ) if name_url[0] != 0: dico_add_options[name_url[5]] = name_url else: @@ -309,8 +375,9 @@ def show_results(self, api_results, pg_connections=dict()): # WMTS elif i.get("format") == "wmts": for layer in i.get("layers"): - name_url = self.layer_adder.build_wmts_url(layer, srv_details, - rsc_type="service") + name_url = self.layer_adder.build_wmts_url( + layer, srv_details, rsc_type="service" + ) if name_url[0] != 0: btn_label = "WMTS : {}".format(name_url[1]) dico_add_options[btn_label] = name_url @@ -350,13 +417,13 @@ def show_results(self, api_results, pg_connections=dict()): icon = ico_pgis elif text.startswith(self.tr("Data file", "ResultsManager")): icon = ico_file - else : + else: logger.debug("text : {}".format(text)) add_button = QPushButton(icon, text) add_button.setStyleSheet("text-align: left") - add_button.pressed.connect(partial(self.add_layer, - layer_info=["info", params]) - ) + add_button.pressed.connect( + partial(self.add_layer, layer_info=["info", params]) + ) tbl_result.setCellWidget(count, 3, add_button) # Else, add a combobox, storing all possibilities. else: @@ -377,9 +444,10 @@ def show_results(self, api_results, pg_connections=dict()): elif i.startswith(self.tr("Data file", "ResultsManager")): icon = ico_file combo.addItem(icon, i, dico_add_options.get(i)) - combo.activated.connect(partial(self.add_layer, - layer_info=["index", count])) - combo.model().sort(0) # sort alphabetically on option prefix. see: #113 + combo.activated.connect( + partial(self.add_layer, layer_info=["index", count]) + ) + combo.model().sort(0) # sort alphabetically on option prefix. see: #113 tbl_result.setCellWidget(count, 3, combo) count += 1 @@ -406,7 +474,9 @@ def _filepath_builder(self, metadata_path): return str(filepath) except: self.cache_mng.cached_unreach_paths.append(dir_file) - logger.debug("Path is not reachable and has been cached:{}".format(dir_file)) + logger.debug( + "Path is not reachable and has been cached:{}".format(dir_file) + ) return False else: logger.debug("Path has been ignored because it's cached.") @@ -417,48 +487,45 @@ def build_postgis_dict(self, input_dict): # input_dict.beginGroup("PostgreSQL/connections") final_dict = {} for k in sorted(input_dict.allKeys()): - if k.startswith("PostgreSQL/connections/")\ - and k.endswith("/database"): + if k.startswith("PostgreSQL/connections/") and k.endswith("/database"): if len(k.split("/")) == 4: connection_name = k.split("/")[2] password_saved = input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/savePassword') + "PostgreSQL/connections/" + connection_name + "/savePassword" + ) user_saved = input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/saveUsername') - if password_saved == 'true' and user_saved == 'true': - dictionary = {'name': - input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/database'), - 'host': - input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/host'), - 'port': - input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/port'), - 'username': - input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/username'), - 'password': - input_dict.value( - 'PostgreSQL/connections/' + - connection_name + - '/password')} + "PostgreSQL/connections/" + connection_name + "/saveUsername" + ) + if password_saved == "true" and user_saved == "true": + dictionary = { + "name": input_dict.value( + "PostgreSQL/connections/" + + connection_name + + "/database" + ), + "host": input_dict.value( + "PostgreSQL/connections/" + connection_name + "/host" + ), + "port": input_dict.value( + "PostgreSQL/connections/" + connection_name + "/port" + ), + "username": input_dict.value( + "PostgreSQL/connections/" + + connection_name + + "/username" + ), + "password": input_dict.value( + "PostgreSQL/connections/" + + connection_name + + "/password" + ), + } final_dict[ - input_dict.value('PostgreSQL/connections/' + - connection_name + - '/database') + input_dict.value( + "PostgreSQL/connections/" + + connection_name + + "/database" + ) ] = dictionary else: continue @@ -472,5 +539,5 @@ def build_postgis_dict(self, input_dict): # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/search_form.py b/modules/search_form.py index 54605765..6af14385 100644 --- a/modules/search_form.py +++ b/modules/search_form.py @@ -34,26 +34,27 @@ qsettings = QSettings() # icons -ico_od_asc = QIcon(':/plugins/Isogeo/resources/results/sort-alpha-asc.svg') -ico_od_desc = QIcon(':/plugins/Isogeo/resources/results/sort-alpha-desc.svg') +ico_od_asc = QIcon(":/plugins/Isogeo/resources/results/sort-alpha-asc.svg") +ico_od_desc = QIcon(":/plugins/Isogeo/resources/results/sort-alpha-desc.svg") ico_ob_relev = QIcon(":/plugins/Isogeo/resources/results/star.svg") -ico_ob_alpha = QIcon(':/plugins/Isogeo/resources/metadata/language.svg') -ico_ob_dcrea = QIcon(':/plugins/Isogeo/resources/datacreated.svg') -ico_ob_dupda = QIcon(':/plugins/Isogeo/resources/datamodified.svg') -ico_ob_mcrea = QIcon(':/plugins/Isogeo/resources/calendar-plus-o.svg') -ico_ob_mupda = QIcon(':/plugins/Isogeo/resources/calendar_blue.svg') -ico_bolt = QIcon(':/plugins/Isogeo/resources/search/bolt.svg') -ico_keyw = QIcon(':/plugins/Isogeo/resources/tag.svg') -ico_none = QIcon(':/plugins/Isogeo/resources/none.svg') -ico_line = QIcon(':/images/themes/default/mIconLineLayer.svg') +ico_ob_alpha = QIcon(":/plugins/Isogeo/resources/metadata/language.svg") +ico_ob_dcrea = QIcon(":/plugins/Isogeo/resources/datacreated.svg") +ico_ob_dupda = QIcon(":/plugins/Isogeo/resources/datamodified.svg") +ico_ob_mcrea = QIcon(":/plugins/Isogeo/resources/calendar-plus-o.svg") +ico_ob_mupda = QIcon(":/plugins/Isogeo/resources/calendar_blue.svg") +ico_bolt = QIcon(":/plugins/Isogeo/resources/search/bolt.svg") +ico_keyw = QIcon(":/plugins/Isogeo/resources/tag.svg") +ico_none = QIcon(":/plugins/Isogeo/resources/none.svg") +ico_line = QIcon(":/images/themes/default/mIconLineLayer.svg") ico_log = QIcon(":/images/themes/default/mActionFolder.svg") -ico_poin = QIcon(':/images/themes/default/mIconPointLayer.svg') -ico_poly = QIcon(':/images/themes/default/mIconPolygonLayer.svg') +ico_poin = QIcon(":/images/themes/default/mIconPointLayer.svg") +ico_poly = QIcon(":/images/themes/default/mIconPolygonLayer.svg") # ############################################################################ # ########## Classes ############### # ################################## + class SearchFormManager(IsogeoDockWidget): """ Basic class to manage IsogeoDockwidget UI module (ui/isogeo_dockwidget.py). It performs different tasks : @@ -64,7 +65,8 @@ class SearchFormManager(IsogeoDockWidget): Most of its methods are called by Isogeo.search_slot method which launched after the results of a search request has been parsed and validated. """ - # Simple signal to connect keywords special combobox with Isogeo.search method + + # Simple signal to connect keywords special combobox with Isogeo.search method kw_sig = pyqtSignal() def __init__(self, trad): @@ -72,31 +74,43 @@ def __init__(self, trad): super().__init__() self.tr = trad - # geofilter, type, format, owner, inspire, srs, contact and license + # geofilter, type, format, owner, inspire, srs, contact and license self.cbbs_search_advanced = self.grp_filters.findChildren(QComboBox) # match between widgets and metadata fields self.match_widget_field = { - self.cbb_type : "types", - self.cbb_format : "formats", - self.cbb_owner : "owners", - self.cbb_inspire : "inspire", - self.cbb_srs : "srs", - self.cbb_contact : "contacts", - self.cbb_license : "licenses" - } - + self.cbb_type: "types", + self.cbb_format: "formats", + self.cbb_owner: "owners", + self.cbb_inspire: "inspire", + self.cbb_srs: "srs", + self.cbb_contact: "contacts", + self.cbb_license: "licenses", + } + # Static dictionnaries for filling static widgets - self.dict_operation = OrderedDict([(self.tr('Intersects'), "intersects"), - (self.tr('within'), "within"), - (self.tr('contains'), "contains")]) - self.dict_ob = OrderedDict([(self.tr("Relevance"), (ico_ob_relev, "relevance")), - (self.tr("Alphabetical order"), (ico_ob_alpha, "title")), - (self.tr("Data modified"), (ico_ob_dupda, "modified")), - (self.tr("Data created"), (ico_ob_dcrea, "created")), - (self.tr("Metadata modified"), (ico_ob_mcrea, "_modified")), - (self.tr("Metadata created"), (ico_ob_mupda, "_created"))]) - self.dict_od = OrderedDict([(self.tr("Descending"), (ico_od_desc, "desc")), - (self.tr("Ascending"), (ico_od_asc, "asc"))]) + self.dict_operation = OrderedDict( + [ + (self.tr("Intersects"), "intersects"), + (self.tr("within"), "within"), + (self.tr("contains"), "contains"), + ] + ) + self.dict_ob = OrderedDict( + [ + (self.tr("Relevance"), (ico_ob_relev, "relevance")), + (self.tr("Alphabetical order"), (ico_ob_alpha, "title")), + (self.tr("Data modified"), (ico_ob_dupda, "modified")), + (self.tr("Data created"), (ico_ob_dcrea, "created")), + (self.tr("Metadata modified"), (ico_ob_mcrea, "_modified")), + (self.tr("Metadata created"), (ico_ob_mupda, "_created")), + ] + ) + self.dict_od = OrderedDict( + [ + (self.tr("Descending"), (ico_od_desc, "desc")), + (self.tr("Ascending"), (ico_od_asc, "asc")), + ] + ) # Setting quick search manager self.qs_mng = QuickSearchManager(self) @@ -109,7 +123,9 @@ def __init__(self, trad): # Setting result manager self.results_mng = ResultsManager(self) - def update_cbb_keywords(self, tags_keywords:dict ={}, selected_keywords:list =[]): + def update_cbb_keywords( + self, tags_keywords: dict = {}, selected_keywords: list = [] + ): """Keywords combobox is specific because items are checkable. See: https://github.com/isogeo/isogeo-plugin-qgis/issues/159 @@ -117,12 +133,15 @@ def update_cbb_keywords(self, tags_keywords:dict ={}, selected_keywords:list =[] :param list selected_keywords: keywords (codes) already checked. """ selected_keywords_lbls = self.cbb_chck_kw.checkedItems() # for tooltip - logger.debug("Updating keywords combobox with {} items, including {} selected." - .format(len(tags_keywords), len(selected_keywords_lbls))) + logger.debug( + "Updating keywords combobox with {} items, including {} selected.".format( + len(tags_keywords), len(selected_keywords_lbls) + ) + ) model = QStandardItemModel(5, 1) # 5 rows, 1 col # parse keywords and check selected - i = 0 # row index + i = 0 # row index for tag_label, tag_code in sorted(tags_keywords.items()): i += 1 item = QStandardItem(tag_label) @@ -137,8 +156,7 @@ def update_cbb_keywords(self, tags_keywords:dict ={}, selected_keywords:list =[] else: pass # first item = label for the combobox. - first_item = QStandardItem("---- {} ----" - .format(self.tr('Keywords'))) + first_item = QStandardItem("---- {} ----".format(self.tr("Keywords"))) first_item.setIcon(ico_keyw) first_item.setSelectable(False) model.insertRow(0, first_item) @@ -151,9 +169,11 @@ def update_cbb_keywords(self, tags_keywords:dict ={}, selected_keywords:list =[] # add tooltip with selected keywords. see: #107#issuecomment-341742142 if selected_keywords: - tooltip = "{}\n - {}".format(self.tr("Selected keywords:"), "\n - ".join(selected_keywords_lbls)) + tooltip = "{}\n - {}".format( + self.tr("Selected keywords:"), "\n - ".join(selected_keywords_lbls) + ) else: - tooltip = self.tr("No keyword selected") + tooltip = self.tr("No keyword selected") self.cbb_chck_kw.setToolTip(tooltip) def pop_as_cbbs(self, tags: dict): @@ -194,20 +214,20 @@ def pop_qs_cbbs(self, items_list: list = None): :param list items_list: a list of quick searche's names from _user/quicksearches.json file. """ - logger.debug("Filling quick searches comboboxes") + logger.debug("Filling quick searches comboboxes") # building the list of widgets' items'content if items_list == None: - qs_list =list(self.qs_mng.load_file().keys()) + qs_list = list(self.qs_mng.load_file().keys()) else: qs_list = items_list - qs_list.pop(qs_list.index('_default')) - if '_current' in qs_list: - qs_list.pop(qs_list.index('_current')) + qs_list.pop(qs_list.index("_default")) + if "_current" in qs_list: + qs_list.pop(qs_list.index("_current")) # clear widgets self.cbb_quicksearch_use.clear() self.cbb_quicksearch_edit.clear() # filling widgets from the saved searches list built above - self.cbb_quicksearch_use.addItem(ico_bolt, self.tr('Quicksearches')) + self.cbb_quicksearch_use.addItem(ico_bolt, self.tr("Quicksearches")) for qs in qs_list: self.cbb_quicksearch_use.addItem(qs, qs) self.cbb_quicksearch_edit.addItem(qs, qs) @@ -223,43 +243,46 @@ def set_ccb_index(self, params: dict, quicksearch: str = ""): :param str quicksearch: empty string if no quicksearch performed. Otherwise:the name of the quicksearch performed. """ - logger.debug("Settings widgets statut according to these parameters : \n{}" - .format(params)) + logger.debug( + "Settings widgets statut according to these parameters : \n{}".format( + params + ) + ) # for Advanced search Combobox except geo_filter for cbb in self.match_widget_field.keys(): field_name = self.match_widget_field.get(cbb) dest_index = cbb.findData(params.get(field_name)) cbb.setCurrentIndex(dest_index) # for geo filter, use quick search, and text label if quicksearch is True - if quicksearch == "": + if quicksearch == "": if params.get("geofilter") == "mapcanvas": dest_index = self.cbb_geofilter.findData("mapcanvas") self.cbb_geofilter.setCurrentIndex(dest_index) - else : + else: dest_index = self.cbb_geofilter.findText(params["geofilter"]) self.cbb_geofilter.setCurrentIndex(dest_index) - dest_index = self.cbb_quicksearch_use.findData(params.get('favorite')) + dest_index = self.cbb_quicksearch_use.findData(params.get("favorite")) self.cbb_quicksearch_use.setCurrentIndex(dest_index) else: dest_index = self.cbb_geofilter.findData(params.get("geofilter")) self.cbb_geofilter.setCurrentIndex(dest_index) if quicksearch != "_default": - saved_index = self.cbb_quicksearch_use.findData(quicksearch) - self.cbb_quicksearch_use.setCurrentIndex(saved_index) + saved_index = self.cbb_quicksearch_use.findData(quicksearch) + self.cbb_quicksearch_use.setCurrentIndex(saved_index) else: pass self.txt_input.setText(params.get("text")) - # for geo_op + # for geo_op dest_index = self.cbb_geo_op.findData(params.get("operation")) self.cbb_geo_op.setCurrentIndex(dest_index) # for sorting order and direction - self.cbb_ob.setCurrentIndex(self.cbb_ob.findData(params.get('ob'))) - self.cbb_od.setCurrentIndex(self.cbb_od.findData(params.get('od'))) + self.cbb_ob.setCurrentIndex(self.cbb_ob.findData(params.get("ob"))) + self.cbb_od.setCurrentIndex(self.cbb_od.findData(params.get("od"))) return - - def fill_tbl_result(self, content: dict, page_index:int, results_count:int): + + def fill_tbl_result(self, content: dict, page_index: int, results_count: int): """ Called by Isogeo.search_slot method. It sets some widgets' statuts in order to display results. @@ -279,16 +302,16 @@ def fill_tbl_result(self, content: dict, page_index:int, results_count:int): elif page_index == nb_page: self.btn_next.setEnabled(False) self.btn_previous.setEnabled(True) - else : + else: self.btn_next.setEnabled(True) self.btn_previous.setEnabled(True) - + self.cbb_ob.setEnabled(True) self.cbb_od.setEnabled(True) self.btn_show.setToolTip(self.tr("Display results")) - self.results_mng.show_results(api_results = content) - self.qs_mng.write_params('_current', search_kind="Current") + self.results_mng.show_results(api_results=content) + self.qs_mng.write_params("_current", search_kind="Current") def switch_widgets_on_and_off(self, mode=1): """Disable all the UI widgets when a request is being sent. @@ -305,7 +328,7 @@ def switch_widgets_on_and_off(self, mode=1): else: self.txt_input.setReadOnly(True) self.tab_search.setEnabled(False) - + def init_steps(self): """ Called by Isogeo.search_slot method in case of reset or "_default" quicksearch. It initialise the widgets that don't need to be updated @@ -352,29 +375,28 @@ def save_params(self): """ params = {} # get the data of the item which index is (comboboxes current index) - for cbb in self.match_widget_field : + for cbb in self.match_widget_field: field = self.match_widget_field.get(cbb) item = cbb.itemData(cbb.currentIndex()) params[field] = item if self.cbb_geofilter.currentIndex() < 2: params["geofilter"] = self.cbb_geofilter.itemData( - self.cbb_geofilter.currentIndex()) + self.cbb_geofilter.currentIndex() + ) else: params["geofilter"] = self.cbb_geofilter.currentText() params["favorite"] = self.cbb_quicksearch_use.itemData( - self.cbb_quicksearch_use.currentIndex()) - + self.cbb_quicksearch_use.currentIndex() + ) + # Getting the text in the search line params["text"] = self.txt_input.text() - params["operation"] = self.cbb_geo_op.itemData( - self.cbb_geo_op.currentIndex()) - params["ob"] = self.cbb_ob.itemData( - self.cbb_ob.currentIndex()) - params["od"] = self.cbb_od.itemData( - self.cbb_od.currentIndex()) + params["operation"] = self.cbb_geo_op.itemData(self.cbb_geo_op.currentIndex()) + params["ob"] = self.cbb_ob.itemData(self.cbb_ob.currentIndex()) + params["od"] = self.cbb_od.itemData(self.cbb_od.currentIndex()) # Saving the keywords that are selected : if a keyword state is # selected, he is added to the list key_params = [] @@ -383,22 +405,24 @@ def save_params(self): key_params.append(self.cbb_chck_kw.itemData(item_index, 32)) params["keys"] = key_params # check geographic filter - if params.get('geofilter') == "mapcanvas": + if params.get("geofilter") == "mapcanvas": e = iface.mapCanvas().extent() extent = [e.xMinimum(), e.yMinimum(), e.xMaximum(), e.yMaximum()] - params['extent'] = extent - epsg = int(plg_tools.get_map_crs().split(':')[1]) - params['epsg'] = epsg - params['coord'] = self.get_coords('canvas') - elif params.get('geofilter') in [lyr.name() for lyr in QgsProject.instance().mapLayers().values()]: - params['coord'] = self.get_coords(params.get('geofilter')) + params["extent"] = extent + epsg = int(plg_tools.get_map_crs().split(":")[1]) + params["epsg"] = epsg + params["coord"] = self.get_coords("canvas") + elif params.get("geofilter") in [ + lyr.name() for lyr in QgsProject.instance().mapLayers().values() + ]: + params["coord"] = self.get_coords(params.get("geofilter")) else: pass # saving params in QSettings qsettings.setValue("isogeo/settings/georelation", params.get("operation")) return params - def get_coords(self, filter:str): + def get_coords(self, filter: str): """Get the extent's coordinates of a layer or canvas in the right format and SRS (WGS84). @@ -410,7 +434,7 @@ def get_coords(self, filter:str): :rtype: str """ - if filter == 'canvas': + if filter == "canvas": e = iface.mapCanvas().extent() current_epsg = plg_tools.get_map_crs() else: @@ -418,28 +442,34 @@ def get_coords(self, filter:str): e = layer.extent() current_epsg = layer.crs().authid() # epsg code as integer - current_epsg = int(current_epsg.split(':')[1]) + current_epsg = int(current_epsg.split(":")[1]) if current_epsg == 4326: coord = "{0},{1},{2},{3}".format( - e.xMinimum(), e.yMinimum(), e.xMaximum(), e.yMaximum()) + e.xMinimum(), e.yMinimum(), e.xMaximum(), e.yMaximum() + ) return coord elif type(current_epsg) is int: current_srs = QgsCoordinateReferenceSystem( - current_epsg, QgsCoordinateReferenceSystem.EpsgCrsId) + current_epsg, QgsCoordinateReferenceSystem.EpsgCrsId + ) wgs = QgsCoordinateReferenceSystem( - 4326, QgsCoordinateReferenceSystem.EpsgCrsId) + 4326, QgsCoordinateReferenceSystem.EpsgCrsId + ) xform = QgsCoordinateTransform(current_srs, wgs, QgsProject.instance()) minimum = xform.transform(QgsPointXY(e.xMinimum(), e.yMinimum())) maximum = xform.transform(QgsPointXY(e.xMaximum(), e.yMaximum())) coord = "{0},{1},{2},{3}".format( - minimum[0], minimum[1], maximum[0], maximum[1]) + minimum[0], minimum[1], maximum[0], maximum[1] + ) return coord else: - logger.debug('Wrong EPSG') + logger.debug("Wrong EPSG") return False + + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution.""" diff --git a/modules/tools.py b/modules/tools.py index 8abaa3a5..b4051728 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, - print_function, unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals # Standard library import configparser @@ -14,7 +13,7 @@ import webbrowser # PyQGIS -from qgis.core import (QgsDataSourceUri, QgsProject, QgsVectorLayer, QgsRasterLayer) +from qgis.core import QgsDataSourceUri, QgsProject, QgsVectorLayer, QgsRasterLayer from qgis.utils import iface # PyQT @@ -22,9 +21,9 @@ from qgis.PyQt.QtWidgets import QMessageBox # Depending on operating system -if opersys == 'win32': +if opersys == "win32": """ windows """ - from os import startfile # to open a folder/file + from os import startfile # to open a folder/file else: pass @@ -47,24 +46,25 @@ class IsogeoPlgTools(IsogeoUtils): """Inheritance from Isogeo Python SDK utils class. It adds some specific tools for QGIS plugin.""" + last_error = None tr = object def __init__(self, auth_folder=r"../_auth"): - """Check and manage authentication credentials.""" - # authentication - self.auth_folder = auth_folder + """Check and manage authentication credentials.""" + # authentication + self.auth_folder = auth_folder - # instanciate - super(IsogeoPlgTools, self).__init__ () + # instanciate + super(IsogeoPlgTools, self).__init__() def error_catcher(self, msg, tag, level): """Catch QGIS error messages for introspection.""" - if tag == 'WMS' and level != 0: + if tag == "WMS" and level != 0: self.last_error = "wms", msg - elif tag == 'WFS' and level != 0: + elif tag == "WFS" and level != 0: self.last_error = "wfs", msg - elif tag == 'PostGIS' and level != 0: + elif tag == "PostGIS" and level != 0: self.last_error = "postgis", msg else: pass @@ -74,11 +74,11 @@ def format_button_title(self, title): :param str title: title to format """ - words = title.split(' ') - if len(words) == 1 and len(words[0])>33: + words = title.split(" ") + if len(words) == 1 and len(words[0]) > 33: final_text = "\n" + words[0][:30] + "..." return final_text - else: + else: pass line_length = 0 @@ -113,9 +113,9 @@ def handle_date(self, input_date): """ if input_date != "NR": date = input_date.split("T")[0] - year = int(date.split('-')[0]) - month = int(date.split('-')[1]) - day = int(date.split('-')[2]) + year = int(date.split("-")[0]) + month = int(date.split("-")[1]) + day = int(date.split("-")[2]) new_date = datetime.date(year, month, day) # method ending return new_date.strftime("%Y-%m-%d") @@ -128,15 +128,13 @@ def mail_to_isogeo(self, lang): :param str lang: language code. If not fr (French), the English form is displayed. """ if lang == "fr": - webbrowser.open('https://www.isogeo.com/fr/Plugin-QGIS/22', - new=0, - autoraise=True - ) + webbrowser.open( + "https://www.isogeo.com/fr/Plugin-QGIS/22", new=0, autoraise=True + ) else: - webbrowser.open('https://www.isogeo.com/en/QGIS-Plugin/22', - new=0, - autoraise=True - ) + webbrowser.open( + "https://www.isogeo.com/en/QGIS-Plugin/22", new=0, autoraise=True + ) # method ending logger.debug("Bugtracker launched in the default web browser") @@ -147,29 +145,30 @@ def open_dir_file(self, target): """ # check if the file or the directory exists if not path.exists(target): - raise IOError('No such file: {0}'.format(target)) + raise IOError("No such file: {0}".format(target)) # check the read permission if not access(target, R_OK): - raise IOError('Cannot access file: {0}'.format(target)) + raise IOError("Cannot access file: {0}".format(target)) # open the directory or the file according to the os - if opersys == 'win32': # Windows + if opersys == "win32": # Windows proc = startfile(path.realpath(target)) - elif opersys.startswith('linux'): # Linux: - proc = subprocess.Popen(['xdg-open', target], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + elif opersys.startswith("linux"): # Linux: + proc = subprocess.Popen( + ["xdg-open", target], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) - elif opersys == 'darwin': # Mac: - proc = subprocess.Popen(['open', '--', target], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + elif opersys == "darwin": # Mac: + proc = subprocess.Popen( + ["open", "--", target], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) else: raise NotImplementedError( - "Your `%s` isn't a supported operating system`." % opersys) + "Your `%s` isn't a supported operating system`." % opersys + ) # end of function return proc @@ -182,15 +181,14 @@ def open_webpage(self, link): if isinstance(link, QUrl): link = link.toString() - webbrowser.open( - link, - new=0, - autoraise=True) + webbrowser.open(link, new=0, autoraise=True) # method ending logger.debug("Link launched in the default web browser: {}".format(link)) return - def plugin_metadata(self, base_path=path.dirname(__file__), section="general", value="version"): + def plugin_metadata( + self, base_path=path.dirname(__file__), section="general", value="version" + ): """Plugin metadata.txt parser. :param path base_path: directory path whete the metadata txt is stored @@ -208,9 +206,9 @@ def plugin_metadata(self, base_path=path.dirname(__file__), section="general", v * repository """ config = configparser.ConfigParser() - if path.isfile(path.join(base_path, 'metadata.txt')): - config.read(path.join(base_path, 'metadata.txt')) - return config.get('general', value) + if path.isfile(path.join(base_path, "metadata.txt")): + config.read(path.join(base_path, "metadata.txt")) + return config.get("general", value) else: logger.error(path.dirname(__file__)) @@ -224,7 +222,7 @@ def results_pages_counter(self, total=0, page_size=10): count_pages = 1 else: if (total % page_size) == 0: - count_pages = (total / page_size) + count_pages = total / page_size else: count_pages = (total / page_size) + 1 # method ending @@ -241,23 +239,23 @@ def special_search(self, easter_code="isogeo"): canvas = iface.mapCanvas() if easter_code == "isogeo": # WMS - wms_params = {"service": "WMS", - "version": "1.3.0", - "request": "GetMap", - "layers": "Isogeo:isogeo_logo", - "crs": "EPSG:3857", - "format": "image/png", - "styles": "isogeo_logo", - "url": "http://noisy.hq.isogeo.fr:6090/geoserver/Isogeo/ows?" - } + wms_params = { + "service": "WMS", + "version": "1.3.0", + "request": "GetMap", + "layers": "Isogeo:isogeo_logo", + "crs": "EPSG:3857", + "format": "image/png", + "styles": "isogeo_logo", + "url": "http://noisy.hq.isogeo.fr:6090/geoserver/Isogeo/ows?", + } wms_uri = unquote(urlencode(wms_params)) - wms_lyr = QgsRasterLayer(wms_uri, u"Ici c'est Isogeo !", "wms") + wms_lyr = QgsRasterLayer(wms_uri, "Ici c'est Isogeo !", "wms") if wms_lyr.isValid: QgsProject.instance().addMapLayer(wms_lyr) logger.info("Isogeo easter egg used and WMS displayed!") else: - logger.error("WMS layer failed: {}" - .format(wms_lyr.error().message())) + logger.error("WMS layer failed: {}".format(wms_lyr.error().message())) # WFS uri = QgsDataSourceURI() @@ -268,20 +266,24 @@ def special_search(self, easter_code="isogeo"): uri.setParam("srsname", "EPSG:3857") uri.setParam("restrictToRequestBBOX", "0") wfs_uri = uri.uri() - wfs_lyr = QgsVectorLayer(wfs_uri, u"Ici c'est Isogeo !", "WFS") + wfs_lyr = QgsVectorLayer(wfs_uri, "Ici c'est Isogeo !", "WFS") if wfs_lyr.isValid: - wfs_style = path.join(path.dirname(path.realpath(__file__)), - "isogeo.qml") + wfs_style = path.join( + path.dirname(path.realpath(__file__)), "isogeo.qml" + ) wfs_lyr.loadNamedStyle(wfs_style) QgsProject.instance().addMapLayer(wfs_lyr) canvas.setExtent(wfs_lyr.extent()) logger.debug("Isogeo easter egg used") else: - logger.error("Esater egg - WFS layer failed: {}" - .format(wfs_lyr.error().message())) + logger.error( + "Esater egg - WFS layer failed: {}".format( + wfs_lyr.error().message() + ) + ) elif easter_code == "picasa": project = QgsProject.instance() - project.setTitle(u"Isogeo, le Picasa de l'information géographique") + project.setTitle("Isogeo, le Picasa de l'information géographique") logger.debug("Picasa easter egg used") else: pass @@ -302,11 +304,11 @@ def test_proxy_configuration(self): else: qgis_proxy = qsettings.value("proxy/proxyEnabled", "") if str(qgis_proxy) == "true": - http = system_proxy_config.get('http') + http = system_proxy_config.get("http") if http is None: pass else: - elements = http.split(':') + elements = http.split(":") if len(elements) == 2: host = elements[0] port = elements[1] @@ -314,54 +316,89 @@ def test_proxy_configuration(self): qgis_port = qsettings.value("proxy/proxyPort", "") if qgis_host == host: if qgis_port == port: - logger.info("A proxy is set up both in QGIS " - "and the OS and they match => " - "Proxy config : OK") + logger.info( + "A proxy is set up both in QGIS " + "and the OS and they match => " + "Proxy config : OK" + ) pass else: - QMessageBox.information(iface.mainWindow( - ), self.tr("Alert", "Tools"), - self.tr("Proxy issue : \nQGIS and your OS " - "have different proxy set up.", "Tools")) + QMessageBox.information( + iface.mainWindow(), + self.tr("Alert", "Tools"), + self.tr( + "Proxy issue : \nQGIS and your OS " + "have different proxy set up.", + "Tools", + ), + ) else: - QMessageBox.information(iface.mainWindow( - ), self.tr("Alert", "Tools"), - self.tr("Proxy issue : \nQGIS and your OS have" - " different proxy set ups.", "Tools")) - elif len(elements) == 3 and elements[0] == 'http': + QMessageBox.information( + iface.mainWindow(), + self.tr("Alert", "Tools"), + self.tr( + "Proxy issue : \nQGIS and your OS have" + " different proxy set ups.", + "Tools", + ), + ) + elif len(elements) == 3 and elements[0] == "http": host_short = elements[1][2:] - host_long = elements[0] + ':' + elements[1] + host_long = elements[0] + ":" + elements[1] port = elements[2] qgis_host = qsettings.value("proxy/proxyHost", "") qgis_port = qsettings.value("proxy/proxyPort", "") if qgis_host == host_short or qgis_host == host_long: if qgis_port == port: - logger.info("A proxy is set up both in QGIS" - " and the OS and they match " - "=> Proxy config : OK") + logger.info( + "A proxy is set up both in QGIS" + " and the OS and they match " + "=> Proxy config : OK" + ) pass else: - logger.error("OS and QGIS proxy ports do not " - "match. => Proxy config: not OK") - QMessageBox.information(iface.mainWindow( - ), self.tr("Alert", "Tools"), - self.tr("Proxy issue : \nQGIS and your OS" - " have different proxy set ups.", "Tools")) + logger.error( + "OS and QGIS proxy ports do not " + "match. => Proxy config: not OK" + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Alert", "Tools"), + self.tr( + "Proxy issue : \nQGIS and your OS" + " have different proxy set ups.", + "Tools", + ), + ) else: - logger.error("OS and QGIS proxy hosts do not " - "match. => Proxy config: not OK") - QMessageBox.information(iface.mainWindow( - ), self.tr("Alert", "Tools"), - self.tr("Proxy issue : \nQGIS and your OS have" - " different proxy set ups.", "Tools")) + logger.error( + "OS and QGIS proxy hosts do not " + "match. => Proxy config: not OK" + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Alert", "Tools"), + self.tr( + "Proxy issue : \nQGIS and your OS have" + " different proxy set ups.", + "Tools", + ), + ) else: - logger.error("OS uses a proxy but it isn't set up in QGIS." - " => Proxy config: not OK") - QMessageBox.information(iface.mainWindow( - ), self.tr("Alert", "Tools"), - self.tr("Proxy issue : \nYou have a proxy set up on your" - " OS but none in QGIS.\nPlease set it up in " - "'Preferences/Options/Network'.", "Tools")) + logger.error( + "OS uses a proxy but it isn't set up in QGIS." + " => Proxy config: not OK" + ) + QMessageBox.information( + iface.mainWindow(), + self.tr("Alert", "Tools"), + self.tr( + "Proxy issue : \nYou have a proxy set up on your" + " OS but none in QGIS.\nPlease set it up in " + "'Preferences/Options/Network'.", + "Tools", + ), + ) def test_qgis_style(self): """ @@ -372,20 +409,24 @@ def test_qgis_style(self): style_qgis = qsettings.value("qgis/style", "Default") if style_qgis in ("macintosh", "cleanlooks"): qsettings.setValue("qgis/style", "Plastique") - msgBar.pushMessage(self.tr("The '{}' QGIS style is not " - "compatible with combobox. It has " - "been changed. Please restart QGIS.") - .format(style_qgis), - duration=0, - level=msgBar.WARNING) - logger.warning("The '{}' QGIS style is not compatible with combobox." - " Isogeo plugin changed it to 'Plastique'." - "Please restart QGIS." - .format(style_qgis)) + msgBar.pushMessage( + self.tr( + "The '{}' QGIS style is not " + "compatible with combobox. It has " + "been changed. Please restart QGIS." + ).format(style_qgis), + duration=0, + level=msgBar.WARNING, + ) + logger.warning( + "The '{}' QGIS style is not compatible with combobox." + " Isogeo plugin changed it to 'Plastique'." + "Please restart QGIS.".format(style_qgis) + ) return False else: return True - + def _to_raw_string(self, in_string): """Basic converter for input string or unicode to raw string. Useful to prevent escaping in Windows paths for example. @@ -396,7 +437,7 @@ def _to_raw_string(self, in_string): """ if isinstance(in_string, str) or isinstance(in_string, unicode): logger.debug(in_string) - return in_string.encode('unicode-escape') + return in_string.encode("unicode-escape") else: raise TypeError @@ -416,8 +457,9 @@ def _ui_tweaker(self, ui_widgets, tweak_type="comboboxes"): logger.debug("Tweak type not recognized.") pass + # ############################################################################# # ##### Stand alone program ######## # ################################## -if __name__ == '__main__': +if __name__ == "__main__": """Standalone execution."""