In [None]:
from ftplib import FTP
import traceback
import sys
import os
import boto3

In [None]:
BUCKET_NAME = "files-cnes-datasus"
BASE_FILES_NAME = "BASE_DE_DADOS_CNES"
SITE = "ftp.datasus.gov.br"
FTP_FOLDER = "cnes"

s3_client = boto3.client("s3")
s3_resource = boto3.resource("s3")

In [None]:
def print_error() -> None:
    """Print the error message and exit script."""
    traceback.print_exc()
    print("Closing...")
    sys.exit()

def get_list_names_zipfiles_bucket(s3_client: boto3.client, bucket: str) -> list[str]:
    try:
        print("Getting list of zipfiles in S3 Bucket...")
        response = s3_client.list_objects(Bucket=bucket)["Contents"]
        content_zipfiles = [k["Key"] for k in response if k["Key"].startswith("zipfiles/")]
        if len(content_zipfiles) == 1:
            print("No zip files in Bucket folder 'zipfiles/'.")
            return []
        else:
            print("Names collected.\n")
            return [item[len("zipfiles/"):] for item in content_zipfiles][1:]
    except:
        print("Error getting names of zipfiles in Bucket.")
        print_error()
    
def download_zipfile(site: str, ftp_folder: str, zip_files_path: str, zipfiles_names_bucket: list[str], file: str) -> None:
    """Access the ftp connection, go to folder and download files with the base name passed as an argument."""
    with FTP(site) as ftp:
        try:
            # LOGIN
            if ftp.login().startswith("230"): # ftp.login() enter the connection and return a string with the response
                print("Logged in.\n")
            else:
                print("Failed to log in.")
                sys.exit()

            # GO TO 'cnes' DIRECTORY
            if ftp.cwd(ftp_folder).startswith("250"): # ftp.cwd() 'change working diretory' to cnes and return a string with the response
                print("Directory changed to 'cnes'.\n")
            else:
                print("Failed to change directory.")
                sys.exit()
        except:
            print_error()

        if file not in zipfiles_names_bucket: # if file isn't already on S3 Bucket folder
            with open(zip_files_path + f"{file}", "wb") as f:
                print(f"Downloading {file}...")
                retCode = ftp.retrbinary(f"RETR {file}", f.write) # download the file and return a string with the response
                if retCode.startswith("226"):
                    print(f"{file} downloaded.")
                else:
                    print(f"Error downloading {file}: {retCode}")

def upload_zipfile(s3_resource: boto3.resource, filename: str, bucket: str, key: str) -> None:
    """Save local zipfile on S3 Bucket folder zipfiles/"""
    try:
        print("Uploading zipfile...")
        s3_resource.meta.client.upload_file(
        Filename=filename,
        Bucket=bucket,
        Key="zipfiles/" + key
        )
        print("Zipfiles uploaded.\n")
    except:
        print("Error uploading files.")
        print_error()

In [None]:
# GET ZIPFILES NAMES FROM FTP
with FTP("ftp.datasus.gov.br") as ftp:
    try:
        # LOGIN
        if ftp.login().startswith("230"): # ftp.login() enter the connection and return a string with the response
            print("Logged in.\n")
        else:
            print("Failed to log in.")
            sys.exit()

        # GO TO 'cnes' DIRECTORY
        if ftp.cwd("cnes").startswith("250"): # ftp.cwd() 'change working diretory' to cnes and return a string with the response
            print("Directory changed to 'cnes'.\n")
        else:
            print("Failed to change directory.")
            sys.exit()
    except:
        print_error()
    try:
        zipfiles_names_ftp = []
        for file in ftp.nlst(): # ftp.nlst() return a list with all files name
            if file.startswith(BASE_FILES_NAME): # if file isn't already on folder
                zipfiles_names_ftp.append(file)
        print("All zipfiles names collected from ftp server.\n")
    except:
        print_error()

In [None]:
zipfiles_names_bucket = get_list_names_zipfiles_bucket(s3_client, BUCKET_NAME)
zipfiles_names_bucket

In [None]:
while len(zipfiles_names_ftp) != len(zipfiles_names_bucket):
    for z in zipfiles_names_ftp:
        try:
            download_zipfile(SITE, FTP_FOLDER, "./", zipfiles_names_bucket, z)
        except EOFError:
            print("EOFError, reconnecting...\n")
            pass
        except:
            print_error()
    zipfiles_names_bucket = get_list_names_zipfiles_bucket(s3_client, BUCKET_NAME)

In [12]:
local_zipfles_names = [f for f in os.listdir("./") if f.endswith(".ZIP")]
local_zipfles_names

['BASE_DE_DADOS_CNES_201804.ZIP', 'BASE_DE_DADOS_CNES_201807.ZIP']

In [13]:
for f in local_zipfles_names:
    os.remove(f)