In [0]:
from pyspark.sql.functions import col
import numpy as np
import pandas as pd
import json
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
import os
# Set Snowflake connection options
sfOptions = {
  "sfURL": "GMDJSCR-GS94912.snowflakecomputing.com",
  "sfUser": "MAHESH2139",
  "sfPassword": "Mks#9560078791",
  "sfDatabase": "mlops",
  "sfSchema": "public",
  "sfWarehouse": "COMPUTE_WH"
}

# Read data from a Snowflake table
df = spark.read.format("snowflake").options(**sfOptions).option("dbtable", "ice_cream").load()

# Show the data
display(df)

# Convert Spark DataFrame to Pandas DataFrame
pdf = df.toPandas()
display(pdf)

# Set MLflow tracking URI to None so it do not store model artifacts on S3
print(mlflow.__version__)
print("Tracking URI:", os.environ.get("MLFLOW_TRACKING_URI"))

'''
os.environ.pop("MLFLOW_TRACKING_URI", None)
os.environ.pop("MLFLOW_S3_ENDPOINT_URL", None)
print(os.environ.get("MLFLOW_TRACKING_URI"))
print(os.environ.get("MLFLOW_S3_ENDPOINT_URL"))
mlflow.set_tracking_uri("databricks")
print("Fixed URI:", mlflow.get_tracking_uri())
'''

# Set MLflow registry URI
mlflow.set_registry_uri("databricks-uc")
# Set MLflow experiment with absolute path
mlflow.set_experiment('/Users/mahesh2139@gmail.com/ice_cream_regression')

def load_data():
    X = pdf[['TEMP']]
    y = pdf['PRICE']
    return X, y

def split_data(X, y):   
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=6)
    return X_train, X_test, y_train, y_test

def model_train(X_train, y_train):
    model = LinearRegression()
    model.fit(X_train, y_train)
    return model

def predict(model, X_train, X_test):
    y_train_pred = model.predict(X_train)
    y_test_pred = model.predict(X_test)
    return y_train_pred, y_test_pred

def evaluate(y_test, y_test_pred):
    mae = mean_absolute_error(y_test, y_test_pred)
    mse = mean_squared_error(y_test, y_test_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_test_pred)
    return mae, mse, rmse, r2

def log_model(mae, mse, rmse, r2):
    mlflow.log_param("test_size", 0.5)
    mlflow.log_param("random_state", 6)
    mlflow.log_metric("MAE", mae)
    mlflow.log_metric("MSE", mse)
    mlflow.log_metric("RMSE", rmse)
    mlflow.log_metric("R2", r2)

    eval_results = {
        "MAE": mae,
        "MSE": mse,
        "RMSE": rmse,
        "R2": r2,
    }
    with open("eval.json", "w") as f:
        json.dump(eval_results, f)

    mlflow.log_artifact("eval.json", artifact_path="eval") 
    
# Register the model in MLflow 
def register_model(model, X_train):
    # Log the model with signature
    signature = infer_signature(X_train, model.predict(X_train))
    mlflow.sklearn.log_model(
        registered_model_name="mlops.public.ice_cream_price",
        sk_model=model,
        signature=signature,
        name="model"
    )

if __name__ == "__main__":
    with mlflow.start_run(run_name="ice_cream_regression_run"):
        X, y = load_data()
        X_train, X_test, y_train, y_test = split_data(X, y)
        model = model_train(X_train, y_train)
        y_train_pred, y_test_pred = predict(model, X_train, X_test)
        mae, mse, rmse, r2 = evaluate(y_test, y_test_pred)
        print(f"mae: {mae}, mse: {mse}, rmse : {rmse}, r2: {r2}")
        log_model(mae, mse, rmse, r2)
        register_model(model, X_train)
        print("Artifact location:",mlflow.get_artifact_uri())


'''
import os

# See if it's set
print("Tracking URI:", os.environ.get("MLFLOW_TRACKING_URI"))

# Remove it if set
os.environ.pop("MLFLOW_TRACKING_URI", None)

# Check again
mlflow.set_tracking_uri("databricks")
print("Fixed URI:", mlflow.get_tracking_uri())
%pip install --upgrade "mlflow[databricks]>=3.1"
dbutils.library.restartPython()
'''
