In [2]:
import json
DBT_PROJECT_DIR = '/workspace/test-dbt-airflow/analytics'
with open(f"{DBT_PROJECT_DIR}/target/manifest.json", "r") as file:
    manifest = json.load(file)

In [3]:
def __get_operator(node_config):
    if 'rmd' in node_config.get('tags', []):
        return "dbt_viewflow.operators.rmd_operator.RmdOperator"
    else:
        return "airflow_dbt.operators.dbt_operator.DbtRunOperator"

In [4]:
def __get_task_group_name(p):
    try:
        x = str(p).split('/')[2]
        if 'sql' in x:
            return None
        else:
            return x
    except:
        return None

In [5]:
import pandas as pd
nodes = manifest['nodes']
nodes.keys()
from pathlib import Path
tasks_all = {}
for node_name, node_config in manifest['nodes'].items():
   p = Path(node_config['original_file_path'])
   if node_config['resource_type'] == 'model':
    tasks_all[p.stem] = dict(
        task_name = p.stem,
        dag_name = str(p).split('/')[1],
        task_group_name = __get_task_group_name(p),
        dependencies=[
                x.split(".")[-1]
                for x in node_config.get("depends_on", []).get("nodes", [])
                if "source" not in x
        ],
        operator = __get_operator(node_config)
    )
tasks_all

