In [2]:
!pip install --upgrade pip

[0m

In [None]:
!pip install mlflow scikit-learn requests python-dotenv pandas boto3

In [23]:
import os
import requests
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd
from dotenv import load_dotenv

In [24]:
# .env 파일 로드
load_dotenv(dotenv_path=".env")

# 환경 변수 설정
MLFLOW_SERVER_URI = os.getenv("MLFLOW_SERVER_URI")
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL")

# 디버깅: 환경 변수 출력
print("MLFLOW_SERVER_URI:", MLFLOW_SERVER_URI)
print("SLACK_WEBHOOK_URL:", SLACK_WEBHOOK_URL)

EXPERIMENT_NAME = "Iris_Classification_Experiment"
MODEL_NAME = "Iris_Classifier"
ACCURACY_THRESHOLD = 0.95  # 성능 검증 기준값


def send_slack_notification(status, message):
    """Slack 알림 전송"""
    if not SLACK_WEBHOOK_URL:
        print("Slack Webhook URL이 설정되지 않았습니다.")
        return

    payload = {"text": f"MLflow 작업 상태: {status}\n{message}"}
    try:
        response = requests.post(SLACK_WEBHOOK_URL, json=payload)
        response.raise_for_status()
        print("Slack 알림 성공")
    except requests.exceptions.RequestException as e:
        print(f"Slack 알림 실패: {str(e)}")


def train_model():
    """모델 학습 및 MLflow 로깅"""
    try:
        # MLflow 설정
        mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
        mlflow.set_experiment(EXPERIMENT_NAME)

        # 데이터 로드 및 학습
        data = load_iris()
        X_train, X_test, y_train, y_test = train_test_split(
            data.data, data.target, test_size=0.2
        )
        model = RandomForestClassifier()
        model.fit(X_train, y_train)

        # MLflow 실행
        with mlflow.start_run() as run:
            # 서명 및 입력 예제 생성
            input_example = pd.DataFrame(X_test, columns=data.feature_names)
            signature = infer_signature(X_test, model.predict(X_test))

            # 모델 저장 및 메트릭 기록
            accuracy = model.score(X_test, y_test)
            mlflow.sklearn.log_model(
                model, "model", signature=signature, input_example=input_example
            )
            mlflow.log_metric("accuracy", accuracy)

            # Run 정보 출력
            run_id = run.info.run_id
            artifact_uri = mlflow.get_artifact_uri("model")
            print(f"Run ID: {run_id}")
            print(f"Artifact URI: {artifact_uri}")

            send_slack_notification(
                status="성공",
                message=f"모델 학습 성공\nRun ID: {run_id}\nAccuracy: {accuracy:.2f}",
            )
            return run_id, artifact_uri, accuracy
    except Exception as e:
        send_slack_notification(status="실패", message=f"모델 학습 중 오류 발생: {str(e)}")
        raise


def register_model(run_id, artifact_uri, accuracy):
    """MLflow 모델 레지스트리에 등록"""
    client = MlflowClient()
    try:
        client.create_registered_model(MODEL_NAME)
    except Exception:
        print(f"Model {MODEL_NAME} already exists. Skipping creation.")

    # 모델 버전 생성
    try:
        model_version = client.create_model_version(
            name=MODEL_NAME, source=artifact_uri, run_id=run_id
        )
        print(f"Model version {model_version.version} created.")
        send_slack_notification(
            status="성공",
            message=f"모델 등록 성공\nModel: {MODEL_NAME}\nVersion: {model_version.version}",
        )

        # 성능 기준에 따라 모델 단계 전환
        target_stage = "Production" if accuracy >= ACCURACY_THRESHOLD else "Staging"
        client.transition_model_version_stage(
            name=MODEL_NAME, version=model_version.version, stage=target_stage
        )
        print(f"Model version {model_version.version} moved to {target_stage}.")
        send_slack_notification(
            status="성공",
            message=f"모델 {target_stage} 단계로 전환 완료\nModel: {MODEL_NAME}\nVersion: {model_version.version}",
        )
    except Exception as e:
        send_slack_notification(status="실패", message=f"모델 등록 중 오류 발생: {str(e)}")
        raise

MLFLOW_SERVER_URI: http://mlflow:5000
SLACK_WEBHOOK_URL: https://hooks.slack.com/services/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


In [25]:
mlflow.set_tracking_uri(MLFLOW_SERVER_URI)
print("MLflow Tracking URI:", mlflow.get_tracking_uri())

MLflow Tracking URI: http://mlflow:5000


In [26]:
import requests
import time

start_time = time.time()
try:
    response = requests.get(MLFLOW_SERVER_URI)
    response.raise_for_status()
    print("MLflow 서버 응답 시간:", time.time() - start_time, "초")
except Exception as e:
    print("MLflow 서버 접근 실패:", e)

MLflow 서버 응답 시간: 0.004454851150512695 초


In [27]:
run_id, artifact_uri, accuracy = train_model()

Run ID: 22da2daa3ce54519af1a1968f3a398f0
Artifact URI: s3://big9-project-01-model/3/22da2daa3ce54519af1a1968f3a398f0/artifacts/model


2024/12/16 05:11:31 INFO mlflow.tracking._tracking_service.client: 🏃 View run smiling-mare-465 at: http://mlflow:5000/#/experiments/3/runs/22da2daa3ce54519af1a1968f3a398f0.
2024/12/16 05:11:31 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://mlflow:5000/#/experiments/3.


Slack 알림 성공
22da2daa3ce54519af1a1968f3a398f0 s3://big9-project-01-model/3/22da2daa3ce54519af1a1968f3a398f0/artifacts/model 0.9666666666666667


In [28]:
register_model(run_id, artifact_uri, accuracy)

2024/12/16 05:12:04 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: Iris_Classifier, version 10


Model Iris_Classifier already exists. Skipping creation.
Model version 10 created.
Slack 알림 성공
Model version 10 moved to Production.


  client.transition_model_version_stage(


Slack 알림 성공
