Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add execution contexts #69

Draft
wants to merge 21 commits into
base: 4.0.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1b6ab89
use mara_storage for abstraction of local storage interactions
ice1e0 Oct 21, 2020
506c4ec
serveral build fixes
ice1e0 Oct 27, 2020
a530b4c
improve version handling + set expected release version to 3.2.0
ice1e0 Oct 28, 2020
7278b0a
upgrade to mara-storage 1.0.0
leo-schick Jun 19, 2022
ff187b8
fix _ParallelRead since merge of mara_storage #55
leo-schick Jun 19, 2022
a0de9d4
improve version handling + set expected release version to 3.2.0
ice1e0 Oct 28, 2020
f3f177a
add execution context
leo-schick Apr 25, 2022
367d9f0
ssh add password option and fix pipefail option execution
leo-schick Apr 26, 2022
161edac
fix ssh execution context + move to contexts.bash module
leo-schick Apr 26, 2022
c53cc52
add docker execution context
leo-schick Apr 26, 2022
21daa2e
fix Copy command execution context
leo-schick Apr 27, 2022
39b1d56
use task.run(context=context) where implemented + use optimistic patt…
leo-schick May 4, 2022
21fb892
fix passing args/kargs to task/command.run
leo-schick May 5, 2022
1e1609a
clean code
leo-schick May 5, 2022
a939b8d
use cache for contexts.context(alias) function
leo-schick May 5, 2022
a7272c4
fix run
leo-schick May 17, 2022
ce2e06a
fix support legacy commands run
leo-schick May 20, 2022
fbc6f95
support executing scripts through docker context V2
leo-schick Jun 13, 2022
99e11d8
fix duplicated arg. -c
leo-schick Jun 15, 2022
b926b32
fix pipeline hang when context is not configured
leo-schick Jul 11, 2022
cfd89fd
use always logger, not print
leo-schick Feb 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions mara_pipelines/commands/files.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Commands for reading files"""

import deprecation
import json
import pathlib
import shlex
Expand All @@ -9,19 +10,28 @@

import mara_db.dbs
import mara_db.shell
import mara_storage.storages
from mara_storage.shell import read_file_command
from . import sql
from mara_page import _, html
from .. import config, pipelines
import mara_pipelines


class Compression(enum.Enum):
@deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0',
current_version=mara_pipelines.__version__,
details='Use mara_storage.compression.Compression instead')
class Compression(enum.EnumMeta):
"""Different compression formats that are understood by file readers"""
NONE = 'none'
GZIP = 'gzip'
TAR_GZIP = 'tar.gzip'
ZIP = 'zip'


@deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0',
current_version=mara_pipelines.__version__,
details='Use mara_storage.compression.uncompressor instead')
def uncompressor(compression: Compression) -> str:
"""Maps compression methods to command line programs that can unpack the respective files"""
return {Compression.NONE: 'cat',
Expand All @@ -35,7 +45,8 @@ class ReadFile(pipelines.Command):

def __init__(self, file_name: str, compression: Compression, target_table: str,
mapper_script_file_name: str = None, make_unique: bool = False,
db_alias: str = None, csv_format: bool = False, skip_header: bool = False,
db_alias: str = None, storage_alias: str = None,
csv_format: bool = False, skip_header: bool = False,
delimiter_char: str = None, quote_char: str = None,
null_value_string: str = None, timezone: str = None) -> None:
super().__init__()
Expand All @@ -48,6 +59,7 @@ def __init__(self, file_name: str, compression: Compression, target_table: str,
self.csv_format = csv_format
self.skip_header = skip_header
self._db_alias = db_alias
self._storage_alias = storage_alias
self.delimiter_char = delimiter_char
self.quote_char = quote_char
self.null_value_string = null_value_string
Expand All @@ -56,6 +68,10 @@ def __init__(self, file_name: str, compression: Compression, target_table: str,
def db_alias(self):
return self._db_alias or config.default_db_alias()

@property
def storage_alias(self):
return self._storage_alias or config.default_storage_alias()

def shell_command(self):
copy_from_stdin_command = mara_db.shell.copy_from_stdin_command(
self.db_alias(), csv_format=self.csv_format, target_table=self.target_table,
Expand All @@ -64,14 +80,17 @@ def shell_command(self):
null_value_string=self.null_value_string, timezone=self.timezone)
if not isinstance(mara_db.dbs.db(self.db_alias()), mara_db.dbs.BigQueryDB):
return \
f'{uncompressor(self.compression)} "{pathlib.Path(config.data_dir()) / self.file_name}" \\\n' \
f'{read_file_command(self.storage_alias, file_name=self.file_name, compression=self.compression)} \\\n' \
+ (f' | {shlex.quote(sys.executable)} "{self.mapper_file_path()}" \\\n'
if self.mapper_script_file_name else '') \
+ (' | sort -u \\\n' if self.make_unique else '') \
+ ' | ' + copy_from_stdin_command
else:
# Bigquery loading does not support streaming data through pipes
return copy_from_stdin_command + f" {pathlib.Path(config.data_dir()) / self.file_name}"
storage = mara_storage.storages.storage(self.storage_alias)
if not isinstance(storage, mara_storage.storages.LocalStorage):
raise ValueError('The ReadFile to a BigQuery database can only be used from a storage alias of type LocalStorage')
return copy_from_stdin_command + f' {shlex.quote(str( (storage.base_path / self.file_name).absolute() ))}'

def mapper_file_path(self):
return self.parent.parent.base_path() / self.mapper_script_file_name
Expand All @@ -86,6 +105,7 @@ def html_doc_items(self) -> [(str, str)]:
('make unique', _.tt[self.make_unique]),
('target_table', _.tt[self.target_table]),
('db alias', _.tt[self.db_alias()]),
('storage alias', _.tt[self.storage_alias]),
('csv format', _.tt[self.csv_format]),
('skip header', _.tt[self.skip_header]),
('delimiter char',
Expand All @@ -100,29 +120,41 @@ def html_doc_items(self) -> [(str, str)]:
class ReadSQLite(sql._SQLCommand):
def __init__(self, sqlite_file_name: str, target_table: str,
sql_statement: str = None, sql_file_name: str = None, replace: {str: str} = None,
db_alias: str = None, timezone: str = None) -> None:
db_alias: str = None, storage_alias: str = None, timezone: str = None) -> None:
if not isinstance(mara_storage.storages.storage(storage_alias), mara_storage.storages.LocalStorage):
raise ValueError('The ReadSQLite task can only be used from a storage alias of type LocalStorage')
sql._SQLCommand.__init__(self, sql_statement, sql_file_name, replace)
self.sqlite_file_name = sqlite_file_name

self.target_table = target_table
self._db_alias = db_alias
self._storage_alias = storage_alias
self.timezone = timezone

@property
def db_alias(self):
return self._db_alias or config.default_db_alias()

@property
def storage_alias(self):
return self._storage_alias or config.default_storage_alias()

def shell_command(self):
storage = mara_storage.storages.storage(self.storage_alias)
if not isinstance(storage, mara_storage.storages.LocalStorage):
raise ValueError('The ReadSQLite task can only be used from a storage alias of type LocalStorage')

return (sql._SQLCommand.shell_command(self)
+ ' | ' + mara_db.shell.copy_command(
mara_db.dbs.SQLiteDB(file_name=config.data_dir().absolute() / self.sqlite_file_name),
mara_db.dbs.SQLiteDB(file_name=(storage.base_path / self.sqlite_file_name).absolute()),
self.db_alias, self.target_table, timezone=self.timezone))

def html_doc_items(self) -> [(str, str)]:
return [('sqlite file name', _.i[self.sqlite_file_name])] \
+ sql._SQLCommand.html_doc_items(self, None) \
+ [('target_table', _.tt[self.target_table]),
('db alias', _.tt[self.db_alias]),
('storage alias', _.tt[self.storage_alias]),
('time zone', _.tt[self.timezone]),
(_.i['shell command'], html.highlight_syntax(self.shell_command(), 'bash'))]

Expand Down
6 changes: 3 additions & 3 deletions mara_pipelines/commands/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, function: Callable = None, args: [str] = None, file_dependenc
self.args = args or []
self.file_dependencies = file_dependencies or []

def run(self) -> bool:
def run(self, *args, **kargs) -> bool:
dependency_type = 'RunFunction ' + self.function.__name__
if self.file_dependencies:
assert (self.parent)
Expand Down Expand Up @@ -78,7 +78,7 @@ def file_name(self):
def args(self):
return self._args() if callable(self._args) else self._args

def run(self) -> bool:
def run(self, *args, **kargs) -> bool:
dependency_type = 'ExecutePython ' + self.file_name
if self.file_dependencies:
assert (self.parent)
Expand All @@ -89,7 +89,7 @@ def run(self) -> bool:
logger.log('no changes')
return True

if not super().run():
if not super().run(*args, **kargs):
return False

if self.file_dependencies:
Expand Down
36 changes: 19 additions & 17 deletions mara_pipelines/commands/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(self, sql_statement: str = None, sql_file_name: Union[str, Callable
def db_alias(self):
return self._db_alias or config.default_db_alias()

def run(self) -> bool:
def run(self, *args, **kargs) -> bool:
if self.sql_file_name:
logger.log(self.sql_file_name, logger.Format.ITALICS)

Expand All @@ -124,7 +124,7 @@ def run(self) -> bool:
# probably not be there (usually the first step is a DROP).
file_dependencies.delete(self.node_path(), dependency_type)

if not super().run():
if not super().run(*args, **kargs):
return False

if self.file_dependencies:
Expand Down Expand Up @@ -167,7 +167,7 @@ def target_db_alias(self):
def file_path(self) -> pathlib.Path:
return self.parent.parent.base_path() / self.sql_file_name

def run(self) -> bool:
def run(self, *args, **kargs) -> bool:
if self.sql_file_name:
logger.log(self.sql_file_name, logger.Format.ITALICS)

Expand All @@ -187,7 +187,7 @@ def run(self) -> bool:
# (see also above in ExecuteSQL)
file_dependencies.delete(self.node_path(), dependency_type)

if not super().run():
if not super().run(*args, **kargs):
return False

if self.file_dependencies:
Expand Down Expand Up @@ -263,15 +263,17 @@ def __init__(self, source_db_alias: str, source_table: str,
def target_db_alias(self):
return self._target_db_alias or config.default_db_alias()

def run(self) -> bool:
def run(self, *args, **kargs) -> bool:
run_shell_command = kargs['context'].run_shell_command if 'context' in kargs else shell.run_shell_command

# retrieve the highest current value for the modification comparison (e.g.: the highest timestamp)
# We intentionally use the command line here (rather than sqlalchemy) to avoid forcing people python drivers,
# which can be hard for example in the case of SQL Server
logger.log(f'Get new max modification comparison value...', format=logger.Format.ITALICS)
max_value_query = f'SELECT max({self.modification_comparison}) AS maxval FROM {self.source_table}'
logger.log(max_value_query, format=logger.Format.VERBATIM)
result = shell.run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | '
+ mara_db.shell.copy_to_stdout_command(self.source_db_alias))
result = run_shell_command(f'echo {shlex.quote(max_value_query)} \\\n | '
+ mara_db.shell.copy_to_stdout_command(self.source_db_alias))

if not result:
return False
Expand Down Expand Up @@ -323,7 +325,7 @@ def run(self) -> bool:
# overwrite the comparison criteria to get everything
replace = {self.comparison_value_placeholder: '(1=1)'}
complete_copy_command = self._copy_command(self.target_table, replace)
if not shell.run_shell_command(complete_copy_command):
if not run_shell_command(complete_copy_command):
return False

else:
Expand All @@ -332,16 +334,16 @@ def run(self) -> bool:
create_upsert_table_query = (f'DROP TABLE IF EXISTS {self.target_table}_upsert;\n'
+ f'CREATE TABLE {self.target_table}_upsert AS SELECT * from {self.target_table} WHERE FALSE')

if not shell.run_shell_command(f'echo {shlex.quote(create_upsert_table_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
if not run_shell_command(f'echo {shlex.quote(create_upsert_table_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
return False

# perform the actual copy replacing the placeholder
# with the comparison value from the latest successful execution
modification_comparison_type = self.modification_comparison_type or ''
replace = {self.comparison_value_placeholder:
f'({self.modification_comparison} >= {modification_comparison_type} \'{last_comparison_value}\')'}
if not shell.run_shell_command(self._copy_command(self.target_table + '_upsert', replace)):
if not run_shell_command(self._copy_command(self.target_table + '_upsert', replace)):
return False

# now the upsert table has to be merged with the target one
Expand Down Expand Up @@ -370,11 +372,11 @@ def run(self) -> bool:
SELECT src.*
FROM {self.target_table}_upsert src
WHERE NOT EXISTS (SELECT 1 FROM {self.target_table} dst WHERE {key_definition})"""
if not shell.run_shell_command(f'echo {shlex.quote(update_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
if not run_shell_command(f'echo {shlex.quote(update_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
return False
elif not shell.run_shell_command(f'echo {shlex.quote(insert_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
elif not run_shell_command(f'echo {shlex.quote(insert_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
return False
else:
upsery_query = f"""
Expand All @@ -383,8 +385,8 @@ def run(self) -> bool:
FROM {self.target_table}_upsert
ON CONFLICT ({key_definition})
DO UPDATE SET {set_clause}"""
if not shell.run_shell_command(f'echo {shlex.quote(upsery_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
if not run_shell_command(f'echo {shlex.quote(upsery_query)} \\\n | '
+ mara_db.shell.query_command(self.target_db_alias)):
return False

# update data_integration_incremental_copy_status
Expand Down
30 changes: 30 additions & 0 deletions mara_pipelines/config.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
"""Configuration of data integration pipelines and how to run them"""

import datetime
import deprecation
import functools
import multiprocessing
import pathlib
import typing

from mara_app.monkey_patch import patch
import mara_storage.config
import mara_storage.storages

from . import pipelines, events
from .contexts import ExecutionContext
from .contexts.bash import BashExecutionContext
import mara_pipelines


def root_pipeline() -> 'pipelines.Pipeline':
"""A pipeline that contains all other pipelines of the project"""
return pipelines.demo_pipeline()


@deprecation.deprecated(deprecated_in='3.2.0', removed_in='4.0.0',
current_version=mara_pipelines.__version__,
details='Use mara_storage.config.storages instead')
def data_dir() -> str:
"""Where to find local data files"""
return str(pathlib.Path('data').absolute())
Expand All @@ -24,6 +35,15 @@ def default_db_alias() -> str:
return 'dwh-etl'


def default_storage_alias() -> str:
"""The alias of the storage that should be used when not specified otherwise"""
return 'data'

@patch(mara_storage.config.storages)
def storages() -> {str: mara_storage.storages.Storage}:
return {'data': mara_storage.storages.LocalStorage(base_path=pathlib.Path(data_dir()))}


def default_task_max_retries():
"""How many times a task is retried when it fails by default """
return 0
Expand All @@ -49,6 +69,16 @@ def bash_command_string() -> str:
return '/usr/bin/env bash -o pipefail'


def default_execution_context() -> str:
"""Sets the default execution context"""
return 'bash'


def execution_contexts() -> {str: ExecutionContext}:
"""The available execution contexts"""
return {'bash': BashExecutionContext()}


def system_statistics_collection_period() -> typing.Union[float, None]:
"""
How often should system statistics be collected in seconds.
Expand Down
Loading