In [1]:
from mlflow.tracking import MlflowClient


MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"

The MlflowClient object allows us to interact with...

 * an MLflow Tracking Server that creates and manages experiments and runs.
* an MLflow Registry Server that creates and manages registered models and model versions.

In [4]:
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

client.search_experiments()

[<Experiment: artifact_location='/workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1', creation_time=1736001296237, experiment_id='1', last_update_time=1736001296237, lifecycle_stage='active', name='nyc-taxi-experiment', tags={}>,
 <Experiment: artifact_location='mlflow-artifacts:/0', creation_time=1736000145875, experiment_id='0', last_update_time=1736000145875, lifecycle_stage='active', name='Default', tags={}>]

In [5]:
#criar um novo experimento
client.create_experiment(name="test-experiment")

'2'

Vamos verificar as versões mais recentes para o experimento com ID 1

In [6]:
from mlflow.entities import ViewType

runs = client.search_runs(
    experiment_ids='1',                         # ID do experimento onde os runs serão pesquisados
    filter_string="metrics.rmse < 7",           # Filtro para selecionar runs com métrica 'rmse' menor que 7
    run_view_type=ViewType.ACTIVE_ONLY,         # Considera apenas runs com status "ativo" (não excluídos ou arquivados)
    max_results=5,                              # Limita a busca a no máximo 5 runs
    order_by=["metrics.rmse ASC"]               # Ordena os runs pelo valor da métrica 'rmse' em ordem crescente
)


In [7]:
for run in runs:
    print(f"run id: {run.info.run_id}, rmse: {run.data.metrics['rmse']:.4f}")

run id: db537e71f5ce45d2ad8a6a57210eac74, rmse: 6.3184
run id: 247b6455fee74cccaeb4e3d61c7b35fa, rmse: 6.3184
run id: d6db051b93f94639b05b297543fc7508, rmse: 6.3449
run id: 388e7da7bd42493bbe5655b0450cda2c, rmse: 6.3652
run id: 1581de318fba4f1fa1e0d179ae60caf5, rmse: 6.4460


## Model Registry

In [8]:
import mlflow

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

In [11]:
run_id = "1581de318fba4f1fa1e0d179ae60caf5"      # ID único do run no MLflow que contém o modelo a ser registrado
model_uri = f"runs:/{run_id}/model"             # URI do modelo salvo no run específico
mlflow.register_model(model_uri=model_uri, name="taxi-regressor")  # Registra o modelo no MLflow Model Registry


Registered model 'taxi-regressor' already exists. Creating a new version of this model...
Created version '3' of model 'taxi-regressor'.


<ModelVersion: aliases=[], creation_timestamp=1736693936489, current_stage='None', description=None, last_updated_timestamp=1736693936489, name='taxi-regressor', run_id='1581de318fba4f1fa1e0d179ae60caf5', run_link=None, source='/workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1/1581de318fba4f1fa1e0d179ae60caf5/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

In [14]:
model_name = "taxi-regressor"                              # Nome do modelo registrado no MLflow Model Registry
registered_model  = client.get_registered_model(name=model_name)  #

for version in registered_model.latest_versions:
    print(f"Version: {version.version}")
    print(f"Stage: {version.current_stage}")
    print(f"Status: {version.status}")
    print(f"Run ID: {version.run_id}")
    print(f"URI: {version.source}")
    print("-" * 30)

Version: 3
Stage: None
Status: READY
Run ID: 1581de318fba4f1fa1e0d179ae60caf5
URI: /workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1/1581de318fba4f1fa1e0d179ae60caf5/artifacts/model
------------------------------


In [15]:
# Nome do modelo registrado
model_name = "taxi-regressor"

# Obtendo todas as versões do modelo registrado
model_versions = client.search_model_versions(f"name='{model_name}'")

# Contando o número total de versões
num_versions = len(model_versions)
print(f"O modelo '{model_name}' possui {num_versions} versão(ões) registrada(s).")

# Exibindo as informações de cada versão (opcional)
for version in model_versions:
    print(f"Version: {version.version}, Stage: {version.current_stage}, Status: {version.status}")

