In [303]:

import pandas as pd
import requests
import sqlalchemy
import sqlite3


class GoogleSheetProcessor:
    def __init__(self, sheet_url1: str):
        self.sheet_url1 = sheet_url1
        self.spreadsheet_id = self.extract_spreadsheet_id(sheet_url1)
        self.sheet_id = self.extract_sheet_id(sheet_url1)
        self.csv_export_url = self.construct_csv_export_url()

    def extract_spreadsheet_id(self, url: str):
        return url.split('/d/')[1].split('/')[0]

    def extract_sheet_id(self, url: str):
        return url.split('gid=')[1]

    def construct_csv_export_url(self):
        return f"https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}/export?format=csv&gid={self.sheet_id}"

    def download_csv(self, output_filename: str = 'temp_sheet.csv'):
        # Descarga el archivo CSV y lo guarda temporalmente
        response = requests.get(self.csv_export_url)
        response.raise_for_status()  # Asegurarse de que la solicitud fue exitosa
        with open(output_filename, 'wb') as f:
            f.write(response.content)
        return pd.read_csv(output_filename)

    def validate_no_duplicate_columns(self, df: pd.DataFrame):
        # Validar si existen columnas duplicadas en el DataFrame
        duplicate_columns = df.columns[df.columns.duplicated()]
        if not duplicate_columns.empty:
            raise ValueError(
                f"Columnas duplicadas encontradas: {duplicate_columns.tolist()}")

    def convertir_booleano_amef(self, df: pd.DataFrame, ini: int, end: int):
        """
        Convierte a tipo booleano las columnas en el rango dado, si no son ya booleanas.

        Parámetros:
        - df: DataFrame en el que se realizará la conversión.
        - ini: Índice de la columna inicial.
        - end: Índice de la columna final.

        Retorna:
        - DataFrame con las columnas convertidas a booleano.
        """
        # Iterar sobre las columnas en el rango especificado
        for col in df.columns[ini + 1:end]:
            # Verificar si la columna no es de tipo booleano
            if df[col].dtype != bool:
                # Convertir la columna a booleano
                df[col] = df[col].map(lambda x: True if str(
                    x).upper() == 'TRUE' else False)

        return df

    def load_data_from_postgres(
            self,
            db_url: str = 'postgresql://postgres:postgres@localhost/simyo',
            query: str = """
                SELECT
                    base."id",
                    "structure".tag,
                    locations.location_code,
                    "plans"."name",
                    base.fk_plan
                FROM
                    base
                    INNER JOIN
                    locations
                    ON
                        base.fk_location = locations."id"
                    INNER JOIN
                    "structure"
                    ON
                        base.fk_structure = "structure"."id"
                    LEFT JOIN
                    "plans"
                    ON
                        base.fk_plan = "plans"."id"
                """):
        """
        Conecta a la base de datos PostgreSQL, ejecuta la consulta SQL y devuelve un DataFrame.

        Parámetros:
        - db_url (str): URL de la base de datos PostgreSQL.
        - query (str): Consulta SQL a ejecutar.

        Retorna:
        - DataFrame con los datos obtenidos de la consulta.
        """
        # Crear un engine de SQLAlchemy para conectarse a la base de datos
        engine = sqlalchemy.create_engine(db_url)

        # Ejecutar la consulta y cargar los resultados en un DataFrame
        df = pd.read_sql_query(query, engine)

        # Establecer la columna 'id' como índice del DataFrame
        df.index = df['id'].values

        return df

    def process_dataframe(self, df: pd.DataFrame, columns_to_remove: list):

        # Si existen columnas con nombres duplicados, arrojar un error de columnas duplicadas
        if df.columns.duplicated().any():
            assert False, "Hay columnas duplicadas en el DataFrame df1"
        # Eliminar columnas innecesarias
        df = df.drop(columns=columns_to_remove, errors='ignore')

        # Eliminar filas donde 'TIPO' tenga valores 'Sistema' o 'Subsistema'
        df = df[~df['Tipo'].isin(['Sistema', 'Subsistema'])]

        # Filtrar filas donde la columna 'Plan' no sea nula
        df = df[df['Plan'].notna()]

        return df

    def create_output_dataframe(self, df: pd.DataFrame):
        # Crear un DataFrame de salida con columnas específicas
        # Iterar sobre las filas del dataframe
        df_salida = pd.DataFrame()
        for index, row in df.iterrows():
            # Buscar el índice de la columna 'AMEF'
            try:
                amef_index = df.columns.get_loc('AMEF')
            except KeyError:
                continue

            # Iterar a partir de la columna siguiente a 'AMEF'
            for col in df.columns[amef_index + 1:]:
                # print(col)
                if row[col] == True:  # Si el valor es True
                    # Adicionar una nueva fila al dataframe de salida
                    new_row = pd.DataFrame({
                        'tag': [row['Tag']],
                        'location_code': [col],
                        'plan': [row['Plan']]
                    })
                    df_salida = pd.concat(
                        [df_salida, new_row], ignore_index=True)
        return df_salida

    def load_data_from_db(self, query: str, db_path: str):
        # Cargar datos desde una base de datos SQLite
        conn = sqlite3.connect(db_path)
        df_db = pd.read_sql_query(query, conn)
        conn.close()
        return df_db

    def read_csv(self, filename="temp_sheet.csv", column_row=None, row_ini=None):
        # Lee el archivo CSV usando pandas
        # df = pd.DataFrame()
        df = pd.read_csv(filename, header=column_row,
                         skiprows=None, skipfooter=0)
        if column_row and row_ini is None:
            return df
        else:
            df.columns = df.loc[column_row, :].to_list()  # la fila 2 como fila
            df = df.loc[row_ini:, :]   # Obtener desde la fila 4 en adelante
            return df

    def merge_dataframes(self, df1: pd.DataFrame, df2: pd.DataFrame, on_columns: list):
        # Realizar el merge de dos DataFrames
        return pd.merge(df1, df2, on=on_columns, how='left', suffixes=('_left', '_right'))

    def process(
            self,
            df_input,
            columns_to_remove: list = ['RO', 'AM', 'AZ', 'MO', 'VE', 'BL', 'NA', 'CE', 'CA', 'PL']):
        # Método principal que engloba toda la lógica
        # df = self.download_csv()  # Descarga y carga del CSV
        # df = self.read_csv(filename=filename_input,column_row=2,row_ini=4)
        # print(filename_input)
        # df = pd.read_csv(filename_input)

        # self.validate_no_duplicate_columns(df)  # Validar columnas duplicadas
        # Método principal que engloba toda la lógica
        df = self.process_dataframe(
            df_input, columns_to_remove)  # Procesar el DataFrame
        amef_index = df.columns.get_loc('AMEF')
        end = len(df.columns)
        df = self.convertir_booleano_amef(df, amef_index, end)
        # Crear DataFrame de salida
        df_output = self.create_output_dataframe(df)

        cantidad_activos = df.iloc[:, amef_index + 1:].sum().sum()
        print(f"Existen {cantidad_activos} activos en la hoja de datos")
        if len(df_output) != cantidad_activos:
            assert False, "La cantidad de activos no corresponde con la salida"
        
        # Cargar datos desde la base de datos
        df_db = self.load_data_from_postgres()  
        # Merge de DataFrames
        df_result = self.merge_dataframes(df_output, df_db, ['tag', 'location_code'])  

        return df_output, df_db, df_result

