# dlt & SQLMesh Runner

## Set Environment Vars

In [None]:
import os
os.environ["CREDENTIALS__AZURE_STORAGE_ACCOUNT_NAME"] = "onelake"
os.environ["CREDENTIALS__AZURE_ACCOUNT_HOST"] = "onelake.blob.fabric.microsoft.com"

### Load From KeyVault

In [None]:
import os

keyvault = "mattiasthalen-fabric"

credentials = [
    "CREDENTIALS__AZURE_TENANT_ID",
    "CREDENTIALS__AZURE_CLIENT_ID",
    "CREDENTIALS__AZURE_CLIENT_SECRET",
]

for credential in credentials:
    secret = credential.replace("_", "-")
    value = notebookutils.credentials.getSecret(f"https://{keyvault}.vault.azure.net/", secret)
    os.environ[credential] = value

    print(f"{credential} = {os.getenv(credential)}")

### Load From Workspace

In [None]:
import sempy.fabric as fabric
import os
import pandas as pd

state__item_name = "sqlmesh__state"
lakehouse__item_name = "landing_zone"

client = fabric.FabricRestClient()

# Get workspace id and items
workspace_id = fabric.get_workspace_id()
workspace__response = client.get(f"/v1/workspaces/{workspace_id}/items")
workspace__items = pd.json_normalize(workspace__response.json()['value'])

# Get state endpoint and database name
state__items = workspace__items[workspace__items["displayName"] == state__item_name]
state__item_id = state__items[state__items["type"] == "SQLDatabase"].iloc[0, 0]

state__response = client.get(f"/v1/workspaces/{workspace_id}/sqlDatabases/{state__item_id}")
state__json = state__response.json()
state__database = state__json["properties"]["databaseName"]
state__endpoint = state__json["properties"]["serverFqdn"].split(",")[0]

# Get warehouse endpoint
warehouse__endpoint = state__endpoint.replace(".database.", ".datawarehouse.")

# Get lakehouse abfss path
lakehouse__items = workspace__items[workspace__items["displayName"] == lakehouse__item_name]
lakehouse__item_id = lakehouse__items[lakehouse__items["type"] == "Lakehouse"].iloc[0, 0]
bucket_url = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse__item_id}/Tables"

# Set env vars
vars = {
    "FABRIC__WORKSPACE_ID": workspace_id,
    "FABRIC__WAREHOUSE_ENDPOINT": warehouse__endpoint,
    "FABRIC__STATE_ENDPOINT": state__endpoint,
    "FABRIC__STATE_DATABASE": state__database,
    "DESTINATION__BUCKET_URL": bucket_url,
}

for name, value in vars.items():
    if not value:
        print(f"Null value for {name}")
    
    if value:
        os.environ[name] = value
        print(f"{name} = {os.getenv(name)}")

## Run Project

