diff --git a/mara_pipelines/commands/bash.py b/mara_pipelines/commands/bash.py index bf94345..7344582 100644 --- a/mara_pipelines/commands/bash.py +++ b/mara_pipelines/commands/bash.py @@ -1,6 +1,6 @@ """Commands for running bash scripts""" -from typing import Union, Callable +from typing import Union, Callable, List, Tuple from mara_page import html from .. import pipelines @@ -18,13 +18,13 @@ def __init__(self, command: Union[str, Callable]) -> None: self._command = command @property - def command(self): + def command(self) -> str: return (self._command() if callable(self._command) else self._command).strip() - def shell_command(self): + def shell_command(self) -> str: return self.command - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [ ('command', html.highlight_syntax(self.shell_command(), 'bash')) ] diff --git a/mara_pipelines/commands/files.py b/mara_pipelines/commands/files.py index 467e32a..74b8cd8 100644 --- a/mara_pipelines/commands/files.py +++ b/mara_pipelines/commands/files.py @@ -4,6 +4,7 @@ import pathlib import shlex import sys +from typing import List, Tuple, Dict import enum @@ -76,7 +77,7 @@ def shell_command(self): def mapper_file_path(self): return self.parent.parent.base_path() / self.mapper_script_file_name - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('file name', _.i[self.file_name]), ('compression', _.tt[self.compression]), ('mapper script file name', _.i[self.mapper_script_file_name]), @@ -89,17 +90,17 @@ def html_doc_items(self) -> [(str, str)]: ('csv format', _.tt[self.csv_format]), ('skip header', _.tt[self.skip_header]), ('delimiter char', - _.tt[json.dumps(self.delimiter_char) if self.delimiter_char != None else None]), - ('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char != None else None]), + _.tt[json.dumps(self.delimiter_char) if self.delimiter_char is not None else None]), + ('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char is not None else None]), ('null value string', - _.tt[json.dumps(self.null_value_string) if self.null_value_string != None else None]), + _.tt[json.dumps(self.null_value_string) if self.null_value_string is not None else None]), ('time zone', _.tt[self.timezone]), (_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))] class ReadSQLite(sql._SQLCommand): def __init__(self, sqlite_file_name: str, target_table: str, - sql_statement: str = None, sql_file_name: str = None, replace: {str: str} = None, + sql_statement: str = None, sql_file_name: str = None, replace: Dict[str, str] = None, db_alias: str = None, timezone: str = None) -> None: sql._SQLCommand.__init__(self, sql_statement, sql_file_name, replace) self.sqlite_file_name = sqlite_file_name @@ -115,10 +116,10 @@ def db_alias(self): def shell_command(self): return (sql._SQLCommand.shell_command(self) + ' | ' + mara_db.shell.copy_command( - mara_db.dbs.SQLiteDB(file_name=config.data_dir().absolute() / self.sqlite_file_name), + mara_db.dbs.SQLiteDB(file_name=pathlib.Path(config.data_dir()).absolute() / self.sqlite_file_name), self.db_alias, self.target_table, timezone=self.timezone)) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('sqlite file name', _.i[self.sqlite_file_name])] \ + sql._SQLCommand.html_doc_items(self, None) \ + [('target_table', _.tt[self.target_table]), @@ -161,7 +162,7 @@ def shell_command(self): def file_path(self): return self.parent.parent.base_path() / self.file_name - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('file name', _.i[self.file_name]), (_.i['content'], html.highlight_syntax(self.file_path().read_text().strip('\n') if self.file_name and self.file_path().exists() @@ -170,9 +171,9 @@ def html_doc_items(self) -> [(str, str)]: ('target_table', _.tt[self.target_table]), ('db alias', _.tt[self.db_alias()]), ('delimiter char', - _.tt[json.dumps(self.delimiter_char) if self.delimiter_char != None else None]), - ('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char != None else None]), + _.tt[json.dumps(self.delimiter_char) if self.delimiter_char is not None else None]), + ('quote char', _.tt[json.dumps(self.quote_char) if self.quote_char is not None else None]), ('null value string', - _.tt[json.dumps(self.null_value_string) if self.null_value_string != None else None]), + _.tt[json.dumps(self.null_value_string) if self.null_value_string is not None else None]), ('time zone', _.tt[self.timezone]), (_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))] diff --git a/mara_pipelines/commands/http.py b/mara_pipelines/commands/http.py index 48c32a3..39c1764 100644 --- a/mara_pipelines/commands/http.py +++ b/mara_pipelines/commands/http.py @@ -1,12 +1,14 @@ """Commands for interacting with HTTP""" +from typing import List, Tuple, Dict + from mara_page import html, _ from .. import pipelines from ..shell import http_request_command class HttpRequest(pipelines.Command): - def __init__(self, url: str, headers: {str: str} = None, method: str = None, body: str = None) -> None: + def __init__(self, url: str, headers: Dict[str, str] = None, method: str = None, body: str = None) -> None: """ Executes a HTTP request @@ -25,7 +27,7 @@ def __init__(self, url: str, headers: {str: str} = None, method: str = None, bod def shell_command(self): return http_request_command(self.url, self.headers, self.method, self.body) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [ ('method', _.tt[self.method or 'GET']), ('url', _.tt[self.url]), diff --git a/mara_pipelines/commands/python.py b/mara_pipelines/commands/python.py index fc36e49..9706869 100644 --- a/mara_pipelines/commands/python.py +++ b/mara_pipelines/commands/python.py @@ -5,7 +5,7 @@ import sys import json from html import escape -from typing import Union, Callable, List +from typing import Union, Callable, List, Optional, Tuple from ..incremental_processing import file_dependencies from ..logging import logger @@ -24,7 +24,7 @@ class RunFunction(pipelines.Command): Note: if you want to pass arguments, then use a lambda function """ - def __init__(self, function: Callable = None, args: [str] = None, file_dependencies: [str] = None) -> None: + def __init__(self, function: Optional[Callable] = None, args: Optional[List[str]] = None, file_dependencies: Optional[List[str]] = None) -> None: self.function = function self.args = args or [] self.file_dependencies = file_dependencies or [] @@ -48,7 +48,7 @@ def run(self) -> bool: return True - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('function', _.pre[escape(str(self.function))]), ('args', _.tt[repr(self.args)]), (_.i['implementation'], html.highlight_syntax(inspect.getsource(self.function), 'python')), @@ -65,7 +65,7 @@ class ExecutePython(pipelines.Command): file_dependencies: Run triggered based on whether a list of files changed since the last pipeline run """ def __init__(self, file_name: Union[Callable, str], - args: Union[Callable, List[str]] = None, file_dependencies: [str] = None) -> None: + args: Optional[Union[Callable, List[str]]] = None, file_dependencies: Optional[List[str]] = None) -> None: self._file_name = file_name self._args = args or [] self.file_dependencies = file_dependencies or [] diff --git a/mara_pipelines/commands/sql.py b/mara_pipelines/commands/sql.py index db04f08..bab387a 100644 --- a/mara_pipelines/commands/sql.py +++ b/mara_pipelines/commands/sql.py @@ -4,7 +4,7 @@ import json import pathlib import shlex -from typing import Callable, Union +from typing import Callable, Union, Dict, Optional, List, Tuple import mara_db.dbs import mara_db.shell @@ -25,8 +25,8 @@ class _SQLCommand(pipelines.Command): sql_file_name: The name of the file to run (relative to the directory of the parent pipeline) replace: A set of replacements to perform against the sql query `{'replace`: 'with', ..}` """ - def __init__(self, sql_statement: Union[Callable, str] = None, sql_file_name: str = None, - replace: {str: str} = None) -> None: + def __init__(self, sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, + replace: Optional[Dict[str, str]] = None) -> None: if (not (sql_statement or sql_file_name)) or (sql_statement and sql_file_name): raise ValueError('Please provide either sql_statement or sql_file_name (but not both)') @@ -88,9 +88,9 @@ class ExecuteSQL(_SQLCommand): sql_file_name: The name of the file to run (relative to the directory of the parent pipeline) replace: A set of replacements to perform against the sql query `{'replace`: 'with', ..}` """ - def __init__(self, sql_statement: str = None, sql_file_name: Union[str, Callable] = None, - replace: {str: str} = None, file_dependencies=None, db_alias: str = None, - echo_queries: bool = None, timezone: str = None) -> None: + def __init__(self, sql_statement: Optional[Union[str, Callable]] = None, sql_file_name: Optional[str] = None, + replace: Optional[Dict[str, str]] = None, file_dependencies: Optional[List[str]] = None, db_alias: Optional[str] = None, + echo_queries: Optional[bool] = None, timezone: Optional[str] = None) -> None: _SQLCommand.__init__(self, sql_statement, sql_file_name, replace) self._db_alias = db_alias @@ -147,10 +147,10 @@ def html_doc_items(self): class Copy(_SQLCommand): """Loads data from an external database""" - def __init__(self, source_db_alias: str, target_table: str, target_db_alias: str = None, - sql_statement: str = None, sql_file_name: Union[Callable, str] = None, replace: {str: str} = None, - timezone: str = None, csv_format: bool = None, delimiter_char: str = None, - file_dependencies=None) -> None: + def __init__(self, source_db_alias: str, target_table: str, target_db_alias: Optional[str] = None, + sql_statement: Optional[Union[Callable, str]] = None, sql_file_name: Optional[str] = None, replace: Dict[str, str] = None, + timezone: Optional[str] = None, csv_format: Optional[bool] = None, delimiter_char: Optional[str] = None, + file_dependencies: Optional[List[str]]=None) -> None: _SQLCommand.__init__(self, sql_statement, sql_file_name, replace) self.source_db_alias = source_db_alias self.target_table = target_table @@ -161,7 +161,7 @@ def __init__(self, source_db_alias: str, target_table: str, target_db_alias: str self.file_dependencies = file_dependencies or [] @property - def target_db_alias(self): + def target_db_alias(self) -> str: return self._target_db_alias or config.default_db_alias() def file_path(self) -> pathlib.Path: @@ -194,12 +194,12 @@ def run(self) -> bool: file_dependencies.update(self.node_path(), dependency_type, pipeline_base_path, self.file_dependencies) return True - def shell_command(self): + def shell_command(self) -> str: return _SQLCommand.shell_command(self) \ + ' | ' + mara_db.shell.copy_command(self.source_db_alias, self.target_db_alias, self.target_table, self.timezone, self.csv_format, self.delimiter_char) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('source db', _.tt[self.source_db_alias])] \ + _SQLCommand.html_doc_items(self, self.source_db_alias) \ + [('target db', _.tt[self.target_db_alias]), @@ -238,12 +238,12 @@ class CopyIncrementally(_SQLCommand): """ def __init__(self, source_db_alias: str, source_table: str, modification_comparison: str, comparison_value_placeholder: str, - target_table: str, primary_keys: [str], - sql_file_name: Union[str, Callable] = None, sql_statement: Union[str, Callable] = None, - target_db_alias: str = None, timezone: str = None, replace: {str: str} = None, + target_table: str, primary_keys: List[str], + sql_file_name: Optional[str] = None, sql_statement: Optional[Union[str, Callable]] = None, + target_db_alias: Optional[str] = None, timezone: Optional[str] = None, replace: Dict[str, str] = None, use_explicit_upsert: bool = False, - csv_format: bool = None, delimiter_char: str = None, - modification_comparison_type: str = None) -> None: + csv_format: Optional[bool] = None, delimiter_char: Optional[str] = None, + modification_comparison_type: Optional[str] = None) -> None: _SQLCommand.__init__(self, sql_statement, sql_file_name, replace) self.source_db_alias = source_db_alias self.source_table = source_table @@ -260,7 +260,7 @@ def __init__(self, source_db_alias: str, source_table: str, self.delimiter_char = delimiter_char @property - def target_db_alias(self): + def target_db_alias(self) -> str: return self._target_db_alias or config.default_db_alias() def run(self) -> bool: @@ -400,7 +400,7 @@ def _copy_command(self, target_table, replace): target_table, timezone=self.timezone, csv_format=self.csv_format, delimiter_char=self.delimiter_char)) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('source db', _.tt[self.source_db_alias]), ('source table', _.tt[self.source_table]), ('modification comparison', _.tt[self.modification_comparison])] \ @@ -416,13 +416,13 @@ def html_doc_items(self) -> [(str, str)]: ('use explicit upsert', _.tt[repr(self.use_explicit_upsert)])] -def _expand_pattern_substitution(replace: {str: str}) -> {str: str}: +def _expand_pattern_substitution(replace: Dict[str, str]) -> Dict[str, str]: """Helper function for replacing callables with their value in a dictionary""" return {k: (str(v()) if callable(v) else str(v)) for k, v in replace.items()} @functools.singledispatch -def _sql_syntax_higlighting_lexter(db): +def _sql_syntax_higlighting_lexter(db) -> str: """Returns the best lexer from http://pygments.org/docs/lexers/ for a database dialect""" return 'sql' diff --git a/mara_pipelines/config.py b/mara_pipelines/config.py index b482eb4..72075d2 100644 --- a/mara_pipelines/config.py +++ b/mara_pipelines/config.py @@ -39,7 +39,7 @@ def last_date() -> datetime.date: return datetime.date(3000, 1, 1) -def max_number_of_parallel_tasks(): +def max_number_of_parallel_tasks() -> int: """How many tasks can run in parallel at maximum""" return multiprocessing.cpu_count() @@ -90,7 +90,7 @@ def slack_token() -> typing.Optional[str]: @functools.lru_cache(maxsize=None) -def event_handlers() -> [events.EventHandler]: +def event_handlers() -> typing.List[events.EventHandler]: """ Configure additional event handlers that listen to pipeline events, e.g. chat bots that announce failed runs diff --git a/mara_pipelines/execution.py b/mara_pipelines/execution.py index 57b6294..bd7d5fb 100644 --- a/mara_pipelines/execution.py +++ b/mara_pipelines/execution.py @@ -16,16 +16,17 @@ from multiprocessing import queues from multiprocessing.context import BaseContext from queue import Empty +from typing import Set, List, Dict, Optional from . import pipelines, config from .logging import logger, pipeline_events, system_statistics, run_log, node_cost from . import events -def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None, +def run_pipeline(pipeline: pipelines.Pipeline, nodes: Optional[Set[pipelines.Node]] = None, with_upstreams: bool = False, interactively_started: bool = False - ) -> [events.Event]: + ) -> List[events.Event]: """ Runs a pipeline in a forked sub process. Acts as a generator that yields events from the sub process. @@ -72,20 +73,20 @@ def run(): logger.redirect_output(event_queue, pipeline.path()) # all nodes that have not run yet, ordered by priority - node_queue: [pipelines.Node] = [] + node_queue: List[pipelines.Node] = [] # data needed for computing cost node_durations_and_run_times = node_cost.node_durations_and_run_times(pipeline) if use_historical_node_cost else {} # Putting nodes into the node queue - def queue(nodes: [pipelines.Node]): + def queue(nodes: List[pipelines.Node]): for node in nodes: node_cost.compute_cost(node, node_durations_and_run_times) node_queue.append(node) node_queue.sort(key=lambda node: node.cost, reverse=True) if nodes: # only run a set of child nodes - def with_all_upstreams(nodes: {pipelines.Node}): + def with_all_upstreams(nodes: Set[pipelines.Node]): """recursively find all upstreams of a list of nodes""" return functools.reduce(set.union, [with_all_upstreams(node.upstreams) for node in nodes], nodes) @@ -110,11 +111,11 @@ def with_all_upstreams(nodes: {pipelines.Node}): # book keeping run_start_time = datetime.datetime.now(tz.utc) # all nodes that already ran or that won't be run anymore - processed_nodes: {pipelines.Node} = set() + processed_nodes: Set[pipelines.Node] = set() # running pipelines with start times and number of running children - running_pipelines: {pipelines.Pipeline: [datetime.datetime, int]} = {} - failed_pipelines: {pipelines.Pipeline} = set() # pipelines with failed tasks - running_task_processes: {pipelines.Task: TaskProcess} = {} + running_pipelines: Dict[pipelines.Pipeline, [datetime.datetime, int]] = {} + failed_pipelines: Set[pipelines.Pipeline] = set() # pipelines with failed tasks + running_task_processes: Dict[pipelines.Task, TaskProcess] = {} # make sure any running tasks are killed when this executor process is shutdown executor_pid = os.getpid() diff --git a/mara_pipelines/incremental_processing/file_dependencies.py b/mara_pipelines/incremental_processing/file_dependencies.py index 38cf8f2..599f3ba 100644 --- a/mara_pipelines/incremental_processing/file_dependencies.py +++ b/mara_pipelines/incremental_processing/file_dependencies.py @@ -2,6 +2,7 @@ import datetime import hashlib import pathlib +from typing import List import sqlalchemy from sqlalchemy.ext.declarative import declarative_base @@ -22,7 +23,7 @@ class FileDependency(Base): timestamp = sqlalchemy.Column(sqlalchemy.TIMESTAMP(timezone=True)) -def update(node_path: [str], dependency_type: str, pipeline_base_path: str, file_dependencies: [str]): +def update(node_path: List[str], dependency_type: str, pipeline_base_path: str, file_dependencies: List[str]): """ Stores the combined hash of a list of files @@ -40,7 +41,7 @@ def update(node_path: [str], dependency_type: str, pipeline_base_path: str, file DO UPDATE SET timestamp = EXCLUDED.timestamp, hash = EXCLUDED.hash """, (node_path, dependency_type, hash(pipeline_base_path, file_dependencies), datetime.datetime.utcnow())) -def delete(node_path: [str], dependency_type: str): +def delete(node_path: List[str], dependency_type: str): """ Delets the combined hash of a list of files for that node and dependency type @@ -55,7 +56,7 @@ def delete(node_path: [str], dependency_type: str): """, (node_path, dependency_type)) -def is_modified(node_path: [str], dependency_type: str, pipeline_base_path: str, file_dependencies: [str]): +def is_modified(node_path: List[str], dependency_type: str, pipeline_base_path: str, file_dependencies: List[str]): """ Checks whether a list of files have been modified since the last pipeline run @@ -77,7 +78,7 @@ def is_modified(node_path: [str], dependency_type: str, pipeline_base_path: str, return False if cursor.fetchone() else True -def hash(pipeline_base_path: pathlib.Path, file_dependencies: [str]) -> str: +def hash(pipeline_base_path: pathlib.Path, file_dependencies: List[str]) -> str: """ Creates a combined hash of the content of a list of files diff --git a/mara_pipelines/incremental_processing/incremental_copy_status.py b/mara_pipelines/incremental_processing/incremental_copy_status.py index 8841803..64f3701 100644 --- a/mara_pipelines/incremental_processing/incremental_copy_status.py +++ b/mara_pipelines/incremental_processing/incremental_copy_status.py @@ -1,5 +1,7 @@ """Tracks the last comparison value of an incremental copy""" +from typing import List + import sqlalchemy from sqlalchemy.ext.declarative import declarative_base @@ -19,7 +21,7 @@ class IncrementalCopyStatus(Base): last_comparison_value = sqlalchemy.Column(sqlalchemy.Text) -def update(node_path: [str], source_db_alias: str, source_table: str, last_comparison_value): +def update(node_path: List[str], source_db_alias: str, source_table: str, last_comparison_value): """ Updates the last_comparison_value for a pipeline node and table Args: @@ -40,7 +42,7 @@ def update(node_path: [str], source_db_alias: str, source_table: str, last_compa ''', (node_path, f'{source_db_alias}.{source_table}', last_comparison_value)) -def delete(node_path: [str], source_db_alias: str, source_table: str): +def delete(node_path: List[str], source_db_alias: str, source_table: str): """ Deletes the last_comparison_value for a pipeline node and table Args: @@ -58,7 +60,7 @@ def delete(node_path: [str], source_db_alias: str, source_table: str): -def get_last_comparison_value(node_path: [str], source_db_alias: str, source_table: str): +def get_last_comparison_value(node_path: List[str], source_db_alias: str, source_table: str): """ Returns the last comparison value for a pipeline node and table Args: diff --git a/mara_pipelines/incremental_processing/processed_files.py b/mara_pipelines/incremental_processing/processed_files.py index ed29b6d..029c901 100644 --- a/mara_pipelines/incremental_processing/processed_files.py +++ b/mara_pipelines/incremental_processing/processed_files.py @@ -1,6 +1,7 @@ """Functions for keeping track whether an input file has already been 'processed' """ -import datetime +from datetime import datetime +from typing import Dict import sqlalchemy from sqlalchemy.ext.declarative import declarative_base @@ -42,7 +43,7 @@ def track_processed_file(node_path: str, file_name: str, last_modified_timestamp return True -def already_processed_files(node_path: str) -> {str: datetime}: +def already_processed_files(node_path: str) -> Dict[str, datetime]: """ Returns all files that already have been processed by a node Args: diff --git a/mara_pipelines/incremental_processing/reset.py b/mara_pipelines/incremental_processing/reset.py index f9f6506..1de119b 100644 --- a/mara_pipelines/incremental_processing/reset.py +++ b/mara_pipelines/incremental_processing/reset.py @@ -1,10 +1,12 @@ """Resetting incremental copy status""" +from typing import List + import mara_db.config import mara_db.postgresql -def reset_incremental_processing(node_path: [str]): +def reset_incremental_processing(node_path: List[str]): """ Recursively resets all incremental processing status information that is stored in the mara db Args: diff --git a/mara_pipelines/logging/logger.py b/mara_pipelines/logging/logger.py index 19b3643..fe51882 100644 --- a/mara_pipelines/logging/logger.py +++ b/mara_pipelines/logging/logger.py @@ -1,8 +1,9 @@ """Text output logging with redirection to queues""" +from datetime import datetime import multiprocessing import sys -from datetime import datetime +from typing import List from ..logging import pipeline_events import mara_pipelines.config @@ -46,7 +47,7 @@ def log(message: str, format: pipeline_events.Output.Format = Format.STANDARD, """When running in a forked task process, this will be bound to the path of the currently running pipeline node""" -def redirect_output(event_queue: multiprocessing.Queue, node_path: [str]): +def redirect_output(event_queue: multiprocessing.Queue, node_path: List[str]): """ Redirects the output of the `log` function as well as `sys.stdout` and `sys.stderr` to `event_queue` Args: diff --git a/mara_pipelines/logging/node_cost.py b/mara_pipelines/logging/node_cost.py index 465ef2a..6d65f62 100644 --- a/mara_pipelines/logging/node_cost.py +++ b/mara_pipelines/logging/node_cost.py @@ -2,13 +2,14 @@ import functools import math +from typing import Dict import mara_db.config import mara_db.postgresql from .. import pipelines -def node_durations_and_run_times(node: pipelines.Node) -> {tuple: [float, float]}: +def node_durations_and_run_times(node: pipelines.Node) -> Dict[tuple, [float, float]]: """ Returns for children of `node` the average duration and run time (sum of average duration of all leaf nodes) @@ -43,7 +44,7 @@ def node_durations_and_run_times(node: pipelines.Node) -> {tuple: [float, float] return {tuple(row[0]): row[1:] for row in cursor.fetchall()} -def compute_cost(node: pipelines.Node, node_durations_and_run_times: {tuple: [float, float]}) -> float: +def compute_cost(node: pipelines.Node, node_durations_and_run_times: Dict[tuple, [float, float]]) -> float: """ Computes the cost of a node as maximum cumulative run time of a node and all its downstreams. Stores the result in `node` and also returns it diff --git a/mara_pipelines/logging/run_log.py b/mara_pipelines/logging/run_log.py index d40efd5..f8ea1a3 100644 --- a/mara_pipelines/logging/run_log.py +++ b/mara_pipelines/logging/run_log.py @@ -1,5 +1,7 @@ """Logging pipeline runs, node output and status information in mara database""" +from typing import List, Dict + import sqlalchemy.orm from sqlalchemy.ext.declarative import declarative_base @@ -116,7 +118,7 @@ def handle_event(self, event: events.Event): class RunLogger(events.EventHandler): """A run logger saving the pipeline events to the 'mara' database alias""" run_id: int = None - node_output: {tuple: [pipeline_events.Output]} = None + node_output: Dict[tuple, List[pipeline_events.Output]] = None def handle_event(self, event: events.Event): import psycopg2.extensions diff --git a/mara_pipelines/notification/notifier.py b/mara_pipelines/notification/notifier.py index 7220661..02714dc 100644 --- a/mara_pipelines/notification/notifier.py +++ b/mara_pipelines/notification/notifier.py @@ -1,4 +1,5 @@ import abc +from typing import Dict, List from .. import events from ..logging import pipeline_events @@ -10,7 +11,7 @@ def __init__(self): """ Abstract class for sending notifications to chat bots when pipeline errors occur""" # keep a list of log messages and error log messages for each node - self.node_output: {tuple: {bool: [events.Event]}} = None + self.node_output: Dict[tuple, Dict[bool, List[events.Event]]] = None def handle_event(self, event: events.Event): diff --git a/mara_pipelines/notification/slack.py b/mara_pipelines/notification/slack.py index a4f989d..49d9287 100644 --- a/mara_pipelines/notification/slack.py +++ b/mara_pipelines/notification/slack.py @@ -1,4 +1,5 @@ import requests +from typing import List from .. import config from ..logging import pipeline_events from ..notification.notifier import ChatNotifier @@ -60,7 +61,7 @@ def _send_message(self, message): if response.status_code != 200: raise ValueError(f'Could not send message. Status {response.status_code}, response "{response.text}"') - def _format_output(self, output_events: [pipeline_events.Output]): + def _format_output(self, output_events: List[pipeline_events.Output]): output, last_format = '', '' for event in output_events: if event.format == pipeline_events.Output.Format.VERBATIM: diff --git a/mara_pipelines/notification/teams.py b/mara_pipelines/notification/teams.py index 12da449..f95ee1c 100644 --- a/mara_pipelines/notification/teams.py +++ b/mara_pipelines/notification/teams.py @@ -1,4 +1,5 @@ import requests +from typing import List from .. import config from ..logging import pipeline_events from .notifier import ChatNotifier @@ -56,7 +57,7 @@ def _send_message(self, message): if response.status_code != 200: raise ValueError(f'Could not send message. Status {response.status_code}, response "{response.text}"') - def _format_output(self, output_events: [pipeline_events.Output]): + def _format_output(self, output_events: List[pipeline_events.Output]): output, last_format = '', '' for event in output_events: if event.format == pipeline_events.Output.Format.VERBATIM: diff --git a/mara_pipelines/parallel_tasks/files.py b/mara_pipelines/parallel_tasks/files.py index 5228ee2..16460e1 100644 --- a/mara_pipelines/parallel_tasks/files.py +++ b/mara_pipelines/parallel_tasks/files.py @@ -7,6 +7,7 @@ import pathlib import re from html import escape +from typing import List, Tuple, Optional import mara_db.config import mara_db.dbs @@ -31,10 +32,10 @@ class ReadMode(enum.EnumMeta): class _ParallelRead(pipelines.ParallelTask): def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, target_table: str, - max_number_of_parallel_tasks: int = None, file_dependencies: [str] = None, date_regex: str = None, + max_number_of_parallel_tasks: Optional[int] = None, file_dependencies: Optional[List[str]] = None, date_regex: Optional[str] = None, partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False, - commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, - db_alias: str = None, timezone: str = None) -> None: + commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None, + db_alias: Optional[str] = None, timezone: Optional[str] = None) -> None: pipelines.ParallelTask.__init__(self, id=id, description=description, max_number_of_parallel_tasks=max_number_of_parallel_tasks, commands_before=commands_before, commands_after=commands_after) @@ -157,7 +158,7 @@ def update_file_dependencies(): pipelines.Task(id=str(n), description=f'Reads {len(chunk)} files', commands=sum([self.parallel_commands(x[0]) for x in chunk], []))) - def parallel_commands(self, file_name: str) -> [pipelines.Command]: + def parallel_commands(self, file_name: str) -> List[pipelines.Command]: return [self.read_command(file_name)] + ( [python.RunFunction(function=lambda: _processed_files.track_processed_file( self.path(), file_name, self._last_modification_timestamp(file_name)))] @@ -173,14 +174,14 @@ def _last_modification_timestamp(self, file_name): class ParallelReadFile(_ParallelRead): def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, - compression: files.Compression, target_table: str, file_dependencies: [str] = None, - date_regex: str = None, partition_target_table_by_day_id: bool = False, + compression: files.Compression, target_table: str, file_dependencies: Optional[List[str]] = None, + date_regex: Optional[str] = None, partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False, - commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, - mapper_script_file_name: str = None, make_unique: bool = False, db_alias: str = None, - delimiter_char: str = None, quote_char: str = None, null_value_string: str = None, - skip_header: bool = None, csv_format: bool = False, - timezone: str = None, max_number_of_parallel_tasks: int = None) -> None: + commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None, + mapper_script_file_name: Optional[str] = None, make_unique: bool = False, db_alias: Optional[str] = None, + delimiter_char: Optional[str] = None, quote_char: Optional[str] = None, null_value_string: Optional[str] = None, + skip_header: Optional[bool] = None, csv_format: bool = False, + timezone: Optional[str] = None, max_number_of_parallel_tasks: Optional[int] = None) -> None: _ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern, read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies, date_regex=date_regex, partition_target_table_by_day_id=partition_target_table_by_day_id, @@ -204,8 +205,8 @@ def read_command(self, file_name: str) -> pipelines.Command: quote_char=self.quote_char, null_value_string=self.null_value_string, csv_format=self.csv_format, timezone=self.timezone) - def html_doc_items(self) -> [(str, str)]: - path = self.parent.base_path() / self.mapper_script_file_name if self.mapper_script_file_name else '' + def html_doc_items(self) -> List[Tuple[str, str]]: + path = self.parent.base_path() / self.mapper_script_file_name if self.mapper_script_file_name else pathlib.Path() return [('file pattern', _.i[self.file_pattern]), ('compression', _.tt[self.compression]), ('read mode', _.tt[self.read_mode]), @@ -231,9 +232,9 @@ def html_doc_items(self) -> [(str, str)]: class ParallelReadSqlite(_ParallelRead): def __init__(self, id: str, description: str, file_pattern: str, read_mode: ReadMode, sql_file_name: str, - target_table: str, file_dependencies: [str] = None, date_regex: str = None, + target_table: str, file_dependencies: List[str] = None, date_regex: str = None, partition_target_table_by_day_id: bool = False, truncate_partitions: bool = False, - commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, + commands_before: List[pipelines.Command] = None, commands_after: List[pipelines.Command] = None, db_alias: str = None, timezone=None, max_number_of_parallel_tasks: int = None) -> None: _ParallelRead.__init__(self, id=id, description=description, file_pattern=file_pattern, read_mode=read_mode, target_table=target_table, file_dependencies=file_dependencies, @@ -243,14 +244,14 @@ def __init__(self, id: str, description: str, file_pattern: str, read_mode: Read timezone=timezone, max_number_of_parallel_tasks=max_number_of_parallel_tasks) self.sql_file_name = sql_file_name - def read_command(self, file_name: str) -> [pipelines.Command]: + def read_command(self, file_name: str) -> List[pipelines.Command]: return files.ReadSQLite(sqlite_file_name=file_name, sql_file_name=self.sql_file_name, target_table=self.target_table, db_alias=self.db_alias, timezone=self.timezone) def sql_file_path(self): return self.parent.base_path() / self.sql_file_name - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: path = self.sql_file_path() return [('file pattern', _.i[self.file_pattern]), ('read mode', _.tt[self.read_mode]), diff --git a/mara_pipelines/parallel_tasks/python.py b/mara_pipelines/parallel_tasks/python.py index dd4b8b6..b274f77 100644 --- a/mara_pipelines/parallel_tasks/python.py +++ b/mara_pipelines/parallel_tasks/python.py @@ -1,6 +1,6 @@ import inspect import re -import typing +from typing import List, Optional, Tuple, Callable from html import escape from mara_page import _, html @@ -9,9 +9,9 @@ class ParallelExecutePython(pipelines.ParallelTask): - def __init__(self, id: str, description: str, file_name: str, parameter_function: typing.Callable, - max_number_of_parallel_tasks: int = None, commands_before: [pipelines.Command] = None, - commands_after: [pipelines.Command] = None) -> None: + def __init__(self, id: str, description: str, file_name: str, parameter_function: Callable, + max_number_of_parallel_tasks: Optional[int] = None, commands_before: Optional[List[pipelines.Command]] = None, + commands_after: Optional[List[pipelines.Command]] = None) -> None: super().__init__(id=id, description=description, max_number_of_parallel_tasks=max_number_of_parallel_tasks, commands_before=commands_before, commands_after=commands_after) self.file_name = file_name @@ -29,7 +29,7 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None: description=f'Runs the script with parameters {repr(parameter_tuple)}', commands=[python.ExecutePython(file_name=self.file_name, args=list(parameter_tuple))])) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: path = self.parent.base_path() / self.file_name return [('parameter function', html.highlight_syntax(inspect.getsource(self.parameter_function), 'python')), @@ -40,9 +40,9 @@ def html_doc_items(self) -> [(str, str)]: class ParallelRunFunction(pipelines.ParallelTask): - def __init__(self, id: str, description: str, function: typing.Callable, parameter_function: typing.Callable, - max_number_of_parallel_tasks: int = None, commands_before: [pipelines.Command] = None, - commands_after: [pipelines.Command] = None) -> None: + def __init__(self, id: str, description: str, function: Callable, parameter_function: Callable, + max_number_of_parallel_tasks: Optional[int] = None, commands_before: Optional[List[pipelines.Command]] = None, + commands_after: Optional[List[pipelines.Command]] = None) -> None: super().__init__(id=id, description=description, max_number_of_parallel_tasks=max_number_of_parallel_tasks, commands_before=commands_before, commands_after=commands_after) self.function = function @@ -60,7 +60,7 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None: description=f'Runs the function with parameters {repr(parameter)}', commands=[python.RunFunction(lambda args=parameter: self.function(args))])) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('function', _.pre[escape(str(self.function))]), ('parameter function', html.highlight_syntax(inspect.getsource(self.parameter_function), 'python')), diff --git a/mara_pipelines/parallel_tasks/sql.py b/mara_pipelines/parallel_tasks/sql.py index adb70ea..9ec8f84 100644 --- a/mara_pipelines/parallel_tasks/sql.py +++ b/mara_pipelines/parallel_tasks/sql.py @@ -1,6 +1,6 @@ import inspect import re -import typing +from typing import Callable, List, Optional, Dict, Tuple from mara_page import _, html from .. import config, pipelines @@ -8,11 +8,11 @@ class ParallelExecuteSQL(pipelines.ParallelTask, sql._SQLCommand): - def __init__(self, id: str, description: str, parameter_function: typing.Callable, parameter_placeholders: [str], - max_number_of_parallel_tasks: int = None, sql_statement: str = None, file_name: str = None, - commands_before: [pipelines.Command] = None, commands_after: [pipelines.Command] = None, - db_alias: str = None, echo_queries: bool = None, timezone: str = None, - replace: {str: str} = None) -> None: + def __init__(self, id: str, description: str, parameter_function: Callable, parameter_placeholders: List[str], + max_number_of_parallel_tasks: Optional[int] = None, sql_statement: Optional[str] = None, file_name: Optional[str] = None, + commands_before: Optional[List[pipelines.Command]] = None, commands_after: Optional[List[pipelines.Command]] = None, + db_alias: Optional[str] = None, echo_queries: Optional[bool] = None, timezone: Optional[str] = None, + replace: Dict[str, str] = None) -> None: if (not (sql_statement or file_name)) or (sql_statement and file_name): raise ValueError('Please provide either sql_statement or file_name (but not both)') @@ -53,7 +53,7 @@ def add_parallel_tasks(self, sub_pipeline: 'pipelines.Pipeline') -> None: sql.ExecuteSQL(sql_statement=self.sql_statement, db_alias=self.db_alias, echo_queries=self.echo_queries, timezone=self.timezone, replace=replace)])) - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: return [('db', _.tt[self.db_alias])] \ + sql._SQLCommand.html_doc_items(self, self.db_alias) \ + [('parameter function', html.highlight_syntax(inspect.getsource(self.parameter_function), 'python')), diff --git a/mara_pipelines/pipelines.py b/mara_pipelines/pipelines.py index aec1272..b303a93 100644 --- a/mara_pipelines/pipelines.py +++ b/mara_pipelines/pipelines.py @@ -1,7 +1,7 @@ import copy import pathlib import re -import typing +from typing import Optional, Dict, Set, List, Tuple, Union from . import config @@ -9,18 +9,18 @@ class Node(): """Base class for pipeline elements""" - def __init__(self, id: str, description: str, labels: {str: str} = None) -> None: + def __init__(self, id: str, description: str, labels: Optional[Dict[str, str]] = None) -> None: if not re.match('^[a-z0-9_]+$', id): raise ValueError(f'Invalid id "{id}". Should only contain lowercase letters, numbers and "_".') self.id: str = id self.description: str = description - self.labels: {str: str} = labels or {} + self.labels: Dict[str, str] = labels or {} - self.upstreams: {'Node'} = set() - self.downstreams: {'Node'} = set() + self.upstreams: Set[Node] = set() + self.downstreams: Set[Node] = set() - self.parent: typing.Optional['Pipeline'] = None - self.cost: typing.Optional[float] = None + self.parent: Optional['Pipeline'] = None + self.cost: Optional[float] = None def parents(self): @@ -30,7 +30,7 @@ def parents(self): else: return [self] - def path(self) -> [str]: + def path(self) -> List[str]: """Returns a list of ids that identify the node across all pipelines, from top to bottom""" return [node.id for node in self.parents()[1:]] @@ -64,7 +64,7 @@ def run(self) -> bool: # logger.log(f'{config.bash_command_string()} -c {shlex.quote(shell_command)}', format=logger.Format.ITALICS) return shell.run_shell_command(shell_command) - def shell_command(self): + def shell_command(self) -> str: """A bash command string that that runs the command""" raise NotImplementedError() @@ -72,7 +72,7 @@ def node_path(self): """The path of the parent node""" return self.parent.path() if self.parent else '' - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: """ Things to display in the documentation of a command. Can contain html Example: `[('filename','/tmp/foo.txt'), ('max-retries', 15)]` @@ -81,7 +81,7 @@ def html_doc_items(self) -> [(str, str)]: class Task(Node): - def __init__(self, id: str, description: str, commands: [Command] = None, max_retries: int = None) -> None: + def __init__(self, id: str, description: str, commands: Optional[List[Command]] = None, max_retries: Optional[int] = None) -> None: super().__init__(id, description) self.commands = [] self.max_retries = max_retries @@ -96,7 +96,7 @@ def add_command(self, command: Command, prepend=False): self.commands.append(command) command.parent = self - def add_commands(self, commands: [Command]): + def add_commands(self, commands: List[Command]): for command in commands: self.add_command(command) @@ -108,8 +108,8 @@ def run(self): class ParallelTask(Node): - def __init__(self, id: str, description: str, max_number_of_parallel_tasks: int = None, - commands_before: [Command] = None, commands_after: [Command] = None) -> None: + def __init__(self, id: str, description: str, max_number_of_parallel_tasks: Optional[int] = None, + commands_before: Optional[List[Command]] = None, commands_after: Optional[List[Command]] = None) -> None: super().__init__(id, description) self.commands_before = [] for command in commands_before or []: @@ -140,7 +140,7 @@ def launch(self) -> 'Pipeline': return sub_pipeline - def html_doc_items(self) -> [(str, str)]: + def html_doc_items(self) -> List[Tuple[str, str]]: """ Things to display in the documentation of the parallel task. Can contain html. Example: `[('filename','/tmp/foo.txt'), ('max-retries', 15)]` @@ -161,7 +161,7 @@ class Pipeline(Node): ignore_errors: When true, then the pipeline execution will not fail when a child node fails force_run_all_children: When true, child nodes will run even when their upstreams failed """ - nodes: {str: Node} = None + nodes: Dict[str, Node] = None initial_node: Node = None final_node: Node = None @@ -169,7 +169,7 @@ def __init__(self, id: str, description: str, max_number_of_parallel_tasks: int = None, base_path: pathlib.Path = None, - labels: {str: str} = None, + labels: Dict[str, str] = None, ignore_errors: bool = False, force_run_all_children: bool = False) -> None: super().__init__(id, description, labels) @@ -179,7 +179,7 @@ def __init__(self, id: str, self.force_run_all_children = force_run_all_children self.ignore_errors = ignore_errors - def add(self, node: Node, upstreams: [typing.Union[str, Node]] = None) -> 'Pipeline': + def add(self, node: Node, upstreams: Optional[List[Union[str, Node]]] = None) -> 'Pipeline': if node.id in self.nodes: raise ValueError(f'A node with id "{node.id}" already exists in pipeline "{self.id}"') @@ -240,7 +240,7 @@ def replace(self, node: Node, new_node) -> Node: self.remove(node) self.add(new_node) - def add_dependency(self, upstream: typing.Union[Node, str], downstream: typing.Union[Node, str]): + def add_dependency(self, upstream: Union[Node, str], downstream: Union[Node, str]): if isinstance(upstream, str): if not upstream in self.nodes: raise KeyError(f'Node "{upstream}" not found in pipeline "{self.id}"') @@ -282,7 +282,7 @@ def base_path(self): return self._base_path or (self.parent.base_path() if self.parent else pathlib.Path('.')) -def find_node(path: [str]) -> (Node, bool): +def find_node(path: List[str]) -> Tuple[Node, bool]: """ Retrieves a node by the the path from its parents Args: diff --git a/mara_pipelines/shell.py b/mara_pipelines/shell.py index be54c7f..90607bd 100644 --- a/mara_pipelines/shell.py +++ b/mara_pipelines/shell.py @@ -2,12 +2,13 @@ import time import shlex +from typing import Dict, List, Optional, Union from . import config from .logging import logger -def run_shell_command(command: str, log_command: bool = True): +def run_shell_command(command: str, log_command: bool = True) -> Union[List[str], bool]: """ Runs a command in a bash shell and logs the output of the command in (near)real-time. @@ -66,7 +67,7 @@ def read_process_stderr(): return output_lines or True -def sed_command(replace: {str: str}) -> str: +def sed_command(replace: Dict[str, str]) -> str: """ Creates a sed command string from a dictionary of replacements @@ -85,7 +86,7 @@ def quote(s): + '"' -def http_request_command(url: str, headers: {str: str} = None, method: str = 'GET', body: str = None, body_from_stdin: bool = False) -> str: +def http_request_command(url: str, headers: Optional[Dict[str, str]] = None, method: str = 'GET', body: Optional[str] = None, body_from_stdin: bool = False) -> str: """ Creates a curl command sending a HTTP request diff --git a/mara_pipelines/ui/cli.py b/mara_pipelines/ui/cli.py index 8d93a4e..15a3b48 100644 --- a/mara_pipelines/ui/cli.py +++ b/mara_pipelines/ui/cli.py @@ -1,13 +1,14 @@ """Command line interface for running data pipelines""" import sys +from typing import Set import click from .. import config, pipelines -def run_pipeline(pipeline: pipelines.Pipeline, nodes: {pipelines.Node} = None, +def run_pipeline(pipeline: pipelines.Pipeline, nodes: Set[pipelines.Node] = None, with_upstreams: bool = False, interactively_started: bool = False, disable_colors: bool = False) -> bool: diff --git a/mara_pipelines/ui/dependency_graph.py b/mara_pipelines/ui/dependency_graph.py index 0f9df33..899d3de 100644 --- a/mara_pipelines/ui/dependency_graph.py +++ b/mara_pipelines/ui/dependency_graph.py @@ -1,6 +1,7 @@ """Visualization of pipelines using graphviz""" import functools +from typing import Dict import flask @@ -30,7 +31,7 @@ def dependency_graph(path: str): @functools.singledispatch -def dependency_graph(nodes: {str: pipelines.Node}, +def dependency_graph(nodes: Dict[str, pipelines.Node], current_node: pipelines.Node = None) -> str: """ Draws a list of pipeline nodes and the dependencies between them using graphviz diff --git a/mara_pipelines/ui/last_runs.py b/mara_pipelines/ui/last_runs.py index 21ef10b..f79c3d2 100644 --- a/mara_pipelines/ui/last_runs.py +++ b/mara_pipelines/ui/last_runs.py @@ -2,6 +2,7 @@ import datetime import json +from typing import List import flask import psycopg2.extensions @@ -201,7 +202,7 @@ def timeline_chart(path: str, run_id: int): return '' -def _latest_run_id(node_path: [str]): +def _latest_run_id(node_path: List[str]): with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor cursor.execute('SELECT max(run_id) FROM data_integration_node_run WHERE node_path=%s', (node_path,)) return cursor.fetchone()[0]