{'dim_address': {'task_name': 'dim_address',
  'dag_name': 'core',
  'task_group_name': 'dimensions',
  'dependencies': ['stg_greenery__addresses'],
  'operator': 'airflow_dbt.operators.dbt_operator.DbtRunOperator'},
 'dim_event': {'task_name': 'dim_event',
  'dag_name': 'core',
  'task_group_name': 'dimensions',
  'dependencies': ['stg_greenery__events'],
  'operator': 'airflow_dbt.operators.dbt_operator.DbtRunOperator'},
 'dim_order': {'task_name': 'dim_order',
  'dag_name': 'core',
  'task_group_name': 'dimensions',
  'dependencies': ['stg_greenery__orders'],
  'operator': 'airflow_dbt.operators.dbt_operator.DbtRunOperator'},
 'dim_product': {'task_name': 'dim_product',
  'dag_name': 'core',
  'task_group_name': 'dimensions',
  'dependencies': ['stg_greenery__products'],
  'operator': 'airflow_dbt.operators.dbt_operator.DbtRunOperator'},
 'dim_promo': {'task_name': 'dim_promo',
  'dag_name': 'core',
  'task_group_name': 'dimensions',
  'dependencies': ['stg_greenery__promos'],
  'op

In [104]:
task_dag_mappings = {k: v['dag_name'] for k, v in tasks_all.items()}
task_group_mappings = {k: v['task_group_name'] for k, v in tasks_all.items()}

list(task_dag_mappings.values())[:3]

['metrics', 'metrics', 'metrics']

In [105]:
out_1 = yaml.safe_load("""
default_args:
    owner: example_owner
    start_date: '2022-02-12'
    retries: 1
    retry_delay_sec: 300
    dir: '/workspace/test-dbt-airflow/dbt-greenery'
schedule_interval: None
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 300
default_view: tree
orientation: LR
description: this is an example dag!
""")

In [116]:
from copy import deepcopy
def write_dag_yml(dag_name = 'views_content'):
    _tasks = {
        task_name: task_config 
        for task_name, task_config in tasks_all.items() 
        if task_config['dag_name'] == dag_name
    }
    tasks = deepcopy(_tasks)
    task_names = [k for k, v in tasks.items()]
    
    for task_name in task_names:
        dependencies = tasks[task_name].get('dependencies', [])
        deps = []
        for dep in dependencies:
            if 'src' in dep:
                pass
            elif dep not in task_names:
                deps.append(f'wait_for_{dep}')
            else:
                deps.append(dep)
                
        # tasks[task_name]['task_group_name'] = f'{task_name.split("_")[0]}_g'
        tasks[task_name]['dependencies'] = deps

    deps = [v.get('dependencies', []) for k, v in tasks.items()]
    wait_for_tasks = list(set([item for sublist in deps for item in sublist if 'wait' in item]))
    for wait_for_task in wait_for_tasks:
        task_dep = wait_for_task.replace('wait_for_', '')
        tasks[wait_for_task] = dict(
            operator = "airflow.sensors.external_task.ExternalTaskSensor",
            external_dag_id =  task_dag_mappings[task_dep],
            external_task_id = f"{task_group_mappings[task_dep]}.{task_dep}",
            task_group_name = task_dag_mappings[task_dep]
        )

    _task_groups = list(set([v.get('task_group_name') for k, v in tasks.items() if v.get('task_group_name') is not None]))
    task_groups = {v: {} for v in _task_groups}
    
    out_2 = dict(task_groups = task_groups, tasks = tasks)
    out = {dag_name: {**out_1, **out_2}}
    return yaml.dump(out, sort_keys=False)
    # with open(f'{dag_name}.yml', 'w') as file:
    #     yaml.dump(out, sort_keys=False, stream=file)
print(write_dag_yml('product'))

product:
  default_args:
    owner: example_owner
    start_date: '2022-02-12'
    retries: 1
    retry_delay_sec: 300
    dir: /workspace/test-dbt-airflow/dbt-greenery
  schedule_interval: None
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 300
  default_view: tree
  orientation: LR
  description: this is an example dag!
  task_groups:
    core: {}
  tasks:
    mart_event:
      task_name: mart_event
      dag_name: product
      task_group_name: null
      dependencies:
      - wait_for_fct_register_event
      - wait_for_dim_event
      operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
    wait_for_fct_register_event:
      operator: airflow.sensors.external_task.ExternalTaskSensor
      external_dag_id: core
      external_task_id: facts.fct_register_event
      task_group_name: core
    wait_for_dim_event:
      operator: airflow.sensors.external_task.ExternalTaskSensor
      external_dag_id: core
      external_task_id: dimensions.dim_event
      task_group

In [32]:
def get_task_group(node_name):
    return node_name.split(".")[-1].split("_")[0]

nodes = []
def __get_operator(node_config):
    if 'rmd' in node_config.get('tags', []):
        return "dbt_viewflow.operators.rmd_operator.RmdOperator"
    else:
        return "airflow_dbt.operators.dbt_operator.DbtRunOperator"

for node_name, node_config in list(manifest["nodes"].items()):
    node = dict(
        models=node_name.split(".")[-1],
        tags=node_config.get('tags', []),
        full_path=node_config.get('original_file_path', ''),
        full_name=node_name,
        dependencies=[
            x.split(".")[-1]
            for x in node_config.get("depends_on", []).get("nodes", [])
            if "source" not in x
        ],
        resource_type=node_config["resource_type"],
        task_group_name=get_task_group(node_name),
    )
    nodes.append(node)

tasks = {}
for node in nodes:
    if node["resource_type"] == "model":
        tasks[node["models"]] = {
            "operator": __get_operator('rmd' in node['tags']),
            "models": node["models"],
            "dependencies": node["dependencies"],
            "task_group_name": node["task_group_name"],
        }
        if ('rmd' in node['tags']):
            tasks[node["models"]]['input'] = node['full_path']
        
# tasks = {
#     node["models"]: {
#         "operator": __get_operator('rmd' in node['tags']),
#         "models": node["models"],
#         "dependencies": node["dependencies"],
#         "task_group_name": node["task_group_name"],
#         "input": node['full_path']
#     }
#     for node in nodes
#     if node["resource_type"] == "model"
# }


import yaml

print(yaml.dump(tasks, sort_keys=False))


dim_address:
  operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
  models: dim_address
  dependencies:
  - stg_greenery__addresses
  task_group_name: dim
dim_event:
  operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
  models: dim_event
  dependencies:
  - stg_greenery__events
  task_group_name: dim
dim_order:
  operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
  models: dim_order
  dependencies:
  - stg_greenery__orders
  task_group_name: dim
dim_product:
  operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
  models: dim_product
  dependencies:
  - stg_greenery__products
  task_group_name: dim
dim_promo:
  operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
  models: dim_promo
  dependencies:
  - stg_greenery__promos
  task_group_name: dim
dim_tracking:
  operator: airflow_dbt.operators.dbt_operator.DbtRunOperator
  models: dim_tracking
  dependencies:
  - stg_greenery__orders
  task_group_name: dim
dim_user:
  operator: airflow_dbt.opera

In [None]:
modes