In [4]:
import datetime
import json

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import datetime
from airflow.utils.dates import timedelta
from airflow.utils.task_group import TaskGroup
PATH_TO_DBT_PROJECT = "/Users/valentin/Github/data-gcp/orchestration/dags/data_gcp_dbt"


In [7]:
def load_manifest():
    local_filepath = PATH_TO_DBT_PROJECT  + "/target/manifest.json"
    with open(local_filepath) as f:
        data = json.load(f)

    return data

def build_simplified_manifest(data):
    simplified_manifest = {node:{"redirect_dep":None, "model_alias":data["nodes"][node]["alias"],"depends_on_node":data["nodes"][node]["depends_on"]["nodes"],"model_tests":{},"resource_type":data["nodes"][node]["resource_type"]} for node in data["nodes"].keys() if (data["nodes"][node]["resource_type"] == "model" and "elementary" not in node)}
    for node in data["nodes"].keys():
        if data["nodes"][node]["resource_type"] == "test":
            generic_test = True in [generic_name in node for generic_name in ["not_null","unique","accepted_values","relationships"]]
            test_alias =  data["nodes"][node]["alias"] if not generic_test else node.split('.')[-2]
            test_config = data["nodes"][node]["config"].get('severity',None)
            try:
                test_config = test_config.lower()
            except AttributeError:
                pass
            parents = data["nodes"][node]["depends_on"]["nodes"]
            for p_node in parents:
                # p_alias  = data["nodes"][p_node]["alias"]
                
                if simplified_manifest[p_node]['model_tests'].get(test_config,None) is None:
                    simplified_manifest[p_node]['model_tests'][test_config] = [{"test_alias":test_alias,'test_node':node,"test_type":'generic' if generic_test else 'custom'}]
                else:
                     simplified_manifest[p_node]['model_tests'][test_config] += [{"test_alias":test_alias,'test_node':node,"test_type":'generic' if generic_test else 'custom'}]

    return simplified_manifest

def rebuild_manifest():
    data = load_manifest()
    simplified_manifest = build_simplified_manifest(data)
    return simplified_manifest

simplified_manifest = rebuild_manifest()


In [9]:
simplified_manifest['model.data_gcp_dbt.enriched_cultural_partner_data']

{'redirect_dep': None,
 'model_alias': 'enriched_cultural_partner_data',
 'depends_on_node': ['model.data_gcp_dbt.enriched_venue_data',
  'model.data_gcp_dbt.region_department',
  'model.data_gcp_dbt.agg_partner_cultural_sector',
  'model.data_gcp_dbt.enriched_venue_tags_data',
  'model.data_gcp_dbt.enriched_offerer_tags_data',
  'model.data_gcp_dbt.enriched_offerer_data',
  'model.data_gcp_dbt.applicative_database_offerer'],
 'model_tests': {'warn': [{'test_alias': 'unique_enriched_cultural_partner_data_venue_id',
    'test_node': 'test.data_gcp_dbt.unique_enriched_cultural_partner_data_venue_id.c1ac96a53e',
    'test_type': 'generic'}]},
 'resource_type': 'model'}

In [57]:
def make_dbt_run_single_task(node, dag,target='dev',DBT_DIR = PATH_TO_DBT_PROJECT,full_refresh =False,GLOBAL_CLI_FLAGS = "--no-write-json"):
    """Returns an Airflow operator either run and test an individual model"""
    node_alias = node['alias']
    dbt_task = BashOperator(
        task_id=node_alias,
        bash_command=f"""
        dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {node_alias} --no-compile
        """ if not full_refresh else f"""
        dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {node_alias} --no-compile --full-refresh
        """,
        cwd=PATH_TO_DBT_PROJECT, 
        dag=dag
    )

    return dbt_task

In [65]:
data = load_manifest()
data['nodes'].keys()





