<a href="https://colab.research.google.com/github/silverstar0727/pose-estimation/blob/main/continuous_training_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 라이브러리 설치 및 임포트

In [None]:
# 해당 셀을 실행한 후에 반드시 "런타임 다시시작"을 해주세요
!pip install -q kfp
!pip3 install --user google-cloud-aiplatform matplotlib --upgrade -q
# 추가
!pip3 install --user google-cloud-aiplatform --upgrade -q
!pip3 install --user google-cloud-pipeline-components --upgrade -q

In [177]:
from datetime import datetime
import time

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, component, ClassificationMetrics, Metrics)
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.google import experimental
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

# gcp 계정 연결

In [178]:
from google.colab import auth as google_auth

google_auth.authenticate_user() # 사용할 gcp 계정으로 연결해주세요

# 경로 변수 설정

In [179]:
PIPELINE_SPEC_NAME = "mediapipe.json"

PROJECT_ID = 'natural-expanse-319203'
REGION = "us-central1"

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_NAME = "gs://mediapipe-pipeline"

USER = "JeongMin-Do"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/{USER}"

WORKING_DIR = PIPELINE_ROOT
MODEL_DISPLAY_NAME = f"train_deploy"

MODEL_NAME = "model.joblib"
print(WORKING_DIR)
print(MODEL_DISPLAY_NAME)

gs://mediapipe-pipeline/pipeline_root/JeongMin-Do
train_deploy


# mediapipe pose estimation을 이용한 데이터 전처리 컴포넌트

In [180]:
@component(base_image="silverstar456/mediapipe:landmarks")
def get_landmarks(result_csv: OutputPath("result_csv")):
    import csv
    import cv2
    import numpy as np
    import os
    import sys
    import tqdm
    import pandas as pd

    from mediapipe.python.solutions import drawing_utils as mp_drawing
    from mediapipe.python.solutions import pose as mp_pose

    import wget 
    import zipfile

    # GCS에서 이미지 파일 다운로드 
    wget.download("https://storage.googleapis.com/mediapipe-pipeline/study.zip")

    # 압축해제
    fantasy_zip = zipfile.ZipFile('study.zip')
    fantasy_zip.extractall('.')
    fantasy_zip.close()

    def landmarks(input_frame):
        input_frame = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)

        # Initialize fresh pose tracker and run it.
        with mp_pose.Pose(upper_body_only=True) as pose_tracker:
            result = pose_tracker.process(image=input_frame)
            pose_landmarks = result.pose_landmarks
        
        # Save landmarks.
        if pose_landmarks is not None:
            # Check the number of landmarks and take pose landmarks.
            assert len(pose_landmarks.landmark) == 25, 'Unexpected number of predicted pose landmarks: {}'.format(len(pose_landmarks.landmark))
            pose_landmarks = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]

            # Map pose landmarks from [0, 1] range to absolute coordinates to get
            # correct aspect ratio.
            frame_height, frame_width = input_frame.shape[:2]
            pose_landmarks *= np.array([frame_width, frame_height, frame_width])

            # Write pose sample to CSV.
            pose_landmarks = np.around(pose_landmarks, 5).flatten().astype(np.float64).tolist()

            return pose_landmarks

    images_in_folder = 'study'
    csv_out_path = result_csv

    with open(csv_out_path, 'w') as csv_out_file:
        csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)

        # Folder names are used as pose class names.
        pose_class_names = sorted([n for n in os.listdir(images_in_folder) if not n.startswith('.')])

        for pose_class_name in pose_class_names:
            print('Bootstrapping ', pose_class_name, file=sys.stderr)

            image_names = sorted([
                n for n in os.listdir(os.path.join(images_in_folder, pose_class_name))
                if not n.startswith('.')])
            for image_name in tqdm.tqdm(image_names, position=0):
                # Load image.
                input_frame = cv2.imread(os.path.join(images_in_folder, pose_class_name, image_name))
                pose_landmarks = landmarks(input_frame)

                try:
                    csv_out_writer.writerow([image_name, pose_class_name] + pose_landmarks)
                except:
                    pass

# knn 훈련 컴포넌트