O modelo 'taxi-regressor' possui 3 versão(ões) registrada(s).
Version: 3, Stage: None, Status: READY
Version: 1, Stage: None, Status: READY
Version: 2, Stage: None, Status: READY


In [16]:
model_version = 3
new_stage = "Staging"
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=new_stage,
    archive_existing_versions=False
)

  client.transition_model_version_stage(


<ModelVersion: aliases=[], creation_timestamp=1736693936489, current_stage='Staging', description=None, last_updated_timestamp=1736694436154, name='taxi-regressor', run_id='1581de318fba4f1fa1e0d179ae60caf5', run_link=None, source='/workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1/1581de318fba4f1fa1e0d179ae60caf5/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>

In [30]:
model_version = 1
new_stage = "Production"
client.transition_model_version_stage(
    name=model_name,
    version=model_version,
    stage=new_stage,
    archive_existing_versions=False
)

  client.transition_model_version_stage(


<ModelVersion: aliases=['production'], creation_timestamp=1736692074061, current_stage='Production', description='model registration test', last_updated_timestamp=1736696514249, name='taxi-regressor', run_id='db537e71f5ce45d2ad8a6a57210eac74', run_link='', source='/workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1/db537e71f5ce45d2ad8a6a57210eac74/artifacts/models_mlflow', status='READY', status_message=None, tags={'model': 'Xgboost'}, user_id=None, version=1>

In [17]:
# Nome do modelo registrado
model_name = "taxi-regressor"

# Obtendo todas as versões do modelo registrado
model_versions = client.search_model_versions(f"name='{model_name}'")

# Contando o número total de versões
num_versions = len(model_versions)
print(f"O modelo '{model_name}' possui {num_versions} versão(ões) registrada(s).")

# Exibindo as informações de cada versão (opcional)
for version in model_versions:
    print(f"Version: {version.version}, Stage: {version.current_stage}, Status: {version.status}")

O modelo 'taxi-regressor' possui 3 versão(ões) registrada(s).
Version: 3, Stage: Staging, Status: READY
Version: 1, Stage: None, Status: READY
Version: 2, Stage: None, Status: READY


In [18]:
from datetime import datetime 

date = datetime.today().date()  # Obtém a data atual (somente a parte da data, sem o horário)

client.update_model_version(  # Atualiza a descrição de uma versão específica do modelo no Model Registry
    name=model_name,  # Nome do modelo registrado no MLflow
    version=model_version,  # Número da versão do modelo que será atualizada
    description=f"The model version {model_version} was transitioned to {new_stage} on {date}"
    # Define a nova descrição para a versão do modelo, informando o estágio (new_stage) e a data da transição
)


<ModelVersion: aliases=[], creation_timestamp=1736693936489, current_stage='Staging', description='The model version 3 was transitioned to Staging on 2025-01-12', last_updated_timestamp=1736694797845, name='taxi-regressor', run_id='1581de318fba4f1fa1e0d179ae60caf5', run_link=None, source='/workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1/1581de318fba4f1fa1e0d179ae60caf5/artifacts/model', status='READY', status_message=None, tags={}, user_id=None, version=3>


#### Comparando versões e selecionando o novo modelo "Production"
Na última seção, vamos recuperar os modelos registrados no Model Registry e comparar seu desempenho em um conjunto de teste não visto. A ideia é simular o cenário em que um engenheiro de implantação precisa interagir com o registro de modelos para decidir se deve atualizar a versão do modelo que está em produção ou não.

Passos a serem seguidos:

* 1 - Carregar o conjunto de dados de teste, que corresponde aos dados do NYC Green Taxi do mês de março de 2021.
* 2 - Baixar o DictVectorizer que foi ajustado usando os dados de treinamento e salvo no MLflow como um artefato, e carregá-lo com o módulo pickle.
* 3 - Pré-processar o conjunto de teste usando o DictVectorizer para que possamos alimentá-lo corretamente nos regressores.
* 4 - Fazer previsões no conjunto de teste usando as versões do modelo que estão atualmente nos estágios "Staging" e "Production", e comparar seus desempenhos.
* 5 - Com base nos resultados, atualizar a versão do modelo "Production" de acordo.

Nota: O Model Registry não implanta o modelo na produção ao transicionar um modelo para o estágio "Production"; ele apenas atribui um rótulo àquela versão do modelo. É necessário complementar o registro com algum código de CI/CD para realizar a implantação de fato.

In [37]:
from sklearn.metrics import mean_squared_error, root_mean_squared_error

import pandas as pd

# Função para ler e processar o arquivo CSV
def read_dataframe(filename):
    # Lê o arquivo CSV em um DataFrame
    df = pd.read_parquet(filename)

    # Converte as colunas de data/hora para o tipo datetime
    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    # Calcula a duração da viagem em minutos, a partir da diferença entre o tempo de dropoff e pickup
    df['duration'] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)  # Converte para minutos

    # Filtra apenas viagens com duração entre 1 e 60 minutos
    df = df[(df.duration >= 1) & (df.duration <= 60)]

    # Converte os IDs das localizações para o tipo string (para tratar como variáveis categóricas)
    categorical = ['PULocationID', 'DOLocationID']
    df[categorical] = df[categorical].astype(str)
    
    return df

# Função para pré-processamento dos dados, transformando variáveis categóricas e numéricas
def preprocess(df, dv):
    # Cria uma nova variável categórica combinando os IDs de pickup e dropoff
    df['PU_DO'] = df['PULocationID'] + '_' + df['DOLocationID']
    
    # Define quais colunas serão categóricas e quais serão numéricas
    categorical = ['PU_DO']
    numerical = ['trip_distance']
    
    # Converte os dados em dicionários para aplicar transformações de vetorização (provavelmente 'dv' é um vetor de características)
    train_dicts = df[categorical + numerical].to_dict(orient='records')
    
    # Aplica a transformação de vetorização aos dados categóricos e numéricos
    return dv.transform(train_dicts)

# Função para testar o modelo e calcular o RMSE
def test_model(name, stage, X_test, y_test):
    # Carrega o modelo treinado do MLflow usando seu nome e estágio
    model = mlflow.pyfunc.load_model(f"models:/{name}/{stage}")
    
    # Faz previsões no conjunto de dados de teste
    y_pred = model.predict(X_test)
    
    # Calcula o erro quadrático médio (RMSE) entre as previsões e os valores reais
    return {"rmse": root_mean_squared_error(y_test, y_pred)}  # squared=False retorna o RMSE


In [24]:
df = read_dataframe("/workspaces/mlops-techniques/01-Environment/data/green_tripdata_2021-03.parquet")

In [26]:
import pickle

with open("/workspaces/mlops-techniques/01-Environment/models/preprocessor.b", "rb") as f_in:
    dv = pickle.load(f_in)

In [27]:
X_test = preprocess(df, dv)


In [28]:
target = "duration"
y_test = df[target].values

In [35]:
#pip install xgboost

In [38]:
%time test_model(name=model_name, stage="Production", X_test=X_test, y_test=y_test)

  latest = client.get_latest_versions(name, None if stage is None else [stage])
 - numpy (current: 1.26.4, required: numpy==2.2.1)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.


CPU times: user 16.2 s, sys: 32.2 ms, total: 16.3 s
Wall time: 8.85 s


{'rmse': 6.2702965482607915}

In [40]:
#não salvei os artefatos dos modelos, não permite testar o model staging
#%time test_model(name=model_name, stage="Staging", X_test=X_test, y_test=y_test)

In [42]:
'''
está movendo a versão 1 de um modelo para o estágio de "Production" no MLflow, 
enquanto arquiva as versões anteriores, garantindo que a versão mais recente seja utilizada em produção.
'''
client.transition_model_version_stage(
    name=model_name,
    version=1,
    stage="Production",
    archive_existing_versions=True
)

  client.transition_model_version_stage(


<ModelVersion: aliases=['production'], creation_timestamp=1736692074061, current_stage='Production', description='model registration test', last_updated_timestamp=1736697111145, name='taxi-regressor', run_id='db537e71f5ce45d2ad8a6a57210eac74', run_link='', source='/workspaces/mlops-techniques/02-Experiment-tracking/mlruns/1/db537e71f5ce45d2ad8a6a57210eac74/artifacts/models_mlflow', status='READY', status_message=None, tags={'model': 'Xgboost'}, user_id=None, version=1>