# Datadriver for DataScientists - Part 2

_Execute the following cell in order to make the table of contents appear_

In [None]:
%%javascript
$.getScript('https://kmahelona.github.io/ipython_notebook_goodies/ipython_notebook_toc.js')

In this notebook, we will discover how you can get your workflow from the notebook to the airflow interface.

<h2 id="tocheading">Table of Contents</h2>
<div id="toc"></div>

# Airflow Context
We covered in the previous notebook how you can create a context that allows you to create datasets and models. In this part, you'll learn how to use a new context, the AirflowContext, to create a dataflow and push it to Airflow

## Airflow principles

[Airflow](https://github.com/apache/incubator-airflow) is a workflow management platform developed at AirBnb. It uses the concept of DAGs (Direct Acyclic Graphs) to schedule tasks, which are called operator. If you were to write some vanilla airflow code, you would then write, in python (from the airflow tutorial):

```python
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

t2.set_upstream(t1)
```

Notice that you must create a dag, tasks, bind tasks to the DAG (through their constructors), and link tasks together in order to create the dependency graph. It is a really powerful approach, but a bit verbose and tedious (and imperative). This is where the airflow context comes into play to save the day.

## A new context to bind them all

I will rewrite here the same code as we used in the previous notebook, removing all the unnecessary stuff and keeping the most important parts.

In [None]:
# Imports
from dd import DB
from dd.api.contexts import LocalContext
import pkg_resources
from sklearn.ensemble import RandomForestClassifier


# Context
db = DB(dbtype='sqlite', filename=':memory:')
context = LocalContext(db)

# Loading data
titanic_datapath = 'https://raw.githubusercontent.com/justmarkham/DAT8/master/data/titanic.csv'
train = context.load_file(titanic_datapath,
                          table_name="titanic.train",
                          write_options=dict(if_exists="replace", index=False))

# Feature engineering
def fillna_with_zeros(dataframe):
    """
    Returns a copy of the dataframe with null values replaced by zeros.
    """
    return dataframe.fillna(0)

filled_with_zeros = train.transform(fillna_with_zeros)
some_columns = filled_with_zeros[["passengerid", "survived", "pclass", "age", "sibsp", "parch", "fare"]]
Xtrain, Xtest = some_columns.split_train_test(train_size=0.75)

# Model
scikit_model = RandomForestClassifier(max_depth=4, n_jobs=-1) 
model = context.model(scikit_model, model_address="model@foo.bar")
fitted_model = model.fit(Xtrain, target="survived")

# Predictions
predictions = fitted_model.predict(Xtest, target="survived")

Here is all the code you need in order to start making predictions. Now, if you were to create a dataflow that can be used by airflow, here is the new code you should write:

In [None]:
# Imports
from dd import DB  # <- new import
from dd.api.contexts import AirflowContext  # <- new import
import pkg_resources
from datetime import datetime
from sklearn.ensemble import RandomForestClassifier
import os
from airflow import DAG

# Context
db = DB(dbtype='sqlite', filename=':memory:')
dataflow = DAG("my_first_dataflow", start_date=datetime.now())  # <- creation of an empty dataflow
context = AirflowContext(dataflow, db)  # <- new context
context.set_default_write_options(if_exists="replace", index=False)

# Loading data
titanic_datapath = 'https://raw.githubusercontent.com/justmarkham/DAT8/master/data/titanic.csv'
train = context.load_file(titanic_datapath,
                          table_name="titanic.train",
                          write_options=dict(if_exists="replace", index=False))

# Feature engineering
def fillna_with_zeros(dataframe):
    """
    Returns a copy of the dataframe with null values replaced by zeros.
    """
    return dataframe.fillna(0)

filled_with_zeros = train.transform(fillna_with_zeros)
some_columns = filled_with_zeros[["passengerid", "survived", "pclass", "age", "sibsp", "parch", "fare"]]
Xtrain, Xtest = some_columns.split_train_test(train_size=0.75)

# Model
scikit_model = RandomForestClassifier(max_depth=4, n_jobs=-1)
try :
    db.drop_table('foo.bar')
except Exception as e :
    print(e)
    
model = context.model(scikit_model, model_address="model@foo.bar")
fitted_model = model.fit(Xtrain, target="survived")


# Predictions
predictions = fitted_model.predict(Xtest, target="survived")

Did you notice the difference ? A grand total of 4 lines, including imports ! Well, that's neat. All you did was basically creating a empty dataflow and feeding it to a new object, the AirflowContext. And you're done. You don't believe me ? Well, first, let's check if we can still make predictions:

In [None]:
predictions.head()

WOW, that's a lot of information right there ! 

Well, yes, and it should be that way. Airflow is a production platform and as such it requires a decent level of logging. What you see just above is the result of all the intermediary computation, from loading the data to the production of the final predictions. That's why you should probably use the LocalContext when playing around with your data.

Ok, I showed you that using this new context does the same thing, but adds more verbosity. So what's all the hype for ? Well, it does a bit more that adding logs. It also populated your dataflow : 

In [None]:
dataflow.task_count

## Airflow UI

As you can see, your dataflow is not empty anymore. It now has 7 distinct tasks that are ready to be pushed to production. Which means that if you copy this code into a file where airflow can find it, then your code will be handle by it. We know copy-pasting code is not fun, so we created a shortcut for it. Just click on the Runtools button, and on the rocket. Then, after a bit, you should see this in the [Airflow UI](http://localhost:8080)

_TODO : add a screenshot_