In [1]:
!pip install zenml
!pip install mlflow



In [2]:
from zenml import pipeline, step
import mlflow
import pandas as pd
import numpy as np
from sklearn.datasets import load_diabetes
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score

class Evaluation:
    def calculate_scores(self, y_true, y_pred):
        return mean_squared_error(y_true, y_pred), r2_score(y_true, y_pred)

@step
def load_data():
    data = load_diabetes()
    X = pd.DataFrame(data.data, columns=data.feature_names)
    y = data.target
    return train_test_split(X, y, test_size=0.2, random_state=42)

@step
def train_model(X_train, X_test, y_train, y_test):
    model = LinearRegression()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    return model, y_test, y_pred

@step
def evaluate_model(y_test, y_pred):
    evaluator = Evaluation()
    mse, r2 = evaluator.calculate_scores(y_test, y_pred)
    mlflow.log_metric("mse", mse)
    mlflow.log_metric("r2", r2)
    return r2

@step
def deployment_trigger(r2: float) -> bool:
    return r2 >= 0.5

@step
def deploy_model(model: LinearRegression, deploy: bool):
    if deploy:
        mlflow.sklearn.log_model(model, "model")

@pipeline
def continuous_deployment_pipeline():
    X_train, X_test, y_train, y_test = load_data()
    model, y_test, y_pred = train_model(X_train, X_test, y_train, y_test)
    r2 = evaluate_model(y_test, y_pred)
    deploy = deployment_trigger(r2)
    deploy_model(model, deploy)


  "cipher": algorithms.TripleDES,
  "class": algorithms.Blowfish,
  "class": algorithms.TripleDES,


[1;35mNote: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.[0m
[1;35mNumExpr defaulting to 8 threads.[0m
