# Connecting to External Services

### Introduction

Now so far, we have used airflow to setup a dag, and perform some tasks in Python. But as we know, the main usecase for airflow is performing our ETL operations.  This means that we'll need airflow to connect to external services like our RDS instance, our redshift cluster, and S3.  As we'll see in this lesson, airflow has some built in functionality that allows us to do just that.

### External Connections

Now if we were to purely use Python, we know how to connect with postgres or RDS.  Here's how we would do so:

```python
import psycopg2
HOST = 'host_url'
conn = psycopg2.connect(host = HOST, 
                 user = 'postgres', 
                 password = 'postgres', 
                database = 'dev', port = '5432')
```

So we would connect to our database by creating a connection object with use of the psycopg2 library.  There, we specify the host url, whether it be localhost or that of our redshift or RDS server, and the related username, password and database information.

In airflow, we do something similar, but we pass that information through the web interface.

To find the panel to do so, we can bootup airflow, and then click on `Admin` followed by `Connections`.

<img src="./connections_ux.png" width="60%">

From there, we can click on create connection, and we'll find a form to enter in similar information that we use to connect to our database through psycopg2.

> Looking at the information below, the only thing new is the `Conn id`.  That's what we'll use to reference the connection to our RDS instance in our code.  We also specify a Conn Type of Postgres, as that's the technology that RDS is built on.

<img src="./rds_updated.png" width="60%">

> **Warning**: There is one thing pretty confusing from the above.  Where the field says `Schema`, we actually enter in the database that we would like to connect to.  So above, the database we are connecting to is the `postgres` database.

So when we're done entering in our information, we can click `Save`.  We can now see our rds connection listed among the rest of the connections.

<img src="./rds_link.png" width="100%">

### Referencing our Connection

So we just entered in the information needed to connect to our RDS instance.  And we added an `Conn id` of `rds` so that we can reference that information in our code.  Let's see how we can do that.

The key part is right here:

```python
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from datetime import datetime, timedelta

def retrieve_states():
    rds_hook = PostgresHook('rds')
    vals = rds_hook.get_records("""SELECT * FROM states;""")
    return vals
```

We define a function called `retreive_states` and then we use a `PostgresHook` object to make the connection to our rds database, by passing through the `Conn Id` we specified in the web interface.  From there we called the `get_records` function followed by our SQL call.  We then returned the retrieved values from the function.

Now, once we connect that function up to a task, and connect the task to a DAG, we are good to go.

```python 
get_foursquare_info = DAG(dag_id = 'retrieve_foursquare_data', 
                          start_date = datetime.now() - timedelta(days = 1))


get_states = PythonOperator(task_id='get_states', 
                            dag = get_foursquare_info,
                            python_callable = retrieve_states)
```

When we trigger the dag with that task, we can see that data is returned by looking at the logs of our `get_states` task.

> Below we see the listed states of `New York` and `Pennsylvania`.

<img src="./log_get_states.png" width="100%">

### Using Operators

Now another way that we could perform connect to our RDS database is with an operator.  In addition to our Python operator, airflow also gives us a postgres operator.  So this time, we could perform the following:

```python
select_states = PostgresOperator(
        task_id="select_states",
        postgres_conn_id="rds",
        sql="""SELECT * FROM states;""", 
        dag = get_foursquare_info
)
```

So here, we do not need to make our connection through the hook.  Instead, our task directly references the rds connection through the `postgres_conn_id` argument.  The downside to using our operator however, is that while it will execute the query, we will not see our query logged.  So using the operator is better for commands where we are making a change to our database or records -- like inserting records, or creating a new table.  In those cases, we would like to perform a command on our postgres database, but we are not querying for information.

### Summary

In this lesson, we saw how to connect to our AWS services by using creating connections through the web interface.  We saw that through airflow's web interface we entered in the information for connecting to our RDS instance, including the `Conn Id`.  One confusing component was that where the form says `Schema`, we entered in the name of our database.  

Then we were able to reference the connection to our database through a redshift hook, where we then used `get_records` to perform our query.

```python
from airflow.hooks.postgres_hook import PostgresHook
def retrieve_states():
    rds_hook = PostgresHook('rds')
    vals = rds_hook.get_records("""SELECT * FROM states;""")
    return vals
```

We also connected to the database, using our postgres operator.

```python
select_states = PostgresOperator(
        task_id="select_states",
        postgres_conn_id="rds",
        sql="""SELECT * FROM states;""", 
        dag = get_foursquare_info
)
```