In [3]:
import kfp

def get_kfp_client():
    client = kfp.Client(host="http://ml-pipeline:8888")
    return client

client = get_kfp_client()
client

<kfp.client.client.Client at 0x7f946b4be4d0>

In [4]:
# 제대로 돌아가는지 확인
client.get_kfp_healthz()

{'multi_user': False}

In [5]:
# Import
from kfp import dsl
from kfp.dsl import Input, Dataset
from kfp.dsl import Output, OutputPath
from typing import Dict
from typing import Tuple

In [6]:
@dsl.component(base_image="seohyun914/seohyun-model:0314")
# 2 전처리(LabelEncoding)
def preprocessing(
    # 1-1 dataset input, output
    dataset: Input[Dataset],
    test_dataset: Input[Dataset],
    
    train_data_output: Output[Dataset],
    n_dict_path : OutputPath(Dict[str, int]),
    
    # test_df 전처리를 위한 output
    valid_data_output: Output[Dataset],

):
    import pandas as pd
    import json

    df = pd.read_csv(dataset.path)
    train_df = df.copy()
    
    users = train_df["user_id"].unique()
    movies = train_df["movie_id"].unique()
    user2idx = {int(user) : i for i, user in enumerate(users)}
    movie2idx = {int(movie) : i for i, movie in enumerate(movies)}
    
    train_df["user_id"] = train_df["user_id"].apply(lambda x: user2idx[x])
    train_df["movie_id"] = train_df["movie_id"].apply(lambda x: movie2idx[x])
    
    n_dict =dict(
        n_users=len(user2idx),
        n_movies = len(movie2idx)
    )
    
    print("n_dict : ", n_dict)
    print("train_df : ", train_df.head())
    
    train_df.to_csv(train_data_output.path, index=False)
    
    # 4. test_df 를 validation에 사용하기 위한 valid_df에서 사용가능한 df로 변경하여 생성
    test_df = pd.read_csv(test_dataset.path)
    valid_idx = list()

    for idx, (user_id, movie_id, _, _)in enumerate(test_df.values):
        if user_id in user2idx and movie_id in movie2idx: # 사용가능한 것만 저장
            valid_idx.append(idx)
        else:
            pass
        
    # 전처리
    valid_df = test_df[test_df.index.isin(valid_idx)]
    valid_df["user_id"] = valid_df["user_id"].apply(lambda x: user2idx[x])
    valid_df["movie_id"] = valid_df["movie_id"].apply(lambda x: movie2idx[x])
    valid_df.to_csv(valid_data_output.path, index=False)

    
    with open(n_dict_path, "w") as f:
        f.write(json.dumps(n_dict))
        
# 3-1 training(MF)
@dsl.component(base_image="seohyun914/seohyun-model:0314")
def training(
    train_dataset: Input[Dataset],
    valid_dataset: Input[Dataset],
    
    n_dict: Dict[str, int],
    epochs : int,
    batch_size : int,
    n_latent_factors : int, 
)  -> float: # return 값 형식 지정
    
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from model import MF, DeepMF
    
    train_df = pd.read_csv(train_dataset.path)
    
    print("train_df : ", train_df.head())
    print(n_dict)

    n_users = n_dict["n_users"]
    n_movies = n_dict["n_movies"]
    
    mf_model = MF(n_users, n_movies, n_latent_factors)
    

    train_df, valid_df = train_test_split(train_df)

    X_train, y_train = [train_df.user_id, train_df.movie_id], train_df.rating
    X_valid, y_valid = [valid_df.user_id, valid_df.movie_id], valid_df.rating
    

    # MF 모델을 학습
    mf_history = mf_model.train(train_df=train_df, valid_df=valid_df, epochs=epochs, batch_size=batch_size, verbose=2)
    mf_val_loss = mf_model.evalute(valid_df, verbose=2)
    mf_val_loss = round(mf_val_loss, 4)
    print(f"MF VAL_LOSS = {mf_val_loss}")
    
    valid_df2= pd.read_csv(valid_dataset.path)
    
    # test_df(1-2로 부터 불러온 데이터셋) 기준 loss를
    mf_history2 = mf_model.train(train_df=train_df, valid_df=valid_df2, epochs=epochs, batch_size=batch_size, verbose=2)
    mf_val_loss2 = mf_model.evalute(valid_df2, verbose=2)
    mf_val_loss2 = round(mf_val_loss2, 4)
    print(f"MF VAL_LOSS_valid = {mf_val_loss2}")
    

    return mf_val_loss # 모델의 최종 valid_df 기준 evaluate 함수 결과를 output parameter로 설정

