In [1]:
import duckdb
from google.cloud import bigquery
import pandas as pd
import logging
import os
import src.config as config

# Para autenticarse en Google Cloud
from google.auth import default as google_auth_default
from google.auth.transport.requests import Request as AuthRequest


In [46]:
def _aut_google():
    # 1. --- OBTENER TOKEN DE AUTENTICACIÓN ---
    try:
        credentials, project = google_auth_default()
        credentials.refresh(AuthRequest())
        gcs_bearer_token = credentials.token
        logger.info("Token de Google Cloud obtenido exitosamente.")
    except Exception as e:
        logger.error(f"❌ No se pudo obtener el token de GCS/ADC: {e}")
        raise
    return gcs_bearer_token

In [3]:


# Configuración básica de logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# La URL del archivo en GCS
GCS_FILE_PATH = "gs://joaquinrk_data_bukito3/datasets/competencia_02_crudo.csv.gz"

def load_gcs_to_bigquery_via_duckdb(
    project_id: str,
    dataset_id: str,
    table_id: str = "c03",
    gcs_file_path: str = config.DATA_PATH,
    temp_local_db: str = ":memory:" # Usa ':memory:' para base de datos en memoria
) -> None:
    """
    Lee un archivo Gzip CSV desde GCS usando DuckDB,
    y luego carga el DataFrame resultante en una tabla de BigQuery.

    Args:
        project_id: ID del proyecto de Google Cloud.
        dataset_id: ID del dataset de destino en BQ.
        table_id: Nombre de la tabla de destino en BQ (por defecto, 'c02').
        gcs_file_path: Ruta completa del archivo GCS.
        temp_local_db: Ruta a la base de datos DuckDB. Usa ':memory:' para RAM.
    """

    gcs_bearer_token = _aut_google()
    logger.info(f"Iniciando proceso: GCS ({gcs_file_path}) -> DuckDB -> BigQuery ({dataset_id}.{table_id})")

    # 1. Conectar DuckDB y leer GCS
    try:
        # Se requiere instalar el módulo httpfs para leer GCS/S3/HTTPs
        con = duckdb.connect(database=temp_local_db, read_only=False)
        con.sql("INSTALL httpfs;")
        con.sql("LOAD httpfs;")
        logger.info("DuckDB conectado y extensión 'httpfs' cargada.")


        #    --- SOLUCIÓN: CREAR SECRETO GCS CON EL TOKEN ---
        # DuckDB utilizará este secreto para todas las peticiones a GCS
        con.sql(f"CREATE SECRET gcs_secret (TYPE GCS, bearer_token '{gcs_bearer_token}');")
        logger.info("DuckDB conectado, extensión 'httpfs' cargada y secreto GCS creado.")


        # Ejecutar la consulta para leer el archivo GCS y obtener el resultado como un DataFrame de Pandas
        query = f"SELECT * FROM read_csv_auto('{gcs_file_path}');"
        df_duckdb = con.sql(query).df()

        logger.info(f"✅ Lectura de GCS a DataFrame completada. Filas cargadas: {len(df_duckdb)}")

        # Cerrar la conexión DuckDB
        con.close()

    except Exception as e:
        logger.error(f"❌ Error durante la lectura con DuckDB: {e}")
        # Asegúrate de que las credenciales de GCS sean válidas para la extensión httpfs
        raise

    # 2. Cargar DataFrame a BigQuery
    try:
        client = bigquery.Client(project=project_id)
        table_ref = client.dataset(dataset_id).table(table_id)

        # Configuración de carga: escribir sobre la tabla si ya existe
        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        )

        # Cargar el DataFrame de Pandas a BigQuery
        job = client.load_table_from_dataframe(
            df_duckdb, table_ref, job_config=job_config
        )

        job.result()  # Espera a que el job termine

        logger.info(f"✅ DataFrame cargado exitosamente a BigQuery en la tabla '{table_id}' en el dataset '{dataset_id}'.")

    except Exception as e:
        logger.error(f"❌ Error durante la carga a BigQuery: {e}")
        raise

In [9]:
# --- Ejemplo de Uso (Asegúrate de configurar tus variables) ---
# **REEMPLAZA ESTOS VALORES CON LOS DE TU ENTORNO**
YOUR_PROJECT_ID = config.BQ_PROJECT
YOUR_DATASET_ID = config.BQ_DATASET # Cambia por tu dataset
YOUR_TABLE_ID = "c02" # Nombre de la tabla de destino

try:
    logger.info("Ejecutando carga de datos desde GCS a BigQuery...")
    logger.info("tarda unos 10 minutos..")

    # load_gcs_to_bigquery_via_duckdb(
    #     project_id=YOUR_PROJECT_ID,
    #     dataset_id=YOUR_DATASET_ID,
    #     table_id=YOUR_TABLE_ID
    # )
except Exception as e:
    print(f"La ejecución falló. Asegúrate de tener credenciales válidas y de que la extensión httpfs esté instalada en DuckDB.")
    pass

INFO:__main__:Token de Google Cloud obtenido exitosamente.
INFO:__main__:Iniciando proceso: GCS (gs://joaquinrk_data_bukito3/datasets/competencia_02_crudo.csv.gz) -> DuckDB -> BigQuery (dmeyf.c02)
INFO:__main__:DuckDB conectado y extensión 'httpfs' cargada.
INFO:__main__:DuckDB conectado, extensión 'httpfs' cargada y secreto GCS creado.


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

INFO:__main__:✅ Lectura de GCS a DataFrame completada. Filas cargadas: 4717958
INFO:__main__:✅ DataFrame cargado exitosamente a BigQuery en la tabla 'c02' en el dataset 'dmeyf'.


In [4]:

logger = logging.getLogger(__name__)

def create_churn_targets_bq(
    project_id: str,
    dataset_id: str,
    source_table: str,
    target_table: str = "targets",
) -> None:
    """
    Crea la tabla de targets (clase_ternaria) en BigQuery utilizando la lógica
    de gap temporal (BAJA+1, BAJA+2) del código R/data.table.

    Args:
        project_id: ID del proyecto de Google Cloud.
        dataset_id: ID del dataset.
        source_table: Nombre de la tabla de datos crudos o features.
        target_table: Nombre de la tabla donde se guardarán los targets.
    """

    logger.info(f"Iniciando creación de tabla de targets '{target_table}'...")

    try:
        client = bigquery.Client(project=project_id)
        source_ref = f"`{project_id}.{dataset_id}.{source_table}`"
        target_ref = f"`{project_id}.{dataset_id}.{target_table}`"

        # SQL para replicar la lógica de Target Engineering (R/data.table)
        query = f"""
        CREATE OR REPLACE TABLE {target_ref}
        PARTITION BY RANGE_BUCKET(foto_mes, GENERATE_ARRAY(201901, 202208, 1))
        CLUSTER BY foto_mes, numero_de_cliente
        AS
        WITH PreCalculations AS (
            SELECT
                foto_mes,
                numero_de_cliente,
                -- 1. Calcula el periodo serializado (ej. 202401 -> 24289)
                CAST(FLOOR(t1.foto_mes / 100) AS INT64) * 12 + MOD(t1.foto_mes, 100) AS periodo0,
                -- 2. Calcula los leads (periodo1 y periodo2)
                LEAD(CAST(FLOOR(t1.foto_mes / 100) AS INT64) * 12 + MOD(t1.foto_mes, 100), 1)
                    OVER (PARTITION BY t1.numero_de_cliente ORDER BY t1.foto_mes) AS periodo1,
                LEAD(CAST(FLOOR(t1.foto_mes / 100) AS INT64) * 12 + MOD(t1.foto_mes, 100), 2)
                    OVER (PARTITION BY t1.numero_de_cliente ORDER BY t1.foto_mes) AS periodo2
            FROM {source_ref} AS t1
        ),
        MaxPeriods AS (
            SELECT
                MAX(periodo0) AS periodo_ultimo,
                MAX(periodo0) - 1 AS periodo_anteultimo
            FROM PreCalculations
        )
        SELECT
            t1.foto_mes,
            t1.numero_de_cliente,
            t1.periodo0,
            t1.periodo1,
            t1.periodo2,
            -- 3. Aplica la lógica de la clase ternaria (siguiendo precedencia)
            CASE
                -- BAJA+2: Antes del penúltimo mes, hay continuidad en M+1, pero falta M+2 (gap > 2)
                WHEN t1.periodo0 < mp.periodo_anteultimo AND
                     (t1.periodo0 + 1 = t1.periodo1) AND
                     (t1.periodo2 IS NULL OR t1.periodo0 + 2 < t1.periodo2)
                THEN 'BAJA+2'

                -- BAJA+1: Antes del último mes, falta M+1 (periodo1 es NULL o hay un gap)
                WHEN t1.periodo0 < mp.periodo_ultimo AND
                     (t1.periodo1 IS NULL OR t1.periodo0 + 1 < t1.periodo1)
                THEN 'BAJA+1'

                -- CONTINUA: Por defecto para registros antes del penúltimo mes que no son Baja
                WHEN t1.periodo0 < mp.periodo_anteultimo
                THEN 'CONTINUA'

                ELSE NULL -- Registros del último/penúltimo mes sin clasificación de Baja
            END AS clase_ternaria
        FROM PreCalculations AS t1
        CROSS JOIN MaxPeriods AS mp;
        """

        job = client.query(query)
        job.result()
        logger.info(f"✅ Tabla de targets '{target_table}' creada exitosamente.")

    except Exception as e:
        logger.error(f"❌ Error al crear la tabla de targets en BigQuery: {e}")
        raise

# Nota: Esta función solo crea la tabla de targets, las features se procesarán en la siguiente función.

In [17]:
create_churn_targets_bq(config.BQ_PROJECT, config.BQ_DATASET, "c02", "targets")

INFO:__main__:Iniciando creación de tabla de targets 'targets'...
INFO:__main__:✅ Tabla de targets 'targets' creada exitosamente.


In [None]:
from src.loader import tabla_productos_por_cliente
tabla_productos_por_cliente(config.BQ_PROJECT, config.BQ_DATASET, config.BQ_TABLE,
                                    config.BQ_TABLE_TARGETS)
# crea tabla c02_products

In [5]:
def create_intra_month_features_bq(
    project_id: str,
    dataset_id: str,
    source_table: str,
    output_table: str
) -> None:
    """
    Crea la tabla con Feature Engineering intra-mes en BigQuery.

    Args:
        project_id: ID del proyecto de Google Cloud.
        dataset_id: ID del dataset.
        source_table: Nombre de la tabla de entrada (cruda o con targets).
        output_table: Nombre de la tabla de salida.
    """

    logger.info(f"Iniciando Feature Engineering intra-mes para '{output_table}'...")

    try:
        client = bigquery.Client(project=project_id)
        source_ref = f"`{project_id}.{dataset_id}.{source_table}`"
        output_ref = f"`{project_id}.{dataset_id}.{output_table}`"

        query = f"""
        CREATE OR REPLACE TABLE {output_ref}
        PARTITION BY RANGE_BUCKET(foto_mes, GENERATE_ARRAY(201901, 202208, 1))
        CLUSTER BY foto_mes, numero_de_cliente
        AS
        SELECT
            t1.* EXCEPT(clase_ternaria),
            t1.clase_ternaria, -- Aseguramos que la columna clase_ternaria esté al final

            -- kmes (Mes del año)
            MOD(t1.foto_mes, 100) AS kmes,

            -- ctrx_quarter_normalizado (Normalización de ctrx_quarter por antigüedad)
            CASE
                WHEN t1.cliente_antiguedad = 1 THEN t1.ctrx_quarter * 5.0
                WHEN t1.cliente_antiguedad = 2 THEN t1.ctrx_quarter * 2.0
                WHEN t1.cliente_antiguedad = 3 THEN t1.ctrx_quarter * 1.2
                ELSE t1.ctrx_quarter -- Valor por defecto o ctrx_quarter original
            END AS ctrx_quarter_normalizado,

            -- mpayroll_sobre_edad
            CASE
                WHEN t1.cliente_edad IS NULL OR t1.cliente_edad = 0 THEN NULL
                ELSE t1.mpayroll / t1.cliente_edad
            END AS mpayroll_sobre_edad

        FROM {source_ref} AS t1;
        """

        job = client.query(query)
        job.result()
        logger.info(f"✅ Feature Engineering intra-mes completado. Tabla guardada en '{output_table}'.")

    except Exception as e:
        logger.error(f"❌ Error al ejecutar el Feature Engineering intra-mes en BigQuery: {e}")
        raise

In [6]:
create_intra_month_features_bq(config.BQ_PROJECT, config.BQ_DATASET, 'c02_products',
                                    config.BQ_TABLE_FEATURES)

INFO:__main__:Iniciando Feature Engineering intra-mes para 'c02_features'...
INFO:__main__:✅ Feature Engineering intra-mes completado. Tabla guardada en 'c02_features'.


In [36]:
# ESTÁ EN FEATURES.PY

def create_historical_features_bq(
    project_id: str,
    dataset_id: str,
    source_table: str,
    output_table: str,
    cols_to_engineer: list, # Lista de columnas para las que calcular historial
    window_size: int = 6,
) -> None:

    logger.info(f"Iniciando Feature Engineering histórico ({window_size} meses) con CTEs para evitar anidación...")

    try:
        client = bigquery.Client(project=project_id)
        source_ref = f"`{project_id}.{dataset_id}.{source_table}`"
        output_ref = f"`{project_id}.{dataset_id}.{output_table}`"

        # --- CONSTRUCCIÓN DINÁMICA DE EXPRESIONES (Sin la cláusula 'OVER') ---
        lag_exprs = []
        hist_exprs = []

        # 1. Definición de la especificación de la ventana (para re-uso)
        window_spec_name = "w"
        window_spec_sql = f"""
            WINDOW {window_spec_name} AS (
                PARTITION BY numero_de_cliente
                ORDER BY foto_mes
                ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW
            )
        """

        # 2. Loop para generar todas las expresiones (usando alias 't2')
        for col in cols_to_engineer:

            # --- Variables limpias del CTE BaseFeatures (t2) ---
            col_base = f"t2_{col}_clean"
            col_rn = "t2_row_index"

            # --- 1. Lags ---
            col_lag1 = f"LAG({col_base}, 1) OVER (PARTITION BY t2.numero_de_cliente ORDER BY t2.foto_mes)"
            col_lag2 = f"LAG({col_base}, 2) OVER (PARTITION BY t2.numero_de_cliente ORDER BY t2.foto_mes)"

            lag_exprs.append(f"{col_lag1} AS {col}_lag1")
            lag_exprs.append(f"{col_lag2} AS {col}_lag2")

            # --- 2. Deltas (Resta) ---
            lag_exprs.append(f"({col_base} - {col_lag1}) AS {col}_delta1")
            lag_exprs.append(f"({col_base} - {col_lag2}) AS {col}_delta2")

            # --- 3. Tendencia (COVAR_POP / VAR_POP) ---
            # Aplicamos la ventana NOMBRADA ({window_spec_name}) a cada función de agregación
            hist_exprs.append(f"""
                (
                    COVAR_POP({col_base}, {col_rn}) OVER {window_spec_name}
                    /
                    NULLIF(VAR_POP({col_rn}) OVER {window_spec_name}, 0)
                ) AS {col}_tend{window_size}
            """)

            # --- 4. Promedio, Min, Max (AVG, MIN, MAX) ---
            col_avg = f"AVG({col_base}) OVER {window_spec_name}"
            col_min = f"MIN({col_base}) OVER {window_spec_name}"
            col_max = f"MAX({col_base}) OVER {window_spec_name}"

            hist_exprs.append(f"{col_avg} AS {col}_avg{window_size}")
            # Corregir los nombres de alias de min/max (antes eran _avg{window_size} duplicados)
            hist_exprs.append(f"{col_min} AS {col}_min{window_size}")
            hist_exprs.append(f"{col_max} AS {col}_max{window_size}")

            # --- 5. Ratios (División) ---
            hist_exprs.append(f"({col_base} / NULLIF({col_avg}, 0)) AS {col}_ratioavg{window_size}")
            hist_exprs.append(f"({col_base} / NULLIF({col_max}, 0)) AS {col}_ratiomax{window_size}")

        all_new_features = lag_exprs + hist_exprs

        # --- QUERY FINAL CON CTEs ---
        # BaseFeatures: Limpieza de tipos y cálculo del índice (ROW_NUMBER)
        # HistoricalFeatures: Cálculos de ventana (LAG, AVG, COVAR)

        cols_for_base_cte = [f"CAST(t1.{col} AS FLOAT64) AS t2_{col}_clean" for col in cols_to_engineer]

        query = f"""
        CREATE OR REPLACE TABLE {output_ref}
        PARTITION BY RANGE_BUCKET(foto_mes, GENERATE_ARRAY(201801, 203001, 1))
        CLUSTER BY foto_mes, numero_de_cliente
        AS
        WITH BaseFeatures AS (
            SELECT
                -- 1. Seleccionamos TODAS las columnas originales (solo una vez)
                t1.*,
                -- 2. Calculamos el índice para la regresión (X)
                ROW_NUMBER() OVER(PARTITION BY t1.numero_de_cliente ORDER BY t1.foto_mes) AS t2_row_index,
                -- 3. Creamos las versiones limpias/casteadas de las features (Y)
                {', '.join(cols_for_base_cte)}
            FROM {source_ref} AS t1
        ),
        HistoricalFeatures AS (
            SELECT
                -- 1. Seleccionamos todas las columnas base y eliminamos las auxiliares
                t2.* EXCEPT({', '.join([f"t2_{col}_clean" for col in cols_to_engineer])}, t2_row_index),

                -- 2. Agregamos las features históricas calculadas
                {', '.join(all_new_features)}
            FROM BaseFeatures AS t2
            {window_spec_sql}
        )
        SELECT * FROM HistoricalFeatures;
        """
        # DEBUG: Imprimir la query completa para revisión antes de ejecutar
        # print(query)

        job = client.query(query)
        job.result()
        logger.info(f"✅ Feature Engineering histórico completado. Tabla guardada en '{output_table}'.")

    except Exception as e:
        logger.error(f"❌ Error al ejecutar el Feature Engineering histórico en BigQuery: {e}")
        raise

In [37]:
LAG_VARS = ['Master_cconsumos', 'Master_fultimo_cierre', 'Master_mconsumospesos', 'Master_mfinanciacion_limite',
 'Master_mlimitecompra', 'Master_mpagominimo', 'Master_mpagospesos', 'Master_msaldopesos', 'Master_msaldototal',
 'Visa_Fvencimiento', 'Visa_cconsumos', 'Visa_mconsumosdolares', 'Visa_mconsumospesos',
 'Visa_mconsumototal', 'Visa_mfinanciacion_limite', 'Visa_mlimitecompra', 'Visa_mpagado',
 'Visa_mpagominimo', 'Visa_mpagospesos', 'Visa_msaldopesos', 'Visa_msaldototal', 'Visa_status', 'ccaja_ahorro',
 'ccaja_seguridad', 'ccajas_consultas', 'ccajas_otras', 'ccajas_transacciones', 'ccallcenter_transacciones',
 'ccomisiones_mantenimiento', 'ccomisiones_otras', 'ccuenta_debitos_automaticos', 'cdescubierto_preacordado', 'cextraccion_autoservicio', 'chomebanking_transacciones', 'cmobile_app_trx', 'cpagomiscuentas',
 'cpayroll_trx', 'cprestamos_personales', 'cproductos', 'ctarjeta_debito', 'ctarjeta_visa', 'ctarjeta_visa_debitos_automaticos',
 'ctarjeta_visa_transacciones', 'ctransferencias_emitidas', 'ctransferencias_recibidas',
 'ctrx_quarter', 'mactivos_margen', 'mautoservicio', 'mcaja_ahorro', 'mcaja_ahorro_dolares',
 'mcomisiones', 'mcomisiones_mantenimiento', 'mcomisiones_otras', 'mcuenta_corriente', 'mcuenta_debitos_automaticos', 'mcuentas_saldo', 'mextraccion_autoservicio', 'mpagomiscuentas', 'mpasivos_margen',
 'mpayroll', 'mplazo_fijo_dolares', 'mprestamos_personales', 'mrentabilidad', 'mrentabilidad_annual',
 'mtarjeta_master_consumo', 'mtarjeta_visa_consumo', 'mtransferencias_emitidas', 'mtransferencias_recibidas', 'mttarjeta_visa_debitos_automaticos', 'tcallcenter', 'thomebanking', 'tmobile_app']

# Listo aparte las features creadas
features_nuevas = ["q_producto_master", "q_producto_visa","q_producto_general","ctrx_quarter_normalizado","mpayroll_sobre_edad" ]

# Agrego las features creadas
LAG_VARS += features_nuevas

# Dejo valores únicos
LAG_VARS = list(set(LAG_VARS))


In [38]:
create_historical_features_bq(
        config.BQ_PROJECT,
        config.BQ_DATASET,
        config.BQ_TABLE_FEATURES,
        'c02_features_historical',
        cols_to_engineer=LAG_VARS,
        window_size=6
    )

INFO:__main__:Iniciando Feature Engineering histórico (6 meses) con CTEs para evitar anidación...
INFO:__main__:✅ Feature Engineering histórico completado. Tabla guardada en 'c02_features_historical'.


### Guardado de parquet

In [59]:
from google.cloud import bigquery
from google.cloud.bigquery import job
import logging
from typing import List

logger = logging.getLogger(__name__)

def export_bq_to_gcs_native(
    project_id: str,
    dataset_id: str,
    source_table: str,
    gcs_path_proc: str, # Debe contener el comodín '*' para fragmentación
    select_cols: List[str] = ['*']
) -> None:
    """
    Exporta una tabla de BigQuery directamente a archivos Parquet fragmentados en GCS
    utilizando la API nativa de BigQuery (sin DuckDB).

    Args:
        project_id: ID del proyecto de Google Cloud.
        dataset_id: ID del dataset de BQ.
        source_table: Nombre de la tabla de BQ a exportar.
        gcs_path_proc: Ruta completa de GCS de destino (ej. 'gs://bucket/path-*.parquet').
        select_cols: Lista de columnas a exportar.
    """

    logger.info(f"Iniciando exportación nativa: BQ ({source_table}) -> GCS Parquet ({gcs_path_proc})")

    try:
        client = bigquery.Client(project=project_id)

        # 1. Crear una vista temporal (si es necesario) para seleccionar solo ciertas columnas
        # Si select_cols != ['*'], necesitamos una Query para filtrar columnas/datos
        if select_cols != ['*']:
            cols_str = ', '.join(select_cols)
            source_query = f"SELECT {cols_str} FROM `{project_id}.{dataset_id}.{source_table}`"

            # Ejecutar la consulta y guardar el resultado en una tabla temporal (o usar la tabla de destino como origen)
            # Para exportación, es más fácil si la tabla de origen ya está lista.
            # Si solo se filtra, se usa un ExtractJob
            logger.warning("La exportación nativa no soporta la selección de columnas 'SELECT' directamente sobre la tabla de origen.")
            logger.warning("Se exportará la tabla completa. Si necesita filtrar, use un CTE o cree una tabla/vista intermedia.")

        table_ref = client.dataset(dataset_id).table(source_table)

        # 2. Configurar el Job de Exportación
        job_config = job.ExtractJobConfig()
        job_config.destination_format = bigquery.DestinationFormat.PARQUET
        job_config.compression = 'SNAPPY' # La compresión estándar para Parquet

        # 3. Ejecutar la exportación
        extract_job = client.extract_table(
            table_ref,
            gcs_path_proc, # Ya contiene el comodín '*'
            job_config=job_config
        )
        extract_job.result()  # Esperar a que el job termine

        logger.info(f"✅ Exportación nativa completada. Archivos guardados en: {gcs_path_proc.replace('*', '...')}")

    except Exception as e:
        logger.error(f"❌ Error durante la exportación nativa de BQ: {e}")
        # El error 400 'Table too large' ya fue resuelto con el comodín,
        # pero es bueno saber que este método lo maneja.
        raise

# --- Ejemplo de Uso ---
if __name__ == '__main__':
    # Usar la misma ruta fragmentada que causó el error en DuckDB
    GCS_DESTINO = "gs://joaquinrk_data_bukito3/datasets/competencia_02_features/competencia_02_features-*.parquet"

    try:
        # Reemplaza con tus variables reales
        export_bq_to_gcs_native(
            project_id="dmecoyfin-250928192534125",
            dataset_id="dmeyf",
            source_table="c02_features_historical",
            gcs_path_proc=GCS_DESTINO,
            select_cols=['foto_mes', 'numero_de_cliente', 'clase_ternaria', 'feature_1_avg6', 'feature_2_tend6']
        )
    except Exception:
        pass

INFO:__main__:Iniciando exportación nativa: BQ (c02_features_historical) -> GCS Parquet (gs://joaquinrk_data_bukito3/datasets/competencia_02_features/competencia_02_features-*.parquet)
INFO:__main__:✅ Exportación nativa completada. Archivos guardados en: gs://joaquinrk_data_bukito3/datasets/competencia_02_features/competencia_02_features-....parquet


In [68]:
path = "gs://joaquinrk_data_bukito3/datasets/competencia_02_features/competencia_02_features-*.parquet"
# O un directorio entero
# path = "./carpeta/*.parquet"
gcs_bearer_token = _aut_google()

con = duckdb.connect(database=':memory:', read_only=False)
con.sql("INSTALL httpfs;")
con.sql("LOAD httpfs;")
logger.info("DuckDB conectado y extensión 'httpfs' cargada.")


con.sql(f"SET s3_region='';")
con.sql(f"SET s3_access_key_id='';")
con.sql(f"SET s3_secret_access_key='';")
con.sql(f"SET s3_endpoint='';")
con.sql(f"SET s3_session_token='';")
con.sql(f"SET s3_url_style='path';")
con.sql(f"SET gcs_bearer_token='{gcs_bearer_token}';") # <--- Inyectamos el token aquí
logger.info("DuckDB GCS configurado usando bearer token (ADC).")

logger.info("DuckDB conectado, extensión 'httpfs' cargada y secreto GCS creado.")

# Ejecutar una consulta SQL directamente sobre los archivos
# DuckDB interpreta el patrón como un conjunto de datos único
query = f"SELECT * FROM read_parquet('{path}') where foto_mes = 202101 LIMIT 10"

# Ejecutar la consulta y obtener los resultados en un DataFrame de Pandas
# DuckDB es muy eficiente leyendo los metadatos y filtrando solo lo necesario
df = conn.execute(query).fetchdf()

print(df.head())
print(f"Total de filas leídas: {len(df)}")


INFO:__main__:Token de Google Cloud obtenido exitosamente.
INFO:__main__:DuckDB conectado y extensión 'httpfs' cargada.


CatalogException: Catalog Error: unrecognized configuration parameter "gcs_bearer_token"

Did you mean: "s3_region"