### Instalar librerias y dependencias necesarias para el proyecto

In [None]:
!pip install python-dotenv --upgrade
!pip install boto3 --upgrade
!pip install pyaml --upgrade
!pip install Jinja2 --upgrade
!pip install opensearch-py
!pip install tqdm

### Importar librerias y dependencias

In [None]:
import os
import sys
import logging
import boto3
import botocore
import yaml
import json
import requests
import zipfile

from tqdm import tqdm
from typing import Callable
from time import time, sleep

from jinja2 import Template, Environment
from dotenv import load_dotenv, find_dotenv
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from opensearchpy.helpers import bulk, streaming_bulk

# Configurar el logging
logging.basicConfig(level=logging.DEBUG, 
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    handlers=[
                        logging.FileHandler("drugs.log"),
                        logging.StreamHandler()
                    ]
                   )

### Cargar variables de entorno y archivos de configuración

In [None]:
# Carga variables entorno y archivos de configuración
if not(load_dotenv(find_dotenv())):
    sys.exit('No se puede continuar porque no se han definido las variables de entorno para el  proyecto!')
        
# Cargar archivo de configuraciones .yaml
try:
    # Definir una función para cargar variables de entorno
    def get_env_variable(name):
        return os.getenv(name)

    # Crear un entorno Jinja2 con la función de carga de variables de entorno
    env = Environment()

    # Agregar la función de entorno al entorno Jinja2
    env.globals['env'] = get_env_variable

    with open('config.yaml', 'r') as file:
        template_content = file.read()
    
     # Cargar el contexto YAML
    yaml_data = yaml.safe_load(template_content)
    
     # Renderizar la plantilla con el contexto
    template = env.from_string(template_content)
    rendered_content = template.render(**yaml_data)
    
    # Cargar el YAML renderizado
    yaml_data = yaml.safe_load(rendered_content)
    
    print("Archivo de configuracion cargado exitosamente.")
except FileNotFoundError:
    print("Error: El archivo de configuración .yaml no se encuentra disponible")
except yaml.YAMLError as exc:
    print(f"Error al analizar el archivo YAML: {exc}")
except Exception as e:
    print(f"Ocurrió un error inesperado: {e}")

