Skip to content

Commit

Permalink
add mara-pipelines click command group
Browse files Browse the repository at this point in the history
  • Loading branch information
leo-schick committed Nov 22, 2023
1 parent 4837cdf commit 3640eb7
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 117 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Here is a pipeline "demo" consisting of three nodes that depend on each other: t
```python
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
from mara_pipelines.cli import run_pipeline, run_interactively

pipeline = Pipeline(
id='demo',
Expand Down Expand Up @@ -115,7 +115,7 @@ CREATE TABLE data_integration_file_dependency (
This runs a pipeline with output to stdout:

```python
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines.cli import run_pipeline

run_pipeline(pipeline)
```
Expand All @@ -138,7 +138,7 @@ run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstr
And finally, there is some sort of menu based on [pythondialog](http://pythondialog.sourceforge.net/) that allows to navigate and run pipelines like this:

```python
from mara_pipelines.ui.cli import run_interactively
from mara_pipelines.cli import run_interactively

run_interactively()
```
Expand Down
4 changes: 2 additions & 2 deletions docs/example.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Here is a pipeline "demo" consisting of three nodes that depend on each other: t
```python
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively
from mara_pipelines.cli import run_pipeline, run_interactively

pipeline = Pipeline(
id='demo',
Expand Down Expand Up @@ -68,7 +68,7 @@ CREATE TABLE data_integration_file_dependency (
This runs a pipeline with output to stdout:

```python
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines.cli import run_pipeline

run_pipeline(pipeline)
```
Expand Down
6 changes: 4 additions & 2 deletions mara_pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ def MARA_ACL_RESOURCES():


def MARA_CLICK_COMMANDS():
from .ui import cli
return [cli.run, cli.run_interactively, cli.reset_incremental_processing]
from . import cli
from .ui import cli as old_cli
return [cli.mara_pipelines,
old_cli._run, old_cli._run_interactively, old_cli._reset_incremental_processing]


def MARA_NAVIGATION_ENTRIES():
Expand Down
153 changes: 153 additions & 0 deletions mara_pipelines/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Auto-migrate command line interface"""

import click
import sys
from typing import Set

from . import config, pipelines


def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None,
with_upstreams: bool = False,
interactively_started: bool = False,
disable_colors: bool = False) -> bool:
"""
Runs a pipeline or parts of it with output printed to stdout
Args:
pipeline: The pipeline to run
nodes: A list of pipeline children that should run
with_upstreams: When true and `nodes` are provided, then all upstreams of `nodes` in `pipeline` are also run
interactively_started: Whether or not this run was started interactively, passed on in RunStarted and
RunFinished events.
disable_colors: If true, don't use escape sequences to make the log colorful (default: colorful logging)
Return:
True when the pipeline run succeeded
"""
from ..logging import logger, pipeline_events
from .. import execution

RESET_ALL = 'reset_all'
PATH_COLOR = 'path_color'
ERROR_COLOR = 'error_color'

# https://godoc.org/github.com/whitedevops/colors
colorful = {logger.Format.STANDARD: '\033[01m', # bold
logger.Format.ITALICS: '\033[02m', # dim
logger.Format.VERBATIM: '',
PATH_COLOR: '\033[36m', # cyan
ERROR_COLOR: '\033[91m', # light red
RESET_ALL: '\033[0m', # reset all
}
plain = {key: '' for key in colorful.keys()}

theme = plain if disable_colors else colorful

succeeded = False
for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
if isinstance(event, pipeline_events.Output):
print(f'{theme[PATH_COLOR]}{" / ".join(event.node_path)}{":" if event.node_path else ""}{theme[RESET_ALL]} '
+ theme[event.format] + (theme[ERROR_COLOR] if event.is_error else '')
+ event.message + theme[RESET_ALL])
elif isinstance(event, pipeline_events.RunFinished):
if event.succeeded:
succeeded = True

return succeeded


# -----------------------------------------------------------------------------


@click.group()
def mara_pipelines():
"""Mara pipelines commands"""
pass


@mara_pipelines.command()
@click.option('--path', default='',
help='The id of of the pipeline to run. Example: "pipeline-id"; "" (default) is the root pipeline.')
@click.option('--nodes',
help='IDs of sub-nodes of the pipeline to run, separated by comma. When provided, then only these nodes are run. Example: "do-this,do-that".')
@click.option('--with_upstreams', default=False, is_flag=True,
help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--disable-colors', default=False, is_flag=True,
help='Output logs without coloring them.')
def run(path, nodes, with_upstreams, disable_colors: bool = False):
"""Runs a pipeline or a sub-set of its nodes"""

# the pipeline to run
path = path.split(',')
pipeline, found = pipelines.find_node(path)
if not found:
print(f'Pipeline {path} not found', file=sys.stderr)
sys.exit(-1)
if not isinstance(pipeline, pipelines.Pipeline):
print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
sys.exit(-1)

# a list of nodes to run selectively in the pipeline
_nodes = set()
for id in (nodes.split(',') if nodes else []):
node = pipeline.nodes.get(id)
if not node:
print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
sys.exit(-1)
else:
_nodes.add(node)

if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False, disable_colors=disable_colors):
sys.exit(-1)


@mara_pipelines.command()
def run_interactively():
"""Select and run data pipelines"""
from dialog import Dialog

d = Dialog(dialog="dialog", autowidgetsize=True) # see http://pythondialog.sourceforge.net/doc/widgets.html

def run_pipeline_and_notify(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None):
if not run_pipeline(pipeline, nodes, interactively_started=True):
sys.exit(-1)

def menu(node: pipelines.Node):
if isinstance(node, pipelines.Pipeline):

code, choice = d.menu(
text='Pipeline ' + '.'.join(node.path()) if node.parent else 'Root pipeline',
choices=[('▶ ', 'Run'), ('>> ', 'Run selected')]
+ [(child.id, '→' if isinstance(child, pipelines.Pipeline) else 'Run')
for child in node.nodes.values()])
if code == d.CANCEL:
return

if choice == '▶ ':
run_pipeline_and_notify(node)
elif choice == '>> ':
code, node_ids = d.checklist('Select sub-nodes to run. If you want to run all, then select none.',
choices=[(node_id, '', False) for node_id in node.nodes.keys()])
if code == d.OK:
run_pipeline_and_notify(node, {node.nodes[id] for id in node_ids})
else:
menu(node.nodes[choice])
return
else:
run_pipeline_and_notify(pipeline=node.parent, nodes=[node])

menu(config.root_pipeline())


@mara_pipelines.command()
@click.option('--path', default='',
help='The parent ids of of the node to reset. Example: "pipeline-id,sub-pipeline-id".')
def reset_incremental_processing(path):
"""Reset status of incremental processing for a node"""
from ..incremental_processing import reset

path = path.split(',') if path else []
node, found = pipelines.find_node(path)
if not found:
print(f'Node {path} not found', file=sys.stderr)
sys.exit(-1)
reset.reset_incremental_processing(path)
124 changes: 19 additions & 105 deletions mara_pipelines/ui/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Command line interface for running data pipelines"""

import sys
from typing import Set
"""(Deprecated) Command line interface for running data pipelines"""

import click
from warnings import warn
from typing import Set

from .. import config, pipelines
from . import pipelines
from .. import cli


def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None,
Expand All @@ -24,39 +24,11 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None
Return:
True when the pipeline run succeeded
"""
from ..logging import logger, pipeline_events
from .. import execution

RESET_ALL = 'reset_all'
PATH_COLOR = 'path_color'
ERROR_COLOR = 'error_color'

# https://godoc.org/github.com/whitedevops/colors
colorful = {logger.Format.STANDARD: '\033[01m', # bold
logger.Format.ITALICS: '\033[02m', # dim
logger.Format.VERBATIM: '',
PATH_COLOR: '\033[36m', # cyan
ERROR_COLOR: '\033[91m', # light red
RESET_ALL: '\033[0m', # reset all
}
plain = {key: '' for key in colorful.keys()}
warn("This method is deprecated. Please use `mara_pipelines.cli.run_pipeline` instead.")
return cli.run_pipeline(pipeline, nodes, with_upstreams, interactively_started, disable_colors)

theme = plain if disable_colors else colorful

succeeded = False
for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
if isinstance(event, pipeline_events.Output):
print(f'{theme[PATH_COLOR]}{" / ".join(event.node_path)}{":" if event.node_path else ""}{theme[RESET_ALL]} '
+ theme[event.format] + (theme[ERROR_COLOR] if event.is_error else '')
+ event.message + theme[RESET_ALL])
elif isinstance(event, pipeline_events.RunFinished):
if event.succeeded:
succeeded = True

return succeeded


@click.command()
@click.command("run")
@click.option('--path', default='',
help='The id of of the pipeline to run. Example: "pipeline-id"; "" (default) is the root pipeline.')
@click.option('--nodes',
Expand All @@ -65,81 +37,23 @@ def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None
help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--disable-colors', default=False, is_flag=True,
help='Output logs without coloring them.')
def run(path, nodes, with_upstreams, disable_colors: bool = False):
def _run(path, nodes, with_upstreams, disable_colors: bool = False):
"""Runs a pipeline or a sub-set of its nodes"""
warn("CLI command `<app> mara_pipelines.ui.run` will be dropped in 4.0. Please use `<app> mara-pipelines run` instead.")
cli.run(path, nodes, with_upstreams, disable_colors)

# the pipeline to run
path = path.split(',')
pipeline, found = pipelines.find_node(path)
if not found:
print(f'Pipeline {path} not found', file=sys.stderr)
sys.exit(-1)
if not isinstance(pipeline, pipelines.Pipeline):
print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
sys.exit(-1)

# a list of nodes to run selectively in the pipeline
_nodes = set()
for id in (nodes.split(',') if nodes else []):
node = pipeline.nodes.get(id)
if not node:
print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
sys.exit(-1)
else:
_nodes.add(node)

if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False, disable_colors=disable_colors):
sys.exit(-1)


@click.command()
def run_interactively():
@click.command("run_interactively")
def _run_interactively():
"""Select and run data pipelines"""
from dialog import Dialog

d = Dialog(dialog="dialog", autowidgetsize=True) # see http://pythondialog.sourceforge.net/doc/widgets.html

def run_pipeline_and_notify(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None):
if not run_pipeline(pipeline, nodes, interactively_started=True):
sys.exit(-1)

def menu(node: pipelines.Node):
if isinstance(node, pipelines.Pipeline):
warn("CLI command `<pp> mara_pipelines.ui.run_interactively` will be dropped in 4.0. Please use `<app> mara-pipelines run_interactively` instead.")
cli.run_interactively()

code, choice = d.menu(
text='Pipeline ' + '.'.join(node.path()) if node.parent else 'Root pipeline',
choices=[('▶ ', 'Run'), ('>> ', 'Run selected')]
+ [(child.id, '→' if isinstance(child, pipelines.Pipeline) else 'Run')
for child in node.nodes.values()])
if code == d.CANCEL:
return

if choice == '▶ ':
run_pipeline_and_notify(node)
elif choice == '>> ':
code, node_ids = d.checklist('Select sub-nodes to run. If you want to run all, then select none.',
choices=[(node_id, '', False) for node_id in node.nodes.keys()])
if code == d.OK:
run_pipeline_and_notify(node, {node.nodes[id] for id in node_ids})
else:
menu(node.nodes[choice])
return
else:
run_pipeline_and_notify(pipeline=node.parent, nodes=[node])

menu(config.root_pipeline())


@click.command()
@click.command("reset_incremental_processing")
@click.option('--path', default='',
help='The parent ids of of the node to reset. Example: "pipeline-id,sub-pipeline-id".')
def reset_incremental_processing(path):
def _reset_incremental_processing(path):
"""Reset status of incremental processing for a node"""
from ..incremental_processing import reset

path = path.split(',') if path else []
node, found = pipelines.find_node(path)
if not found:
print(f'Node {path} not found', file=sys.stderr)
sys.exit(-1)
reset.reset_incremental_processing(path)
warn("CLI command `<pp> mara_pipelines.ui.reset_incremental_processing` will be dropped in 4.0. Please use `<app> mara-pipelines reset_incremental_processing` instead.")
cli.reset_incremental_processing(path)
2 changes: 1 addition & 1 deletion tests/mssql/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from mara_pipelines.commands.files import WriteFile
from mara_pipelines.commands.sql import ExecuteSQL
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines.cli import run_pipeline

from tests.db_test_helper import db_is_responsive, db_replace_placeholders
from tests.local_config import MSSQL_SQLCMD_DB
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from mara_pipelines.commands.files import WriteFile
from mara_pipelines.commands.sql import ExecuteSQL
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines.cli import run_pipeline

from tests.db_test_helper import db_is_responsive, db_replace_placeholders
from tests.local_config import POSTGRES_DB
Expand Down
Loading

0 comments on commit 3640eb7

Please sign in to comment.