## Escenario 3.1 – Schema drift en datos de entrada

1. Es necesario definir las features basado en el esquema inicial estableciendo el conjunto y el orden esperado.

Se implementa una rutina de validación y transformacion de esquema, primero se resolvio el problema de nombre inconsistente mediante un bucle de renombrado basado en el mapeo de columna.

In [0]:
from pyspark.sql.functions import col

#definicion de las caracteristicas y mapeo de nombres
FEATURE_CONTRACT = ['customer_id', 'total_usage', 'avg_calls', 'region_id'] 

#atributos
COLUMN_MAP = {
    'cliente_id': 'customer_id',
    'uso_total': 'total_usage'
}

#carga de datos de scoring
df_scoring = spark.read.table("xxxx")


#aplicar el mapeo de atributos para estandarizar nombres
for old_name, new_name in COLUMN_MAP.items():
    if old_name in df_scoring.columns:
        df_scoring = df_scoring.withColumnRenamed(old_name, new_name)

#seleccionar y reordenar el conjunto de atributos
select_expr = [col(c) for c in FEATURE_CONTRACT]

df_scoring_aligned = df_scoring.select(*select_expr)

#verificación del esquema alineado
df_scoring_aligned.printSchema()

## Escenario 3.2 – Carga incorrecta de modelo en MLflow

**Error Inicial** La falla fue usar un nombre de modelo incorrecto (churn_model_prod en lugar de churn_model) o especificar un stage (/Production) que no estaba asignado a ninguna versión.

**Correccion¨** Se utiliza el MlflowClient para obtener y cargar la última versión numérica disponible (models:/nombre/versión). Esto es mas robusto que depender de un nombre ("Production", "Staging") e incierto.

In [0]:
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()
model_name = "churn_model" 
new_tags = {
    "model_framework": "sklearn",
    "project_id": "123456",
    "model_type": "regression"
}

#Listar para obtener ultima version
#Se obtiene la última version registrada (la de mayor numero)
latest_version = client.get_latest_versions(model_name)[0]
version_number = latest_version.version
model_uri = f"models:/{model_name}/{version_number}"

#Carga correcta y predecir
model = mlflow.pyfunc.load_model(model_uri)
df = spark.read.table("churn.scoring_features").toPandas()
preds = model.predict(df)

#etiqueta
for key, value in new_tags.items():
    client.set_model_version_tag(
        name=model_name, 
        version=version_number, 
        key=key, 
        value=value
    )

## Escenario 3.3 – Sistema RAG que siempre devuelve contexto vacío

**Problema:** Suponga que por un error en “retrieve_relevant_chunks” siempre se devuelve un DataFrame vacío, por
ejemplo, por un ordenamiento/umbral mal aplicado.

Se podría estar generando el error en este punto, ya que esto seleccionaria los chunks menos parecidos (los de similitud baja), para este caso estaba seleccionando lo mas seguro todo lo negativo, contenido irrelevante y vacio.

orderBy(col("similarity").asc())


Esta versión siempre devuelve k resultados (si el DataFrame tiene datos)
Evita el error clásico de ordenar ascendente sin querer
Confirma que la columna "embedding" existe

In [0]:
#Tambien se podría ralizar a traves de manejo de excepciones pero quiero dejar algo sencillo y rapido para el ejercicio
def retrieve_chunks(query_embedding, df_embeddings, k=3):

    #Validacion basica (evita DataFrame vacío)
    if df_embeddings.count() == 0:
        print("df_embeddings está vacío.")
        return df_embeddings
    
    if "embedding" not in df_embeddings.columns:
        raise Exception("La columna 'embedding' no existe en df_embeddings.")

    #calculo de similitud usando tu UDF
    df_scored = df_embeddings.withColumn(
        "similarity",
        cos_sim_udf(col("embedding"), lit(query_embedding))
    )

    #manejo por si la similitud queda como NULL o vectores mal formados
    df_scored = df_scored.na.fill({"similarity": -1.0})

    #orden descendente (nota: siempre las similitudes altas primero)
    df_top = df_scored.orderBy(col("similarity").desc()).limit(k)

    #log para que siempre veas qué está retornando
    print("Chunks recuperados (top k):")
    df_top.select("chunk_id", "similarity").show()

    return df_top