From e1fa8e357eaa77608f26af52b1291e52841e29d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20Anh=20B=C3=ACnh?= Date: Thu, 8 Jun 2023 18:49:12 +0700 Subject: [PATCH] - wrap xpull function --- evoflow/VERSION | 2 +- evoflow/entities/core/step.py | 1 + evoflow/operators/base_operator.py | 14 +++++++++++- evoflow/operators/python_operator.py | 34 ++++------------------------ 4 files changed, 20 insertions(+), 31 deletions(-) diff --git a/evoflow/VERSION b/evoflow/VERSION index 5f1faa6..3c4e98d 100644 --- a/evoflow/VERSION +++ b/evoflow/VERSION @@ -1 +1 @@ -0.1.6.292 +0.1.7.0 diff --git a/evoflow/entities/core/step.py b/evoflow/entities/core/step.py index 531e25c..85d9b96 100644 --- a/evoflow/entities/core/step.py +++ b/evoflow/entities/core/step.py @@ -121,3 +121,4 @@ def __lshift__(self, other): elif isinstance(other, list): step_list = StepList(other) return step_list.next(self) + raise Exception("Invalid step") diff --git a/evoflow/operators/base_operator.py b/evoflow/operators/base_operator.py index 5e68137..27cfeb9 100644 --- a/evoflow/operators/base_operator.py +++ b/evoflow/operators/base_operator.py @@ -2,13 +2,25 @@ class BaseOperator(Step): - def __init__(self, **kwargs): + def __init__(self, op_args=None, op_kwargs=None, **kwargs): super().__init__(**kwargs) self.task_id = kwargs.get("task_id", None) self.name = self.task_id + self.op_args = op_args or [] + self.__op_kwargs = op_kwargs or {} + + @property + def op_kwargs(self): + return {**self.__op_kwargs, **{"ti": self}} def __str__(self): return f"{self.__class__.__name__} {self.task_id}" def __repr__(self): return f"{self.__class__.__name__} {self.task_id}" + + def xcom_push(self, key, value): + self.params[key] = value + + def xcom_pull(self, key, **kwargs): + return self.params[key] \ No newline at end of file diff --git a/evoflow/operators/python_operator.py b/evoflow/operators/python_operator.py index 535e6dc..9aa6d7d 100644 --- a/evoflow/operators/python_operator.py +++ b/evoflow/operators/python_operator.py @@ -4,39 +4,15 @@ class PythonOperator(BaseOperator): """ Executes a Python callable - :param python_callable: A reference to an object that is callable - :type python_callable: python callable - :param op_args: a list of positional arguments to pass to python_callable - :type op_args: list - :param op_kwargs: a dict of keyword arguments to pass to python_callable - :type op_kwargs: dict - :param templates_dict: a dictionary of templates variables to pass to python_callable - :type templates_dict: dict - :param templates_exts: a list of file extensions to resolve templates - :type templates_exts: list """ - template_fields = ("templates_dict", "op_args", "op_kwargs") - template_ext = tuple() - ui_color = "#ffefeb" - - def __init__( - self, - python_callable, - op_args=None, - op_kwargs=None, - templates_dict=None, - templates_exts=None, - *args, - **kwargs - ): + def __init__(self, python_callable, *args, **kwargs): super().__init__(*args, **kwargs) self.python_callable = python_callable - self.op_args = op_args or [] - self.op_kwargs = op_kwargs or {} - self.templates_dict = templates_dict - self.templates_exts = templates_exts def action(self, **kwargs): kwargs = {**kwargs, **self.op_kwargs} - return self.python_callable(*self.op_args, **kwargs) + result = self.python_callable(*self.op_args, **kwargs) + if isinstance(result, dict): + kwargs = {**kwargs, **result} + return kwargs