# dbt execution notebook

This notebook is responsible for **executing dbt commands inside Microsoft Fabric** while providing:

- **Live streaming of dbt logs** to the notebook output
- **Centralized log persistence** to a Lakehouse location
- **Run-level metadata tracking** (run id, environment, branch, workspace)
- **Fail-fast behavior** to ensure pipeline correctness

The goal of this notebook is to make dbt runs:
- **Observable** â€“ logs are visible in real time
- **Auditable** â€“ complete logs are stored centrally
- **Deterministic** â€“ failures reliably stop downstream execution

---

## What this notebook does

1. Builds the dbt command dynamically based on:
   - Selected models / selectors
   - Environment (OTAP stage)
   - Active Git branch
2. Executes dbt using a subprocess with **streamed stdout/stderr**
3. Captures all dbt output into an in-memory log buffer
4. Writes a consolidated run log to the Lakehouse after execution
5. Fails the notebook when dbt exits with a non-zero status

---

## Logging strategy

- **Live output**  
  All dbt output is streamed line-by-line to the notebook for immediate visibility.

- **Persistent logs**  
  The full dbt output is written to a timestamped log file in the Lakehouse, including:
  - Run metadata
  - Environment context
  - Full raw dbt output

This ensures logs remain available for:
- Debugging failed runs
- Post-run analysis
- Compliance and auditing

---

## Expected usage

This notebook is intended to be:
- Triggered from **pipelines** (ADO / Fabric / CI/CD)
- Parameterized per environment and workspace
- Used as a **single execution point** for dbt within Fabric

Do **not** use this notebook for:
- Interactive model development
- Local debugging of dbt projects

---

## Failure behavior

If dbt fails:
- The failure is immediately visible in the notebook output
- The complete log is still written to the Lakehouse
- The notebook raises an exception to stop downstream execution

This behavior is intentional to guarantee data quality.

---


## 1. Parameters

In [None]:
# passed parameters from pipeline
run_id = "n/a" # pipeline run id

In [None]:
# constants
zip_src = '/lakehouse/default/Files/dbt-repo-zips' # local shortcut to central repo lakehouse folder
dbt_authentication = 'ActiveDirectoryAccessToken'  #https://docs.getdbt.com/docs/cloud/connect-data-platform/connect-microsoft-fabric
# get workspace config in varlib 'v_dbt_runner'
v_dbt_runner = notebookutils.variableLibrary.getLibrary("v_dbt_runner")

#folders 
run_dir = f"/tmp/dbt_project"
lakehouse_log_dir = f"/lakehouse/default/Files/logs"

In [None]:
# workspace config parameters
ws_name = v_dbt_runner.workspace_name # current ws
stage = v_dbt_runner.stage                                       # type
git_branch = v_dbt_runner.git_branch   

# dbt config parameters
dbt_command = v_dbt_runner.dbt_command 
dbt_selector = v_dbt_runner.dbt_selector           # project scope to run
target_connection_string = v_dbt_runner.target_connection_string 
target_database = v_dbt_runner.target_database 

## 2. class dbtrunner


In [None]:
from datetime import datetime
import subprocess, shlex

class DbtRunner:
    def __init__(self, lakehouse_log_path: str, flush_every: int = 25):
        self.lakehouse_log_path = lakehouse_log_path
        self.flush_every = flush_every
        self.buffer = []
        self.persisted_log = ""   # <-- NEW
        self.success = True
        self.failure_type = None
        self.start_time = None

    # ---------- logging ----------
    def log(self, message: str, level: str = "INFO", end: str = "\n"):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        formatted = f"[{timestamp}] [{level}] {message}"

        print(formatted, end=end)
        self.buffer.append(f"{formatted}{end}")

        if len(self.buffer) >= self.flush_every:
            self.flush()

    def flush(self):
        if not self.buffer:
            return

        try:
            chunk = "".join(self.buffer)
            self.buffer.clear()

            self.persisted_log += chunk

            notebookutils.fs.put(
                self.lakehouse_log_path,
                self.persisted_log,
                overwrite=True
            )

        except Exception as e:
            print(f"[WARN] Failed to flush logs: {e}")

    # ---------- dbt failure classification ----------
    def classify_failure(self, line: str):
        if "Compilation Error" in line:
            return "COMPILATION"
        if "Database Error" in line:
            return "DATABASE"
        if "Runtime Error" in line:
            return "RUNTIME"
        if "FAIL" in line:
            return "TEST_FAILURE"
        return None

    # ---------- execution ----------
    def run(self, cmd: str):
        self.start_time = datetime.now()
        self.log("Starting dbt run")

        process = subprocess.Popen(
            shlex.split(cmd),
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1
        )

        for line in process.stdout:
            self.log(line, end="")

            if not self.failure_type:
                failure = self.classify_failure(line)
                if failure:
                    self.failure_type = failure

        return_code = process.wait()
        if return_code != 0:
            self.success = False
            self.log(
                f"dbt exited with return code {return_code}",
                level="ERROR"
            )

        self.flush()
        self.finish()        

        if not self.success:
            raise RuntimeError("dbt run failed")

    # ---------- finish ----------
    def finish(self):
        duration = (datetime.now() - self.start_time).total_seconds()

        self.log("")
        self.log("=" * 40)
        self.log("DBT Run Finished")
        self.log(f"Success: {self.success}")
        self.log(f"Duration: {duration:.1f}s")

        if self.failure_type:
            self.log(f"Failure type: {self.failure_type}", level="ERROR")

        self.log("=" * 40)

        self.flush()


