Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt Cloud & airflow: best of friends #16

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9cfdf5b
get toolkit #1 to work
sungchun12 Jun 29, 2021
c716094
airflow 2.0 working setup
sungchun12 Jun 29, 2021
b1d1594
starting template
sungchun12 Jun 30, 2021
5731459
blueprint code changes
sungchun12 Jun 30, 2021
78b72ef
add another idea for command steps
sungchun12 Jun 30, 2021
46e63d3
add dbt cloud defaults
sungchun12 Jun 30, 2021
5e2e995
add another blueprint
sungchun12 Jul 1, 2021
7d29655
add dataclass for job configs
sungchun12 Jul 1, 2021
945697e
move comments
sungchun12 Jul 1, 2021
6a2d419
add docstring
sungchun12 Jul 1, 2021
4a6689e
get class format
sungchun12 Jul 1, 2021
c661227
multi inheritance for class util
sungchun12 Jul 1, 2021
086e8a0
starting draft dag
sungchun12 Jul 1, 2021
cdcc074
call on dataclass attr
sungchun12 Jul 1, 2021
5983368
remove vars
sungchun12 Jul 1, 2021
e0b450c
add todo
sungchun12 Jul 1, 2021
12b50cc
add todo
sungchun12 Jul 1, 2021
ec51693
add a cause and more logs
sungchun12 Jul 2, 2021
f4e6270
secure the api key in airflow var
sungchun12 Jul 2, 2021
7f56d8f
add in asserts
sungchun12 Jul 2, 2021
1a082b3
remove extra imports and comments
sungchun12 Jul 2, 2021
5019576
immutable dataclasses and remove old comments
sungchun12 Jul 6, 2021
1fff4fd
add example EL tasks
sungchun12 Jul 6, 2021
8534130
add better docstrings
sungchun12 Jul 6, 2021
4ba4d87
clean up code and docstrings
sungchun12 Jul 6, 2021
ed1cc15
duplicate todo in example
sungchun12 Jul 6, 2021
c6c4fd0
update class parameters
sungchun12 Jul 6, 2021
e88d883
update example for xcoms
sungchun12 Jul 7, 2021
372d5fe
pod xcom pull works
sungchun12 Jul 7, 2021
05c4d28
upgrade to airflow 2.1.0 cleaner UI
sungchun12 Jul 10, 2021
95d40ac
upgrade checkov
sungchun12 Jul 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/terraform_checkov_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
pip install checkov
- name: Security Test-terragrunt_infrastructure_modules, Skip Specific Tests
run: |
checkov -d terragrunt_infrastructure_modules --skip-check CKV_GCP_26,CKV_GCP_32,CKV_GCP_38
checkov -d terragrunt_infrastructure_modules --skip-check CKV_GCP_26,CKV_GCP_32,CKV_GCP_38,CKV2_GCP_3
- name: Security Test-terraform_simple_setup, Skip Specific Tests
run: |
checkov -d terraform_simple_setup --skip-check CKV_GCP_26,CKV_GCP_32,CKV_GCP_38
checkov -d terraform_simple_setup --skip-check CKV_GCP_26,CKV_GCP_32,CKV_GCP_38,CKV2_GCP_3
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ extraVolumes: # this will create the volume from the directory
# leave this terminal open to sustain airflow webserver
# Set of environment variables
export ENV="dev"
export PROJECT_ID="airflow-demo-build"
export PROJECT_ID="dbt-demos-sung"
export DOCKER_DBT_IMG="gcr.io/$PROJECT_ID/dbt_docker:$ENV-latest"

source deploy_local_desktop_airflow.sh
Expand Down Expand Up @@ -330,7 +330,7 @@ kubectl port-forward --namespace airflow $POD_NAME 8080:8080

```bash
# start a remote shell in the airflow worker for ad hoc operations or to run pytests
kubectl exec -it airflow-worker-0 -- /bin/bash
kubectl exec -it airflow-local-desktop-worker-0 -- /bin/bash
```

