# Introduktion til Airflow

Data engineering er at gør data troværdig, gentagende og maintable process.

Workflow er skridt i a en process som at downloade filer osv. 

Airflow bruges til at lave et workflow. Disse workflows kaldes `DAGs (Directed Acyclic Graphs)`.

DAgs er sæt af opgaver i et workflow. 

For ar kære et worfkflow kan vi bruge

```
airflow run <dag_id> <task_id> <start_date>
```

En mulig error vil være `airflow.exceptions.AirflowException`.

Ved at køre `airflow` kan man se en række komando strenge:

```
repl:~$ airflow
[2022-06-28 19:49:23,572] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]
               {backfill,list_dag_runs,list_tasks,clear,pause,unpause,trigger_dag,delete_dag,pool,variables,kerberos,render,run,initdb,list_dags,dag_state,task_failed_deps,task_state,serve_logs,test,webserver,resetdb,upgradedb,scheduler,worker,flower,version,connections,create_user,delete_user,list_users,sync_perm,next_execution,rotate_fernet_key}
               ...
airflow: error: the following arguments are required: subcommand
repl:~$
```

## Airflow DAGs

Directed Acyclic graph:

Directed der er en flow der udviser afhænge mellem komponenter.

Acycluc, betyder at der ikke rer loop og den kører engang.

grapch de række komponenter. 

Python bruges til at definere DAGs.

tasks er de opgaver DAGs skal køre. 

Afhængigheder mellem tasks er essentiel. 

Dag er case centitive.

Hvis man skal bruge hjælp kan man skrive.

```
airflow -h
```

Kommandoer til at vise hvordan man kan burge airflow.

```
airflow list_dags
```

En simpel DAG

```
# Import the DAG object
from airflow.models import DAG

# Define the default_args dictionary
default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 1, 14),
  'retries': 2
}

# Instantiate the DAG object
etl_dag = DAG('example_etl', default_args=default_args)
```

## Airflow web interface

Her kan man se dags og hvordan processen er. 

Det giver os et DAG detail view.

I view får vi et chart for at have et overblik omkring data. 

VI kan også se hvordan python koden er definerede. Den er kun read only. 

Undeer Browse kan vi se Logs. 

Vi kan også burge kommando til at se det samme.

Start en webserver

```
airflow webserver -p 1010
```


# Implementer Airflow DAGs

## Airflow operator

operator er en enkelt task i workflow.
Køre uafhængigt. 


`BashOperator` en bash kommando og kræver tre argument

```
BashOperator(
  task_id = 'bash_example',
  bash_command = 'echo "Example!"',
  dag = ml_dag
)
```

Køres i en temp directory. 

Kan specifier miljø variable.



```
# Import the BashOperator
from airflow.operators.bash_operator import BashOperator

# Define the BashOperator 
cleanup = BashOperator(
    task_id='cleanup_task',
    # Define the bash_command
    bash_command='cleanup.sh',
    # Add the task to the dag
    dag=analytics_dag
)
```

```
# Define a second operator to run the `consolidate_data.sh` script
consolidate = BashOperator(
    task_id='consolidate_task',
    bash_command='consolidate_data.sh',
    dag=analytics_dag)

# Define a final operator to execute the `push_data.sh` script
push_data = BashOperator(
    task_id='pushdata_task',
    bash_command='push_data.sh',
    dag = analytics_dag)

```

## Airflow tasks

tasks er instanc eof operator.
tilskrives en variable i python. 

Afhængigheder referes til som upstream eller downstream tasks. 

- Upstram betyder det skal gøres færdi prior til downstream tasks.

bruger bitshift operator.

Hvornår bruger man den ene
Upstream er before
downstream er after

Der kan være mange afhængigheder

Der kan være mixed afhængigheder. 

Kan definere mixed afhængigeder og upstream og downstream med `<<` og `>>`.

```
# Define a new pull_sales task
pull_sales = BashOperator(
    task_id='pullsales_task',
    bash_command = 'wget https://salestracking/latestinfo?json',
    dag=analytics_dag
)

# Set pull_sales to run prior to cleanup
pull_sales >> cleanup

# Configure consolidate to run after cleanup
consolidate << cleanup

# Set push_data to run last
consolidate >> push_data
```

```
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------


repl:~$ cat workspace/dags/codependent.py

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

default_args = {
  'owner': 'dsmith',
  'start_date': datetime(2020, 2, 12),
  'retries': 1
}

codependency_dag = DAG('codependency', default_args=default_args)

task1 = BashOperator(task_id='first_task',
                     bash_command='echo 1',
                     dag=codependency_dag)

task2 = BashOperator(task_id='second_task',
                     bash_command='echo 2',
                     dag=codependency_dag)

task3 = BashOperator(task_id='third_task',
                     bash_command='echo 3',
                     dag=codependency_dag)

# task1 must run before task2 which must run before task3
task1 >> task2
task2 >> task3
task3 >> task1
repl:~$
```

## Additional operators

PythonOperator køre en fuktion, men skal bruge samme kommdoer som i bashoperator.  Tildase argument til taks, positional og keyword. 

```
def pull_file(URL, savepath):
    r = requests.get(URL)
    with open(savepath, 'wb') as f:
        f.write(r.content)   
    # Use the print method for logging
    print(f"File pulled from {URL} and saved to {savepath}")

from airflow.operators.python_operator import PythonOperator

# Create the task
pull_file_task = PythonOperator(
    task_id='pull_file',
    # Add the callable
    python_callable=pull_file,
    # Define the arguments
    op_kwargs={'URL':'http://dataserver/sales.json', 'savepath':'latestsales.json'},
    dag=process_sales_dag
)

# Add another Python task
parse_file_task = PythonOperator(
    task_id='parse_file',
    # Set the function to call
    python_callable = parse_file,
    # Add the arguments
    op_kwargs={'inputfile':'latestsales.json', 'outputfile':'parsedfile.json'},
    # Add the DAG
    dag=process_sales_dag
)
    
```

EmailOperator, sender en email med indhold. 

```
# Import the Operator
from airflow.operators.email_operator import EmailOperator

# Define the task
email_manager_task = EmailOperator(
    task_id='email_manager',
    to='manager@datacamp.com',
    subject='Latest sales JSON',
    html_content='Attached is the latest sales JSON file as requested.',
    files='parsedfile.json',
    dag=process_sales_dag
)

# Set the order of tasks
pull_file_task >> parse_file_task >> email_manager_task 
```

De fleste ligger i `airflow.operator`


## Airflow scheduling

kan få en kørelse til et tidspunkt

cron eksempler

```
* * 25 2 * # run once per minute on february 25

@hourly         0 * * * *
# etc.
```

None aldrig løb dag.
@once kør dag engang. 

```
# Update the scheduling arguments as defined
default_args = {
  'owner': 'Engineering',
  'start_date': datetime(2019, 11, 1),
  'email': ['airflowresults@datacamp.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 3,
  'retry_delay': timedelta(minutes=20)
}

dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3') 
```

# Maintaining and monitoring Airflow workflows

## Sensors

en type operator der afventer påm en sans. 
poke kører gentagende
reschedule 
poke_interval checke rhovr ote

file sensor er vigtig og ser om eksisent af en fil 

mange andre sensor
ExternalTaskSensor

## Airflow executer

den der køre taks.

der er en som hedder celeryexecutor

default er sequentialexecutor god til debug ikke brug i produktion.
airflow.cgf og find executor. 

```

```

## debuggin and troubleshooting in airflow

# Building production pipelines in Airflow