In [None]:
def run_commands(commands, log_path):
    import subprocess, tempfile
    import datetime
    import time

    # Create timestamp-based log file name to avoid conflicts and enable chronological sorting
    # UTC ensures consistent timestamps regardless of local timezone
    timestamp = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    log_file = f"commands__{timestamp}.log"
    full_log_path = f"{log_path}/{log_file}"

    # Ensure log directory exists before attempting to write
    # Using shell=True here is acceptable since we control the log_path parameter
    subprocess.run(f"mkdir -p {log_path}", shell=True, check=True)

    # Track failed commands for summary reporting - helps users quickly identify issues
    failed_commands = []
    # Record total execution time for performance monitoring and user feedback
    start_time = time.time()
    
    # Open log file once and keep it open to ensure atomic writes and better performance
    with open(full_log_path, "w") as log:
        # Start with clear timestamp header for debugging and audit purposes
        log.write(f"=== Run started at {datetime.datetime.utcnow()} UTC ===\n\n")
        
        # Use temporary directory to isolate command execution and ensure clean environment
        # This prevents commands from interfering with each other or leaving artifacts
        with tempfile.TemporaryDirectory() as tmpdir:
            message = f"🚀 Starting command execution in {tmpdir}..."
            # Dual output (console + log) keeps user informed while maintaining detailed records
            print(message)
            log.write(f"{message}\n")

            n_commands = len(commands)

            # Process commands sequentially to maintain order and enable proper error tracking
            for i, cmd in enumerate(commands):
                # Track individual command timing for performance analysis
                cmd_start = time.time()
                
                # Show progress with current/total count - helps users estimate completion time
                print(f"⚙️ Executing ({i+1}/{n_commands}): {cmd}")
                log.write(f"⚙️ Executing: {cmd}\n")
                log.write(f"Started at: {datetime.datetime.utcnow().strftime('%H:%M:%S')} UTC\n")
                
                try:
                    # Key subprocess.run parameters explained:
                    # - cwd=tmpdir: Run in isolated temp directory for safety
                    # - shell=True: Allow shell features like pipes, redirects (user responsibility to sanitize)
                    # - check=True: Raise exception on non-zero exit codes for proper error handling
                    # - capture_output=True: Capture both stdout/stderr for logging
                    # - text=True: Handle output as strings rather than bytes for easier processing
                    result = subprocess.run(
                        cmd, cwd=tmpdir, shell=True, check=True,
                        capture_output=True, text=True
                    )
                    
                    cmd_duration = time.time() - cmd_start
                    
                    # Log all output for debugging, but handle empty output gracefully
                    # Separate stdout/stderr to maintain distinction for troubleshooting
                    if result.stdout.strip():
                        log.write(f"STDOUT:\n{result.stdout}")
                    if result.stderr.strip():
                        log.write(f"STDERR:\n{result.stderr}")
                    # Explicitly note when there's no output to avoid confusion about missing logs
                    if not result.stdout.strip() and not result.stderr.strip():
                        log.write("(no output)\n")
                    
                    # Provide immediate feedback to user while logging detailed timing
                    duration_msg = f"✅ SUCCESS (took {cmd_duration:.2f}s)"
                    print(f"   ✅ Success in {cmd_duration:.2f}s")
                    log.write(f"{duration_msg}\n")
                    
                except subprocess.CalledProcessError as e:
                    cmd_duration = time.time() - cmd_start
                    
                    # Store failure info for summary - include command number for easy reference
                    failed_commands.append((i+1, cmd))
                    
                    # Log failure with exit code for debugging and timing for performance analysis
                    error_msg = f"❌ FAILED with exit code {e.returncode} (took {cmd_duration:.2f}s)"
                    print(f"   ❌ Failed in {cmd_duration:.2f}s")
                    log.write(f"{error_msg}\n")
                    
                    # Capture error output even on failure - critical for debugging
                    # Handle cases where stderr/stdout might be None or empty
                    if e.stdout and e.stdout.strip():
                        log.write(f"STDOUT:\n{e.stdout}")
                    if e.stderr and e.stderr.strip():
                        log.write(f"STDERR:\n{e.stderr}")
                    if not (e.stdout and e.stdout.strip()) and not (e.stderr and e.stderr.strip()):
                        log.write("(no output)\n")
                
                # Separator between commands for readability in log file
                log.write("---\n")

            # Calculate and report total execution time for performance monitoring
            total_runtime = time.time() - start_time
            
            # Comprehensive execution summary for quick assessment
            log.write(f"\n=== EXECUTION SUMMARY ===\n")
            log.write(f"Total runtime: {total_runtime:.2f}s\n")
            log.write(f"Commands executed: {n_commands}\n")
            
            # Provide detailed failure reporting if any commands failed
            if failed_commands:
                failure_summary = f"❌ {len(failed_commands)} command(s) failed:"
                print(failure_summary)
                log.write(f"{failure_summary}\n")
                
                # List each failed command with its number for easy reference back to logs
                for cmd_num, cmd in failed_commands:
                    summary_line = f"  - Command {cmd_num}: {cmd}"
                    print(summary_line)
                    log.write(f"{summary_line}\n")
                
                # Show partial success count - helps assess overall batch health
                success_count = n_commands - len(failed_commands)
                final_msg = f"⚠️  Completed with {success_count}/{n_commands} commands successful in {total_runtime:.2f}s"
                print(final_msg)
                log.write(f"\n{final_msg}\n")
            else:
                # Celebrate complete success - positive reinforcement for users
                success_msg = f"🎉 All {n_commands} commands completed successfully in {total_runtime:.2f}s!"
                print(success_msg)
                log.write(f"\n✅ {success_msg}\n")

    # Return log path so caller can access detailed logs, and failure list for programmatic handling
    print(f"📝 Log saved to: {full_log_path}")
    return full_log_path, failed_commands

In [None]:
organization = "mattiasthalen"
repo_name = "northwind"
repo_url = f"https://github.com/{organization}/{repo_name}.git"
logs_path = "/lakehouse/default/Files/logs"

commands = [
    # Clone the project repo
    f"git clone --depth 1 {repo_url} .",

    # Setup the venv
    "pip install uv",
    "uv sync",

    # Symlink logs into the default lakehouse to persist them
    f"ln -s {logs_path} logs",

    # Run the ELT commands using
    "uv run dlt/northwind.py prod",
    "uv run sqlmesh -p sqlmesh plan prod --run --auto-apply --no-prompts",
]

_ = run_commands(commands, logs_path)