# Using the DBT Python Package
If you need some python processing in the middle of your data modeling (ML that's used in further analytics or something similar), and you don't want to go down the plugin/function API route, one can use the dbt CLI programmatically within python.

In [1]:
import os

#We need to change our working directory to be in the same folder as the dbt_project.yml folder
os.chdir('../mi_cfr_duckdb_dbt/')

### We create an instance of the dbt runner class and feed it the same type of command line arguments

In [2]:
from dbt.cli.main import dbtRunner, dbtRunnerResult

dbt = dbtRunner()
result = dbt.invoke(['run', '--model', 'stg_expenditures'])

[0m02:26:11  Running with dbt=1.8.2
[0m02:26:11  Registered adapter: duckdb=1.8.1
[0m02:26:11  Found 9 models, 1 test, 1 operation, 407 macros
[0m02:26:11  
[0m02:26:11  
[0m02:26:11  Running 1 on-run-start hook
[0m02:26:12  1 of 1 START hook: mi_cfr_duckdb.on-run-start.0 ................................ [RUN]
[0m02:26:12  1 of 1 OK hook: mi_cfr_duckdb.on-run-start.0 ................................... [[32mOK[0m in 0.00s]
[0m02:26:12  
[0m02:26:12  Concurrency: 1 threads (target='dev')
[0m02:26:12  
[0m02:26:12  1 of 1 START sql external model main.stg_expenditures .......................... [RUN]
[0m02:26:12  1 of 1 OK created sql external model main.stg_expenditures ..................... [[32mOK[0m in 0.79s]
[0m02:26:12  
[0m02:26:12  Finished running 1 external model, 1 project hook in 0 hours 0 minutes and 1.14 seconds (1.14s).
[0m02:26:12  
[0m02:26:12  [32mCompleted successfully[0m
[0m02:26:12  
[0m02:26:12  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1


### We then can then get the data objects that we've created and run our process on them
In this case I reuse the same logic for the classify_expenditure function from the function API

In [3]:
import duckdb

df = duckdb.sql('''
    SELECT LOWER(CONCAT(expenditure_description, ' ', expenditure_purpose)) AS concat_expenditure
    FROM '../data/staging/stg_expenditures.parquet'
    LIMIT 100
''').df()

df.head()

Unnamed: 0,concat_expenditure
0,credit card fee processing fee
1,print advertising print and management costs
2,credit card fee processing fee
3,campaign office expense printer cartridge
4,"mailing,postage,bulk rate post cards"


In [5]:
from typing import List

import cloudpickle
from duckdb.typing import DuckDBPyType


def load_model_dict():
    with open('../data/ml_models/zero_shot_model.pkl', 'rb') as f:
        model_dict = cloudpickle.load(f)
    return model_dict


def classify_expenditure(model_dict: dict, text: str , threshold: float) -> str:
    prediction = model_dict['run_prediction'](text, model_dict['classifier'], model_dict['labels'])
    if prediction['predicted_prob'] > threshold:
        return prediction['predicted_label']
    else:
        return "unknown"
        
model_dict = load_model_dict()
model_dict

{'run_prediction': <function __main__.run_prediction(text, classifier, labels)>,
 'classifier': <transformers.pipelines.zero_shot_classification.ZeroShotClassificationPipeline at 0x7aa9a4e92de0>,
 'labels': ['paid media',
  'payroll',
  'food',
  'contribution',
  'consulting',
  'bank or credit card fees',
  'other campaign activities']}

In [6]:
df['predicted_label'] = df['concat_expenditure'].apply(lambda x: classify_expenditure(model_dict, x, 0.75))
df.head()    

Unnamed: 0,concat_expenditure,predicted_label
0,credit card fee processing fee,bank or credit card fees
1,print advertising print and management costs,paid media
2,credit card fee processing fee,bank or credit card fees
3,campaign office expense printer cartridge,unknown
4,"mailing,postage,bulk rate post cards",unknown


### We can then save this in a location for our downstream models to pick up

In [8]:
df.to_parquet('../data/ml/model_output.parquet')

### We then invoke the dbt cli using our generated data.
### Note that you will need to enable the mlproject in your dbt_project.yml:
    ml:
      +enabled: true

In [11]:
dbt.invoke(['run', '--model', 'python_dbt_cli'])

[0m02:33:38  Running with dbt=1.8.2
[0m02:33:38  Registered adapter: duckdb=1.8.1
[0m02:33:38  Found 11 models, 1 test, 1 operation, 407 macros
[0m02:33:38  
[0m02:33:38  
[0m02:33:38  Running 1 on-run-start hook
[0m02:33:38  1 of 1 START hook: mi_cfr_duckdb.on-run-start.0 ................................ [RUN]
[0m02:33:38  1 of 1 OK hook: mi_cfr_duckdb.on-run-start.0 ................................... [[32mOK[0m in 0.00s]
[0m02:33:38  
[0m02:33:38  Concurrency: 1 threads (target='dev')
[0m02:33:38  
[0m02:33:38  1 of 1 START sql external model main.python_dbt_cli ............................ [RUN]
[0m02:33:38  1 of 1 OK created sql external model main.python_dbt_cli ....................... [[32mOK[0m in 0.05s]
[0m02:33:38  
[0m02:33:38  Finished running 1 external model, 1 project hook in 0 hours 0 minutes and 0.12 seconds (0.12s).
[0m02:33:38  
[0m02:33:38  [32mCompleted successfully[0m
[0m02:33:38  
[0m02:33:38  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1


dbtRunnerResult(success=True, exception=None, result=RunExecutionResult(results=[RunResult(status=<RunStatus.Success: 'success'>, timing=[TimingInfo(name='compile', started_at=datetime.datetime(2024, 6, 8, 2, 33, 38, 479560), completed_at=datetime.datetime(2024, 6, 8, 2, 33, 38, 483307)), TimingInfo(name='execute', started_at=datetime.datetime(2024, 6, 8, 2, 33, 38, 483768), completed_at=datetime.datetime(2024, 6, 8, 2, 33, 38, 524884))], thread_id='Thread-35 (worker)', execution_time=0.046823978424072266, adapter_response={'_message': 'OK'}, message='OK', failures=None, node=ModelNode(database='memory', schema='main', name='python_dbt_cli', resource_type=<NodeType.Model: 'model'>, package_name='mi_cfr_duckdb', path='ml/python_dbt_cli.sql', original_file_path='models/ml/python_dbt_cli.sql', unique_id='model.mi_cfr_duckdb.python_dbt_cli', fqn=['mi_cfr_duckdb', 'ml', 'python_dbt_cli'], alias='python_dbt_cli', checksum=FileHash(name='sha256', checksum='5b19cc405021e77f4f14cc01de18cf1e792f

In [13]:
duckdb.sql('''
    FROM '../data/ml/python_dbt_cli.parquet'
''')

┌──────────────────────────┬───────┐
│     predicted_label      │ COUNT │
│         varchar          │ int64 │
├──────────────────────────┼───────┤
│ food                     │     2 │
│ unknown                  │    49 │
│ bank or credit card fees │     7 │
│ paid media               │     7 │
│ payroll                  │     2 │
│ consulting               │     2 │
│ contribution             │    31 │
└──────────────────────────┴───────┘

### Finally if we want to run everything we can do our usual run command

In [12]:
dbt.invoke(['run'])

[0m02:36:39  Running with dbt=1.8.2
[0m02:36:39  Registered adapter: duckdb=1.8.1
[0m02:36:39  Unable to do partial parsing because a project config has changed
[0m02:36:40  Found 9 models, 1 test, 1 operation, 407 macros
[0m02:36:40  
[0m02:36:40  
[0m02:36:40  Running 1 on-run-start hook
[0m02:36:40  1 of 1 START hook: mi_cfr_duckdb.on-run-start.0 ................................ [RUN]
[0m02:36:40  1 of 1 OK hook: mi_cfr_duckdb.on-run-start.0 ................................... [[32mOK[0m in 0.00s]
[0m02:36:40  
[0m02:36:40  Concurrency: 1 threads (target='dev')
[0m02:36:40  
[0m02:36:40  1 of 9 START sql view model main.stg_contributions ............................. [RUN]
[0m02:36:40  1 of 9 OK created sql view model main.stg_contributions ........................ [[32mOK[0m in 0.04s]
[0m02:36:40  2 of 9 START sql external model main.stg_expenditures .......................... [RUN]
[0m02:36:41  2 of 9 OK created sql external model main.stg_expenditures ........

dbtRunnerResult(success=True, exception=None, result=RunExecutionResult(results=[RunResult(status=<RunStatus.Success: 'success'>, timing=[TimingInfo(name='compile', started_at=datetime.datetime(2024, 6, 8, 2, 36, 40, 332700), completed_at=datetime.datetime(2024, 6, 8, 2, 36, 40, 336486)), TimingInfo(name='execute', started_at=datetime.datetime(2024, 6, 8, 2, 36, 40, 336864), completed_at=datetime.datetime(2024, 6, 8, 2, 36, 40, 371550))], thread_id='Thread-39 (worker)', execution_time=0.0405123233795166, adapter_response={'_message': 'OK'}, message='OK', failures=None, node=ModelNode(database='memory', schema='main', name='stg_contributions', resource_type=<NodeType.Model: 'model'>, package_name='mi_cfr_duckdb', path='staging/stg_contributions.sql', original_file_path='models/staging/stg_contributions.sql', unique_id='model.mi_cfr_duckdb.stg_contributions', fqn=['mi_cfr_duckdb', 'staging', 'stg_contributions'], alias='stg_contributions', checksum=FileHash(name='sha256', checksum='6e303