Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ from dagster_sqlmesh import sqlmesh_assets, SQLMeshContextConfig, SQLMeshResourc

sqlmesh_config = SQLMeshContextConfig(path="/home/foo/sqlmesh_project", gateway="name-of-your-gateway")

@sqlmesh_assets(config=sqlmesh_config)
@sqlmesh_assets(environment="dev", config=sqlmesh_config)
def sqlmesh_project(context: AssetExecutionContext, sqlmesh: SQLMeshResource):
yield from sqlmesh.run(context)

Expand Down
7 changes: 4 additions & 3 deletions dagster_sqlmesh/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
RetryPolicy,
)

from dagster_sqlmesh.controller.dagster import DagsterSQLMeshController
from dagster_sqlmesh.controller import DagsterSQLMeshController
from dagster_sqlmesh.translator import SQLMeshDagsterTranslator

from .config import SQLMeshContextConfig
Expand All @@ -17,6 +17,7 @@
# Define a SQLMesh Asset
def sqlmesh_assets(
*,
environment: str,
config: SQLMeshContextConfig,
name: t.Optional[str] = None,
dagster_sqlmesh_translator: t.Optional[SQLMeshDagsterTranslator] = None,
Expand All @@ -25,10 +26,10 @@ def sqlmesh_assets(
required_resource_keys: t.Optional[t.Set[str]] = None,
retry_policy: t.Optional[RetryPolicy] = None,
):
controller = DagsterSQLMeshController.setup(config)
controller = DagsterSQLMeshController.setup_with_config(config)
if not dagster_sqlmesh_translator:
dagster_sqlmesh_translator = SQLMeshDagsterTranslator()
conversion = controller.to_asset_outs(dagster_sqlmesh_translator)
conversion = controller.to_asset_outs(environment, dagster_sqlmesh_translator)

return multi_asset(
name=name,
Expand Down
26 changes: 22 additions & 4 deletions dagster_sqlmesh/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ def sample_sqlmesh_project():

@dataclass
class SQLMeshTestContext:
"""A test context for running SQLMesh"""

db_path: str
context_config: SQLMeshContextConfig

def create_controller(self, enable_debug_console: bool = False):
console = None
if enable_debug_console:
console = get_console()
return DagsterSQLMeshController.setup(
return DagsterSQLMeshController.setup_with_config(
self.context_config, debug_console=console
)

Expand Down Expand Up @@ -96,17 +98,33 @@ def append_to_test_source(self, df: polars.DataFrame):
"""
)

def run(
def plan_and_run(
self,
*,
environment: str,
apply: bool = False,
execution_time: t.Optional[TimeLike] = None,
enable_debug_console: bool = False,
start: t.Optional[TimeLike] = None,
end: t.Optional[TimeLike] = None,
restate_models: t.Optional[t.List[str]] = None,
):
"""Runs plan and run on SQLMesh with the given configuration and record all of the generated events.

Args:
environment (str): The environment to run SQLMesh in.
execution_time (TimeLike, optional): The execution timestamp for the run. Defaults to None.
enable_debug_console (bool, optional): Flag to enable debug console. Defaults to False.
start (TimeLike, optional): Start time for the run interval. Defaults to None.
end (TimeLike, optional): End time for the run interval. Defaults to None.
restate_models (List[str], optional): List of models to restate. Defaults to None.

Returns:
None: The function records events to a debug console but doesn't return anything.

Note:
TimeLike can be any time-like object that SQLMesh accepts (datetime, str, etc.).
The function creates a controller and recorder to capture all SQLMesh events during execution.
"""
controller = self.create_controller(enable_debug_console=enable_debug_console)
recorder = ConsoleRecorder()
# controller.add_event_handler(ConsoleRecorder())
Expand All @@ -126,7 +144,7 @@ def run(
plan_options["end"] = end
run_options["end"] = end

for _context, event in controller.plan_and_run(
for event in controller.plan_and_run(
environment,
plan_options=plan_options,
run_options=run_options,
Expand Down
14 changes: 14 additions & 0 deletions dagster_sqlmesh/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,18 @@ class ConsoleException:


class EventConsole(Console):
"""
A console implementation that manages and publishes events related to
SQLMesh operations. The sqlmesh console implementation is mostly for it's
CLI application and doesn't take into account using sqlmesh as a library.
This event pub/sub interface allows us to capture events and choose how we
wish to handle it with N number of handlers.

This class extends the Console class and provides functionality to handle
various events during SQLMesh processes such as plan evaluation, creation,
promotion, migration, and testing.
"""

categorizer: t.Optional[SnapshotCategorizer] = None

def __init__(self, log_override: t.Optional[logging.Logger] = None):
Expand Down Expand Up @@ -475,6 +487,8 @@ def exception(self, exc: Exception):


class DebugEventConsole(EventConsole):
"""A console that wraps an existing console and logs all events to a logger"""

def __init__(self, console: Console):
super().__init__()
self._console = console
Expand Down
3 changes: 3 additions & 0 deletions dagster_sqlmesh/controller/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ruff: noqa: F403 F401
from .base import SQLMeshController, SQLMeshInstance, PlanOptions, RunOptions
from .dagster import DagsterSQLMeshController
Loading
Loading