In [1]:
import os
import json
import base64
import pathlib
import polars as pl
import boto3
import pyarrow
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta
from functools import partial


import pandas as pd
import numpy as np
import pyarrow

# Models
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.metrics import mean_squared_error

# More Models
import xgboost as xgb
from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope
from typing import Dict, Any
from numpy import ndarray
from xgboost import DMatrix
import mlflow
import pickle
import polars as pl
import polars.selectors as cs

In [3]:
# import mlflow
# #from mlflow.tracking import MlflowClient
# print(f'tracking uri: {mlflow.get_tracking_uri()}')
# mlflow.set_tracking_uri("http://localhost:5001")
# mlflow.set_experiment('wine_dataset')

tracking uri: file:///Users/samlafell/Desktop/mlops_zoomcamp_sam/07-project/notebooks/mlruns


KeyboardInterrupt: 

In [12]:
import configparser

# Create a configparser object
config = configparser.ConfigParser()

# Read the file
config.read(pathlib.Path().resolve().parent / '.aws')
access_key = config.get('default', 'aws_access_key_id')
secret_key = config.get('default', 'aws_secret_access_key')

# Default Env File
config.read(pathlib.Path().resolve().parent / '.env')
MLFLOW_TRACKING_URI = config.get('mlflow', 'MLFLOW_TRACKING_URI')

'AKIAZKNJD2X7G3NZJSQI'
'OptdpTeZAR92m97pwLR3rFKi2wJdMaufPD3jug7P'
s3://sal-wine-quality/models/


In [14]:
import logging
# Set MLFlow Tracking URI
logging.info(f'tracking uri before defining: {mlflow.get_tracking_uri()}')
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
logging.info(f'tracking uri: {mlflow.get_tracking_uri()}')

In [15]:
class GetPaths():
    def __init__(self):
        self.project_root_path = pathlib.Path().absolute().parent
    
    def get_data_path(self):
        return self.project_root_path / 'data'
    
    def get_models_path(self):
        return self.project_root_path / 'models'
        
path = GetPaths()
data_path = path.get_data_path()
model_path = path.get_models_path()

In [16]:
def import_data(df_path: pathlib.Path) -> pl.DataFrame():
    """Import Data

    Args:
        train_df (pathlib.Path): Give path of data

    Returns:
        df (polars.DataFrame): Return Polars Dataframe
    """
    return pl.read_csv(df_path)

In [17]:
# from pandas_profiling import ProfileReport
# profile = ProfileReport(data.to_pandas(), title = 'REPORT')
# profile

In [18]:
# Import data
data = import_data(df_path = data_path / 'raw' / 'WineQT.csv')

In [19]:
class Preprocessor:
    def __init__(self, data):
        self.data = data

    def shuffle_data(self):
        ids_shuffled = self.data.select(pl.col('Id').shuffle(seed=42))
        self.data = self.data.join(ids_shuffled, how = 'inner', on = 'Id')
        return self

    def split_data(self):
        data_rows = self.data.shape[0]
        self.data_train = self.data.slice(0, int(data_rows * 0.7))
        self.data_val = self.data.slice(int(data_rows * 0.7), int(data_rows * 0.15))
        self.data_test = self.data.slice(int(data_rows * 0.85), data_rows)
        return self

    def split_X_y(self):
        self.X_train = self.data_train.drop('quality', 'Id')
        self.y_train = self.data_train.select('quality')

        self.X_val = self.data_val.drop('quality', 'Id')
        self.y_val = self.data_val.select('quality')

        self.X_test = self.data_test.drop('quality', 'Id')
        self.y_test = self.data_test.select('quality')

        return self

    def standardize_data(self):
        for dataset in [self.X_train, self.X_val, self.X_test]:
            for col_name in dataset.columns:
                col_mean = dataset[col_name].mean()
                col_std = dataset[col_name].std()
                dataset = dataset.with_columns(((dataset[col_name] - col_mean) / col_std).alias(f'{col_name}_std'))
            dataset = dataset.lazy().select(cs.ends_with('std')).collect()

        return self

    def convert_to_numpy(self):
        self.X_train = self.X_train.to_numpy()
        self.y_train = self.y_train.to_numpy().flatten()

        self.X_val = self.X_val.to_numpy()
        self.y_val = self.y_val.to_numpy().flatten()

        self.X_test = self.X_test.to_numpy()
        self.y_test = self.y_test.to_numpy().flatten()

        return self


In [20]:
preprocessor = Preprocessor(data)
preprocessor.shuffle_data().split_data().split_X_y().standardize_data().convert_to_numpy()

X_train = preprocessor.X_train
y_train = preprocessor.y_train

X_val = preprocessor.X_val
y_val = preprocessor.y_val

X_test = preprocessor.X_test
y_test = preprocessor.y_test

In [21]:
def objective(params: Dict[str, Any]) -> Dict[str, Any]:
    '''
    Train an XGBoost model with given parameters and datasets, log the training 
    process with MLFlow, make predictions on the validation set, calculate RMSE 
    and log it with MLFlow. Return the RMSE and the status.

    Parameters:
    params (dict): A dictionary of parameters to use for the XGBoost model.

    Returns:
    dict: A dictionary with 'loss' key indicating the root mean squared error (RMSE) 
    on validation set and 'status' key indicating the status of the function.
    '''
    with mlflow.start_run():
        mlflow.set_tag("model", "xgboost")
        mlflow.log_params(params)
        mlflow.xgboost.autolog()
        booster = xgb.train(
            params=params,
            dtrain=train,
            num_boost_round=1000,
            evals=[(valid, 'validation')],
            early_stopping_rounds=50
        )
        y_pred = booster.predict(valid)
        rmse = mean_squared_error(y_val, y_pred, squared=False)
        mlflow.log_metric("rmse", rmse)

    return {'loss': rmse, 'status': STATUS_OK}