In [304]:
import pandas as pd

gs1 = GoogleSheetProcessor ("https://docs.google.com/spreadsheets/d/1oUHkuKpHtuhMirNW6SvAQ4A0ns5PZs71iZ_WFXZHNn8/edit?gid=1115106678#gid=1115106678")
archivo = "input/pad_2.csv"
#gs1.download_csv(archivo)
df1 = pd.read_csv(archivo,header=3,true_values=['True','TRUE'],false_values=['False',"FALSE",""])


gs2 = GoogleSheetProcessor("https://docs.google.com/spreadsheets/d/1yOaSeqRBr1FW6tvFMi_Y-s4011cKBoyiWU5dTMlujrU/edit?gid=1115106678#gid=1115106678")
archivo = "input/pbd_2.csv"
#gs2.download_csv(archivo)
df2 = pd.read_csv(archivo,header=3,true_values=['True','TRUE'],false_values=['False',"FALSE",""])

df_merged = pd.concat([df1,df2],ignore_index=True).reset_index(drop=True)
filename = "input/mix_plan_2.csv"
df_merged.to_csv(filename,sep=';')

df_output, df_db, df_result=gs1.process(df_merged)
df_result.to_excel("Salida_Planes.xlsx")


Existen 12817 activos en la hoja de datos
