# Abstract of airflow chapter 04

### Templating Tasks Using the Airflow Context

In this chapter we have in-depth coverage of what operators represent, what they are, how they function, and when and how they are executed. Besides these concepts, we demonstrate how operators can be used to communicate with remote systems via hooks, which allows you to perform tasks such as loading data into a database, running a command in a remote environment, and performing workloads somewhere else than in Airflow.

### example used

For the purposes of this example, we will apply the axiom that an increase in a company’s pageviews shows a positive sentiment, and the company’s stock is likely to increase as well. On the other hand, a decrease in pageviews tells us a loss in interest, and the stock price is likely to decrease. We will use five companies amazon, apple, facebook, microsoft and google

<img src="./pic/CH04_pageviews.png" width="800">

First step is to download the .zip file for every interval. The url is constructed of various date & time components:

```html
https://dumps.wikimedia.org/other/pageviews/{year}/{year}-{month}/pageviews-{year}{month}{day}- {hour}0000.gz
```

There are many ways to download the pageviews; however, let’s focus on the BashOperator and PythonOperator. The method to insert variables at runtime in those operators can be generalized to all other operator types.

```python
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG( 
            dag_id="chapter4_stocksense_bashoperator", 
            start_date=airflow.utils.dates.days_ago(3), 
            schedule_interval="@hourly",
)
get_data = BashOperator( 
                        task_id="get_data", 
                        bash_command=(
"curl -o /tmp/wikipageviews.gz " "https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/" #A
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/" "pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz" #B
                                     ),
dag=dag, )
```

- #A Double curly braces denote a variable inserted at runtime 
- #B Any Python variable or expression can be provided

The Wikipedia pageviews URL requires zero-padded months, days and hours (e.g. “07” for hour 7). Within the Jinja templated string we therefore apply string formatting for padding:

``` python 
{{ '{:02}'.format(execution_date.hour) }} 
```

The lib https://pendulum.eustace.io is easier than python datetime

It is important to know not all operator arguments are templatable! Every operator can keep a whitelist of attributes that are templatable. You can check the templatable attributes in the documentation: https://airflow.apache.org/docs, go to the operator of your choice and view the “template_fields” item. The following code shows all templates for python operator:

``` python
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
            dag_id="chapter4_print_context", 
            start_date=airflow.utils.dates.days_ago(3), 
            schedule_interval="@daily",
         )

def _print_context(**kwargs):
    print(kwargs)

    print_context = PythonOperator( 
                                    task_id="print_context", 
                                    python_callable=_print_context, 
                                    dag=dag,
                                  )
```

<img src="./pic/CH04 task context var p1.png" width="800">

<img src="./pic/CH04 task context var p2.png" width="800">

<img src="./pic/CH04 task context var p3.png" width="800">

Python code to download the wiki count page

``` python
from urllib import request
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
            dag_id="stocksense", 
            start_date=airflow.utils.dates.days_ago(1), 
            schedule_interval="@hourly",
         )

def _get_data(execution_date): #A
    year, month, day, hour, *_ = execution_date.timetuple() url = (
    "https://dumps.wikimedia.org/other/pageviews/" f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
    
output_path = "/tmp/wikipageviews.gz" request.urlretrieve(url, output_path)
get_data = PythonOperator(task_id="get_data", python_callable=_get_data, dag=dag) #A
 
```

The PythonOperator is an exception to the templating shown in Section 4.2.1. With the BashOperator (and all other operators in Airflow), you provide a string to the bash_command argument (or whatever the argument is named in other operators), which is automatically templated at runtime. The PythonOperator is an exception to this standard, because it doesn’t take arguments which can be templated with the runtime context, but instead a python_callable argument in which the runtime context can be applied.

<img src="./pic/CH04 pythonvarcontext.png" width="800">

In Airflow 2, the PythonOperator determines which context variables must be passed along to your callable by inferring these from the callable argument names. It is therefore not required to set provide_context=True anymore:

```python
PythonOperator( task_id="pass_context", python_callable=_pass_context, dag=dag,) 
```

The context variable is a dict of all context variables, which allows us to give our task different behaviour for the interval it runs in. For example, to print the start and end datetime of the current interval

```python
def _print_context(**context):
    
    start = context["execution_date"] #A 
    end = context["next_execution_date"] 
    
    print(f"Start: {start}, end: {end}")
    print_context = PythonOperator(task_id="print_context", 
                                   python_callable=_print_context, 
                                   dag=dag)
```

- #A extract the execution_date from the context

<img src="./pic/CH04 dissectionPythonoperator.png" width="800"> 

