# Ejecutar un script de entrenamiento en Spark (Databrocs)

## Conectar a workspace

In [None]:
# conectar
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())

print(f"Conectado al Workspace: {ml_client.workspace_name}")

## Conectar Databricks con Azure ML

In [None]:
databricks_id="/subscriptions/7decb7a4-f615-4cc3-9d7d-5de10998373f/resourceGroups/ntrgy-databrics/providers/Microsoft.Databricks/workspaces/natrgy-db"

In [None]:
from azure.ai.ml.entities import AmlCompute

# Vincular el clúster de Databricks
databricks_compute = AmlCompute(
    name="my-databricks-compute",
    resource_id=databricks_id,
    type="databricks"
)

# Registrar el destino de computación
ml_client.compute.begin_create_or_update(databricks_compute).result()
print(f"Databricks vinculado como destino de computación: {databricks_compute.name}")


## Entrenar un modelo

In [None]:
import os

# create a folder for the script files
script_folder = './src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

In [None]:
%%writefile $script_folder/train.py
# import libraries
import mlflow
import argparse
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

import pyspark.pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression


def main(args):
    # Inicializar Spark
    # spark = SparkSession.builder.appName("DiabetesTraining").getOrCreate()
    # spark = SparkSession.getActiveSession()
    spark = SparkSession.builder.getOrCreate()

    if spark is None:
        raise ValueError("No hay una sesión Spark activa disponible.")

    # read data
    df = get_data(args.input_data, spark)

    # split data
    X_train, X_test, y_train, y_test = split_data(df)

    # train model
    model = train_model(args.reg_rate, X_train, X_test, y_train, y_test)

    # evaluate model
    eval_model(model, X_test, y_test)

# function that reads the data
def get_data(path, spark):
    print("Reading data...")
    df = spark.read.csv(path, header=True, inferSchema=True)
    
    return df

# function that splits the data
def split_data(df):
    print("Splitting data...")
    # (trainingData, testData) = df.randomSplit([0.8, 0.2])

    # X_train, y_train = trainingData[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness',
    # 'SerumInsulin','BMI','DiabetesPedigree','Age']].values, trainingData['Diabetic'].values
    
    # X_test, y_test = testData[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness',
    # 'SerumInsulin','BMI','DiabetesPedigree','Age']].values, testData['Diabetic'].values
    X, y = df[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness',
    'SerumInsulin','BMI','DiabetesPedigree','Age']].values, df['Diabetic'].values

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

    return X_train, X_test, y_train, y_test

# function that trains the model
def train_model(reg_rate, X_train, X_test, y_train, y_test):
    mlflow.log_param("Regularization rate", reg_rate)
    print("Training model...")
    model = LogisticRegression(C=1/reg_rate, solver="liblinear").fit(X_train, y_train)

    return model

# function that evaluates the model
def eval_model(model, X_test, y_test):
    # calculate accuracy
    y_hat = model.predict(X_test)
    acc = np.average(y_hat == y_test)
    print('Accuracy:', acc)
    mlflow.log_metric("training_accuracy_score", acc)

    # calculate AUC
    y_scores = model.predict_proba(X_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC: ' + str(auc))
    mlflow.log_metric("AUC", auc)


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", type=str, required=True, help="Path to the input dataset")
    parser.add_argument("--reg_rate", dest='reg_rate', type=float, default=0.01)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")


## Entorno de ejecución

In [None]:
%%writefile src/conda-env.yml
name: sklearn-azure-ai-env
channels:
  - defaults
  - conda-forge
dependencies:
  - python=3.11
  - pip
  - scikit-learn
  - pandas
  - numpy
  - matplotlib
  - pip:
    - azure-ai-ml
    - azure-identity
    - mlflow
    - mltable
    - jsonschema
    - pathlib
    - pyspark


In [None]:
from azure.ai.ml.entities import Environment

# Definir el entorno
environment = Environment(
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",  # Base image
    conda_file="./src/conda-env.yml"  # Archivo YAML con dependencias
)

## Job de entrenamiento

In [None]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

# Definir el job
job = command(
    code="./src",  # Carpeta que contiene el script
    command="python train.py --input_data ${{inputs.input_data}} --reg_rate ${{inputs.reg_rate}}",
    environment=environment,
    compute="my-databricks-compute",  # Nombre del clúster Databricks
    inputs={
        "input_data": Input(type="uri_file", path="azureml:diabetes-data-local-ric:1"),
        "reg_rate": 0.01,
    },
    display_name="train-diabetes-spark",
    experiment_name="diabetes-training-spark",
)

# Enviar el job
returned_job = ml_client.jobs.create_or_update(job)
print(f"Job enviado: {returned_job.name}")