- Airflow worker remote shell examples
Expand Down
2 changes: 1 addition & 1 deletion custom-setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ airflow:
##
image:
repository: apache/airflow
tag: 1.10.10-python3.6
tag: 2.1.0-python3.8
## values: Always or IfNotPresent
pullPolicy: IfNotPresent
pullSecret: ""
Expand Down
15 changes: 9 additions & 6 deletions dags/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@

# TODO(developer): update for your specific settings
# GIT_REPO = "git@github.com:sungchun12/airflow-toolkit.git" #placeholder ssh git repo
GIT_REPO = "github_sungchun12_airflow-toolkit"
PROJECT_ID = "airflow-demo-build"
GIT_REPO = "https://github.com/sungchun12/airflow-toolkit.git"
PROJECT_ID = "dbt-demos-sung"
DBT_IMAGE = f"gcr.io/{PROJECT_ID}/dbt_docker:dev-latest"

env = os.environ.copy()
DEPLOYMENT_SETUP = env["DEPLOYMENT_SETUP"]
GIT_BRANCH = "master" # TODO: make this an env var: env["GIT_BRANCH"]
GIT_BRANCH = "master" # TOsDO: make this an env var: env["GIT_BRANCH"]


def get_secret(project_name, secret_name):
"""
Returns the value of a secret in Secret Manager for use in DAGs
Returns the value of a secret in Secret Manager for use in DAGs
"""
secrets = secretmanager.SecretManagerServiceClient()
secret_value = (
Expand Down Expand Up @@ -85,11 +86,13 @@ def set_google_app_credentials(deployment_setup, dag_name):
git_clone_cmds = f"""
/entrypoint.sh &&
gcloud auth activate-service-account --key-file=account.json &&
gcloud source repos clone {GIT_REPO} --project={PROJECT_ID}"""
git clone {GIT_REPO} """

dbt_setup_cmds = f"""
{git_clone_cmds} &&
cd {GIT_REPO}/dbt_bigquery_example &&
pwd &&
ls &&
cd airflow-toolkit/dbt_bigquery_example &&
git checkout {GIT_BRANCH} &&
export PROJECT_ID={PROJECT_ID} &&
export DBT_PROFILES_DIR=$(pwd) &&
Expand Down
130 changes: 130 additions & 0 deletions dags/dbt_cloud_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import time

import requests
from dataclasses import dataclass
from airflow.models import Variable

# TODO: MANUALLY create a dbt Cloud job: https://docs.getdbt.com/docs/dbt-cloud/cloud-quickstart#create-a-new-job
# Example dbt Cloud job URL
# https://cloud.getdbt.com/#/accounts/4238/projects/12220/jobs/12389/
@dataclass(frozen=True) # make attributes immutable
class dbt_cloud_job_vars:
"""Basic dbt Cloud job configurations. Set attributes based on the ids within dbt Cloud URL"""

# add type hints
account_id: int
project_id: int
job_id: int
cause: str
dbt_cloud_api_key: str = Variable.get(
"dbt_cloud_api_key"
) # TODO: manually set this in the airflow variables UI


@dataclass(frozen=True)
class dbt_job_run_status:
"""Different dbt Cloud API status responses in integer format"""

QUEUED: int = 1
STARTING: int = 2
RUNNING: int = 3
SUCCESS: int = 10
ERROR: int = 20
CANCELLED: int = 30


class dbt_cloud_job_runner(dbt_cloud_job_vars, dbt_job_run_status):
"""Utility to run dbt Cloud jobs.

Inherits dbt_cloud_job_vars(dataclass) and dbt_job_run_status(dataclass)

Parameters
----------
account_id(int): dbt Cloud account id
project_id(int): dbt Cloud project id
job_id(int): dbt Cloud job id
cause(str): dbt Cloud cause(ex: name of DAG)
dbt_cloud_api_key(str)(OPTIONAL): dbt Cloud api key to authorize programmatically interacting with dbt Cloud
"""

def _trigger_job(self) -> int:
"""Trigger the dbt Cloud job asynchronously.

Verifies dbt_cloud_job_vars match response payload from dbt Cloud api.

Returns
----------
job_run_id(int): specific dbt Cloud job run id invoked
"""
url = f"https://cloud.getdbt.com/api/v2/accounts/{self.account_id}/jobs/{self.job_id}/run/"
headers = {"Authorization": f"Token {self.dbt_cloud_api_key}"}
res = requests.post(
url=url,
headers=headers,
data={
"cause": f"{self.cause}", # name of the python file invoking this
},
)

try:
res.raise_for_status()
except:
print(f"API token (last four): ...{self.dbt_cloud_api_key[-4:]}")
raise

response_payload = res.json()
# Verify the dbt Cloud job matches the arguments passed
assert self.account_id == response_payload["data"]["account_id"]
assert self.project_id == response_payload["data"]["project_id"]
assert self.job_id == response_payload["data"]["job_definition_id"]

job_run_id = response_payload["data"]["id"]
return job_run_id

# to be used in a while loop to check on job status
def _get_job_run_status(self, job_run_id) -> int:
"""Trigger the dbt Cloud job asynchronously.

Verifies dbt_cloud_job_vars match response payload from dbt Cloud api.

Parameters
----------
job_run_id(int): specific job run id invoked

Returns
----------
job_run_status(int): status of the dbt Cloud job run
"""
url = f"https://cloud.getdbt.com/api/v2/accounts/{self.account_id}/runs/{job_run_id}/"
headers = {"Authorization": f"Token {self.dbt_cloud_api_key}"}
res = requests.get(url=url, headers=headers)

res.raise_for_status()
response_payload = res.json()
job_run_status = response_payload["data"]["status"]
return job_run_status

# main function operator to trigger the job and a while loop to wait for success or error
def run_job(self) -> None:
"""Main handler method to run the dbt Cloud job and track the job run status"""
job_run_id = self._trigger_job()
print(f"job_run_id = {job_run_id}")

visit_url = f"https://cloud.getdbt.com/#/accounts/{self.account_id}/projects/{self.project_id}/runs/{job_run_id}/"
print(f"Check the dbt Cloud job status! Visit URL:{visit_url}")

while True:
time.sleep(1) # make an api call every 1 second

job_run_status = self._get_job_run_status(job_run_id)

print(f"job_run_status = {job_run_status}")

if job_run_status == dbt_job_run_status.SUCCESS:
print(f"Success! Visit URL: {visit_url}")
break
elif (
job_run_status == dbt_job_run_status.ERROR
or job_run_status == dbt_job_run_status.CANCELLED
):
raise Exception(f"Failure! Visit URL: {visit_url}")
2 changes: 1 addition & 1 deletion dags/examples/add_gcp_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

# TODO(developer): update for your specific naming conventions
CONN_PARAMS_DICT = {
"gcp_project": "airflow-demo-build",
"gcp_project": "dbt-demos-sung",
"gcp_conn_id": "my_gcp_connection",
"gcr_conn_id": "gcr_docker_connection",
"secret_name": "airflow-conn-secret",
Expand Down
2 changes: 1 addition & 1 deletion dags/examples/bigquery_connection_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# TODO(developer): update for your specific settings
TASK_PARAMS_DICT = {
"dataset_id": "dbt_bq_example",
"project_id": "airflow-demo-build",
"project_id": "dbt-demos-sung",
"gcp_conn_id": "my_gcp_connection",
}

Expand Down
45 changes: 45 additions & 0 deletions dags/examples/dbt_cloud_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from airflow import DAG
from datetime import datetime
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
from dbt_cloud_utils import dbt_cloud_job_runner


dag_file_name = __file__

# TODO: MANUALLY create a dbt Cloud job: https://docs.getdbt.com/docs/dbt-cloud/cloud-quickstart#create-a-new-job
# Example dbt Cloud job URL
# https://cloud.getdbt.com/#/accounts/4238/projects/12220/jobs/12389/
# example dbt Cloud job config
dbt_cloud_job_runner_config = dbt_cloud_job_runner(
account_id=4238, project_id=12220, job_id=12389, cause=dag_file_name
)


default_args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"start_date": datetime(2001, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"priority_weight": 1000,
}


with DAG(
"dbt_cloud_example", default_args=default_args, schedule_interval="@once"
) as dag:
# have a separate extract and load process(think: FivetranOperator and/or custom gcs load to bigquery tasks)
extract = DummyOperator(task_id="extract")
load = DummyOperator(task_id="load")

# Single task to execute dbt Cloud job and track status over time
transform = PythonOperator(
task_id="run-dbt-cloud-job",
python_callable=dbt_cloud_job_runner_config.run_job,
provide_context=True,
)

extract >> load >> transform
36 changes: 32 additions & 4 deletions dags/examples/kubernetes_sample.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from airflow import DAG
from datetime import datetime, timedelta
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator

Expand All @@ -17,7 +18,11 @@
# "retry_delay": timedelta(minutes=5),
}

dag = DAG("kubernetes_sample", default_args=default_args, schedule_interval="@once",)
dag = DAG(
"kubernetes_sample",
default_args=default_args,
schedule_interval="@once",
)


start = DummyOperator(task_id="run_this_first", dag=dag)
Expand All @@ -40,11 +45,12 @@
namespace="default",
image="ubuntu:16.04",
cmds=["/bin/bash", "-cx"],
arguments=["echo hello world"],
arguments=["mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
labels={"foo": "bar"},
name="fail",
name="passing-bash-task",
task_id="passing-bash-task",
get_logs=True,
do_xcom_push=True,
dag=dag,
)

Expand All @@ -62,4 +68,26 @@
dag=dag,
)