# 3-2 Training(DeepMF)
@dsl.component(base_image="seohyun914/seohyun-model:0314")
def training_deepfm(
    train_dataset: Input[Dataset],
    # test를 위한 df
    valid_dataset: Input[Dataset],
    n_dict: Dict[str, int],
    epochs : int,
    batch_size : int,
    n_latent_factors : int, 
    dropouts : float,
    
) -> float:  # return 값 형식 지정
    
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from model import MF, DeepMF
    
    train_df = pd.read_csv(train_dataset.path)
    
    print("train_df : ", train_df.head())
    print(n_dict)
    
    n_users = n_dict["n_users"]
    n_movies = n_dict["n_movies"]
    
    deepmf_model = DeepMF(n_users, n_movies, n_latent_factors)

    train_df, valid_df = train_test_split(train_df)

    X_train, y_train = [train_df.user_id, train_df.movie_id], train_df.rating
    X_valid, y_valid = [valid_df.user_id, valid_df.movie_id], valid_df.rating
    

    # Train DeepMF Model
    deepmf_history = deepmf_model.train(train_df=train_df, valid_df=valid_df, epochs=epochs, batch_size=batch_size, verbose=2)
    deepmf_val_loss = deepmf_model.evalute(valid_df, verbose=2)
    deepmf_val_loss = round(deepmf_val_loss, 4)
    print(f"DeepMF VAL_LOSS = {deepmf_val_loss}")
    
    valid_df2= pd.read_csv(valid_dataset.path)
    
    #  test_df(1-2로 부터 불러온 데이터셋) 기준 loss
    deepmf_history2 = deepmf_model.train(train_df=train_df, valid_df=valid_df2, epochs=epochs, batch_size=batch_size, verbose=2)
    deepmf_val_loss2 = deepmf_model.evalute(valid_df2, verbose=2)
    deepmf_val_loss2 = round(deepmf_val_loss2, 4)
    print(f"MF VAL_LOSS_valid = {deepmf_val_loss2}")
    

    return deepmf_val_loss # 모델의 최종 valid_df 기준 evaluate 함수 결과를 output parameter로 설정

# 4. 어느 모델이 가장 좋은지 Ouput Parameter로 설정하여 출력
@dsl.component(base_image="seohyun914/seohyun-model:0314")
def validating(
    mf_loss1: float,
    deepmf_loss2: float,
) -> str:
    # "DeepMF(0.8923) > MF(0.9012)" 
    if mf_loss1 < deepmf_loss2:
        return f"MF({mf_loss1}) > DeepMF({deepmf_loss2})"
    else:
        return f"DeepMF({deepmf_loss2}) > MF({mf_loss1})"

    
    
@dsl.pipeline
def seohyun_pipeline():
    
    MINIO_URL = "minio://mlpipeline"
    # 1-1  train_data.csv 파일 불러오기
    train_file_path = "train_data.csv"
    importer_task = dsl.importer(
        artifact_uri = f"{MINIO_URL}/{train_file_path}",
        artifact_class = dsl.Dataset,
    )
    
    # 1-2 test_data.csv 파일을 불러오기
    test_file_path = "test_data.csv"
    importer_task2 = dsl.importer(
        artifact_uri = f"{MINIO_URL}/{test_file_path}",
        artifact_class = dsl.Dataset,
    )
    
    preprocessing_task = preprocessing(dataset=importer_task.output, test_dataset=importer_task2.output)
    
    default_epochs = 8
    default_batch_size = 256
    default_n_latent_factors = 64
    default_dropout_rate = 0.5
    
    # mf model
    training_mf_task = training(
        train_dataset=preprocessing_task.outputs["train_data_output"],
        valid_dataset=preprocessing_task.outputs["valid_data_output"],
        n_dict=preprocessing_task.outputs["n_dict_path"],
        epochs=default_epochs,
        batch_size = default_batch_size,
        n_latent_factors= default_n_latent_factors,
    )
    
    # deepmf model
    training_deepmf_task = training_deepfm(
        train_dataset=preprocessing_task.outputs["train_data_output"],
        valid_dataset=preprocessing_task.outputs["valid_data_output"],
        n_dict=preprocessing_task.outputs["n_dict_path"],
        epochs=default_epochs,
        batch_size = default_batch_size,
        n_latent_factors= default_n_latent_factors, 
        dropouts = default_dropout_rate, # dropout hyperparameter
    )
    
    # better model
    validating(
        mf_loss1 = training_mf_task.output,
        deepmf_loss2 = training_deepmf_task.output,
    )

In [7]:
result = client.create_run_from_pipeline_func(seohyun_pipeline, run_name="mf-model-training-240314-1328")

In [8]:
fin_result = result.wait_for_run_completion()

In [9]:
fin_result.to_dict()["run_details"]

{'pipeline_context_id': None,
 'pipeline_run_context_id': None,
 'task_details': [{'run_id': '5f701f52-9620-46b0-887c-654883b9216f',
   'task_id': '010d0787-d55d-407a-9378-d973c11d93ca',
   'display_name': 'training',
   'create_time': datetime.datetime(2024, 3, 14, 8, 23, 41, tzinfo=tzlocal()),
   'start_time': datetime.datetime(2024, 3, 14, 8, 24, 31, tzinfo=tzlocal()),
   'end_time': datetime.datetime(2024, 3, 14, 8, 24, 59, tzinfo=tzlocal()),
   'executor_detail': None,
   'state': 'SUCCEEDED',
   'execution_id': None,
   'error': None,
   'inputs': None,
   'outputs': None,
   'parent_task_id': None,
   'state_history': [{'update_time': datetime.datetime(2024, 3, 14, 8, 24, 32, tzinfo=tzlocal()),
     'state': 'RUNNING',
     'error': None},
    {'update_time': datetime.datetime(2024, 3, 14, 8, 25, tzinfo=tzlocal()),
     'state': 'SUCCEEDED',
     'error': None}],
   'pod_name': None,
   'child_tasks': [{'task_id': None,
     'pod_name': 'seohyun-pipeline-bhszm-2698213957'}]},
  