# ETL Pipelines With Airflow


__[1. Introduction](#first-bullet)__

__[2. Calling An API In Python](#second-bullet)__

__[3. Setting Up A PostgreSQL Database](#third-bullet)__

__[4. Introduction To Airflow](#fourth-bullet)__

__[5. An Example ETL Pipeline With Airflow](#fifth-bullet)__

__[6. Debugging Airflow Code](#six-bullet)__

__[7. Conclusions](#sevn-bullet)__


You can see the source code for this project <a href="https://github.com/mdh266/AirflowDataPipeline">here</a>.

 
## Introduction <a class="anchor" id="first-bullet"></a>

In this blog post I want to go over the aspects of data engineering, sepecifically Extract, Transform, Load (ETL) operations and show how they can be automated using <a href="https://airflow.incubator.apache.org/">Apache Airflow</a>. *Extracting* data can be done in a multitude of ways, but one of the most common is to query a <a href="https://en.wikipedia.org/wiki/Web_API">WEB API</a>.  If the query is sucessful, then we will receive data back from the API and often times it is in the form of <a href="https://en.wikipedia.org/wiki/JSON">JSON</a>.  This means that the data must be *transformed* before being stored or *loaded* into a database. Airflow is a platform to programmatically author, schedule and monitor workflow and in this post I will show how to use it to get the daily weather in New York from the <a href="https://openweathermap.org/api">OpenWeatherMap</a> API, convert the temperature to Celsius and store the data in a simple <a href="https://www.postgresql.org/">PostgreSQL</a> database.


Let's first get started with how to query an API.


## Calling An API In Python <a class="anchor" id="second-bullet"></a>

To use a Web API to get data, you make a request to a remote web server, and retrieve the data you need. In Python, this is done using the <a href="http://docs.python-requests.org/en/master/">Requests</a> module. Below I wrote a simple module that uses GET request to obtain the weather for Brooklyn, NY. 

![api](./images/api_call.png)


Notice that I keep my API key in a seperate file called <code>config.py</code>.  In order to use this code yourself you would have to obtain your own API key.  After the request has been made, I check to see if it was successful with, 

    result.code == 200
   
otherwise, I print an error.  Proper exception handling here is definitely something I will add in the future.  If the request is succesfull, then weather data is returned and is then dumped into a JSON file with a name that is the current date using the <a href="json">JSON package</a>.

The above code is stored in a file title <code>getWeather.py</code> and be run from the command line by typing from the appropriate directory:

    python getWeather.py
    
Note that this is the exact Bash command that I'll use to have Airflow collect daily weather data.  A great into into using API's with Python can be found <a href="https://www.dataquest.io/blog/python-api-tutorial/">here</a>.  Next let's go over how to set up a PostgreSQL database.

## Setting Up PostgreSQL Database <a class="anchor" id="third-bullet"></a>

I went over the basics of how to use PostgresSQL in a previous blog <a href="http://michael-harmon.com/blog/SQLWars.html">post</a>, so I'll just present the code I used to do it here below.  The code below creates a table called <code>weather_table</code> in a local PostgreSQL database named <code>WeatherDB</code>.

![api](./images/make_db.png)

I only take a subset of the data that is returned from <a href="https://openweathermap.org/">OpenWeatherMap</a>.  Specifically I transform and load,

- the city name
- the country name
- the latitude and longitude of the city
- the date the API call was made
- the humidity
- the pressure
- the minimum temperature of the day
- the maximum temperature of the day
- the current temperature
- a description of the weather


This script is stored in a file name <code>makeTable.py</code> and can be run using the command,

    python makeTable.py

From the appropriate directory and before we set up our Airflow job.  Lastly, note that I don't have any password necessary to access the database, this was just for convience. 

Now we can dive into Airflow!

## Introduction Airflow 


As mentioned in the introduction Airflow is a platform to programmatically author, schedule and monitor workflow and well be using it to set up a data pipeline. Data pipelines in Airflow are made up of DAGs (Directed Ayclic Graphs) that are scheduled to be completed at a specific times. Each node in the DAG will be a task that needs to be compeleted.  Taks that are dependent on the completion of other tasks are run sequentially while, tasks that are not dependent on one another can be run in parallel.  


The main components of Airflow are 

- **Metadata DB  (database)** : Keeps track of tasks, how long each run took, etc.

- **Webserver (Flask based UI)** : The webserver talks to metadata db to get information to present.

- **Scheduler** : This scrolls the file system and puts things on the queue.

- **Workers** : These do the actual tasks, these can can be separate from scheduler or the same. If they are separate then you can use celey: http://www.celeryproject.org/


Airflow will dump all information about your DAGs into logs. The logs are going to be dumped to a file or database as well.  Just for simplicity I made a local directory in,

    ~/airflow/logs
    
Notice the choice of directory to dump the logs this is decided by what <code>base_log_folder</code> is set to in the <code>airflow.cfg</code> file.  You can change it to store the logs remotely by setting the <code>remote_base_log_folder</code> variable in the <code>airflow.cfg</code> file.



### Installing Airflow

To install airflow first set your airflow home directy by typing into your terminal,

    export AIRFLOW_HOME=<path_to_airflow_home>
    
I chose to set <code>AIRFLOW_HOME=~/airflow</code> which is the default setting.  Now we can install airflow with PostgreSQL using pip:
    
    pip install airflow[postgres]

### Metadata DB

We can then initialize the metadata database using,

    airflow initdb 
    
Out of the box, Airflow uses a sqlite database, which you should outgrow fairly quickly since no parallelization is possible using this database backend.  The default will be sqlite database in <code>AIRFLOW_HOME/airflow.db</code>. You can change the database choice using the <code>sql_alchemy_conn</code> variable in the <code>airflow.cfg</code> file.

Airflow also by default works in conjunction with the SequentialExecutor which will only run task instances sequentially.  This is set in the executor variable in the <code>airflow.cfg</code> file.

### Webserver
We can start the webserver locally using the command,

    airflow webserver -p 8080

Then plug in http://0.0.0.0:8080/ into browser and you will get the Airflow UI. The webserver will be extemely helpful to understand what DAGS are running, how long they ran, etc. as well as setting up connections to databases.

### Scheduler

The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.
The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To kick it off, all you need to do is execute airflow scheduler. It will use the configuration specified in <code>airflow.cfg</code>.

    airflow scheduler
    
Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

### Workers
In this example I won't be using any seperate workers since I'm running this on my personal computer.


## An Example ETL Pipeline With Airflow <a class="anchor" id="fourth-bullet"></a>

Let's go over an example of an Airflow DAG to call OpenWeatherMap daily and store the weather in the Postgres database we created.  The first thing we need to is create a connection to the database (<code>postgres_conn_id</code>).  We do this by opening the Airflow Webserver and clicking on <code>Admin<code> tab and choosing <code>connections</code> as show below:

![api](./images/admin.png)


And then click on the <code>create</code> link and enter information to create our Postgres connection:
![api](./images/airflow_ui_db.png)


We'll hit save and then use <code>weather_id</code> as our <code>postgres_conn_id</code> connnection id. 

Now, lets jump into careating our DAG; first hing we import all the necessary libraries:

![libs](./images/libraries.png)

You can see that we import both the <a href="https://airflow.incubator.apache.org/code.html">BashOperator</a> and <a href="https://airflow.incubator.apache.org/code.html">PythonOperator</a>.  While DAGs describe how to run a workflow, Operators determine what actually gets done.  Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators. The DAG will make sure that operators run in the correct certain order; other than those dependencies, operators generally run independently.  Once an operator is instantiated, it is referred to as a “task”.

Let's just into the Python function that will transform and load our JSON data object.  This can seen below as the funct<code>load_data</code>:

![load](./images/load_data.png)

You can see that the first thing we do is to instantiate a <a href="">PostgresHook</a> object and pass out postgree conection id, <code>weather_id</code> through the constructor.  We then get the current date so what we can load the JSON data from the API request of this day.  Once we load the data, we can see that is basically a dictionary of strings.  We then transform the properties of this dictionary and check to make sure that the numerical values are non NaNs using <a href="http://www.numpy.org/">NumPy<a>'s <a href="https://docs.scipy.org/doc/numpy/reference/generated/numpy.isnan.html">isnan</a> function.  If there are any NaNs in the numerical data we flag this data as invalid. 

We then cast this data into a tuple which we then pass as a paramter along with the SQL insertion command, <code>insert_cmd</code>, into the PostgresHook object's run method and this inserts the data into the database.  More checks the validity and quality of our data would be better, but for the purposes of this blog post what we have done is sufficient. Also, note that as we have done things now, we enter one row into our database at a time, ideally we would load mutiple rows at once to be more efficent.

Now let's dive into a DAG defintion below,

![DAGS](./images/DAGS.png)

We first define our default parameters, such as the owner of the DAG and how many times and how often to retry running the DAG if it fails as a dictionary <code>default_paramter</code>.  Next we instantiate our dag in the command,

    dag = DAG(dag_id="weatherDag",
              default_args=default_args,
              start_date=datetime(2017,8,24),
              schedule_interval=timedelta(minutes=1440))

The <code>dag_id</code> needs to be unique and we will pass it off to all the tasks which needs to be completed. The DAG wil have a <code>start_date</code> which should be future <a href="https://docs.python.org/2/library/datetime.html">datetime</a> object as well as <code>schedule_interval</code> which is a <a href="https://docs.python.org/2/library/datetime.html">timedelta</a> object that dictates how often to run the DAG.  I set my DAG to run every 1440 minutes or everyday.


Next we intantiate a BashOperator operator becomes a task that executes the API call as discussed before:

    task1 = BashOperator(task_id='get_weather',
                        bash_command='python ~/airflow/dags/src/getWeather.py',
                        dag=dag)


Notice that we pass the DAG object in through the Operator constructor.  We note that the <code>task_id</code> has to be unique within the DAG pipeline.  We next instatiate a PythonOperator which will be <code>task2</code> that loads the pulled JSON data into the database:

    task2 =  PythonOperator(task_id='transform_load',
                            provide_context=True,
                            python_callable=load_data,
                            dag=dag)

Notice that we pass the function, <code>load_data</code>, to the <code>python_callable</code>.

Finally we set the up pipline by saying one task depends on another being completed.  You can do this by saying <code>task1</code> needs to be completed before <code>task2<code> by saying at the bottom of your <code>.py<code> script or DAG definition:

    task2.set_upstream(task1)

or,

    task1 >> task2

Tasks which dont have dependencies between each other can be run concurrently.

## Debugging Airflow Codes <a class="anchor" id="sixth-bullet"></a>

You can debug a DAG by first seeing if there is Python syntax error by "compiling it"

    python dag_def_.py

You can then test an individual task by using,

    airflow test <dag_id> <task_id> <todays date

Can also do test the whole dag by doing a bag fill,

	Airflow backfill <dag_id> -s <todays_date> -e <todays_date>


If you need to delte a dag, first delete the DAG data from the metadata_db:

	Use the UI -> Browse -> Dag Runs -> Then delete them all.
	
Then you can delete DAGs by clearing the task instance states:

    airflow clear <dag_id>



## Conclusions