Skip to content
Permalink
Browse files

More work

  • Loading branch information
NathanW2 committed Nov 30, 2018
1 parent 4a504ab commit cf4927770ea3143682f7b4260762026d78d857a3
Showing with 214 additions and 59 deletions.
  1. +3 −3 python/processing/__init__.py
  2. +181 −56 python/processing/algfactory.py
  3. +30 −0 tests/src/python/test_processing_alg.py
@@ -1,3 +1,3 @@
from algfactory import ProcessingAlgFactory

alg = ProcessingAlgFactory()
from .algfactory import ProcessingAlgFactory

alg = ProcessingAlgFactory()
@@ -1,36 +1,83 @@
from collections import OrderedDict
from functools import partial
from qgis.core import (QgsProcessing,
QgsFeatureSink,
QgsProcessingException,
QgsProcessingParameterDefinition,
QgsProcessingAlgorithm,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterBand,
QgsProcessingParameterString,
QgsProcessingParameterNumber,
QgsProcessingParameterDistance,
QgsProcessingParameterString,
QgsProcessingParameterFeatureSource,
QgsProcessingParameterFeatureSink)

from qgis.core import (QgsProcessingOutputString,
QgsProcessingOutputFile,
QgsProcessingOutputFolder,
QgsProcessingOutputHtml,
QgsProcessingOutputLayerDefinition,
QgsProcessingOutputMapLayer,
QgsProcessingOutputMultipleLayers,
QgsProcessingOutputNumber,
QgsProcessingOutputRasterLayer,
QgsProcessingOutputVectorLayer)

def make_output(**args):
cls = args['cls']
del args['cls']
newargs = {
"name": args['name'],
"description": args['description'],
}
return cls(**newargs)

def make_string(**args):
return QgsProcessingParameterString(**args)

def make_distance(**args):
return QgsProcessingParameterDistance(**args)

def make_sink(**args):
return QgsProcessingParameterFeatureSink(**args)

def make_source(**args):
return QgsProcessingParameterFeatureSource(**args)

def make_number(**args):
return QgsProcessingParameterNumber(**args)

def safe_del(data, *keys):
for key in keys:
try:
del data[key]
except KeyError:
pass

class ProcessingAlgFactoryException(Exception):
def __init__(self, message):
super(ProcessingAlgFactoryException, self).__init__(message)


class AlgWrapper(QgsProcessingAlgorithm):
class _Args():
def __getattribute__(self, item):
return item
class AlgWarpper(QgsProcessingAlgorithm):

def __init__(self, name=None, display=None,
group=None, group_id=None, inputs=None,
outputs=None, func=None):
super(AlgWarpper, self).__init__()
self._inputs = OrderedDict(inputs or {})
self._outputs = OrderedDict(outputs or {})
self._name = name
self._group = group
self._group_id = group_id
self._display = display
self._func = func

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._inputs = {}
self._outputs = {}
self._name = None
self._group = None
self._group_id = None
self._display = None
self._func = None

self.NAMES = AlgWrapper._Args()
def _get_parent_id(self, parent):
if isinstance(parent, str):
return parent
else:
raise NotImplementedError()

# Wrapper logic
def define(self, name, label, group, groupid):
@@ -47,37 +94,53 @@ def end(self):
"At least one is required. Use @alg.input"
"to define one.".format(self.name()))
if not self.has_outputs:
raise ProcessingAlgFactoryException("No outputs defined for '{}' alg. "
"At least one is required. Use @alg.output to set one.".format(
self.name()))

def add_input(self, type, name, label, parent=None):
print("Input: {} as {}".format(name, str(type)))
if name in self._inputs:
raise ProcessingAlgFactoryException("Input {} already defined".format(name))
raise ProcessingAlgFactoryException("No outputs defined for '{}' alg"
"At least one must be defined. Use @alg.output")

def add_output(self, type, **kwargs):
parm = self._add_parm(type, output=True, **kwargs)
self._outputs[parm.name()] = parm
print("Output: {} as {}".format(parm.name(), str(parm)))

def add_input(self, type, **kwargs):
parm = self._add_parm(type, **kwargs)
self._inputs[parm.name()] = parm
print("Input: {} as {}".format(parm.name(), str(parm)))

def _add_parm(self, type, output=False, **kwargs):
name = kwargs['name']
if name in self._inputs or name in self._outputs:
raise ProcessingAlgFactoryException("{} already defined".format(name))

