Skip to content

Commit

Permalink
Add a function to ad-hoc materialize a table for interactive debugging (
Browse files Browse the repository at this point in the history
#117)

* Add pipedag.debug.materialize_table, a function to ad-hoc materialize a table from a task for interactive debugging
  • Loading branch information
nicolasmueller committed Nov 14, 2023
1 parent a20dc1f commit c5051cd
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 4 deletions.
4 changes: 4 additions & 0 deletions src/pydiverse/pipedag/core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ def __init__(
# Compute input tasks
self.input_tasks: dict[int, Task] = {}

# Flag is set if table was materialized for debugging.
# Setting this flag will fail the task.
self.debug_tainted = False

def visitor(x):
if isinstance(x, Task):
self.input_tasks[x.id] = x
Expand Down
5 changes: 5 additions & 0 deletions src/pydiverse/pipedag/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from pydiverse.pipedag.materialize.debug import materialize_table

__all__ = ["materialize_table"]
5 changes: 5 additions & 0 deletions src/pydiverse/pipedag/materialize/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ def _input_tables_visitor(x):
)

result = self.fn(*args, **kwargs)
if task.debug_tainted:
raise RuntimeError(
f"The task {task.name} has been tainted by interactive debugging."
f" Aborting."
)
result = store.materialize_task(task, task_cache_info, result)

# Delete underlying objects from result (after materializing them)
Expand Down
76 changes: 76 additions & 0 deletions src/pydiverse/pipedag/materialize/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

import random

import structlog

from pydiverse.pipedag import ConfigContext, Table
from pydiverse.pipedag.backend import SQLTableStore
from pydiverse.pipedag.backend.table.sql.ddl import DropTable
from pydiverse.pipedag.context import TaskContext
from pydiverse.pipedag.materialize.core import MaterializingTask
from pydiverse.pipedag.materialize.store import mangle_table_name
from pydiverse.pipedag.util.hashing import stable_hash


def materialize_table(
table: Table,
debug_suffix: str | None = None,
flag_task_debug_tainted: bool = True,
keep_table_name: bool = True,
drop_if_exists: bool = True,
):
"""
This function allows the user to materialize a table ad-hoc
whenever the TaskContext is defined.
This can be useful during interactive debugging.
If table.name is not given, then it is derived from the task name and a suffix.
The suffix is either the debug_suffix (if given) or a random suffix.
If the table name ends in %%, the %% are also replaced by a suffix.
:param table: The table to be materialized.
:param debug_suffix: A suffix to be appended to the table name
for debugging purposes. Default: None
:param flag_task_debug_tainted: Whether to flag the task as tainted
by this debug materialization. The flag will cause an exception if execution is
continued. Default: True
:param keep_table_name: if False, the table.name will be equal to the debug name
after return. Otherwise, the original name of the table is preserved.
Default: True
:param drop_if_exists: If True, try to drop the table (if exists) before recreating.
This is only supported for SQL table stores.
"""
config_context = ConfigContext.get()
table_store = config_context.store.table_store

task_context = TaskContext.get()
task: MaterializingTask = task_context.task # type: ignore

table.stage = task.stage

suffix = (
stable_hash(str(random.randbytes(8))) + "_0000" if debug_suffix is None else ""
)
old_table_name = table.name
table.name = mangle_table_name(table.name, task.name, suffix)
if debug_suffix is not None:
table.name += debug_suffix

if flag_task_debug_tainted:
task.debug_tainted = True

if drop_if_exists:
if isinstance(table_store, SQLTableStore):
schema = table_store.get_schema(task.stage.transaction_name)
table_store.execute(DropTable(table.name, schema, if_exists=True))
else:
logger = structlog.get_logger(logger_name="Debug materialize_table")
logger.warning(
"drop_if_exists not supported for non SQLTableStore table stores."
)
table_store.store_table(table, task)

new_table_name = table.name
if keep_table_name:
table.name = old_table_name
return new_table_name
13 changes: 9 additions & 4 deletions src/pydiverse/pipedag/materialize/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,7 @@ def preparation_mutator(x):
object_number = next(auto_suffix_counter)
auto_suffix = f"{task_cache_info.cache_key}" f"_{object_number:04d}"

if x.name is None:
x.name = task.name + "_" + auto_suffix
elif x.name.endswith("%%"):
x.name = x.name[:-2] + auto_suffix
x.name = mangle_table_name(x.name, task.name, auto_suffix)

if isinstance(x, Table):
if x.obj is None:
Expand Down Expand Up @@ -572,3 +569,11 @@ def dematerialize_output_from_store(
item, as_type=as_type, ctx=run_context
),
)


def mangle_table_name(table_name: str, task_name: str, suffix: str):
if table_name is None:
table_name = task_name + "_" + suffix
elif table_name.endswith("%%"):
table_name = table_name[:-2] + suffix
return table_name
19 changes: 19 additions & 0 deletions tests/test_materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ def test_materialize_table_twice():
assert f.run().successful


def test_debug_materialize_table_no_taint():
with Flow("flow") as f:
with Stage("stage"):
x = m.simple_dataframe_debug_materialize_no_taint()
m.assert_table_equal(x, x)

assert f.run().successful


def test_debug_materialize_table_twice():
with Flow("flow") as f:
with Stage("stage"):
x = m.simple_dataframe_debug_materialize_twice()
m.assert_table_equal(x, x)

with pytest.raises(RuntimeError, match="interactive debugging"):
assert f.run().successful


def test_materialize_blob():
with Flow("flow") as f:
with Stage("stage_0"):
Expand Down
32 changes: 32 additions & 0 deletions tests/util/tasks_library.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sqlalchemy.dialects

from pydiverse.pipedag import Blob, RawSql, Table, materialize
from pydiverse.pipedag.debug import materialize_table


@materialize(input_type=pd.DataFrame, version="1.0")
Expand Down Expand Up @@ -71,6 +72,37 @@ def simple_dataframe():
return Table(df)


@materialize(version="1.0")
def simple_dataframe_debug_materialize_no_taint():
df = pd.DataFrame(
{
"col1": [0, 1, 2, 3],
"col2": ["0", "1", "2", "3"],
}
)
res = Table(df, name="test_table")
materialize_table(res, flag_task_debug_tainted=False, debug_suffix="debug")
return res


@materialize(version="1.0")
def simple_dataframe_debug_materialize_twice():
df = pd.DataFrame(
{
"col1": [0, 1, 2, 3],
"col2": ["0", "1", "2", "3"],
}
)
res = Table(df)
materialize_table(res, flag_task_debug_tainted=True, debug_suffix="debug")

df.iloc[3] = [4, "4"]
res = Table(df)
materialize_table(res, flag_task_debug_tainted=True, debug_suffix="debug")

return res


@materialize(version="1.0")
def simple_dataframe_with_pk():
df = pd.DataFrame(
Expand Down

0 comments on commit c5051cd

Please sign in to comment.