bash_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('passing-bash-task')[0] }}\"",
task_id="bash_task_xcom_result",
dag=dag,
)

# xcom pull follows a different syntax compared to the BashOperator
pod_task_xcom_result = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["/bin/bash", "-c"],
arguments=["echo \"{{ task_instance.xcom_pull(task_ids='passing-bash-task', key='return_value')[0] }}\""],
labels={"foo": "bar"},
name="pod_task_xcom_result",
task_id="pod_task_xcom_result",
get_logs=True,
dag=dag,
)


start >> [passing_python, passing_bash, private_gcr_passing]

passing_bash >> [bash_task_xcom_result, pod_task_xcom_result]
7 changes: 5 additions & 2 deletions dags/kube_secrets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from airflow.contrib.kubernetes.secret import Secret
from airflow.kubernetes.secret import Secret


# GCP service account for dbt operations with BigQuery
Expand All @@ -17,5 +17,8 @@

# This is included as a placeholder based on the note in `airflow_utils.py`
GIT_SECRET_ID_RSA_PRIVATE = Secret(
deploy_type="volume", deploy_target="/dbt/.ssh/", secret="ssh-key-secret", key="id_rsa",
deploy_type="volume",
deploy_target="/dbt/.ssh/",
secret="ssh-key-secret",
key="id_rsa",
)
10 changes: 5 additions & 5 deletions deploy_local_desktop_airflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ echo "***********************"
# install airflow helm chart
# https://helm.sh/docs/helm/helm_install/
helm install \
airflow-local-desktop \
airflow-stable/airflow \
--version 7.16.0 \
--namespace "airflow" \
--values ./custom-setup.yaml
airflow-local-desktop \
airflow-stable/airflow \
--version 8.3.1 \
--namespace "airflow" \
--values ./custom-setup.yaml

echo "***********************"
echo "Wait for the Kubernetes Cluster to settle"
Expand Down
2 changes: 1 addition & 1 deletion utils/checkov/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
checkov==1.0.501
checkov==2.0.259