This repo contains a readme.md file that explains how to use Airflow at Delivery Hero
To start using Airflow at Delivery Hero, you need to obtain 'Edit' access to:
-
datahub-airflow repo
- A pre-requisite to this step is installing Git, creating an GitHub account, and merging your account with your Delivery Hero account through sailpoint
- To get access to the datahub-airflow repo, you need to open a ticket on the datahub Jira domain similar to this one --> DHUB-1301
- The Global Pricing Team's user group (pricing@deliveryhero.com) already has "Admin" access to the datahub-airflow repo
-
The Airflow instance that belongs to your business unit (e.g., marketing, logistics, etc.)
- To get access to your business unit's Airflow instance, you need to submit a request to designated person or in the respective data engineering channel.
- Logistics --> #log-data-engineering. Use the "Shortcut" function to submit a support request similar to the one shown below.
- Marketing --> #mkt-tech-bi-analytics
- POC: @Ersin Ihsan Ünkar (Data Engineering Manager) or @Sarmad Ahmed (PM)
You need to clone the datahub-airflow repo to be able to commit to it. Follow the instructions below to interact with the repo correctly.
Before cloning the repo on a Mac machine, you need to add a SSH key. Follow the instructions in this video from 5:20 to 8:55. After that follow step 1 in the "Windows" section.
- Clone the repo by entering this command in your terminal
git clone https://github.com/deliveryhero/datahub-airflow.git
- If you are cloning the repo to a location on G-drive, you will have to delete the .ini files for the cloning to work. The best way to do that is through a .bat file that you run in your terminal or by double clicking it
- Create a .bat file anywhere on your G-drive and place the following command in it
del /s /q /f /a "{path_to_folder_containing_ini_files}\desktop.ini"
- If you are cloning the repo to a location on G-drive, you will have to delete the .ini files for the cloning to work. The best way to do that is through a .bat file that you run in your terminal or by double clicking it
cd [REPOSITORY]
- Enter the following three commands in your terminal
git config core.protectNTFS false
--> Allows format overridesgit config core.sparsecheckout true
--> Accept paths with empty trailing or preceding spaces. This is done automatically in Macgit config --system core.longpaths true
--> Accept paths longer than 256 characters- If you face errors with Git commands due to long paths, you might need to do two additional steps to prevent long path errors:
- Step 1:
- Search for Registry editor in the Windows search box
- Go to this path
Computer\HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\FileSystem
- Set the LongPath option to 1
- Step 2:
- Search for gpedit.msc, then press the Enter key. This launches the Local Group Policy Editor.
- Navigate to
Local Computer Policy > Computer Configuration > Administrative Templates > System > Filesystem
- Double click Enable NTFS long paths
- Step 1:
The best practice is to create a folder containing all SQL and Python scripts under your respective business unit. For logisitcs, use the log folder. For marketing, use the mkt folder. To see a good example of a folder structure, you can check out the "Loved Brands" project
The Datahub engineeering team created many useful features that could help you spin up your DAG without the need for creating custom functions and operators. You can check a couple of DAG examples below:
You can also check the SQL files under the "Loved Brands" project to see how to parametrize your project IDs and datasets so that the correct one is picked based on the requirements of the task.
Keep in mind that you will need to install the Airflow Python package to utilize the commonly used Airflow operators such as the BigQueryExecuteQueryOperator
and PythonOperator
. To do that, please run the following command in your terminal after creating and activating a Py virtual environment
pip3 install "apache-airflow[celery]==2.3.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.3.4/constraints-3.7.txt"
The Global Pricing team currently shares the staging environment (Stg-ds-pricing-demand) with the Data Science team. Please don't use any other environment. Parameterizing your code to run on Staging and Production is done via YAML config files.
There is one base YAML file in the datahub-airflow repo called config.yaml
. It is found in this here. This file contains the standard configurations that are used by default. config_production.yaml
overrides these default configuration when the DAG runs on production. That way, you will not need to create two Python scripts for the DAG. You can only create one file and import the configurations from "config.yaml" using the get
method as shown below.
from configuration import config
airflow_gcp_conn_id = config.get("global-pricing").get("bigquery_conn_id")
For the Global Pricing team's use cases, these are the parameters used in config.yaml
. They are configured based on the staging environment (Stg-ds-pricing-demand) and staging project we use to dump the output tables log-data-science-staging
global-pricing:
project_id_read: "fulfillment-dwh-staging" # The GCP project used in queries for reading/pulling data
project_id_destination: "log-data-science-staging" # The GCP project used in queries to store output tables
bigquery_conn_id: "bigquery_data_science_flat" # The Airflow connection used to connect to BQ
dataset_read: "cl" # The dataset used in queries for reading/pulling data
dataset_staging: "staging" # The dataset to store the tables of intermediate steps
dataset_destination: "global_pricing" # The dataset to store the table produced by the last task
The configurations above get overriden by the ones on config_production.yaml
global-pricing:
project_id_read: "fulfillment-dwh-production" # When the DAG runs on production, project_id_read becomes fulfillment-dwh-production
project_id_destination: "dh-logistics-product-ops" # When the DAG runs on production, project_id_destination becomes dh-logistics-product-ops
bigquery_conn_id: "bigquery_logistics_flat" # When the DAG runs on production, bigquery_conn_id becomes bigquery_logistics_flat
dataset_destination: "pricing" # When the DAG runs on production, dataset_destination becomes pricing
If you have a configuration that does not change when switching environments, you do not need to specify it in the config_production.yaml
file. To import these configurations to your DAG, use the get
method after the instantiating your DAG as follows:
from configuration import config
with DAG(
dag_id="loved-brands",
schedule_interval="0 4 3 * *", # At 04:00 on day-of-month 3
template_searchpath=template_search_path, # See the full DAG (linked below) to see how we define this variable
default_args=default_args, # See the full DAG (linked below) to see how we define this dictionary
catchup=False,
tags=["pricing", "loved_brands"],
) as dag:
# Task parameters
airflow_gcp_conn_id = config.get("global-pricing").get("bigquery_conn_id")
project_id_read = config.get("global-pricing").get("project_id_read")
# ... and so on
You can check the Loved Brands DAG to see how the rest of the configurations are imported. The process is identical to what is shown above.
After you place all your SQL and Python files in the right place and create your DAG, you will need to run some commands in the terminal so that the scripts pass the automatic checks that are run after you push your code to GitHub.
- In your terminal, run the following command after activating the virtual environment of your project
pip install sqlfluff, black, flake8, isort
- For SQL files you will need to run this command in your terminal -->
sqlfluff fix /local_path/to/sql_queries
. Replacingfix
withlint
will display the fixable formatting errors without fixing them - For Python files, run the following commands in the specified order:
- black /local_path/to/py_files
- flake8 /local_path/to/py_files
- isort /local_path/to/py_files
You cannot commit to the main branch right away. You need to open a new branch and create a pull request. This YouTube video explains how to create a pull request in under 3 minutes.
As a friendly reminder, the commands that you will need are as follows:
git checkout -b [BRANCH_NAME]
--> This creates and switches the HEAD to a new branchgit add .
git commit -m "YOUR MESSAGE AFTER CHANGES HERE"
git push origin [BRANCH_NAME]
- Open GitHub and click on
Compare & Pull Request
and follow the instructions to create your pull request
If you have been working on your branch for a while and some changes occured on the main branch that create conflicts with your code, you will need to pull the changes and merge the base branch into the head branch
- Step 1: Clone the repository or update your local repository with the latest changes -->
git pull origin main
- Step 2: Switch to the head branch of the pull request -->
git checkout [BRANCH_NAME]
- Step 3: Merge the base branch into the head branch -->
git merge main
- Step 4: Fix the conflicts manually (if any exist) and commit the result -->
git commit -m "..."
- Step 5: Push the changes -->
git push -u origin [BRANCH_NAME]
After the code is pushed to GitHub, some checks will run in the background and a docker image of your repo will be created. If there any formatting or syntax errors, you will need to fix them and re-commit. If all the checks are successful, you are ready to deploy the GitHub branch to the staging environment. If anything fails, check the failures before deploying to staging.
- Depending on your business unit, you will need to open the right Airflow environment
- Click on
"Add-ons" > "Deploy airflow branch"
- Select the right environment --> Currently, the one the Global Pricing team uses is Stg-ds-pricing-demand, although this might change in the future as this is a shared environment with the Data Science team
- Insert your GitHub branch name. You can copy it from the Pull Request
- Before clicking on Submit, go to the #log-chapter-data slack channel, check that no one else is using the environment, and announce that you will deploy a GitHub branch via the "Integration" functionality
- Return to the Airflow instance and click on Submit
- A link will appear directly after you click Submit. After 2-3 minutes, your Airflow environment will be ready to trigger
- Before triggering the DAG, you need to promote yourself as an Environment Admin
- After you're done with testing, share with the other channel members that the environment is free by adding an emoji to your first announcement message. This relevant to logistics environments only)
- If you are using marketing environments, make sure to delete the environment after you're done
- If your DAG fails, check the Airflow logs and correct the error. You will need to re-commit your changes to GitHub, wait until all the checks are passed, and re-deploy the branch to the staging environment
- Logistics --> Submit support request in the #log-data-engineering channel to review and approve your PR. Feel free to replicate this request
- Marketing --> Create a Jira ticket similar to this one
To integrate Slack alerts in your DAG so that you can be notified whenever a DAG fails or succeeds, you will need to do the following:
-
Import the
alerts
module fromdatahub.common
-->from datahub.common import alerts
- This gives you access to two callback functions that you can add to your Airflow tasks via the
on_success_callback
andon_failure_callback
parametersalerts.setup_on_success_callback()
alerts.setup_on_failure_callback()
- An example of how to use the functions in an Airflow operator is shown below
- This gives you access to two callback functions that you can add to your Airflow tasks via the
-
In the default_args of the DAG, define a team who will own the DAG as shown below
- In
datahub-airflow\dags\log\configuration\yaml\config.yaml
, add the name of the team and their Slack IDs
- Create a new channel in Slack to send the alerts to and enter its name in
datahub-airflow\dags\log\configuration\yaml\config_production.yaml
. The channel used by the Global Pricing team is called #log-alerts-global-pricing
- Add the GDF Monitoring bot to the channel
- This channel will be used to send Slack alerts for DAGs running on the production environment. For DAGs running on staging, alerts will be automatically sent to #log-airflow-st, albeit without mentions