<img src="./pic/CH04**context.png" width="800"> 

The end result with this given example is that a keyword with name execution_date is passed along to the execution_date argument and all other variables are passed along to **context since they are not explicitly expected in the function signature

<img src="./pic/CH04 kwargs.png" width="800"> 

Now, we can directly use the execution_date variable instead of having to extract it from **context with context["execution_date"]. In addition, your code will be more self-explanatory and tools such as linters and type hinting will benefit by the explicit argument definition

How to use external variables inside the python functions? We have two options: op_kwargs and op_args:

```python
get_data = PythonOperator(
                            task_id="get_data",
                            python_callable=_get_data, 
                            op_args=["/tmp/wikipageviews.gz"], #A 
                            dag=dag,
                         )
```

```python
get_data = PythonOperator(
                            task_id="get_data",
                            op_kwargs={"output_path": "/tmp/wikipageviews.gz"}, #A dag=dag,
)
```

Note these values can contain strings and thus can be templated! That means we could avoid extracting the datetime components inside the callable function itself and instead pass templated strings to our callable function

```python
def _get_data(year, month, day, hour, output_path, **_):
    
    url = (
"https://dumps.wikimedia.org/other/pageviews/" f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
          )
    
    request.urlretrieve(url, output_path)
    get_data = PythonOperator(task_id="get_data", 
                              python_callable=_get_data, 
                              op_kwargs={ "year": "{{ execution_date.year }}", #A 
                                         "month": "{{ execution_date.month }}", 
                                         "day": "{{ execution_date.day }}", 
                                         "hour": "{{ execution_date.hour }}", 
                                         "output_path": "/tmp/wikipageviews.gz",},
                              dag=dag, )
```

- #A User-defined keyword arguments are templated before passing to the callable

The airflow GUI allows us to see the templates used:

<img src="./pic/CH04 airflowGUItemplate.png" width="800"> 

or using the command:

```bash
airflow tasks render stocksense get_data 2019-07-19T00:00:00
```

To save data inside some DB we need to download its provider

```bash
pip install apache-airflow-providers-postgres
```

we cant write directly in the database with postgresql operator, then we write the sql to insert, after that we create

```python
def _fetch_pageviews(pagenames, execution_date, **_): result = dict.fromkeys(pagenames, 0) #A

    with open("/tmp/wikipageviews", "r") as f:
        for line in f:
            domain_code, page_title, view_counts, _ = line.split(" ")
                if domain_code == "en" and page_title in pagenames: result[page_title] = view_counts #B
    with open("/tmp/postgres_query.sql", "w") as f:
        for pagename, pageviewcount in result.items(): #C
            f.write(
                "INSERT INTO pageview_counts VALUES (" f"'{pagename}', {pageviewcount}, '{execution_date}'" ");\n"
            )
            
fetch_pageviews = PythonOperator(
    task_id="fetch_pageviews",
    python_callable=_fetch_pageviews,
    op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}}, 
    dag=dag,
)
```

- #A Initialize result for all pageviews with 0 
- #B Scan over pageviews
- #C For each result, write SQL query

Calling the postgresql operator:

``` python
from airflow.providers.postgres.operators.postgres import PostgresOperator

dag = DAG(..., template_searchpath="/tmp")

write_to_postgres = PostgresOperator( 
                                     task_id="write_to_postgres", 
                                     postgres_conn_id="my_postgres", #A 
                                     sql="postgres_query.sql", #B dag=dag,
                                    )
```

- #A Identifier to credentials to use for connection 
- #B SQL query or path to file containing SQL queries

The PostgresOperator requires filling in only two arguments to run a query against a Postgres database. Intricate operations such as setting up a connection to the database and closing it after completion are handled under the hood. The postgres_conn_id argument points to an identifier holding the credentials to the Postgres database. Airflow can manage such credentials (stored encrypted in the metastore), and operators can fetch one of the credentials when required. Without going into details yet, we can add the “my_postgres” connection in Airflow with the help of the CLI:

```bash
airflow connections add \ 
--conn-type postgres \
--conn-host localhost \ 
--conn-login postgres \ 
--conn-password mysecretpassword \ 
my_postgres #A
```

- #A The connection identifier

<img src="./pic/ch04connexionGUI.png" width="800"> 

The PostgresOperator will instantiate a so-called Hook to communicate with Postgres. The hook deals with creating a connection, sending queries to Postgres and closing the connection afterwards. The operator is merely passing through the request from the user to the hook in this situation.

<img src="./pic/CH04 postgresqloperator.png" width="800"> 