# SDK de AWS

#### Este notebook contiene una serie de scripts en Python que te permiten interactuar con los servicios de AWS.
**Dependencias**
- pip install boto3

In [8]:
import os
import time
import boto3
import pandas as pd
from decouple import config
from typing import Optional
from dataclasses import dataclass

#### Variables gobales

In [2]:
@dataclass
class Parameters():
    ENV:str
    BUCKET_RAW: Optional[str] = None
    AWS_PROFILE: Optional[str] = None
    AWS_IAM_ROLE: Optional[str] = None
    AWS_ACCESS_KEY_ID: Optional[str] = None
    AWS_SECRET_ACCESS_KEY: Optional[str] = None
    BUCKET_ANALYTICS: Optional[str] = None
    INPUT_FILE:str = config('INPUT_FILE')
    OUTPUT_FILE:str = config('OUTPUT_FILE')
    AWS_REGION: str = config('AWS_SDL_REGION')

    def __post_init__(self):
        for key, value in self.__dict__.items():
            if value:
                continue

            setattr(self, key, config(f'{self.ENV}_{key}'))                

PARAMS = Parameters(ENV='DEV')

#### Conexión con AWS

In [3]:
def get_aws_session(params: Parameters):
    sts_client = boto3.client('sts', 
                              aws_access_key_id = params.AWS_ACCESS_KEY_ID,
                              aws_secret_access_key= params.AWS_SECRET_ACCESS_KEY)
    
    assumed_role_response = sts_client.assume_role(RoleArn = params.AWS_IAM_ROLE,
                                          DurationSeconds = 3600,
                                          RoleSessionName = 'Excecution')

    credentials = assumed_role_response['Credentials']

    session = boto3.Session(aws_access_key_id = credentials['AccessKeyId'],
                            aws_secret_access_key = credentials['SecretAccessKey'],
                            aws_session_token = credentials['SessionToken'])

    return session 

aws_session = get_aws_session(PARAMS)

#### Descargar archivo desde S3

In [7]:
tstart = time.time()

s3 = aws_session.client('s3', region_name= PARAMS.AWS_REGION)

KEYS = [
        'CO/E&CM/SFTP/CARTERA/YEAR=2023/MONTH=08/informe_cartera.csv'
    ]

for key in KEYS:
    try:
        file_name = os.path.basename(key)
        file_path = os.path.join(PARAMS.OUTPUT_FILE, file_name)

        s3.download_file(PARAMS.BUCKET_RAW, key, file_path)

        print(f'File {file_name} downloaded successfully in: {file_path}')

    except Exception as ex:
        print(ex)

print('\nTiempo de ejecución {:.2f} segundos'.format(time.time()-tstart))

File informe_cartera.csv downloaded successfully in: C:\Projects\helpers\output_files\informe_cartera.csv

Tiempo de ejecución 2.39 segundos
