# Airflow Tutorial

## Architecture Components

- `Webserver`
- `Scheduler`
- `Workers`

## Kep Concepts:

| Concept | What is that | 
| ---     | --- |
| `DAG` - Directed Acyclic Graph | <ul><li>a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.</li><li>a description of the order in which work should take place</li></ul> |
| `DAG Definition file` (python script) |  <ul><li> a configuration file specicying the **DAG's structure** as code </li></ul> |
| `Operators` (a *Class* object):  | <ul><li>a class that acts as a template for carrying out some work</li><li>usually *atomic*</li><li>run independently - i.e. different machines</li></ul> |
|  `Tasks` | <ul><li>a parameterized instance of an operator</li><li>a node in a DAG</li></ul> | 
|  `Task Instances` | <ul><li>a specific run of a task - i.e. combination of *a dag, a task, and a point in time*</li><li>Have states: "running", 'success", "failed", "skipped", "up for retry" etc.</li></ul> | 
    
Airflow will execute the code in each file to dynamically build the DAG objects.  
You can have as many DAGs as you want, each describing an arbitrary number of tasks.   
In general, each one should correspond to a single logical workflow.

Notes:
    - Tasks defined in the DAG will run in a different context - i.e. the workers
    - DAG definition file is just a configureation file (i.e. no data processing, no data processing, no data ....)
    - Dag is evaluated quickly by the scheduler.

The precedence rules for a task are as follows:

- Explicitly passed arguments
- Values that exist in the default_args dictionary
- The operator’s default value, if one exists

### Templating

```python
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
```

- The `params` hook in `BaseOperator` allows you to pass a dictionary of parameters and/or objects to your templates. 
- Files can also be passed to the bash_command argument, like bash_command='templated_command.sh' 
    - where the file location is relative to the directory containing the pipeline file (`tutorial.py` in this case). 
        - i.e. separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, 
        - general flexibility in structuring pipelines. 
        - also possible to define your `template_searchpath` as pointing to *any folder locations* in the DAG constructor call.


More stuff: 
- [Jinja documentation](http://jinja.pocoo.org/docs/dev/api/#writing-filters)
- Variables and Macros: [Macros reference](https://airflow.apache.org/macros.html)


### Setting dependencies

```python
t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
```

### Testing

1. **Running the Script**

```python
# put your file in the `dags` folder as specified in the `airflow.cfg`
python ~/path/to/dags-folder/script.py
```
2. **Command Line Metadata Validation**

```python
# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
```
3. **Testing**  
Let’s test by running the actual task instances on a specific date.   
The date specified in this context is an `execution_date`, which simulates the scheduler running your task or dag at a specific date + time:

```python
# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

# testing templated
airflow test tutorial templated 2015-06-01
```
> *Note*: airflow `test` command is only for testing  
> - run task instances locally, 
> - outputs their log to stdout (on screen), 
> - doesn’t bother with dependencies, 
> - doesn’t communicate state (running, success, failed, …) to the database.


4. **Backfill**
    - respect your dependencies
    - emit logs into files 
    - talk to the database to record status.
    - track progress on webserver 
    
    ? `depends_on_past`?
    
    
```python
# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
```

### Next

Check these sections
- UI 
- Command line interface
- Operators
- Macros