In [141]:
import io
import os
import boto3
import pandas as pd
import sqlalchemy as sa 
from sqlalchemy import  MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine.url import URL
from sqlalchemy.schema import MetaData
from sqlalchemy.engine.url import URL
from sqlalchemy.ext.declarative import declarative_base
from dotenv import load_dotenv
from base import OtuCount, FeatureCountExtendedView
import logging
from io import BytesIO

In [142]:
# Configurar el logging
logging.basicConfig(filename='otus_projects_upload.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')

In [143]:
# Cargar variables de entorno
load_dotenv()

True

In [144]:
# Configuración de la conexión a la base de datos
def get_engine():
    url = URL.create(
        drivername=os.getenv('DB_DRIVER'),
        host=os.getenv('DB_HOST'),
        port=os.getenv('DB_PORT'),
        database=os.getenv('DB_NAME'),
        username=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD')
    )
    engine =sa.create_engine(url)
    return engine

engine = get_engine()
# Creación de MetaData y Declarative Base
metadata = MetaData()
Base = declarative_base(metadata=metadata)
# Creación de la sesión
Session = sessionmaker(bind=get_engine())
session = Session()

In [145]:
def client(client):
    return boto3.client(
    client,
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    region_name=os.getenv('S3_REGION')
    )
# Configurar el cliente de s3
s3_client = client('s3')
# Configurar el cliente de Athena
athena_client = client('athena')

In [146]:
def pivotear_datos(df):
    """
    Pivotea los datos de un DataFrame.

    Parámetros:
    df (DataFrame): DataFrame que contiene las columnas 'value', 'otu' y 'sampleId'.

    Devuelve:
    DataFrame: DataFrame pivoteado con 'otu' como índice y 'sampleId' como columnas.
    """

    # Crear una tabla pivote
    datos_pivoteados = (
        df.pivot_table(values='value', index='otu', columns='sampleId')
          .fillna(0)
          .rename_axis(index=None, columns=None)
    )

    # Restablecer el índice
    datos_pivoteados = datos_pivoteados.reset_index()

    return datos_pivoteados

In [147]:
def upload_project_parquet_to_s3(projectid, otu_parquet, s3_client, bucket_name):
    """
    Sube un archivo Parquet a un bucket de S3 específico basado en el ID del proyecto.

    :param projectid: El ID del proyecto para el cual se subirá el archivo.
    :param otu_parquet: El contenido del archivo Parquet o la ruta al archivo a subir.
    :param s3_client: La instancia del cliente S3 para realizar la subida.
    """
    file_name = f"otu/{projectid}/{projectid}.parquet"
    try:
        s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=otu_parquet)
        return {"message": "Archivo subido exitosamente"}
    except Exception as e:
        return {"error": str(e)}

In [148]:
def df_to_parquet(df):
 # df_otu_con_project es tu DataFrame
    parquet_buffer = df.to_parquet( engine='pyarrow', compression='snappy')
    return parquet_buffer

In [149]:
def get_projectid_all():
    """Consulta a la tabla FeatureCountExtendedView para obtener una lista de IDs de proyectos existentes."""
    # Ejecutar la consulta directamente y obtener los projectId como un dataframe
    df = pd.read_sql(session.query(FeatureCountExtendedView.projectId).statement, session.bind)
    
    # Filtrar los projectId nulos y obtener una lista única de projectId
    project_ids = df['projectId'].dropna().unique().tolist()
    
    return project_ids

In [150]:
projects = get_projectid_all()

In [151]:
    
def get_feature_otu(projectId):
    """Consulta a la tabla otu para obtener los datos de un project especifico"""
    query = session.query(
        FeatureCountExtendedView.value, 
        FeatureCountExtendedView.otu, 
        FeatureCountExtendedView.sampleId
    ).filter(FeatureCountExtendedView.projectId == projectId)
    df = pd.read_sql(query.statement, session.bind)
    
    return df.to_dict(orient='records')

In [154]:
def upload_parquets_otus_for_projects(project_list, s3_client):
    """
    Procesa los datos de OTUs para cada proyecto en la lista que no se han actualizado previamente,
    convierte los datos a formato Parquet, y los sube a un almacenamiento S3. Actualiza un archivo Parquet
    en S3 que mantiene un registro de los proyectos procesados.

    Args:
        project_list (list): Lista de IDs de proyectos a procesar.
        s3_client: Cliente de S3 para realizar operaciones de S3.
        s3_bucket (str): Nombre del bucket de S3 donde se almacenan los archivos.

    Returns:
        str: Mensaje indicando que la operación fue exitosa.
    """
    # Define la clave del archivo Parquet en S3 que mantiene el registro de proyectos subidos
    s3_bucket = 'siwaparquets'
    projects_parquet_key = 'projects/projects_record.parquet'

    try:
        # Intentar leer el archivo Parquet desde S3 que contiene los proyectos ya procesados
        response = s3_client.get_object(Bucket=s3_bucket, Key=projects_parquet_key)

        # Read the content of the response into a BytesIO buffer
        buffer = BytesIO(response['Body'].read())

        # Use the buffer to read the Parquet file
        updated_projects_df = pd.read_parquet(buffer)
        updated_projects_set = set(updated_projects_df['project_id'])
    except Exception as e:
        logging.warning(f"No se pudo leer el archivo de proyectos actualizados desde S3: {e}")
        updated_projects_set = set()

    for project_id in project_list:
        if project_id not in updated_projects_set:
            try:
                project_data = pd.DataFrame(get_feature_otu(project_id))
                if not project_data.empty:
                    otu_pivot = pivotear_datos(project_data)
                    otu_parquet = df_to_parquet(otu_pivot)
                    upload_project_parquet_to_s3(project_id, otu_parquet, s3_client, s3_bucket)
                    logging.info(f'Proyecto {project_id}: Datos subidos correctamente a S3.')

                    # Agregar el proyecto al conjunto de proyectos actualizados
                    updated_projects_set.add(project_id)
                else:
                    logging.warning(f'Proyecto {project_id}: No se encontraron datos.')
            except Exception as e:
                logging.error(f'Proyecto {project_id}: Error al procesar - {e}')
                continue
        else: 
            logging.info(f'Proyecto {project_id}: El archivo no se subio por que ya esta en s3.')

    # Actualizar el archivo Parquet con la lista de proyectos subidos
    updated_projects_df = pd.DataFrame(list(updated_projects_set), columns=['project_id'])
    updated_projects_parquet = df_to_parquet(updated_projects_df)  # Asegúrate de que esta función devuelva los datos en formato adecuado para `put_object`
    s3_client.put_object(Bucket=s3_bucket, Key=projects_parquet_key, Body=updated_projects_parquet)

    return "Proceso completado con éxito."

In [155]:
upload_parquets_otus_for_projects(projects, s3_client)

'Proceso completado con éxito.'