# Airflow DAGs  

### Introduction

In the last lesson, we were briefly introduced to directed acyclic graphs -- that is, DAGs in airflow.  A DAG is how airflow conceptualizes a workflow - it's a series of tasks.  In this lesson, we'll see specifically what we mean by a DAG, and move through the steps of creating a DAG in airflow.

### From Graphs to DAGs

Before talking about directed acyclic graphs, let's discuss graphs more generally.  Take a look at the map below, that shows various airports, and flight routes between them.

<img src="./airports-graph.png" width="40%">

We see various airports listed: Los Angelos, Phoenix, Chicago, Atlanta.  And the lines between them represent the connections from one to another.  This is an example of a graph.

> A graph is a structure amounting to a set of **nodes** in which some pairs of the nodes are in some sense "related". [Derived from Wikipedia Graphs](https://en.wikipedia.org/wiki/Graph_(discrete_mathematics))

Ok, so above the airports are the nodes with the different routes being our connections.  In graph speak, we'll call each connection between a pair of nodes an **edges**.  


Now when talking about our graph above, we would say that the graph is **cyclic**.  This is because when we visit a node like Los Angelos, and go to connected node of Chicago, there is a path back to the Los Angelos node -- we can simply turn around go back to Los Angelos.

> **Cycle** With graphs, a cycle exists if there is a path to revisit one of the vertices.

Now let's see a different graph.

> <img src="./red-blue-paint.png" width="60%">

Note that unlike our airport graph, this time, once we visit a node, there is no going back.  This is indicated by the arrows: there is a path from getting red paint to mixing the paint, but we cannot go from the paint mixing step back to the red paint.

Because our edges have a specified direction, and because once we visit a node, we cannot revisit that same node -- we call this graph a **directed acyclic graph**.

> A directed acyclic graph is a graph where each of the edges have a direction and none of the nodes can be repeated.

This is another example of a directed acyclic graph:

> <img src="./dag-etl.png" width="60%">

So once again, each of the edges has a direction, and there exists no cycle between our nodes.  Now that we understand what DAG is conceptually it perhaps makes sense, that this is the structure airflow chooses to describe a workflow.  

With a workflow, our steps have an order to them, and we do not want our graph to cycle through nodes endlessly.  

Ok, now let's create a DAG in airflow.

### Creating a DAG

To create a DAG in airflow, we to create an instance of a DAG and then one or more tasks that we associate with the DAG.  Let's get to it.

First we'll create our dag.

```python
from airflow import DAG
from datetime import datetime, timedelta

greeting_dag = DAG(dag_id = 'hello_world', start_date=datetime.now() - timedelta(days = 1))
```

To create a dag, we must specify a `dag_id`.  The `dag_id` cannot have any spaces in it.  We also specify a `start_date`, when the dag should first be run.  

> We want our `start_date` to be in the past, because for a dag to run the start date must be in a time in the past.  We'll talk more about this later.

Ok, now the DAG is an entire workflow.  But our DAG does not do anything unless it has some tasks.  Below we'll create a task to execute some Python, and then we'll associate it with our DAG.

```python
from airflow.operators.python_operator import PythonOperator

def hello():
    return 'Hello world!'

hello_task = PythonOperator(task_id='hello_task', 
                            python_callable=hello,
                            dag=greeting_dag)
```

So focusing on the last statement where we call our `PythonOperator` notice that just like our DAG, the first argument step is to provide an id, this time a `task_id`.  Then with `python_callable`, we specify what this task does, which is execute the `hello` function.  And finally we associate this task with our `dag` from earlier.   

> So we can say that a task belongs to a dag, and a dag has many tasks. 

* Operators

One thing that may be confusing about the above code is that we seem to be working with operators to create a task.

```python
from airflow.operators.python_operator import PythonOperator
hello_task = PythonOperator(task_id='hello_task', 
                            python_callable=hello,
                            dag=greeting_dag)
```

What's an operator?

> Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. [Airflow Documentation](https://airflow.apache.org/docs/apache-airflow/stable/python-api-ref.html)

So the Operator is the class, and an instance of an operator is a task.  These tasks are the nodes in our DAG.  And note that Airflow comes out of the box with different kinds of operators that allows for the creation of different kinds of tasks.

For example, here are just some of the operators available to us.

> <img src="./operators.png" width="60%">

### Upstream and Downstream Tasks

Let's finish up by discussing one last topic in airflow, and that's how to specify an order to our tasks.  So imagine that in addition to our hello task, we also have our `goodbye` task.

```python
def goodbye():
    return 'goodbye everyone'

goodbye_task = PythonOperator(task_id='goodbye_task', 
                            python_callable=goodbye,
                            dag=greeting_dag)

```

Well we specify that hello should come before goodbye with the following syntax.

```python
hello_task >> goodbye_task
```

> The double carrot indicates that the hello task must come before the goodbye task.

And finally, let's see bindmount our code into the environment and see our dag in action.

### Interacting with the DAG

Now that we have written our dag, let's boot it up and see it in our airflow web app.

```bash
docker run -p 8080:8080 -v "$(pwd)"/dags:/usr/local/airflow/dags puckel/docker-airflow webserver
```

There's our `greeting_dag`.

> <img src="./greet_dag.png" width="60%">

> Note that the name we see matches the `dag_id` we specified when creating the dag: `DAG(dag_id = 'greeting_dag')`.

And then when we click on the `greeting_dag` link, we can see our related tasks for that dag.

<img src="./greeting-tasks.png" width="40%">

And then let's go back to the original page, and turn on our dag.  From there, if we click on the dag, and then click on graph view, we should eventually see something like the following:

<img src="./tasks-run.png" width="30%">

And from there, we can click on the `hello_task` and see the related logs.

<img src="./hello_logs.png" width="80%">

And click on the `goodbye_task` for the related logs.

<img src="./goodbye_logs.png" width="80%">

So we can see that we were able to have both tasks execute in order.

### Summary

In this lesson we learned about directed acyclic graphs and how to create them in airflow.  As we saw, a graph is a data structure with nodes and edges that represent the connections between those nodes.  A directed acyclic graph has each edge having a direction with no nodes being revisited.  It's ideal for our workflows in airflow. 

> <img src="./dag-etl.png" width="60%">

To create our dags in airflow, we first initialize a DAG, specifying an id, and a past start date.

```python
from airflow import DAG
from datetime import datetime, timedelta

greeting_dag = DAG(dag_id = 'hello_world', start_date=datetime.now() - timedelta(days = 1))
```

And we then associate tasks with the DAG by instantiating an operator, and linking the dag with the `dag` argument.

```python
def goodbye():
    return 'goodbye everyone'

goodbye_task = PythonOperator(task_id='goodbye_task', 
                            python_callable=goodbye,
                            dag=greeting_dag)
```

Finally, if there is more than one task, we can specify the order with the double carrot.

```python
hello_task >> goodbye_task
```