Envio do arquivo CSV armazenado no bucket do S3 para o MySQL do RDS

In [None]:
import boto3
import pandas as pd
import pyarrow.parquet as pq 
from io import BytesIO
from sqlalchemy import create_engine
import sagemaker
import os 
from dotenv import load_dotenv, dotenv_values

In [None]:
# Carrega variáveis de ambiente a partir de um arquivo .env
load_dotenv()

# AWS credentials and region
# Configuração de sessão
boto3_session = boto3.Session()
s3_client = boto3_session.client('s3')
sm_boto3 = boto3_session.client('sagemaker')
session = sagemaker.Session(boto_session=boto3_session)

# S3 bucket and file details
bucket_name = os.getenv('bucket_name')
file_prefix = os.getenv('file_prefix')

# RDS connection details
database_name = os.getenv('DATABASE_NAME')
table_name = os.getenv('TABLE_NAME')
rds_host = os.getenv('RDS_HOST')
rds_port = os.getenv('RDS_PORT')
rds_user = os.getenv('RDS_USER')
rds_password = os.getenv('RDS_PASSWORD')

In [None]:
def load_csv_data(s3_bucket, s3_prefix):
    try:
        # Lista os objetos no bucket com o prefixo especificado
        response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
        if 'Contents' not in response:
            raise ValueError("No objects found in the specified S3 bucket/prefix.")
        
        file_objects = response['Contents']
        dfs = []
        
        # Itera sobre os arquivos listados e carrega cada um como um DataFrame
        for file_object in file_objects:
            file_key = file_object['Key']
            if file_key.endswith('.csv'):  # Certifique-se de que o arquivo é um CSV
                file_obj = s3_client.get_object(Bucket=s3_bucket, Key=file_key)
                df = pd.read_csv(BytesIO(file_obj['Body'].read()))
                dfs.append(df)
                print("ok")
        
        if not dfs:
            raise ValueError("No CSV files were loaded. Please check the S3 bucket/prefix.")
        
        return pd.concat(dfs, ignore_index=True)
    except Exception as e:
        print(f"Erro ao carregar arquivos CSV: {e}")
        return None


file_prefix = 'csv/'  # Ajuste o prefixo conforme necessário

# Carrega os dados CSV do S3 em um DataFrame do Pandas
df = load_csv_data(bucket_name, file_prefix)

In [None]:
# Connect to RDS
conn_str = f'mysql+pymysql://{rds_user}:{rds_password}@{rds_host}:{rds_port}/{database_name}'
engine = create_engine(conn_str)

# Write the DataFrame to RDS
df.to_sql(table_name, con=engine, if_exists='replace', index=False)

# Closing the connection
engine.dispose()

print('Data loaded successfully!')