parent = kwargs.get("parent")
if parent:
parentname = self._get_parent_id(parent)
if parentname == name:
raise ProcessingAlgFactoryException("Input {} can't depend on itself. "
raise ProcessingAlgFactoryException("{} can't depend on itself. "
"We know QGIS is smart but it's not that smart".format(name))
if parentname not in self._inputs:
raise ProcessingAlgFactoryException("No input named {} defined".format(parentname))
if parentname not in self._inputs or parentname not in self._outputs:
raise ProcessingAlgFactoryException("Can't find parent named {}".format(parentname))
else:
print(" <- Depends On: {}".format(parentname))

self._inputs[name] = label

def _get_parent_id(self, parent):
if isinstance(parent, str):
return parent
else:
raise NotImplementedError()
parm = self._create_parm(type, output, **kwargs)
return parm

def add_output(self, type, name, label, parent=None):
print("Output: {} as {}".format(name, str(type)))
if name in self._outputs:
raise ProcessingAlgFactoryException("Output {} already defined".format(name))
self._outputs[name] = label
def _create_parm(self, type,output=False, **kwargs):
kwargs['description'] = kwargs.pop("label", "")
kwargs['defaultValue'] = kwargs.pop("default", "")
advanced = kwargs.pop("advanced", False)
try:
if output:
make_func = ProcessingAlgFactory.output_type_mapping[type]
else:
make_func = ProcessingAlgFactory.input_type_mapping[type]
parm = make_func(**kwargs)
if advanced:
parm.setFlags(parm.flags() | QgsProcessingParameterDefinition.FlagAdvanced)
return parm
except KeyError as ex:
raise NotImplementedError("{} not supported".format(str(type)))

def set_func(self, func):
print("Func: {}".format(func.__name__))
@@ -118,29 +181,87 @@ def groupId(self):

def processAlgorithm(self, parameters, context, feedback):
print("Call: {}".format(self._func.__name__))
output = self._func(self, parameters, context, feedback)
values = {}
for parm in self._inputs.values():
name = parm.name()
if isinstance(parm, QgsProcessingParameterString):
value = self.parameterAsString(parameters, name, context)
values[name] = value
elif isinstance(parm, QgsProcessingParameterNumber):
if parm.dataType() == QgsProcessingParameterNumber.Integer:
value = self.parameterAsInt(parameters, name, context)
values[name] = value
if parm.dataType() == QgsProcessingParameterNumber.Double:
value = self.parameterAsDouble(parameters, name, context)
values[name] = value

output = self._func(self, parameters, context, feedback, values)
if output is None:
return {}
return output

def createInstance(self):
return AlgWarpper(self._name, self._display,
self._group, self._group_id, self._inputs, self._outputs,
self._func)

def initAlgorithm(self, configuration, p_str=None, Any=None, *args, **kwargs):
for parm in self._inputs.values():
self.addParameter(parm)

for parm in self._outputs.values():
self.addOutput(parm)

# TODO Check for define on none main thread.
class ProcessingAlgFactory(object):
STRING = str,
INT = int,
NUMBER = float,
SINK = "SINK",
FEATURE_SOURCE = "FEATURESOURCE"
class ProcessingAlgFactory():
STRING = "STRING",
INT = "INT",
NUMBER = "NUMBER",
DISTANCE = "DISTANCE",
SINK = "SINK"
SOURCE = "SOURCE"
FILE = "FILE",
FOLDER = "FOLDER",
HTML = "HTML",
LAYERDEF = "LAYERDEF",
MAPLAYER = "MAPLAYER",
MULTILAYER = "MULTILAYER",
RASTER_LAYER = "RASTER_LAYER",
VECTOR_LAYER = "VECTOR_LAYER"

input_type_mapping = {
str: make_string,
int: partial(make_number, type=QgsProcessingParameterNumber.Integer),
float: partial(make_number, type=QgsProcessingParameterNumber.Double),
NUMBER: partial(make_number, type=QgsProcessingParameterNumber.Double),
INT: partial(make_number, type=QgsProcessingParameterNumber.Integer),
STRING: make_string,
DISTANCE: make_distance,
SINK: make_sink,
SOURCE: make_source
}

output_type_mapping = {
str: partial(make_output, cls=QgsProcessingOutputString),
int: partial(make_output, cls=QgsProcessingOutputNumber),
float: partial(make_output, cls=QgsProcessingOutputNumber),
NUMBER: partial(make_output, cls=QgsProcessingOutputNumber),
DISTANCE: partial(make_output, cls=QgsProcessingOutputNumber),
INT: partial(make_output, cls=QgsProcessingOutputNumber),
STRING: partial(make_output, cls=QgsProcessingOutputString),
FILE: partial(make_output, cls=QgsProcessingOutputFile),
FOLDER: partial(make_output, cls=QgsProcessingOutputFolder),
HTML: partial(make_output, cls=QgsProcessingOutputHtml),
LAYERDEF: partial(make_output, cls=QgsProcessingOutputLayerDefinition),
MAPLAYER: partial(make_output, cls=QgsProcessingOutputMapLayer),
MULTILAYER: partial(make_output, cls=QgsProcessingOutputMultipleLayers),
RASTER_LAYER: partial(make_output, cls=QgsProcessingOutputRasterLayer),
VECTOR_LAYER: partial(make_output, cls=QgsProcessingOutputVectorLayer),
}

def __init__(self):
self._current = None

def __call__(self, *args, **kwargs):
return self.define(*args, **kwargs)

@property
def NAMES(self):
return self.current.NAMES
self.instances = {}

@property
def current(self):
@@ -150,17 +271,21 @@ def current(self):
def current_defined(self):
return self._current is not None

def __call__(self, *args, **kwargs):
return self._define(*args, **kwargs)

def _initnew(self):
if self.current_defined:
raise ProcessingAlgFactoryException("@alg.define() already called."
"Did you forget @alg.output")

self._current = AlgWrapper()
self._current = AlgWarpper()

def _pop(self):
self.instances[self.current.name()] = self.current
self._current = None

def define(self, *args, **kwargs):
def _define(self, *args, **kwargs):
self._initnew()
self.current.define(*args, **kwargs)

@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
"""QGIS Unit tests for some syntactic sugar in python
.. note:: This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
"""
__author__ = 'Nathan Woodrow'
__date__ = '27.11.2018'
__copyright__ = 'Copyright 2018, The QGIS Project'
# This will get replaced with a git SHA1 when you do a git archive
__revision__ = '$Format:%H$'

import qgis # NOQA

from python.processing import alg

from qgis.testing import unittest, start_app

start_app()

class TestSyntacticSugar(unittest.TestCase):

def testEdit(self):
pass


if __name__ == "__main__":
unittest.main()

0 comments on commit cf49277

Please sign in to comment.
You can’t perform that action at this time.