# Cleaning File in Airflow SaaS in Memory

## Astronomer Airflow SaaS limits total memory in each task, so its best to use s3 streams when working with in memory tranforms.

### Note: This might not apply for smaller files, as basic transforms can be done in Pandas

In [None]:
import io
import smart_open
import urllib.request
import zipfile
import logging
from datetime import datetime
from airflow.operators import PythonOperator, DummyOperator, PostgresOperator
from airflow.hooks.S3_hook import S3Hook

def ftp_unzip_s3(*args, **kwargs):
    """
    Unzips a file from an FTP to s3. Uses BytesIO and zipfile to stream files of variable size into s3.
    
    This function is meant to be called as a Python Callable in Airflow.
    
    The output in S3 are the indvidual files within the original zipfile.  
    """

    file_name = 'astronomer_extract'

    url = ('FTP_URl_{0}'.
           format(file_name))

    req = urllib.request.urlretrieve(url, file_name)
    s3 = S3Hook(s3_conn_id=s3_conn_id)
    s3_creds = s3.get_connection(s3_conn_id).extra_dejson

    # Read the file
    with io.BytesIO() as b:
        b = open(file_name, "rb")
        b.seek(0)
        with zipfile.ZipFile(b, mode='r') as zipf:
            for subfile in zipf.namelist():
                file_name = 'astronomer_extract_' + subfile
                s3_key = '{}/{}'.format(
                    s3_bucket,
                    file_name
                )
                print(file_name)
                url = 's3://{}:{}@{}'.format(
                    s3_creds['aws_access_key_id'],
                    s3_creds['aws_secret_access_key'],
                    s3_key)
                with smart_open.smart_open(url, 'wb') as fout:
                    info = zipf.open(subfile)
                    for line in info:
                        info = (line)
                        fout.write(info)

                logging.info("Uploaded file!")
