### Step 1 - Run a flow (`restore_state_test_flow.py`)

This flow is the `ds-projen` sample flow with some minor changes, so it is pretty representative of our production flows.

It:

1. executes SQL queries in Snowflake using the `publich()` function
2. accesses a secret (`outerbounds.brain-api-pattern-nlp`)--which is used to hit the Brain API via our internal Brain SDK

In [3]:
import subprocess
import sys
import uuid


def execute_with_output(cmd):
    """Execute a command and yield output lines as they are produced."""
    process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,  # Merge stderr into stdout
        universal_newlines=True,
        bufsize=1,
    )

    for line in iter(process.stdout.readline, ""):
        yield line

    process.stdout.close()
    return_code = process.wait()
    if return_code:
        raise subprocess.CalledProcessError(return_code, cmd)


cmd = [
    sys.executable,
    "restore_state_test_flow.py",
    "--environment=fast-bakery",
    "--with=card",
    "run",
    f"--random_param={uuid.uuid4()}",
]

print("\n=== Metaflow Output ===")
for line in execute_with_output(cmd):
    print(line, end="")


=== Metaflow Output ===
  warn_incompatible_dep(
Metaflow 2.15.21.3+obcheckpoint(0.2.4);ob(v1) executing TestRestoreFlowState for user:eric.riddoch@pattern.com
Project: example, Branch: user.eric.riddoch@pattern.com
Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
Bootstrapping virtual environment(s) ...
Virtual environment(s) bootstrapped!
2025-07-23 19:27:19.741 Workflow starting (run-id 20704), see it in the UI at https://ui.pattern.obp.outerbounds.com/p/default/TestRestoreFlowState/20704
2025-07-23 19:27:21.996 [20704/start/494578 (pid 76924)] Task is starting.
2025-07-23 19:27:24.662 [20704/start/494578 (pid 76924)] self.config.n_rows=10
2025-07-23 19:27:30.543 [20704/start/494578 (pid 76924)] warn_incompatible_dep(
2025-07-23 19:27:31.065 [20704/start/494578 (pid 76924)] Task finished successfully.
2025-07-23 19:27:31.642 [20704/execute_sql/494579 (pid 77004)] Task is starting.
2025-07-23 19:27:34.681 [20704/execute_sql/494579 (pid 77004)]

### Step 2 - Restore the state

✨ These are loaded automatically

1. Flow parameters
2. Flow configs
3. Flow attributes/artifacts, i.e. `self.df`

✨ The `current` singleton, i.e. `from metaflow import current` is mocked so the `publish()` function will work automatically.

✨ Optionally, you can specify `secrets=[...]` to export to the environment as well, equivalent to the `@secrets` decorator.

✨ Autocompletion is available in your editor for ALL attributes of `self.`. Even the ones specific to your flow, e.g. `self.config.n_rows`, `self.df`, etc.

In [None]:
import os

from restore_state_test_flow import TestRestoreFlowState

from ds_platform_utils.metaflow import restore_step_state

self = restore_step_state(TestRestoreFlowState, secrets=["outerbounds.brain-api-pattern-nlp"])

print(f"{self.config=}")
print(f"{self.config.n_rows=}")
print(f"{self.random_param=}")
print(self.df)

assert "BRAIN_API_KEY" in os.environ.keys()

self.config={'n_rows': 10, 'table_name': 'test_delete_me'}
self.config.n_rows=10
self.random_param='a972246f-9183-498e-9650-56c7f042b53e'
   id                                          message
0   1  Hello from Snowflake! - Processed from Metaflow


### Step 3 - Copy/paste step code and iterate!

The following is code copies directly from one of the `@step`-decorated functions in the flow.

In [2]:
import os
from textwrap import dedent

from ds_platform_utils.metaflow import publish

os.environ["DEBUG_QUERY"] = "1"

publish(
    table_name=self.config.table_name,
    query=dedent("""\
    -- Create a test table
    CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.{{table_name}} (
        id INT,
        message STRING
    );

    -- Insert a test row
    INSERT INTO PATTERN_DB.{{schema}}.{{table_name}} (id, message)
    VALUES (1, 'Hello from Snowflake!');

    -- Read the row
    SELECT * FROM PATTERN_DB.{{schema}}.{{table_name}};
    """),
)


=== DEBUG SQL QUERY ===
ALTER SESSION SET TIMEZONE = 'UTC';
ALTER SESSION SET QUERY_TAG = 'project_name not implemented';


=== DEBUG SQL QUERY ===
-- Create a test table
CREATE OR REPLACE TABLE PATTERN_DB.DATA_SCIENCE_STAGE.test_delete_me (
    id INT,
    message STRING
);

-- Insert a test row
INSERT INTO PATTERN_DB.DATA_SCIENCE_STAGE.test_delete_me (id, message)
VALUES (1, 'Hello from Snowflake!');

-- Read the row
SELECT * FROM PATTERN_DB.DATA_SCIENCE_STAGE.test_delete_me;

