## Traer data en Real Time

In [0]:
df = spark.read.table("laboratorio3.cdz.fraud_transactions")
display(df)

Uso el test como simulando que son los datos de inferencia. Lo estoy realizando en batch pero deberia ser en real time

In [0]:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=42)

test_df.count()

## Feature Engineering

In [0]:
features_inferencia = test_df.select("Transaction_ID", "Customer_ID", "Transaction_Amount", "Account_Balance", "Is_Fraud")
display(features_inferencia)

## Feature Store

In [0]:
features_inferencia.write.format("delta") \
  .mode("overwrite") \
  .saveAsTable("laboratorio3.features.features_inferencia")

## Vector

In [0]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['Transaction_Amount', 'Account_Balance'] 
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

df_assembled = assembler.transform(features_inferencia)

## Inferencia

In [0]:
import mlflow
from mlflow.tracking import MlflowClient
from pyspark.sql.functions import current_timestamp, lit

client = MlflowClient()
model_name = "laboratorio3.features.fraud_model_rf"
alias = "inference"
version = client.get_model_version_by_alias(model_name, alias).version

model_uri = f"models:/{model_name}@inference"

# Specify UC volume path for temporary storage
volume_path = "/Volumes/laboratorio3/features/model_storage"

print(f"Cargando modelo desde: {model_uri}")
model = mlflow.spark.load_model(model_uri, dfs_tmpdir=volume_path)

predictions = model.transform(df_assembled)

final_predictions = predictions.select(
    "Transaction_ID", 
    "prediction", 
    "probability"
).withColumn("inference_timestamp", current_timestamp()) \
 .withColumn("model_version", lit(version))

# Guardar los resultados en la tabla de historia de predicciones
final_predictions.write.format("delta") \
    .mode("append") \
    .saveAsTable("laboratorio3.results.predictions")

display(final_predictions)
