In [None]:
%pip install langchain-elasticsearch langchain-community langchain tiktoken langchain_openai google-cloud-bigquery

Collecting langchain-elasticsearch
  Downloading langchain_elasticsearch-0.3.2-py3-none-any.whl.metadata (8.3 kB)
Collecting langchain-community
  Downloading langchain_community-0.3.27-py3-none-any.whl.metadata (2.9 kB)
Collecting langchain_openai
  Downloading langchain_openai-0.3.27-py3-none-any.whl.metadata (2.3 kB)
Collecting elasticsearch<9.0.0,>=8.13.1 (from elasticsearch[vectorstore-mmr]<9.0.0,>=8.13.1->langchain-elasticsearch)
  Downloading elasticsearch-8.18.1-py3-none-any.whl.metadata (9.2 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.10.1-py3-none-any.whl.metadata (3.4 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.1-py3-none-any.whl.metadata (9.4 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langcha

In [2]:
import os
with open("/content/api_key.txt") as archivo:
  apikey = archivo.read()
os.environ["OPENAI_API_KEY"] = apikey

In [3]:
from google.colab import auth
auth.authenticate_user()


In [4]:
from google.cloud import bigquery

# Reemplaza con tu proyecto
PROJECT_ID = "banded-badge-465303-k1"

client = bigquery.Client(project=PROJECT_ID)



In [28]:
import re
from google.cloud import bigquery
from langchain.schema import Document
from datetime import datetime
from langchain.text_splitter import RecursiveCharacterTextSplitter

client = bigquery.Client(project="banded-badge-465303-k1")
docs = []

schema_chunk_size = 7  # columnas por bloque

splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100,
    separators=["\n- ", "\n\n", "\n", " ", ""]
)

def extraer_metadata_comentario(comentario: str) -> dict:
    metadata = {}
    patrones = {
        "caso_uso": r"CASO DE USO:\s*(.*)",
        "version": r"VERSION:\s*(.*)",
        "desarrollador": r"DESARROLLADOR:\s*(.*)",
        "proveedor": r"PROVEEDOR:\s*(.*)",
        "fecha_documentacion": r"FECHA:\s*(.*)",
        "objetivo_proceso": r"OBJETIVO DEL PROCESO:\s*(.*)"
    }
    for key, pattern in patrones.items():
        match = re.search(pattern, comentario, re.IGNORECASE)
        if match:
            metadata[key] = match.group(1).strip()
    return metadata

def clasificar_bloque(chunk: str) -> str:
    chunk_upper = chunk.upper()
    if chunk_upper.startswith("/*"):
        return "comentario"
    elif "----" in chunk:
        return "encriptacion"
    elif chunk_upper.startswith("INSERT INTO"):
        return "carga_destino"
    elif chunk_upper.startswith("SELECT"):
        return "logica_seleccion"
    elif chunk_upper.startswith("FROM"):
        return "origen_datos"
    elif chunk_upper.startswith("WHERE"):
        return "logica_adicional"
    elif "NOT EXISTS" in chunk_upper:
        return "verificacion_existencia"
    elif "ROW_NUMBER" in chunk_upper:
        return "verificacion_duplicados"
    elif chunk_upper.startswith("BEGIN"):
        return "inicio"
    elif chunk_upper.startswith("END"):
        return "fin"
    else:
        return "otro"


def chunkear_columnas(columnas, tabla_ref, dataset_id):
    columnas_txt = []
    for field in columnas:
        policy_tag = (
            field.policy_tags.names[0]
            if field.policy_tags and field.policy_tags.names
            else "Sin clasificación"
        )
        columnas_txt.append(f"- {field.name} ({field.field_type}, {field.mode}): {field.description or 'Sin descripción'} [Policy Tag: {policy_tag}]")

    chunks = [
        "\n".join(columnas_txt[i:i+schema_chunk_size])
        for i in range(0, len(columnas_txt), schema_chunk_size)
    ]

    group_id = f"schema:{tabla_ref}"
    total_chunks = len(chunks)
    documentos = []

    for idx, chunk in enumerate(chunks):
        contenido = f"Tabla: {tabla_ref}\n\nColumnas:\n{chunk}"
        documentos.append(Document(
            page_content=contenido,
            metadata={
                "dataset": dataset_id,
                "type": "table",
                "table": tabla_ref,
                "chunk_index": idx,
                "total_chunks": total_chunks,
                "group_id": group_id
            }
        ))

    return documentos

# Procesamiento de datasets, tablas y rutinas
datasets = list(client.list_datasets())
for dataset in datasets:
    dataset_id = dataset.dataset_id
    dataset_ref = f"{client.project}.{dataset_id}"

    # TABLAS
    tables = client.list_tables(dataset_ref)
    for table in tables:
        table_id = table.table_id
        table_ref = f"{dataset_ref}.{table_id}"
        table_obj = client.get_table(table_ref)

        partitioning = table_obj.time_partitioning
        clustering = table_obj.clustering_fields

        partition_info = ""
        if partitioning:
            partition_info += f"Particionado por columna: {partitioning.field} - Tipo: {partitioning.type_}\n"
        if clustering:
            partition_info += f"Clustering: {', '.join(clustering)}\n"

        table_header = f"""
Dataset: {dataset_id}
Tabla: {table_ref}
Descripción: {table_obj.description or 'Sin descripción'}
Número de filas: {table_obj.num_rows}
Tamaño (bytes): {table_obj.num_bytes}
Fecha de creación: {table_obj.created}
Última modificación: {table_obj.modified}
{partition_info}
""".strip()

        docs.append(Document(
            page_content=table_header,
            metadata={
                "dataset": dataset_id,
                "type": "table_metadata",
                "table": table_ref,
                "partitioned": bool(partitioning),
                "clustered": bool(clustering),
                "group_id": f"meta:{table_ref}"
            }
        ))

        docs.extend(chunkear_columnas(table_obj.schema, table_ref, dataset_id))

    # RUTINAS

    for routine in client.list_routines(dataset_ref):
        routine_obj = client.get_routine(routine.reference)
        routine_id = routine_obj.reference.routine_id
        dataset_id = routine_obj.reference.dataset_id
        group_id = f"routine:{dataset_id}.{routine_id}"

        body = routine_obj.body or ""
        comentario_match = re.search(r"/\*.*?\*/", body, re.DOTALL)
        comentario = comentario_match.group(0).strip() if comentario_match else None
        extra_metadata = extraer_metadata_comentario(comentario) if comentario else {}

        if comentario:
            docs.append(Document(
                page_content=comentario,
                metadata={
                    **extra_metadata,
                    "type": "routine",
                    "routine": routine_id,
                    "bloque_tipo": "comentario",
                    "dataset": dataset_id,
                    "chunk_index": 0,
                    "group_id": group_id
                }
            ))
            body = body.replace(comentario, "")

        # Identificar bloques específicos manualmente
        bloques = []

        match_inicio = re.search(r"BEGIN", body, re.IGNORECASE)
        if match_inicio:
            bloques.append(("inicio", "BEGIN"))

        match_encriptacion = re.search(r"----.*?----", body, re.DOTALL)
        if match_encriptacion:
            bloques.append(("encriptacion", match_encriptacion.group(0)))

        match_insert = re.search(r"INSERT INTO[\s\S]+?\)\s*", body, re.IGNORECASE)
        if match_insert:
            bloques.append(("carga_destino", match_insert.group(0)))

        match_select = re.search(r"SELECT[\s\S]+?FROM", body, re.IGNORECASE)
        if match_select:
            bloques.append(("logica_seleccion", match_select.group(0).rstrip("FROM")))

        match_from = re.search(r"FROM[\s\S]+?WHERE", body, re.IGNORECASE)
        if match_from:
            bloques.append(("origen_datos", match_from.group(0).rstrip("WHERE")))

        match_where = re.search(r"WHERE[\s\S]+?(?=QUALIFY|;|$)", body, re.IGNORECASE)
        if match_where:
            bloques.append(("logica_adicional", match_where.group(0)))

        match_qualify = re.search(r"QUALIFY[\s\S]+?;", body, re.IGNORECASE)
        if match_qualify:
            bloques.append(("verificacion_duplicados", match_qualify.group(0)))

        match_end = re.search(r"END", body, re.IGNORECASE)
        if match_end:
            bloques.append(("fin", "END"))

        for idx, (tipo, chunk) in enumerate(bloques):
            origen_destino = extraer_metadata_comentario(chunk) if tipo in ["carga_destino", "origen_datos"] else {}
            docs.append(Document(
                page_content=chunk.strip(),
                metadata={
                    **extra_metadata,
                    **origen_destino,
                    "dataset": dataset_id,
                    "routine": routine_id,
                    "type": "routine",
                    "bloque_tipo": tipo,
                    "chunk_index": idx,
                    "group_id": group_id
                }
            ))

print(f"Chunks enriquecidos generados: {len(docs)}")


Chunks enriquecidos generados: 106


In [24]:
docs[1]

Document(metadata={'dataset': 'DS_RDV_KAFKA', 'type': 'table', 'table': 'banded-badge-465303-k1.DS_RDV_KAFKA.ACCOUNT_EVENTS_temp', 'chunk_index': 0, 'total_chunks': 7, 'group_id': 'schema:banded-badge-465303-k1.DS_RDV_KAFKA.ACCOUNT_EVENTS_temp'}, page_content='Tabla: banded-badge-465303-k1.DS_RDV_KAFKA.ACCOUNT_EVENTS_temp\n\nColumnas:\n- ID_ACCOUNT (STRING, NULLABLE): Identificador de la cuenta [Policy Tag: Sin clasificación]\n- BALANCE (STRING, NULLABLE): Saldo actual de la cuenta [Policy Tag: Sin clasificación]\n- CONTRACTID (STRING, NULLABLE): Identificador del contrato asociado [Policy Tag: Sin clasificación]\n- CREDITLIMIT (STRING, NULLABLE): Límite de crédito disponible [Policy Tag: Sin clasificación]\n- CUSTOMERID (STRING, NULLABLE): Identificador del cliente [Policy Tag: Sin clasificación]\n- EXTERNALCUSTOMERID (STRING, NULLABLE): ID externo del cliente [Policy Tag: Sin clasificación]\n- EXTERNALTRANSACTIONID (STRING, NULLABLE): ID externo de la transacción [Policy Tag: Sin cla

In [29]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.elasticsearch import ElasticsearchStore
# Embeddings
embeddings = OpenAIEmbeddings()

# Crear vectorstore en tu cluster
db = ElasticsearchStore.from_documents(
    docs,
    embeddings,
    es_url="http://34.29.149.164:9200",
    es_user="elastic",
    es_password="mZkJVbe7_a_8fYbWonN7",
    index_name="lg-proyectobigquery",
)

In [26]:
results = db.similarity_search("¿Qué columnas tiene la tabla STG_ACCOUNT_EVENTS_temp?", k=3)
for r in results:
    print(r.page_content)

Dataset: DS_UDV_KAFKA
Tabla: banded-badge-465303-k1.DS_UDV_KAFKA.STG_ACCOUNT_EVENTS_temp
Descripción: Tabla staging de eventos de cuentas, con campos financieros, de cliente, tarjetas, y tiempos de operación para auditoría y trazabilidad.
Número de filas: 18
Tamaño (bytes): 8424
Fecha de creación: 2025-07-11 05:40:53.432000+00:00
Última modificación: 2025-07-11 05:49:46.062000+00:00
Particionado por columna: TIME_DATE - Tipo: DAY
Tabla: banded-badge-465303-k1.DS_UDV_KAFKA.STG_ACCOUNT_EVENTS_temp

Columnas:
- ID_ACCOUNT (STRING, NULLABLE): ID de la cuenta [Policy Tag: Sin clasificación]
- BALANCE (FLOAT, NULLABLE): Saldo actual de la cuenta [Policy Tag: Sin clasificación]
- CONTRACTID (STRING, NULLABLE): ID del contrato [Policy Tag: Sin clasificación]
- CREDITLIMIT (FLOAT, NULLABLE): Límite de crédito [Policy Tag: Sin clasificación]
- CUSTOMERID (STRING, NULLABLE): ID del cliente [Policy Tag: Sin clasificación]
- EXTERNALCUSTOMERID (STRING, NULLABLE): ID externo del cliente [Policy Tag: