# Apache Airflow

## Instalación y uso de la librería

Vamos a instalar nuestro Apache Airflow en una máquina virtual usando multipass.
Seguimos los pasos correspondientes sobre nuestro máquina, lanzando primero una nueva instancia:

```bash
multipass launch --name airapache --cpus 4 --mem 8G
multipass shell airapache
sudo apt update
sudo apt install python3-pip

# Airflow needs a home. `~/airflow` is the default, but you can put it
# somewhere else if you prefer (optional)
export AIRFLOW_HOME=~/airflow

# Install Airflow using the constraints file
AIRFLOW_VERSION=2.3.2

PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# For example: 3.7
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.2/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"


# Reboot the VM
exit
multipass restart airapache
multipass shell airapache
```


#### Acceso via SSH 


Averiguar la IP de nuestras máquinas con multipass
```bash
% multipass list 
Name                    State             IPv4             Image
airapache                  Running           192.168.64.19     Ubuntu 20.04 LTS
                                          10.1.219.64
```
Por ejemplo, nuestra IP será 192.168.64.19.


Dentro de nuestra máquina virtual, realizamos los siguientes pasos: 
- En la MV de Ubuntu, cambiaremos un "no" por un "yes":  <br/>
  ```
  sudo nano /etc/ssh/sshd_config
  ```
  Nota: para guardar el documento ctrl+o, y para salir ctrt+x <br />
  <img src="images/nanoSSHD.png"/>
  <br/>
- Reiniciamos el servicio (¿Qué es un servicio/daemon?)
  ```
  sudo systemctl restart sshd  
  ```
- Añadimos un password a nuestro usuario
  ```
  
  sudo passwd ubuntu
  
  airflow standalone
  ```

  
Desde nuestra máquina Host ya podemos conectarnos:  
  ```
  ssh -L 8080:192.168.64.20:8080 ubuntu@192.168.64.20
  ```

- Y en cualquier navegador del HOST podemos acceder a la url pertinente: *localhost:8080* <br/> 
 <img src="images/airflowcaptura.png"/>

- Finalmente, activamos el ejemplo: *example_bash_operator* . ¿Qué información nos ofrecen las diferentes pestañas?



## Un primer DAG con Apache Airflow


Vamos a realizar ciertos puntos del tutorial de la propia herramienta:

- https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html



In [1]:
import random as rnd
import time

def my_task1():
    # Esta función es muy compleja, que obtiene un valor después de muchisímo tiempo de ejecución
    try:
        time.sleep(5)
        value = rnd.random()
        return {"value1":value}
    except:
        none

def my_task2():
    # Esta función es muy compleja; pero no tarda tanto.
    try:
        value = rnd.random()
        time.sleep(8)
        return {"value2":value}
    except:
        none


def my_task_max(value1, value2):
    # Esta función muestra como combinar los resultados de otras tareas predecesoras
    valueMax = max(value1["value1"],value2["value2"])
    with open("/tmp/mydata.csv","a+") as f:
        f.write("%.4f,"%valueMax)

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime

default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),

}

dag = DAG('4miPrimerDAG', 
     default_args=default_args,
     start_date = datetime(2019,1,1),#datetime.now()-timedelta(minutes=1),
     schedule_interval = timedelta(minutes=2) 
    )

In [None]:
t1 = PythonOperator(dag=dag,
        task_id='my_task1',
        python_callable=my_task1)

t2 = PythonOperator(dag=dag,
        task_id='my_task2',
        python_callable=my_task2)


t3 = PythonOperator(dag=dag,
        task_id='my_MAX',
        op_kwargs={'value1': t1.output, 'value2': t2.output},
        python_callable=my_task_max)

In [None]:
# [t1, t2] >> t3

t1.set_downstream(t3)
t2.set_downstream(t3)

In [None]:
# Todo junto

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import timedelta, datetime

import random as rnd
import time

def my_task1():
    # Esta función es muy compleja, que obtiene un valor después de muchisímo tiempo de ejecución
    try:
        time.sleep(5)
        value = rnd.random()
        return {"value1":value}
    except:
        none

def my_task2():
    # Esta función es muy compleja; pero no tarda tanto.
    try:
        value = rnd.random()
        time.sleep(8)
        return {"value2":value}
    except:
        none


def my_task_max(value1, value2):
    # Esta función muestra como combinar los resultados de otras tareas predecesoras
    valueMax = max(value1["value1"],value2["value2"])
    with open("/tmp/mydata.csv","a+") as f:
        f.write("%.4f,"%valueMax)

default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),

}

dag = DAG('4miPrimerDAG', 
     default_args=default_args,
     start_date = datetime(2019,1,1),#datetime.now()-timedelta(minutes=1),
     schedule_interval = timedelta(minutes=2) 
    )

t1 = PythonOperator(dag=dag,
        task_id='my_task1',
        python_callable=my_task1)

t2 = PythonOperator(dag=dag,
        task_id='my_task2',
        python_callable=my_task2)


t3 = PythonOperator(dag=dag,
        task_id='my_MAX',
        op_kwargs={'value1': t1.output, 'value2': t2.output},
        python_callable=my_task_max)


# [t1, t2] >> t3

t1.set_downstream(t3)
t2.set_downstream(t3)

# Recordatorio de algunos comandos
# python3 mydag.py
# airflow db init

# airflow tasks list miPrimerDAG
# airflow tasks test miPrimerDAG my_task1
# airflow dags trigger miPrimerDAG

## Actividad del módulo

Analiza **diaramente** las noticias de ciencia que se publican con el API de *Inshorts* (ver ejemplo en celdas posteriores). Guarda un registro estadístico de:
- Número de noticias
- Número de palabras de cada noticia en *Content* y en *title*
- Registra la hora y fecha de la publicación.


Nota del API: 
- https://github.com/cyberboysumanjay/Inshorts-News-API
- un recopilatorio de APIs: https://github.com/public-apis/public-apis

In [3]:
import requests
req = requests.get("https://inshorts.deta.dev/news?category=science")

In [10]:
print(req.status_code)

data = dict(req.json())
print(data)

200
{'category': 'science', 'data': [{'author': 'Ankush Verma', 'content': 'NASA has said that the first image from its James Webb Space Telescope is the deepest and sharpest infrared image of the distant universe to date. "Known as Webb\'s First Deep Field, this image of galaxy cluster SMACS 0723 is overflowing with detail. Thousands of galaxies have appeared in Webb\'s view for the first time," NASA added.', 'date': '12 Jul 2022,Tuesday', 'id': '463a2523b39e442187d634a64dbfc9eb', 'imageUrl': 'https://static.inshorts.com/inshorts/images/v1/variants/jpg/m/2022/07_jul/12_tue/img_1657593610944_453.jpg?', 'readMoreUrl': 'https://twitter.com/NASAWebb/status/1546621080298835970?utm_campaign=fullarticle&utm_medium=referral&utm_source=inshorts ', 'time': '08:32 am', 'title': 'Deepest image of the early universe ever taken released by NASA', 'url': 'https://www.inshorts.com/en/news/deepest-image-of-the-early-universe-ever-taken-released-by-nasa-1657594971552'}, {'author': 'Apaar Sharma', 'cont

In [12]:
news = data["data"]
print(len(news))
print(news[0])

25
{'author': 'Ankush Verma', 'content': 'NASA has said that the first image from its James Webb Space Telescope is the deepest and sharpest infrared image of the distant universe to date. "Known as Webb\'s First Deep Field, this image of galaxy cluster SMACS 0723 is overflowing with detail. Thousands of galaxies have appeared in Webb\'s view for the first time," NASA added.', 'date': '12 Jul 2022,Tuesday', 'id': '463a2523b39e442187d634a64dbfc9eb', 'imageUrl': 'https://static.inshorts.com/inshorts/images/v1/variants/jpg/m/2022/07_jul/12_tue/img_1657593610944_453.jpg?', 'readMoreUrl': 'https://twitter.com/NASAWebb/status/1546621080298835970?utm_campaign=fullarticle&utm_medium=referral&utm_source=inshorts ', 'time': '08:32 am', 'title': 'Deepest image of the early universe ever taken released by NASA', 'url': 'https://www.inshorts.com/en/news/deepest-image-of-the-early-universe-ever-taken-released-by-nasa-1657594971552'}


## Referencias
- Data Pipelines with Apache Airflow. Manning Publications Bas P. Harenslak, Julian Rutger de Ruiter. 2021
- https://betterdatascience.com/apache-airflow-run-tasks-in-parallel/
