# Manipulating FTPs/SFTPs with Airflow.

## Agenda:
- Basic Airflow architecture.

- Simple FTP requests

- PythonOperator

- Hook+Python Operator

- Scheduling

## Airflow words.

- DAG: Directed Acyclic Graph, the structure Airflow uses for its workflows. Each DAG has an ordering (one task can depend on another (Directed)) and contains no cycles (acyclic). A DAG is made up of seperate tasks that are the  configuration for the workflow's structure - all the heavy lifting is done in the hooks and operators.

- Hooks: Files used by Airflow to interact with external systems (databases, APIs, etc.)

- Operators: The atomic unit of logic in Airflow - these files determine how the work gets done.

# Simple FTP requests:

#### There are plenty of different modules for dealing with FTPs in Python. For SFTPs, _paramiko_ is the best library to use.
#### This unsecured example uses ftplib.

_Note_: For . All commands used here have a paramiko equivalent.

In [None]:
## Suppose you have some workflow that downloads data off of an FTP, does osme transformation, and uploads 
## it that runs on a cron schedule. 

In [None]:
from ftplib import FTP

def download_file(connection, file_name):
    """
    Downloads file from FTP.
    """

    filename = 'sample_data.csv'

    localfile = open(filename, 'wb')
    ftp.retrbinary('RETR ' + filename, localfile.write, 1024)

    ftp.quit()
    localfile.close()
    
    #Do some manipulations to the local file. 
    

def upload_file(connection, file_name):
    """
    Uploads file as binary to FTP. 
    """
    filename = 'sample_data.csv'
    ftp.storbinary('STOR '+filename, open(filename, 'rb'))
    ftp.quit()
    
host = ''
username = ''
password = ''
port = 21

ftp = FTP(host)
connection = ftp.login(username, password)


download_file(connection, 'test.csv')
upload_file(connection, 'test.csv')


## Doing this in Airflow:

To do this in a DAG, we can use the PythonOperator:

In [None]:
# Import Airflow specific dependencies. 
from airflow import DAG
from airflow.operators import DummyOperator
from airflow.operators.python_operator import PythonOperator 
from datetime import datetime

#Import the module for the FTP.. 
from ftplib import FTP


#Define functions
def upload_file(**kwargs):
    """
    Uploads file as binary to FTP. 
    """
    
    credentials = kwargs.get('templates_dict').get('credentials', None)
    host = credentials['host']
    username = credentials['username']
    password = credentials['password']
    
    
    ftp = FTP(host)
    ftp.login(username, password)

    
    filename = 'sample_data.csv'
    ftp.storbinary('STOR '+filename, open(filename, 'rb'))
    ftp.quit()


def download_file(**kwargs):
    """
    Downloads file from FTP.
    """
    
    credentials = kwargs.get('templates_dict').get('credentials', None)
    host = credentials['host']
    username = credentials['username']
    password = credentials['password']
    

    ftp = FTP(host)
    ftp.login(username, password)

    filename = 'sample_data.csv'

    localfile = open(filename, 'wb')
    ftp.retrbinary('RETR ' + filename, localfile.write, 1024)

    ftp.quit()
    localfile.close()


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}


# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='Manipulating FTPs with PythonOperators',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)

# FTP creds
credentials= {
    'host' : ''
    'username' : '',
    'password' : '',
    'port' : 21
    
}
with dag:
    # Dummy start DAG.
    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    # Call the functions

    download_file = PythonOperator(
        task_id='download_file',
        python_callable=download_file,
        # This passes the date into the function as a dictionaryt.
        templates_dict={'credentials': credentials},
        provide_context=True
    )
    
    upload_file = PythonOperator(
    task_id='upload_file',
    python_callable=upload_file, #function-name
    # This passes the params into the function as a dictionaryt.
    templates_dict={'credentials': credentials},
    provide_context=True
    )
    
    
    # Set dependencies.First the kickoff, then the download, and finally, the upload.
    # A task won't start until the one before it does.
    # e.g. the upload won't start until the download taks finishes. 
    kick_off_dag >>  download_file >> upload_file

## Why use the PythonOperator?

Airflow is made up of 3 core components: the webserver, the scheduler, and the executor.
    
    Webserver - Responsible for the UI in the browser.
    Scheduler - Handles the scheduling and state of tasks.
    Executor- Handles actually executing underlying code.
    
The scheduler "heartbeats" DAG files every few seconds before sending anything anything to the executor.
Each "heartbeat" executes **all** top level code. Any code that isn't wrappped in an operator is executed 
each heartbeat, making it incredibly expensive.

**Airflow Best Practice: Minimize top-level code. **
    
The PythonOperator is a quick and dirty way around this - just throw your function in a PythonOperator and you 
can leverage Airflow's scheduling and dependency capabilities. 

### Lots of repeated code.

Python Operators make it easy to take previous scripts and easily schedule them with Airflow, but they lead to a 
fair deal of repeated code and aren't anymore modular than regular python functions. Furthermore, they make 
the DAG file itself cluttered. 

** Airflow Best Practice: The DAG file should be as close to a "config" file as possible. **


### Using the FTP Hook.

Using a hook to handle the connection can clean this code up a ton by handling the connection to the FTP.

https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/contrib/hooks/ftp_hook.py

In [None]:
from airflow import DAG
from airflow.operators import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.contrib.hooks import FTPHook


def upload_file(**kwargs):
    """
    Uploads file as binary to FTP. 
    """

    hook = FTP(ftp_conn_id='astro_ftp').get_conn()

    local_path = 'sample_data.csv'
    remote_path = '/astro_test/saple_data.csv'
    
    hook.store_file(local_path, remote_path)
    hook.close()


def download_file(**kwargs):
    """
    Downloads file from FTP.
    """
    hook = FTP(ftp_conn_id='astro_ftp').get_conn()

    local_path = 'sample_data.csv'
    remote_path = '/astro_test/saple_data.csv'
    
    hook.retrieve_file(local_path, remote_path)
    hook.close()


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 12, 19)
}


# Schedule this DAG to run once.
dag = DAG('test_ftp',
          description='Manipulating FTPs with PythonOperators+Hooks',
          schedule_interval='@once',
          start_date=datetime(2017, 12, 18),
          default_args=default_args)

with dag:

    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    upload_file = PythonOperator(
        task_id='upload_file',
        python_callable=upload_file,
        # This passes the params into the function.
        provide_context=True
    )
    
    download_file = PythonOperator(
        task_id='download_file',
        python_callable=download_file,
        # This passes the date into the function.
        provide_context=True
    )
    
    kick_off_dag >> upload_file >> download_file

### Handling Connections.

Notice when the hook was instanstiated, it was simply passed the name of the connection instead of the actual credentials used. All hooks inherit from the BaseHook, which has access to the Airflow database that stores connections.

![connections](img/airflow_connections.png)

The Connections Panel can be accessed from the UI. from Admin -> Connections.

Connections are fernet key encrypted after they're entered, prevent credentials from going into files, 
and can be used by other DAGs in the same instance.

## Not "Airflowic" Enough

Using the hook with the PythonOperator cut down on repeated code, 
but the DAG file doesn't read like a config file yet.

To polish it off, we'll write a custom FTPtoS3Operator.