In [22]:
# This dictionary defines the search space for hyperparameter optimization. The keys represent 
# different parameters of the XGBoost model, while the values specify distributions from which 
# values for these parameters will be sampled. Here:
# - 'max_depth' is sampled uniformly from integers between 4 and 100
# - 'learning_rate' is sampled log-uniformly in the range from 0.05 to 1
# - 'reg_alpha' is sampled log-uniformly in the range from 0.00001 to 0.1
# - 'reg_lambda' is sampled log-uniformly in the range from 0.000001 to 0.1
# - 'min_child_weight' is sampled log-uniformly in the range from 0.1 to 1000
# - 'objective' is fixed to 'reg:linear'
# - 'seed' is fixed to 42 for reproducibility
search_space = {
    'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
    'learning_rate': hp.loguniform('learning_rate', -3, 0),
    'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
    'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
    'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
    'objective': 'multi:softmax',
    'num_class': 9,
    'seed': 42
}

train = xgb.DMatrix(X_train, label = y_train)
valid = xgb.DMatrix(X_val, label = y_val)

best_result = fmin(
    fn=objective,
    space=search_space,
    algo=tpe.suggest,
    max_evals=50,
    trials=Trials()
)

  0%|          | 0/50 [00:00<?, ?trial/s, best loss=?]

ERROR:hyperopt.fmin:job exception:  Model registry functionality is unavailable; got unsupported URI 's3://sal-wine-quality/models/' for model registry data storage. Supported URI schemes are: ['', 'file', 'databricks', 'databricks-uc', 'http', 'https', 'postgresql', 'mysql', 'sqlite', 'mssql']. See https://www.mlflow.org/docs/latest/tracking.html#storage for how to run an MLflow server against one of the supported backend storage locations.


  0%|          | 0/50 [00:00<?, ?trial/s, best loss=?]


UnsupportedModelRegistryStoreURIException:  Model registry functionality is unavailable; got unsupported URI 's3://sal-wine-quality/models/' for model registry data storage. Supported URI schemes are: ['', 'file', 'databricks', 'databricks-uc', 'http', 'https', 'postgresql', 'mysql', 'sqlite', 'mssql']. See https://www.mlflow.org/docs/latest/tracking.html#storage for how to run an MLflow server against one of the supported backend storage locations.

In [28]:
# <Experiment: artifact_location='mlflow-artifacts:/561371050764143265', creation_time=1690287725130, experiment_id='561371050764143265', last_update_time=1690287725130, lifecycle_stage='active', name='wine_dataset', tags={}>

experiment_id = '561371050764143265'
client = MlflowClient()
runs = client.search_runs(experiment_id)

# Best RMSE
best_rmse = float('inf')
# Best Run
best_run = None

# Loop through all runs and find the one with smallest RMSE
for run in runs:
    metrics = run.data.metrics
    if 'rmse' in metrics:
        rmse = metrics['rmse']
        if rmse < best_rmse:
            best_rmse = rmse
            best_run = run

# Register the best model to the model registry
if best_run:
    model_uri = f"runs:/{best_run.info.run_id}/model"
    model_name = "wine_dataset_champion"
    client.create_registered_model(model_name)
    client.create_model_version(
        name=model_name,
        source=model_uri,
        run_id=best_run.info.run_id
    )
else:
    print("No valid runs found.")

In [44]:
import mlflow.pyfunc
model_name = model_name
model_version = 1
model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")
model.predict(X_test)

array([5., 5., 5., 5., 5., 5., 5., 6., 5., 7., 6., 6., 5., 6., 7., 7., 5.,
       6., 5., 6., 6., 5., 7., 6., 6., 6., 6., 6., 5., 7., 6., 6., 5., 5.,
       6., 5., 5., 6., 5., 6., 5., 6., 5., 7., 5., 6., 5., 6., 6., 6., 6.,
       6., 6., 5., 6., 6., 6., 6., 6., 6., 6., 5., 5., 6., 6., 5., 6., 6.,
       5., 6., 5., 7., 5., 5., 5., 5., 5., 5., 5., 5., 6., 6., 6., 5., 6.,
       7., 6., 5., 4., 5., 6., 5., 6., 5., 5., 6., 5., 5., 6., 7., 6., 6.,
       6., 7., 7., 6., 5., 6., 5., 7., 6., 5., 5., 6., 6., 6., 5., 7., 6.,
       5., 6., 5., 5., 6., 6., 6., 6., 6., 7., 5., 7., 5., 5., 6., 6., 5.,
       6., 5., 5., 6., 7., 5., 5., 5., 7., 6., 5., 5., 4., 7., 6., 5., 5.,
       6., 7., 6., 7., 7., 7., 5., 7., 5., 6., 5., 5., 5., 5., 5., 6., 7.,
       6., 5.], dtype=float32)

In [30]:
client.transition_model_version_stage(
    name="wine_dataset_champion", version=1, stage="Production"
)

AttributeError: 'MlflowClient' object has no attribute 'list_registered_models'

In [None]:

# Now, we need to convert the data to numpy arrays
X_train = X_train.to_numpy()
y_train = y_train.to_numpy()

# We are going to do multi class classification in SKLearn
from sklearn.linear_model import LogisticRegression
LogisticRegression = LogisticRegression(random_state=0, max_iter=1000).fit(X_train, y_train)
LogisticRegression.score(X_train, y_train)