dict_keys(['model.data_gcp_dbt.isbn_rayon_editor', 'model.data_gcp_dbt.bookable_collective_offer', 'model.data_gcp_dbt.available_stock_information', 'model.data_gcp_dbt.offer_item_ids', 'model.data_gcp_dbt.booking', 'model.data_gcp_dbt.cleaned_stock', 'model.data_gcp_dbt.offer_extracted_data', 'model.data_gcp_dbt.bookable_offer', 'model.data_gcp_dbt.agg_partner_cultural_sector', 'model.data_gcp_dbt.linked_offers', 'model.data_gcp_dbt.siren_data', 'model.data_gcp_dbt.subcategories', 'model.data_gcp_dbt.siren_data_labels', 'model.data_gcp_dbt.region_department', 'model.data_gcp_dbt.applicative_database_offerer_tag_category', 'model.data_gcp_dbt.applicative_database_offerer_tag_category_mapping', 'model.data_gcp_dbt.applicative_database_collective_booking', 'model.data_gcp_dbt.applicative_database_favorite', 'model.data_gcp_dbt.applicative_database_venue_criterion', 'model.data_gcp_dbt.applicative_database_offerer_tag', 'model.data_gcp_dbt.applicative_database_offerer', 'model.data_gcp_db

In [59]:
# standard_tests = ["not_null","unique","accepted_values","relationships"]
# for i,node in enumerate(data["nodes"].keys()):
#     if "test" in node.split("."): 
#         print(node)
        
        
        

In [60]:
# data["nodes"]["test.data_gcp_dbt.dummy_test"]

In [61]:
# data["nodes"]['model.data_gcp_dbt.siren_data']

In [62]:
# data["nodes"]["test.data_gcp_dbt.not_null_siren_data_siren.565b43275a"]

In [135]:
node
data["nodes"][node]
for node in data["nodes"].keys():

    if data["nodes"][node]["resource_type"] in ["model","source"]:
        print(data["nodes"][node])
        break

{'database': 'passculture-data-ehp', 'schema': 'tmp_dev', 'name': 'isbn_rayon_editor', 'resource_type': 'model', 'package_name': 'data_gcp_dbt', 'path': 'clean/isbn_rayon_editor.sql', 'original_file_path': 'models/clean/isbn_rayon_editor.sql', 'unique_id': 'model.data_gcp_dbt.isbn_rayon_editor', 'fqn': ['data_gcp_dbt', 'clean', 'isbn_rayon_editor'], 'alias': 'isbn_rayon_editor', 'checksum': {'name': 'sha256', 'checksum': '7970caaef2b230399fda9a24853c7bc61c70b67afaacdffab246609a88d529a1'}, 'config': {'enabled': True, 'alias': None, 'schema': 'staging', 'database': None, 'tags': [], 'meta': {}, 'group': None, 'materialized': 'view', 'incremental_strategy': None, 'persist_docs': {}, 'post-hook': [], 'pre-hook': [], 'quoting': {}, 'column_types': {}, 'full_refresh': None, 'unique_key': None, 'on_schema_change': 'ignore', 'on_configuration_change': 'apply', 'grants': {}, 'packages': [], 'docs': {'show': True, 'node_color': '#b43ddb'}, 'contract': {'enforced': False, 'alias_types': True}, 'a

In [136]:
children_tests = {node:{"redirect_dep":None, "model_alias":data["nodes"][node]["alias"],"depends_on_node":data["nodes"][node]["depends_on"]["nodes"],"model_tests":{}} for node in data["nodes"].keys() if data["nodes"][node]["resource_type"] == "model"}
children_tests



{'model.data_gcp_dbt.isbn_rayon_editor': {'redirect_dep': None,
  'model_alias': 'isbn_rayon_editor',
  'depends_on_node': ['model.data_gcp_dbt.offer_extracted_data'],
  'model_tests': {}},
 'model.data_gcp_dbt.bookable_collective_offer': {'redirect_dep': None,
  'model_alias': 'bookable_collective_offer',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_collective_booking',
   'model.data_gcp_dbt.applicative_database_collective_stock',
   'model.data_gcp_dbt.applicative_database_collective_offer',
   'model.data_gcp_dbt.applicative_database_venue',
   'model.data_gcp_dbt.applicative_database_collective_offer_template'],
  'model_tests': {}},
 'model.data_gcp_dbt.available_stock_information': {'redirect_dep': None,
  'model_alias': 'available_stock_information',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_booking',
   'model.data_gcp_dbt.applicative_database_stock'],
  'model_tests': {}},
 'model.data_gcp_dbt.offer_item_ids': {'redirect_dep': None,
  'm

In [137]:
# import numpy as np
def get_children_tests(data):
    children_tests = {node:{"redirect_dep":None, "model_alias":data["nodes"][node]["alias"],"depends_on_node":data["nodes"][node]["depends_on"]["nodes"],"model_tests":{},"resource_type":data["nodes"][node]["resource_type"]} for node in data["nodes"].keys() if data["nodes"][node]["resource_type"] in ["model","source"]}
    for node in data["nodes"].keys():
        if data["nodes"][node]["resource_type"] == "test":
            generic_test = True in [generic_name in node for generic_name in ["not_null","unique","accepted_values","relationships"]]
            test_alias =  data["nodes"][node]["alias"] if not generic_test else node.split('.')[-2]
            test_config = data["nodes"][node]["config"].get('severity',None)
            try:
                test_config = test_config.lower()
            except AttributeError:
                pass
            parents = data["nodes"][node]["depends_on"]["nodes"]
            for p_node in parents:
                p_alias  = data["nodes"][p_node]["alias"]
                
                if children_tests[p_node]['model_tests'].get(test_config,None) is None:
                    children_tests[p_node]['model_tests'][test_config] = [{"test_alias":test_alias,'test_node':node,"test_type":'generic' if generic_test else 'custom'}]
                else:
                     children_tests[p_node]['model_tests'][test_config] += [{"test_alias":test_alias,'test_node':node,"test_type":'generic' if generic_test else 'custom'}]

    return children_tests

children_tests = get_children_tests(data)
children_tests

{'model.data_gcp_dbt.isbn_rayon_editor': {'redirect_dep': None,
  'model_alias': 'isbn_rayon_editor',
  'depends_on_node': ['model.data_gcp_dbt.offer_extracted_data'],
  'model_tests': {},
  'resource_type': 'model'},
 'model.data_gcp_dbt.bookable_collective_offer': {'redirect_dep': None,
  'model_alias': 'bookable_collective_offer',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_collective_booking',
   'model.data_gcp_dbt.applicative_database_collective_stock',
   'model.data_gcp_dbt.applicative_database_collective_offer',
   'model.data_gcp_dbt.applicative_database_venue',
   'model.data_gcp_dbt.applicative_database_collective_offer_template'],
  'model_tests': {},
  'resource_type': 'model'},
 'model.data_gcp_dbt.available_stock_information': {'redirect_dep': None,
  'model_alias': 'available_stock_information',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_booking',
   'model.data_gcp_dbt.applicative_database_stock'],
  'model_tests': {},
  'resourc

In [138]:
[item for item in children_tests.keys() if children_tests[item]["resource_type"]=="source"]

[]

In [139]:

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 23),
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG(
    'dbt_dag',
    default_args=default_args,
    description='A dbt wrapper for airflow',
    schedule=timedelta(days=1),
)
full_refresh = False
target='dev'
DBT_DIR = PATH_TO_DBT_PROJECT
GLOBAL_CLI_FLAGS = "--no-write-json"
for model_node,model_data in children_tests.items():
    test_list = model_data["model_tests"].get('error',[])
    
    if len(test_list) != 0 :   
        with TaskGroup(group_id=f'{model_data["model_alias"]}_tasks',dag=dag) as model_tasks:
            model_op = BashOperator(
                task_id = model_data['model_alias'],
                bash_command=f"""
                dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {model_data['model_alias']} --no-compile
                """ if not full_refresh else f"""
                dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {model_data['model_alias']} --no-compile --full-refresh
                """,
                cwd=PATH_TO_DBT_PROJECT, 
                dag=dag
                )

            with TaskGroup(group_id=f'{model_data["model_alias"]}_critical_tests',dag=dag) as tests_task:
                dbt_test_tasks = [BashOperator(
                task_id = test['test_alias'],
                bash_command=f"""
                dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {test['test_alias']} --no-compile
                """ if not full_refresh else f"""
                dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {test['test_alias']} --no-compile --full-refresh
                """,
                cwd=PATH_TO_DBT_PROJECT, 
                dag=dag
                ) for test in test_list]
            model_op >> tests_task
    else:
        model_tasks = BashOperator(
                task_id = model_data['model_alias'],
                bash_command=f"""
                dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {model_data['model_alias']} --no-compile
                """ if not full_refresh else f"""
                dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {model_data['model_alias']} --no-compile --full-refresh
                """,
                cwd=PATH_TO_DBT_PROJECT, 
                dag=dag
                )

    children_tests[model_node]["redirect_dep"] = model_tasks

        

In [140]:
children_tests

{'model.data_gcp_dbt.isbn_rayon_editor': {'redirect_dep': <Task(BashOperator): isbn_rayon_editor>,
  'model_alias': 'isbn_rayon_editor',
  'depends_on_node': ['model.data_gcp_dbt.offer_extracted_data'],
  'model_tests': {},
  'resource_type': 'model'},
 'model.data_gcp_dbt.bookable_collective_offer': {'redirect_dep': <Task(BashOperator): bookable_collective_offer>,
  'model_alias': 'bookable_collective_offer',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_collective_booking',
   'model.data_gcp_dbt.applicative_database_collective_stock',
   'model.data_gcp_dbt.applicative_database_collective_offer',
   'model.data_gcp_dbt.applicative_database_venue',
   'model.data_gcp_dbt.applicative_database_collective_offer_template'],
  'model_tests': {},
  'resource_type': 'model'},
 'model.data_gcp_dbt.available_stock_information': {'redirect_dep': <Task(BashOperator): available_stock_information>,
  'model_alias': 'available_stock_information',
  'depends_on_node': ['model.data_

In [145]:
source_op = BashOperator(
                task_id = "dbt_source",
                bash_command=f"""
                dbt {GLOBAL_CLI_FLAGS} source --target {target} --no-compile
                """ if not full_refresh else f"""
                dbt {GLOBAL_CLI_FLAGS} source --target {target} --no-compile --full-refresh
                """,
                cwd=PATH_TO_DBT_PROJECT, 
                dag=dag
                )
for node in children_tests.keys():
    
    for upstream_node in children_tests[node]["depends_on_node"]:
        if upstream_node is not None:
            if not upstream_node.startswith("source."):
                children_tests[upstream_node]['redirect_dep'] >> children_tests[node]['redirect_dep']
            else:
                source_op >> children_tests[node]['redirect_dep']

model.data_gcp_dbt.isbn_rayon_editor model.data_gcp_dbt.offer_extracted_data
model.data_gcp_dbt.bookable_collective_offer model.data_gcp_dbt.applicative_database_collective_booking
model.data_gcp_dbt.bookable_collective_offer model.data_gcp_dbt.applicative_database_collective_stock
model.data_gcp_dbt.bookable_collective_offer model.data_gcp_dbt.applicative_database_collective_offer
model.data_gcp_dbt.bookable_collective_offer model.data_gcp_dbt.applicative_database_venue
model.data_gcp_dbt.bookable_collective_offer model.data_gcp_dbt.applicative_database_collective_offer_template
model.data_gcp_dbt.available_stock_information model.data_gcp_dbt.applicative_database_booking
model.data_gcp_dbt.available_stock_information model.data_gcp_dbt.applicative_database_stock
model.data_gcp_dbt.offer_item_ids model.data_gcp_dbt.applicative_database_offer
model.data_gcp_dbt.offer_item_ids model.data_gcp_dbt.offer_extracted_data
model.data_gcp_dbt.offer_item_ids model.data_gcp_dbt.linked_offers
mode

In [144]:
# data["sources"]


{'source.data_gcp_dbt.raw.agg_partner_cultural_sector': {'database': 'passculture-data-ehp',
  'schema': 'raw_dev',
  'name': 'agg_partner_cultural_sector',
  'resource_type': 'source',
  'package_name': 'data_gcp_dbt',
  'path': 'models/sources.yml',
  'original_file_path': 'models/sources.yml',
  'unique_id': 'source.data_gcp_dbt.raw.agg_partner_cultural_sector',
  'fqn': ['data_gcp_dbt', 'raw', 'agg_partner_cultural_sector'],
  'source_name': 'raw',
  'source_description': '',
  'loader': '',
  'identifier': 'agg_partner_cultural_sector',
  'quoting': {'database': None,
   'schema': None,
   'identifier': None,
   'column': None},
  'loaded_at_field': None,
  'freshness': {'warn_after': {'count': None, 'period': None},
   'error_after': {'count': None, 'period': None},
   'filter': None},
  'external': None,
  'description': '',
  'columns': {},
  'meta': {},
  'source_meta': {},
  'tags': [],
  'config': {'enabled': True},
  'patch_path': None,
  'unrendered_config': {},
  'relatio

In [123]:
children_tests 

{'model.data_gcp_dbt.isbn_rayon_editor': {'redirect_dep': <Task(BashOperator): isbn_rayon_editor>,
  'model_alias': 'isbn_rayon_editor',
  'depends_on_node': ['model.data_gcp_dbt.offer_extracted_data'],
  'model_tests': {}},
 'model.data_gcp_dbt.bookable_collective_offer': {'redirect_dep': <Task(BashOperator): bookable_collective_offer>,
  'model_alias': 'bookable_collective_offer',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_collective_booking',
   'model.data_gcp_dbt.applicative_database_collective_stock',
   'model.data_gcp_dbt.applicative_database_collective_offer',
   'model.data_gcp_dbt.applicative_database_venue',
   'model.data_gcp_dbt.applicative_database_collective_offer_template'],
  'model_tests': {}},
 'model.data_gcp_dbt.available_stock_information': {'redirect_dep': <Task(BashOperator): available_stock_information>,
  'model_alias': 'available_stock_information',
  'depends_on_node': ['model.data_gcp_dbt.applicative_database_booking',
   'model.data_gc

In [None]:

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 23),
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG(
    'dbt_dag',
    default_args=default_args,
    description='A dbt wrapper for airflow',
    schedule=timedelta(days=1),
)


dbt_tasks = {}
dbt_tests = {}

for node in data["nodes"].keys():
    alias = data["nodes"][node]["alias"]
    if data["nodes"][node]["resource_type"] == "model":
        
        # node_test = node.replace("model", "test")
        dbt_tasks[alias] = make_dbt_run_single_task(alias, "run",dag)
        # try :
        #     dbt_tasks[alias] = make_dbt_run_single_task(alias, "run",dag)
        # except DuplicateTaskIdFound:
        #     print( data["nodes"][node]['alias'])
        #     # print(dbt_tasks[node])
        #     pass
    if data["nodes"][node]["resource_type"] == "test":
            
        # node_test = node.replace("model", "test")
        dbt_tests[alias] = make_dbt_run_single_task(alias, "run",dag)

        # try :
        #     dbt_tasks[node_test] = make_dbt_task(node, "test",dag)
        # except DuplicateTaskIdFound:
        #     # print(node)
        #     # print(dbt_tasks[node])
        #     pass
    if 'elementary' in node.split('.'):
        print(node)

In [6]:
# import numpy as np
# def get_children_tests(data):
#     children_tests = {}
#     for node in data["nodes"].keys():
#         generic_test = np.any([generic_name in node for generic_name in ["not_null","unique","accepted_values","relationships"]])
#         test_alias =  data["nodes"][node]["alias"] if not generic_test else node.split('.')[-2]
#         test_config = data["nodes"][node]["config"].get('severity',None)
#         try:
#             test_config = test_config.lower()
#         except AttributeError:
#             pass
#         if data["nodes"][node]["resource_type"] == "test":
#             parents = data["nodes"][node]["depends_on"]["nodes"]
#             for p_node in parents:
#                 p_alias  =  data["nodes"][p_node]["alias"]
                
#                 if children_tests.get(p_alias,None) is None:
#                 #    children_tests[p_alias] = [(node,test_alias,'generic test' if generic_test else 'custom test',test_config)] 
#                    children_tests[p_alias] = {test_config:[{'test_node':node,"test_alias":test_alias,"test_type":'generic' if generic_test else 'custom'}]} 
#                 elif children_tests[p_alias].get(test_config,None) is None:
#                     children_tests[p_alias][test_config] = [{'test_node':node,"test_alias":test_alias,"test_type":'generic' if generic_test else 'custom'}]
#                 else:
#                     children_tests[p_alias][test_config] += [{'test_node':node,"test_alias":test_alias,"test_type":'generic' if generic_test else 'custom'}] 
#     return children_tests

# children_tests = get_children_tests(data)
# children_tests




In [None]:
def make_dbt_run_test_task(node_tests_list, dag,target='dev',DBT_DIR = PATH_TO_DBT_PROJECT,full_refresh =False,GLOBAL_CLI_FLAGS = "--no-write-json"):
    """Returns an Airflow operator either run and test an individual model"""
    tests_aliases = [node['alias'] for node in node_tests_list]
    with TaskGroup(group_id=f'{node_tests_list[0]['alias']}_critical_tests') as tg1:
    t1 = EmptyOperator(task_id='task_1')
    t2 = EmptyOperator(task_id='task_2')
    dbt_task = BashOperator(
        task_id=node_alias,
        bash_command=f"""
        dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {node_alias} --no-compile
        """ if not full_refresh else f"""
        dbt {GLOBAL_CLI_FLAGS} run --target {target} --select {node_alias} --no-compile --full-refresh
        """,
        cwd=PATH_TO_DBT_PROJECT, 
        dag=dag
    )

    return dbt_task

In [43]:
from airflow.exceptions import DuplicateTaskIdFound


default_args = {
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 23),
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG(
    'dbt_dag',
    default_args=default_args,
    description='A dbt wrapper for airflow',
    schedule=timedelta(days=1),
)


dbt_tasks = {}
dbt_tests = {}

for node in data["nodes"].keys():
    alias = data["nodes"][node]["alias"]
    if data["nodes"][node]["resource_type"] == "model":
        
        # node_test = node.replace("model", "test")
        dbt_tasks[alias] = make_dbt_run_single_task(alias, "run",dag)
        # try :
        #     dbt_tasks[alias] = make_dbt_run_single_task(alias, "run",dag)
        # except DuplicateTaskIdFound:
        #     print( data["nodes"][node]['alias'])
        #     # print(dbt_tasks[node])
        #     pass
    if data["nodes"][node]["resource_type"] == "test":
            
        # node_test = node.replace("model", "test")
        dbt_tests[alias] = make_dbt_run_single_task(alias, "run",dag)

        # try :
        #     dbt_tasks[node_test] = make_dbt_task(node, "test",dag)
        # except DuplicateTaskIdFound:
        #     # print(node)
        #     # print(dbt_tasks[node])
        #     pass
    if 'elementary' in node.split('.'):
        print(node)

AttributeError: 'str' object has no attribute 'task_group'

In [70]:
data["nodes"]['model.data_gcp_dbt.typeform_adage']['depends_on']


{'macros': [], 'nodes': ['source.data_gcp_dbt.clean.gsheet_eac_webinar']}

In [71]:

for node in data["nodes"].keys():
    if "elementary" not in node.split("."):
        if node.split(".")[0] == "model":
        
            # Set dependency to run tests on a model after model runs finishes
            # node_test = node.replace("model", "test")
            # dbt_tasks[node] >> dbt_tasks[node_test]
    
            # Set all model -> model dependencies
            for upstream_node in data["nodes"][node]["depends_on"]["nodes"]:
            
                upstream_node_type = upstream_node.split(".")[0]
                if upstream_node_type == "model":
                    dbt_tasks[upstream_node] >> dbt_tasks[node]

In [49]:
print(dag)


<DAG: dbt_dag>


In [147]:
import datetime
import json

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.dates import datetime, timedelta
from airflow.exceptions import DuplicateTaskIdFound

default_args = {
    "depends_on_past": False,
    "start_date": datetime(2020, 12, 23),
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "dbt_dag2",
    default_args=default_args,
    description="A dbt wrapper for airflow",
    schedule_interval=None,
)


def load_manifest():
    local_filepath = PATH_TO_DBT_PROJECT + "/target/manifest.json"
    with open(local_filepath) as f:
        data = json.load(f)
    return data

def get_children_tests(data):
    children_tests = {node:{"redirect_dep":None, "model_alias":data["nodes"][node]["alias"],"depends_on_node":data["nodes"][node]["depends_on"]["nodes"],"model_tests":{},"resource_type":data["nodes"][node]["resource_type"]} for node in data["nodes"].keys() if data["nodes"][node]["resource_type"] == "model"}
    for node in data["nodes"].keys():
        if data["nodes"][node]["resource_type"] == "test":
            generic_test = True in [generic_name in node for generic_name in ["not_null","unique","accepted_values","relationships"]]
            test_alias =  data["nodes"][node]["alias"] if not generic_test else node.split('.')[-2]
            test_config = data["nodes"][node]["config"].get('severity',None)
            try:
                test_config = test_config.lower()
            except AttributeError:
                pass
            parents = data["nodes"][node]["depends_on"]["nodes"]
            for p_node in parents:
                # p_alias  = data["nodes"][p_node]["alias"]
                
                if children_tests[p_node]['model_tests'].get(test_config,None) is None:
                    children_tests[p_node]['model_tests'][test_config] = [{"test_alias":test_alias,'test_node':node,"test_type":'generic' if generic_test else 'custom'}]
                else:
                     children_tests[p_node]['model_tests'][test_config] += [{"test_alias":test_alias,'test_node':node,"test_type":'generic' if generic_test else 'custom'}]

    return children_tests



DBT_DIR = PATH_TO_DBT_PROJECT
GLOBAL_CLI_FLAGS = "--no-write-json"
full_refresh = False

start = DummyOperator(task_id="start",dag=dag)

dbt_compile_op = BashOperator(
        task_id="run_compile_dbt",
        bash_command="dbt compile --target {{ params.target }}",
        cwd=PATH_TO_DBT_PROJECT,
        dag=dag
    )


    
with TaskGroup(group_id='data_transformation',dag=dag) as data_transfo:

    data = load_manifest()
    children_tests = get_children_tests(data)


    for model_node,model_data in children_tests.items():
        tests_list = model_data["model_tests"].get('error',[])

        if len(tests_list) != 0 :   
            with TaskGroup(group_id=f'{model_data["model_alias"]}_tasks',dag=dag) as model_tasks:
                model_op = BashOperator(
                    task_id = model_data['model_alias'],
                    bash_command=f"""
                    dbt {GLOBAL_CLI_FLAGS} run --target {{ params.target }} --select {model_data['model_alias']} --no-compile
                    """ if not full_refresh else f"""
                    dbt {GLOBAL_CLI_FLAGS} run --target {{ params.target }} --select {model_data['model_alias']} --no-compile --full-refresh
                    """,
                    cwd=PATH_TO_DBT_PROJECT, 
                    dag=dag
                    )

                with TaskGroup(group_id=f'{model_data["model_alias"]}_critical_tests',dag=dag) as tests_task:
                    dbt_test_tasks = [BashOperator(
                    task_id = test['test_alias'],
                    bash_command=f"""
                    dbt {GLOBAL_CLI_FLAGS} run --target {{ params.target }} --select {test['test_alias']} --no-compile
                    """ if not full_refresh else f"""
                    dbt {GLOBAL_CLI_FLAGS} run --target {{ params.target }} --select {test['test_alias']} --no-compile --full-refresh
                    """,
                    cwd=PATH_TO_DBT_PROJECT, 
                    dag=dag
                    ) for test in tests_list]
                model_op >> tests_task
        else:
            model_tasks = BashOperator(
                    task_id = f'{model_data["model_alias"]}_tasks',
                    bash_command=f"""
                    dbt {GLOBAL_CLI_FLAGS} run --target {{ params.target }} --select {model_data['model_alias']} --no-compile
                    """ if not full_refresh else f"""
                    dbt {GLOBAL_CLI_FLAGS} run --target {{ params.target }} --select {model_data['model_alias']} --no-compile --full-refresh
                    """,
                    cwd=PATH_TO_DBT_PROJECT, 
                    dag=dag
                    )
   
        children_tests[model_node]["redirect_dep"] = model_tasks


    for node in children_tests.keys():
        for upstream_node in children_tests[node]["depends_on_node"]:
            if upstream_node is not None:
                if upstream_node.startswith("model."):
                    try:
                        children_tests[upstream_node]['redirect_dep'] >> children_tests[node]['redirect_dep']
                    except:
                        pass
                else:
                    pass
   

end = DummyOperator(task_id='transfo_completed',dag=dag)

start >> dbt_compile_op >> data_transfo >> end

<Task(EmptyOperator): transfo_completed>

In [153]:
[item for i,item in enumerate(list(children_tests.keys()) ) if "invocation" in item]

['model.elementary.dbt_invocations']