In [181]:
# usable docker image
## silverstar456/mediapipe:knn
## us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest
@component(base_image="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest")
def knn_model(result_csv: InputPath("result_csv"), 
        model_output: OutputPath("model"),
        model_name: str,
        model_path: str
        ):
    import pandas as pd
    import os
    from google.cloud import storage

    import pickle
    from joblib import dump, load
    from sklearn.neighbors import KNeighborsClassifier

    data = pd.read_csv(result_csv, header=None)
    x_train, y_train = data.iloc[:, 2:], data.iloc[:, 1]

    classifier = KNeighborsClassifier(n_neighbors = 3)

    classifier.fit(x_train, y_train)

    dump(classifier, model_output) 

    # upload model to gcs bucket
    dump(classifier, model_name)

    blob = storage.blob.Blob.from_string(model_path, client=storage.Client())
    blob.upload_from_filename(model_name)

# 모델 평가 컴포넌트

In [182]:
@component(base_image="silverstar456/mediapipe:test")
def model_test(result_csv: InputPath("result_csv"), 
               model_output: InputPath("model"), 
               metrics: Output[Metrics]):
    import cv2
    import numpy as np
    import os
    import sys
    import tqdm
    import pandas as pd
    from joblib import dump, load

    from sklearn.neighbors import KNeighborsClassifier

    from mediapipe.python.solutions import drawing_utils as mp_drawing
    from mediapipe.python.solutions import pose as mp_pose

    def landmarks(input_frame):
        input_frame = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)

        # Initialize fresh pose tracker and run it.
        with mp_pose.Pose(upper_body_only=True) as pose_tracker:
            result = pose_tracker.process(image=input_frame)
            pose_landmarks = result.pose_landmarks
        
        # Save landmarks.
        if pose_landmarks is not None:
            # Check the number of landmarks and take pose landmarks.
            assert len(pose_landmarks.landmark) == 25, 'Unexpected number of predicted pose landmarks: {}'.format(len(pose_landmarks.landmark))
            pose_landmarks = [[lmk.x, lmk.y, lmk.z] for lmk in pose_landmarks.landmark]

            # Map pose landmarks from [0, 1] range to absolute coordinates to get
            # correct aspect ratio.
            frame_height, frame_width = input_frame.shape[:2]
            pose_landmarks *= np.array([frame_width, frame_height, frame_width])

            # Write pose sample to CSV.
            pose_landmarks = np.around(pose_landmarks, 5).flatten().astype(np.float64).tolist()

            return pose_landmarks

    classifier = load(model_output) 

    data = pd.read_csv(result_csv, header=None)
    x, y = data.iloc[:, 2:], data.iloc[:, 1]
    correct = 0
    for i in range(len(data)):
        sample = np.array(x.iloc[i, :]).reshape(1, -1)
        pred = classifier.predict(sample)
        if pred[0] == y[i]:
            correct += 1

    result_acc = correct / len(data)
    print(result_acc)

    metrics.log_metric("accuracy", (result_acc * 100.0))

# 모델 등록 및 배포 컴포넌트

In [196]:
@component(base_image="silverstar456/mediapipe:aiplatform")
def model_upload(accuracy: Input[Metrics], 
                 project: str,
                 region: str,
                 display_name: str, 
                 artifact_uri: str,):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    model = aiplatform.Model.upload(
        display_name=display_name,
        artifact_uri=artifact_uri,
        serving_container_image_uri="gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest")
    
    endpoint = model.deploy(machine_type="n1-standard-4",
                        min_replica_count=1,
                        max_replica_count=1,)

In [191]:
@component(base_image="silverstar456/mediapipe:aiplatform")
def model_deploy(project: str,
                 region: str,):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)
    
    model = aiplatform.Model("gs://mediapipe-pipeline")

    endpoint = model.deploy(machine_type="n1-standard-4",
                        min_replica_count=1,
                        max_replica_count=1,)

# 파이프라인 구성 후 실행

In [198]:
@dsl.pipeline(
    name = "mediapipe-pipeline8",
    description = "mediapipe",
    pipeline_root=PIPELINE_ROOT
)
def mediapipe():
    landmarks = get_landmarks()
    model = knn_model(landmarks.output, 
                      model_name=MODEL_NAME, 
                      model_path=f"{BUCKET_NAME}/{MODEL_NAME}")
    test = model_test(landmarks.output, model.output)

    upload = model_upload(test.output,
                          project=PROJECT_ID,
                          region=REGION,
                          display_name="mediapipe",
                          artifact_uri=BUCKET_NAME)
    #deploy = model_deploy(project=PROJECT_ID,
    #                      region=REGION,)
    

compiler.Compiler().compile(
    pipeline_func = mediapipe, 
    package_path = PIPELINE_SPEC_NAME
)

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

response = api_client.create_run_from_job_spec(
    job_spec_path=PIPELINE_SPEC_NAME
)