In [None]:
#Valida que se hayan definido las variables de configuración y las variables de entorno
try:
    # Cargar variables de entorno
    aws_region = os.environ['aws_region']
    aws_access_key_id = os.environ['aws_access_key_id']
    aws_secret_access_key = os.environ['aws_secret_access_key']
    
    # Cargar variables de configuración
    project = yaml_data.get('Project', {})
    project_name = project.get('name')
    url_dataset = project.get('url_dataset')
    
    open_search = yaml_data.get('OpenSearch', {}) 
    service = open_search.get('service')
    port = open_search.get('port')
    use_ssl = open_search.get('use_ssl')
    verify_certs = open_search.get('verify_certs')
    pool_maxsize = open_search.get('pool_maxsize')
    timeout = open_search.get('timeout')
    
    collection_name = open_search.get('Collection',{}).get('name')
    collection_index = open_search.get('Collection',{}).get('index')
    collection_chunk_size = open_search.get('Collection',{}).get('chunk_size')
   
    if(aws_region is None):
        raise Exception('No se ha definido la variable de entorno: aws_region')
    elif(aws_access_key_id is None):
        raise Exception('No se ha definido la variable de entorno: aws_access_key_id')
    elif(aws_secret_access_key is None):
        raise Exception('No se ha definido la variable de entorno: aws_secret_access_key')
    if(project_name is None):
        raise Exception('No se ha definido la variable de configuración del proyecto : project_name')
    elif(url_dataset is None):
        raise Exception('No se ha definido la variable de configuración del proyecto : url_dataset')
    elif(service is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : service')
    elif(port is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : port')
    elif(use_ssl is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : use_ssl')
    elif(verify_certs is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : verify_certs')
    elif(pool_maxsize is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : pool_maxsize')
    elif(timeout is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : timeout')
    elif(collection_name is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : collection_name')
    elif(collection_index is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : collection_index')
    elif(collection_chunk_size is None):
        raise Exception('No se ha definido la variable de configuración de openSearch-AWS : collection_chunk_size')
    
    # Define variables de entorno y variables globales
    DATASET_FOLDER = project_name + '/dataset'
    
     # Crea directorio para almacenar los datasets
    if not(os.path.exists(DATASET_FOLDER)):
        !mkdir -p {DATASET_FOLDER}
        print('Directorio creado exitosamente!!')
    
except Exception as ex:
    print(ex)
    print(f'Error en la linea No. {ex.__traceback__.tb_lineno}')

### Definición de Funciones

In [None]:
#  Decorador para medir el tiempo de ejecución de la función
def measure_time(func:Callable)->Callable:
    '''
    args: one argument, is the function that we want to decorated(e,g functions that:
    build the model, train the model, optimize the model, ...etc)
    '''
    # define the wrapper function that executes the original function(e,g arg func)
    def wrapper(*args, **kwargs)->float:
        '''
        the wrapper function have the same argument as the object(e,g arg)function
        '''
        start_time=time()
        # call the object function to process and manipulate ...
        result=func(*args, **kwargs)
        end_time=time()

        processing_time=end_time-start_time
        print(f'tiempo de procesamiento {processing_time:.2f} segundos')
        return result

    return wrapper

# Función para validar si una URL es válida
def is_url_valid(url : str) -> bool:
    try:
        response = requests.head(url)
        return response.status_code == 200
    except Exception:
        return False

# Función para descargar archivos desde repositorio openFDA
@measure_time
def download_files(url_file : str, chunk_size: int = 1024):
    try:
        # Hacer la solicitud para descargar el archivo .zip
        response = requests.get(url_file, stream=True)
        if(response.status_code != 200):
            raise Exception(f"Error en la solicitud HTTP: {url_file}| respuesta: {response.status_code}")
        
        zip_filename = os.path.join(DATASET_FOLDER, url_file.split('/')[-1])
        
        total_size = int(response.headers.get('content-length', 0))
        
        # Guardar el archivo .zip descargado
        with open(zip_filename, 'wb') as file:
            file.write(response.content)
            #for data in tqdm(response.iter_content(chunk_size=chunk_size), total=total_size//chunk_size, unit='KB'):
            #    file.write(data)
                    
        logging.debug("Archivo guardado como: {}".format(zip_filename))
        
        # Extraer el contenido del archivo .zip
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(DATASET_FOLDER)
        logging.debug("Archivo extraído: {}".format(zip_filename))

        # Eliminar el archivo .zip descargado
        os.remove(zip_filename)
        logging.debug("Archivo .zip eliminado {}".format(zip_filename))
        
    except zipfile.BadZipFile:
        logging.error("El archivo no es un archivo zip válido: {}".format(zip_filename))
    except FileNotFoundError:
        logging.error("El archivo no fue encontrado: {}".format(zip_filename))
    except Exception as ex:
        logging.error("Ocurrió un error al extraer el archivo {}: {}".format(zip_filename, e))
        
def get_data_drugs(json_file):
    try:
        if(json_file):
            data = json.load(json_file)
            last_updated = data['meta']['last_updated']
            batch = []
            
            for item in data['results']:
                item['last_updated'] = last_updated
                
                # Crear un nuevo diccionario con la estructura deseada y fusionar openfda si existe
                results = {**{k: v for k, v in item.items() if k != "openfda"}, **item.get("openfda", {})}
                action = {
                    "_index": collection_index,
                    "_id": results['id'],  # Usar el atributo 'id' como ID del documento
                    "_source": results
                }
                batch.append(action)
                
                # Si el tamaño del lote es 500, hacer yield y reiniciar el lote
                if len(batch) == collection_chunk_size:
                    yield batch
                    batch = []
            
            if batch is not None:
                yield batch
        #return actions
            
    except Exception as ex:
        logging.error("Ocurrió un error al leer los datos {} ".format(ex))
        print(f"Ocurrió un error al leer los datos {ex}")
        print(ex.__traceback__.tb_lineno)

### Descargar dataset desde OpenFDA

In [None]:
# Descargar dataset desde OpenFDA
try:
    response = requests.get(url_dataset)
    if(response.status_code != 200):
        raise Exception(f"Error en la solicitud HTTP. Código de respuesta: {response.status_code}")
        
    data = response.json()
    
    # Obtener las particiones
    partitions = data.get('results', {}).get('drug', {}).get('label', {}).get('partitions', [])

    boo = False
    if isinstance(partitions, list):
        total_files = len(partitions)
        with tqdm(total=total_files, desc="Procesando descarga") as pbar:
            for partition in partitions:
                file_url = partition.get('file', '')
                if file_url and is_url_valid(file_url):
                    print(f"Descargando archivo desde: {file_url}")
                    download_files(file_url)
                    pbar.update(1)
    else:
        print("El nodo 'partitions' no es una lista.")
        
except Exception as ex:
    print(ex)

### Cargar información al servicio de OpenSearh Serverless AWS

In [None]:
try:
    # Establecer conexión con OpenSearch-AWS-Serverless
    session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=aws_region)
    client = session.client('opensearchserverless')
    credentials = session.get_credentials()
    awsauth = AWSV4SignerAuth(credentials, aws_region, service)  
    response = client.batch_get_collection(names=[collection_name])
    
    #sleep(2)

    # Extraer el endpoint de la colección desde el response
    aws_host = (response['collectionDetails'][0]['collectionEndpoint'])
    aws_host = aws_host.replace("https://", "")

    # Construir el cliente de OpenSearch
    client = OpenSearch(hosts=[{'host': aws_host, 'port': port}],
                        http_auth=awsauth,
                        use_ssl=use_ssl,
                        verify_certs=verify_certs,
                        connection_class=RequestsHttpConnection,
                        timeout=timeout,
                        pool_maxize=pool_maxsize
                        )
    
    # Leer archivos JSON
    json_files = [pos_json for pos_json in os.listdir(DATASET_FOLDER) if pos_json.endswith('.json')]
    json_files = sorted(json_files, key=lambda x: int(x.split('-')[2]))
    print(json_files)
    
    # Leer el contenido de cada archivo JSON
    data_set = []
    if(True):
        success = 0
        failed = 0
        
        for json_file in json_files:
            print(f'Load dataset : {json_file}')
            with open(os.path.join(DATASET_FOLDER, json_file), 'r') as f:
                acu = 0
                for batch in get_data_drugs(f):
                    # Calcular el tamaño del lote en KB
                    batch_size = len(json.dumps(batch)) / 1024 / 1204 # Convertir el lote a JSON y medir el tamaño en MB
                    print(f'Batch size: {batch_size:.2f} MB')
                    acu+= batch_size
                    if(True):
                        responses = []
                        for ok, response in streaming_bulk(
                            client=client,
                            actions=batch,
                            chunk_size=collection_chunk_size,  # Número de documentos por cada chunk
                            max_chunk_bytes=150 * 1024 * 1024,  # 150 MB
                            raise_on_error=False,
                            raise_on_exception=False,
                            max_retries=3,
                            initial_backoff=2,
                            max_backoff=5,
                            yield_ok=True,
                            ignore_status=[409]):
                            if ok:
                                success += 1
                            else:
                                failed += 1
                                print(response)

                    # Eviar una colección de datos de forma masiva usando bulk d e OpenSearch.helper
                    #success, failed = streaming_bulk(client, data_set)
                    #print('=============================')
                    #print('success : ', success)
                    #print('failed : ', failed)
                print(f'Total Batch size: {acu:.2f} MB')
                acu = acu/1024
                print(f'Total Batch size: {acu:.2f} GB')
            break
    
        print(f'Successful operations: {success}')
        if(failed > 0):
            print(f'Failed operations: {failed}')
            print(json.dumps(responses))
        

    # Insertar un documento en OpenSearch
    '''
    try:
        for doc in data:
            response = client.index(index=collection_index, body=doc)

    except Exception as ex:
        print(ex.error)
    '''    
    
except Exception as ex:
    print(f"Ocurrió un error al procesar los archivos jsonx : {ex}")
    print(ex.__traceback__.tb_lineno)