## 3. Init

In [None]:
# import packages

import json
import subprocess
import logging
import os
import re
from datetime import datetime
from sempy import fabric
import zipfile
import shutil
import shlex
import notebookutils

# install dbt-fabric
%pip install dbt-fabric[PyHive]

Note: you may need to restart the kernel to use updated packages.


In [None]:
# log init
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = f"dbt_log_{timestamp}.txt"
lakehouse_log_path = f"{lakehouse_log_dir}/{log_filename}"
log = ''
# setup local dbt folders

# vars
target_dir = f"{run_dir}/target"
log_dir = f"{run_dir}/logs"
branch_zip_path=f'{zip_src}/{git_branch}.zip'

# prep folders
print ('Preparing run-dir ',run_dir)
if os.path.exists(run_dir):
    shutil.rmtree(run_dir)
os.makedirs(run_dir, exist_ok=True)  # This will create all directories, and won't raise an error if they exist
os.makedirs(target_dir, exist_ok=True)
os.makedirs(log_dir, exist_ok=True)
notebookutils.fs.mkdirs(lakehouse_log_dir)
os.chdir(run_dir)

Preparing run-dir  /tmp/dbt_project


In [None]:
# get repo
print("unzip dbt repo", branch_zip_path, " to ", run_dir)
log+= f"unzip dbt repo {branch_zip_path} to {run_dir}\n"
with zipfile.ZipFile(branch_zip_path, "r") as zip_ref:
    zip_ref.extractall(run_dir)

# show dbt project folder and profile
log += f"\n\nðŸ“¦ Listing for: {run_dir}\n"
for name in os.listdir(run_dir):
    full = os.path.join(run_dir, name)
    if os.path.isdir(full):
        log += f"ðŸ“‚ {name}"
    else:
        log += f"ðŸ“„ {name}"

log +="\n\nProfile:"
with open("profiles.yml") as f:
    log += f.read()

unzip dbt repo /lakehouse/default/Files/dbt-repo-zips/main.zip  to  /tmp/dbt_project


In [None]:
# Pass dbt config
os.environ["DBT_TARGET_PATH"] = target_dir
os.environ["DBT_CONNECTION_STRING"] = target_connection_string
os.environ["DRIVER"] = 'ODBC Driver 18 for SQL Server'
os.environ["AUTHENTICATION"] = dbt_authentication # ActiveDirectoryAccessToken
os.environ["DATABASE"] = target_database
os.environ["ACCESS_TOKEN"] = notebookutils.credentials.getToken("https://database.windows.net/") # workspace identity access token

## 4. Run dbt

In [None]:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = f"dbt_log_{timestamp}.txt"
lakehouse_log_path = f"{lakehouse_log_dir}/{log_filename}"

runner = DbtRunner(lakehouse_log_path)

cmd = f"{dbt_command} {dbt_selector} --log-format text"
runner.log(cmd)

# Header
runner.log("=" * 40)
runner.log("DBT Run Log")
runner.log(f"Run-Id: {run_id}")
runner.log(f"Timestamp: {timestamp}")
runner.log(f"Workspace: {ws_name} ({stage})")
runner.log(f"Branch: {git_branch}")
runner.log(f"Command: {dbt_command} {dbt_selector}")
runner.log("=" * 40)
runner.log("")

runner.run(cmd)


[2025-12-16 00:05:20] [INFO] dbt run -s fna_percelen --log-format text
[2025-12-16 00:05:20] [INFO] DBT Run Log
[2025-12-16 00:05:20] [INFO] Run-Id: n/a
[2025-12-16 00:05:20] [INFO] Timestamp: 20251216_000520
[2025-12-16 00:05:20] [INFO] Workspace: dbt_runner1 (dev)
[2025-12-16 00:05:20] [INFO] Branch: main
[2025-12-16 00:05:20] [INFO] Command: dbt run -s fna_percelen
[2025-12-16 00:05:20] [INFO] 
[2025-12-16 00:05:20] [INFO] Starting dbt run
[2025-12-16 00:05:22] [INFO] [0m00:05:22  Running with dbt=1.10.15
[2025-12-16 00:05:23] [INFO] Duplicate key '+schema' in "<unicode string>", line 145, column 7 in file
[2025-12-16 00:05:23] [INFO] `/tmp/dbt_project/dbt_project.yml`
[2025-12-16 00:05:23] [INFO] [0m00:05:23  Registered adapter: fabric=1.9.8
[2025-12-16 00:05:23] [INFO] [0m00:05:23  Unable to do partial parsing because saved manifest not found. Starting full parse.
[2025-12-16 00:05:26] [INFO] [0m00:05:26  lbt_2023: Target columns (var): 218
[2025-12-16 00:05:26] [INFO] [0m00: