# Day - Airflow

[Airflow](https://airflow.apache.org/) is a platform to programmatically author, schedule and monitor workflows. This is used beyond Machine Learning but it is very useful tool when you put Machine Learning in production.

In this exercise, you will learn how to setup Airflow with GCP, create your first [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph) and finally setup a pipeline for the Taxi Fare Prediction Challenge 

<img src="https://ncrocfer.github.io/images/airflow-logo.png" width="400" />

## Summary

1. [Airflow setup with GCP](#part1)
2. [Your first DAG](#part2)
3. [Taxi Fare Prediction Pipeline](#part3)
4. [Data Enriching](#part4)
5. [Real Use Case](#part5)
6. [Your model as a product](#part6)

## 1. Airflow Setup with GCP<a id="part1" />

Google Cloud has a tool called `Composer` that manages Airflow for you! **Let's use that!**

#### Exercise
- Create a [Gloud Composer]((https://console.cloud.google.com/composer) account
- Create a new environment
- Finally, go to Airflow Web Server interface to check it out.

_Note : Airflow uses Compute Engine resources (3 by default), it is expensive if you let it run for a week, so don't forget to delete your Airflow session at the end of the day._

## 2. Your first DAG<a id="part2" />

#### Exercise

- Write your `first_dag.py`
- Fill in the `test()` function
- Instantiate a `DAG` that will run every minute
- Add an operator to the dag that calls the `test` function
- Write a Makefile to updload this DAG to GCP

In [14]:
## first_dag.py

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def test():
    """test method"""
    pass
    

## create DAG here
dag = DAG()

## test operator
test_opr = PythonOperator()

## 3. Taxi Fare Prediction Pipeline<a id="part4" />

The objective now is to use Airflow to orchestrate and automate our TaxiFareModel pipeline. 

The benefit of using a "workflows platform" like Airflow is to breakdown our entire work into different smaller tasks. Then these tasks can run in parallel and it is easier to debug when something goes wrong and eventually you do not have to restart from scratch when a task fails.  

In our TaxiFarePrediction problem, there are not a lot of steps involved, but in the future you might have to aggregate different sources of data before building a model and then apply a sequence of different steps. (we will see this in the next exercise).

Also the other benefit is to schedule worlflows at specified times. For example, let's say you get new data everyday for taxi rides, so you would like to retrain a model every day with this new data.




#### Exericise

For now, we just want you to know how to automatically submit new trainings for you model to GCP, using Airflow.
The idea is to send a new training once every hour for example.

- Write a DAG `dag_training_job.py` that will send a new training to GCP for the TaxiFareModel once every x minutes.
- Use a `Makefile` to build and updload dependencies and submit your DAG to GCP.

TIP: You will have to use [MLEngineTrainingOperator](https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/mlengine_operator/index.html#airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator)

Check that you see new training tasks and new models on your AI Platform Console.


## 4. Data Enriching<a id="part5" />

Now, to make things more interesting and leverage the power of Airflow, let's enrich our data with weather data!
It makes sense to think that weather may influence traffic in NYC, hence taxi riding times and fares. 


The objective here is to run a worklow gathering data from external source and save this data into Google Big Query

#### Weather API
Create a free account on this [Weather API](https://www.worldweatheronline.com/developer/api/docs/historical-weather-api.aspx#qparameter). No card needed for account creation

- Get your ApiKey
- Implement `WeatherApi()` class inside `data_enriching.py` aiming at connecting to the API, and getting current day's weather data (only one point)
- Now implement load_bq_table_from_df() to load api result into a new BQ table named `weather_data`
- Test your 2 functions by requesting data from 2018 and inserting it into our table.

In [None]:
## data_enriching.py

import requests
import pandas as pd
from google.cloud import bigquery
from datetime import date

ApiKey = "<your_api_key>"


class weather_api(object):
    
    def __init__(self, api_key=ApiKey):
        self.url = 'http://api.worldweatheronline.com/premium/v1/past-weather.ashx'
        self.apikey = api_key

    def get_day_data(self):
        pass


def load_bq_table_from_df(df, dataset, table):
    pass


def run():
    """test enriching data into BigQuery """


if __name__ == '__main__':
    run()

### Exercise 2


Once you have weather data collected. Modify your training task to:
- Query weather data and merge it with the historical rides data
- Build one or more weather features and add then into your model

## 5. Real Use Case - Putting all together<a id="part5" />

Now let's put all things together!
Imagine you want a model that gets retrained everyday using new data (weather + historical rides).

Now let us take a step back and check what we've done so far. We have implemented:

- A DAG that launches, on GCP, model training jobs.
- A DAG that gathers weather data every day and store it inside a BQ Table.

Additionally, let us imagine that we have every day, fresh new raw data uploaded every night into a Storage Bucket, with the rides of the day.

We might want to 
1. Enrich our training data with the daily gathered data, both raw and weather data
2. Launch a daily training on enriched training data 
3. Use every day our newly trained model to predict price of a new ride (note: in a real situation, you might also need to collect weather predictions when making fares estimations).

#### Implementation

Implement a DAG that will, at 12:30 am every day:

- collect new weather data
- merge weather data with rides data (you can create a new Big Query Table `taxi_rides` table for example)
- launch a new training task with the most recent data
- create a new version of the model 



## 6. Your model as a product<a id="part6" />

In this last exercise, the goal is to make your model publicly available and visible. The idea is build a http endpoint that can be used by anyone to request a fare estimation.

At the end of this exercise, you should have a web app where people can request a taxi fare estimate in NYC by inputing their pickup and destination (similar to what Uber provides [here](https://www.uber.com/fr/fr/price-estimate/).

For this, you are going to build a [Flask](https://flask.palletsprojects.com/en/1.1.x/quickstart/) application (for those who followed the Web Bootcamp, this is equivalent of [Sinatra](http://sinatrarb.com/) on Ruby, but with Python)
This Webapp will be deployed using [Google App Engine](https://cloud.google.com/appengine/docs/standard/python3/building-app/?hl=fr)


#### Exercise

- Read the [Flask quickstart](https://flask.palletsprojects.com/en/1.1.x/quickstart/) to see how you can run a simple Flask application locally
- Build a simple route that will call your TaxFarePrediction model given the location parameters and passenger count. You may want to assume the fare estimation is requested for a ride that happens now.
- Deploy the app with Google App Engine
- If you want to go further, you can also build a 