# Dags Lab

### Introduction

In this lesson, we'll practice creating our own DAGs in Airflow.  We'll set up a workflow that can make a request to our texas drink receipts API available [here](https://data.texas.gov/dataset/Mixed-Beverage-Gross-Receipts/naix-2893), and then select information from that requested data.  We'll do so by first setting up tasks that fake the request to the drink receipts API, and then when this is working, we'll make the request.

## Setting up docker

Begin by creating a directory called `texas_drinks_workflow`.  Then curl down the docker-compose file to boot up airflow.

```bash
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.0.1/docker-compose.yaml
```

Now, before booting up airflow, it's a good idea to alter our docker-compose file by going to our docker compose file, and doing the following:

> Look for the line that says `AIRFLOW__CORE__LOAD_EXAMPLES`. Change the value 'true' to 'false'. 

From here we should create a folder called `dags` where we can place our dags.

### Creating a Dag

Let's begin by creating a DAG with the id of `get_tax_receipts`.  Then, let's check our work.  

Use docker to boot up an airflow webserver connected to the dags folder with our new dag.  we can check this in two ways:

* First, we can make sure that the dag is loaded into the correct folder by sh-ing into the docker container, and then looking in the `/usr/local/airflow/dags` folder.

> You need to do this from the folder of texas_drinks_workflow.  Then you can connect to the container by first listing the docker-compose services.

<img src="./list-services.png" width="90%">

And then sh-ing into the relevant contianer -- here the webserver.

<img src="./connect-api.png" width="70%">

From there, we should be able to view our dag in the dags folder.

<img src="./dags-folder.png" width="90%">

* Second, let's go our airflow website and hopefully see our dag appear.

<img src="./get-tax-receipts.png" width="70%"> 

The next step is to add a `start_date` to the dag.  Set the start date as five days previous to the current time.  

> You can refresh the dagbag with the following:
* ```python -c "from airflow.models import DagBag; d = DagBag();"```

The next time we boot up our webserver, we can check that airflow saw the changes by hovering over the three dots under the word `links`, and then clicking on code.

> <img src="./click-code.png" width="100%">

Then turn on the dag run with the off-on switch over to the left, and shortly thereafter, we should see the dag run.

<img src="./five-runs.png" width="90%">

Now if you click on the green DAG runs button, eventually, you can see five successes.  The reason is because there is a default time interval that the dag should be run every day, and we placed the start date as 5 days ago.  So airflow will run the dag five times to try to backfill.  

<img src="./dag-run-history.png" width="60%">

Let's setup the dag so that it only runs once -- we can do that by setting the `start_date` as one day in the past.  Make that change now.

### Adding Tasks

Ok, now so far we have created a DAG, but we have not added any tasks to that dag.  Let's do that now.  

* First add a task called `retrieve_receipts`.  For now, the task will not actually retreive the receipts from the api.  Instead, we should just see the string `getting receipts` in the logs when we run the dag -- you can do this with a print statement. 

After making the change, first delete the existing dag from via the airflow webserver -- by clicking on the red trash can.

<img src="./delete-dag.png" width="70%">

Then refresh the dag bag.  

`python -c "from airflow.models import DagBag; d = DagBag();"`

Then, if we click on the `get_tax_receipts` dag, we should see something like the task listed there.

> <img src="./with_task.png" width="50%">

Then if we flip the DAG on, we should see both the dag, and the related task running.

> Take a look under the Recent Tasks tab to see whether it run.

> <img src="./run-dag.png" width="80%"> 

From there, let's check the green circle under recent tasks, then click on the task id, and look at the logs.  

> <img src="./log-task.png" width="70%">

We should see something like the following:

> <img src="./get_receipts_log.png" width="80%">

> Notice that in the second to last line it says the returned value was `getting receipts`.

### Adding another task

Now let's add another task called `find_large_receipts` that returns `found large receipts` when run.  And have the `retreive_receipts` and then the `finding_receipts` step be called in sequential order.

After shutting down the docker container and restarting it, we should now see the following under the Dag's tree view.

> <img src="./dag-sequential.png" width="40%">

Notice that with this updated diagram, that `get_receipts` is run followed by `find_receipts`.

Next, let's switch on the dag, and check that both of these tasks are run.

> The recent tasks panel should show two, with a green circle.

> <img src="./two-run.png" width="100%">

If we click on the recent tasks circle, we should be able to take a look at the logs of each tasks to see that the correct values were properly returned.

> So the `find_receipts` task should produce logs like the following:

> <img src="./found_large_receipts.png" width="100%">

In the third to last line, we can see that it says `found large receipts` were returned.

### Working with Schedule Intervals

Now let's change how often our dag is run.  We can do this with the schedule interval.  Update the dag so that it sets a schedule interval to daily, by adding the following keyword argument.

```python
schedule_interval='@hourly'

```

We can see the various options for setting how often the dag will be run with the [presets documentation](https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#cron-presets)

### Make it real(er)

Ok, now let's have our `get_receipts` task connect to our Texas Drinks API.  The url to reach is the following:

`https://data.texas.gov/resource/naix-2893.json?taxpayer_zip=77036`

And next time when we run the dag, have that task return the first dictionary from the api.  If it's done properly, the log should look something like the following:

<img src="./returned_api_value.png" width="100%">

> So we can see the `Returned value was:` had the dictionary with the `taxpayer_number` and `name`.

### Bonus

If you would like to store the data, perhaps write the data to a csv file in the first task, and then read the data from that csv returning receipts greater than zero in the second task.  Look at using pandas to do so:

* For writing to csv we can do something like the following: 
```python
import pandas as pd
df = pd.DataFrame(response.json())
df.to_csv('records.csv')
```

And for reading from csv and getting back a list of dictionaries to search through we can do something like the following: 

```python    
df = pd.read_csv('./records.csv')
records = df.to_dict('records.csv')
```


### Summary

In this lab, we practiced working with directed acyclic graphs.  We saw that we could initialize a DAG, and then add tasks to the DAG.  Then we moved into specifying a schedule interval with the dag, by adding the `schedule_interval` keyword argument and the preset of `'@hourly'`.