#  Uploading to S3

### Introduction

In this lesson, we'll see how we can extract data from an RDS instance to load into an S3 instance.  Let's get started.

### Connecting to S3

Now connecting to S3, is a little trickier than our previous connections.  Here, we'll again have to go to `Admin > Connections`, and create a new connection.  From there, we can enter information that looks like the following:

> <img src="./s3_connection.png" width="60%">

So our `Conn Type` is `S3` and we leave the rest blank until we get to `Extra`.  There we add a dictionary with the keys of `aws_access_key_id` and `aws_secret_access_key`.  The values are the corresponding aws values from your acccount.

```python
{"aws_access_key_id": "...", "aws_secret_access_key": "..."}
```

### Creating a Task

Now that we've create the connection it's time to use the connection to upload some data to our s3 bucket.  We can do so with something like the following:

```python
from airflow.hooks.S3_hook import S3Hook

def upload_to_s3():
    s3_hook = S3Hook('s3_connection')
    s3_hook.load_string(string_data='hello world', bucket_name='jk-jigsaw-foursquare', key='practice.csv')
    
rds_to_s3 = PythonOperator(task_id='rds_to_s3',
                            dag = get_foursquare_info,
                            python_callable = upload_to_s3)
```

So here, we import the S3Hook, and then reference the connection via the `Conn Id` we specified in the Airflow webserver.  We use the `load_string` method to load the specified string to our bucket, and in a new file called `practice.csv`.

Ok, now let's see if it worked.  If we check the log of our data, we ccan see that `S3_hook` was called.

> <img src="./uploading_s3.png" width="100%">

And if we look at our s3 buckets, we'll see a new `practice.csv` file loaded into our bucket.

> <img src="./practice_csv.png" width="50%">

## From RDS to S3

Ok, so now that we saw how to connect to S3 and add a new file to the bucket, let's do so again, but this time uploading a CSV file populated with data from our RDS instance. 

To do so, we'll create a CSV file in memory which we can then upload to S3.  Unfortunately this step is a little complicated.  It's ok, we'll push through.

To start, we'll connect to our RDS instance, and select the records that we'd like to move to S3.

In [None]:
rds_hook = PostgresHook('rds')
zipcode_records = rds_hook.get_records("SELECT * FROM zipcodes;")

> First, we create an in memory file with the following:

In [13]:
import io
mem_file = io.StringIO()
mem_file

<_io.StringIO at 0x10df30700>

Then we wrap the file in a csvwriter, which is a file that allows us to write ccsv values, specifying that the end of a line is where there's a line separation. 

In [14]:
csv_writer = csv.writer(mem_file, lineterminator=os.linesep)


Finally, we write our zipcode records, and convert the file to binary so that we can upload this to S3.

```python
csv_writer.writerows(zipcode_records)
# encode into a byte stream
mem_file_binary = io.BytesIO(mem_file.getvalue().encode())
```

Finally, our s3_hook has a `load_file_obj` function that allows us to upload the file to an S3 bucket.

In [None]:
s3_hook.load_file_obj(
       file_obj=mem_file_binary,
       bucket_name='jigsaw-sample-data',
       key='sample_zipcodes.csv',
       replace=True,
   )

Ok, now let's see the entire function.

In [None]:
import io
import csv
import os

def load_to_s3():
    rds_hook = PostgresHook('rds')
    s3_hook = S3Hook('s3_connection')
    zipcode_records = rds_hook.get_records("SELECT * FROM zipcodes;")
    # create an in memory file
    mem_file = io.StringIO()
    csv_writer = csv.writer(mem_file, lineterminator=os.linesep)
    csv_writer.writerows(zipcode_records)
    # encode into a byte stream
    mem_file_binary = io.BytesIO(mem_file.getvalue().encode())
    s3_hook.load_file_obj(
       file_obj=mem_file_binary,
       bucket_name='jigsaw-sample-data',
       key='sample_zipcodes.csv',
       replace=True,
   )

And after running this task in airflow, we'll find data from our RDS instance successfully uploaded to our S3 bucket.

> <img src="./uploaded_zipcodes.png" width="30%">

### Summary

In this lesson, we saw how to select records from a database in RDS, convert those records into a CSV file, and then upload that CSV file to an S3 bucket.  To do so, we needed to create an S3 connection, which we again did through the airflow webserver, and then in the extra field, specified our aws keys with the following dictionary: `{"aws_access_key_id": "...", "aws_secret_access_key": "..."}`.

After making a connection to S3, we wrote a task that selected records from an RDS, converted the records to an in memory CSV file, and uploaded the CSV file to S3.  To convert the records to an in memory CSV file, we used the following code:

In [None]:
mem_file = io.StringIO()
csv_writer = csv.writer(mem_file, lineterminator=os.linesep)

csv_writer.writerows(zipcode_records)
# encode into a byte stream
mem_file_binary = io.BytesIO(mem_file.getvalue().encode())

This allowed us to call the `s3_hook.load_file_obj` to upload the data.

### Resources


* [StringIO](geeksforgeeks.org/stringio-module-in-python/)
* [Manning - Data Pipelines with Airflow ](https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-7/v-6/41)
* [Airflow Lessons Learned Medium](https://medium.com/snaptravel/airflow-part-2-lessons-learned-793fa3c0841e)