### MLOps using Kubeflow and Feature Store



#### Before start
- Google Cloud Storage Bucket / Pipelines / Feature Store must be created on same region {suggested: europe-west3(Frankfurt)}
- Pipelines considerations when failing if jobs is "in progress", needs to improve adding code until previous import is done
- Some issues with $tfx 2.7 so needed to downgrade to 2.5
- Align on aiplatform sdk
- Have good mlops use case for using feature store serving (costs)

In [None]:
%%bash
#Create new virtual environment
conda create -n myenv python=3.7
conda activate myenv

#Launch Jupyter notebook using teh virtual environment
pip install --user ipykernel
python -m ipykernel install --user --name=myenv

In [None]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [None]:
!pip install -r requirements.txt

In [None]:
import os
import sys

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import compiler, dsl
from kfp.v2.google.client import AIPlatformClient

### Imports for environment

In [None]:
import os
import sys

from google_cloud_pipeline_components import aiplatform as gcc_aip

from kfp.v2 import compiler, dsl
from kfp.v2.google.client import AIPlatformClient
from google.cloud.aiplatform_v1 import (
    FeaturestoreOnlineServingServiceClient,
    FeaturestoreServiceClient,
)
from google.cloud.aiplatform_v1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1.types import entity_type as entity_type_pb2
from google.cloud.aiplatform_v1.types import feature as feature_pb2
from google.cloud.aiplatform_v1.types import featurestore as featurestore_pb2
from google.cloud.aiplatform_v1.types import (
    featurestore_online_service as featurestore_online_service_pb2,
)
from google.cloud.aiplatform_v1.types import (
    featurestore_service as featurestore_service_pb2,
)
from google.cloud.aiplatform_v1.types import io as io_pb2
from google.protobuf.duration_pb2 import Duration
from google.cloud import aiplatform

In [None]:
from src.trainer import trainer_component
from src.generator import generator_component
from src.ingester import ingester_component
from src import load_component
import importlib
from src import feature_store_helper

### Preparing environment

In [None]:
import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

### Bucket, BigQuery and Feature Store Environment

In [None]:
BUCKET_NAME = "gs://" + PROJECT_ID + "movielens"
REGION = "europe-west4"

API_ENDPOINT = "europe-west4-aiplatform.googleapis.com"
INPUT_CSV_FILE = ""
FEATURESTORE_ID = "movie_prediction"
FEATURE_STORE_REGION = REGION
ENTITY_TYPE_ID="movie_entity"
ENTITY_ID_FIELD="user_id"

# BigQuery parameters
BIGQUERY_DATASET_ID = f"{PROJECT_ID}.movielens_dataset"
BIGQUERY_LOCATION = "EU"
BIGQUERY_TABLE_ID = f"{BIGQUERY_DATASET_ID}.training_dataset"
BIGQUERY_RAW_TABLE_ID = f"{BIGQUERY_DATASET_ID}.raw_dataset"
BIGQUERY_INPUT_URI=f"bq://{BIGQUERY_RAW_TABLE_ID}"


# https://www.kaggle.com/prajitdatta/movielens-100k-dataset
# Dataset parameters
RAW_DATA_PATH = BUCKET_NAME+"/raw_data/u.data"

# u.data -- The full u data set, 100000 ratings by 943 users on 1682 items.
# Each user has rated at least 20 movies. Users and items are numbered consecutively from 1. The data is randomly ordered. 
# This is a tab separated list of user id | item id | rating | timestamp.
# The time stamps are unix seconds since 1/1/1970 UTC

# Pipeline parameters
PIPELINE_NAME = "movie-prediction"
ENABLE_CACHING = False
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline"
PIPELINE_SPEC_PATH = "metadata_pipeline.json"
OUTPUT_COMPONENT_SPEC = "output-component.yaml"

BIGQUERY_TMP_FILE = (
    "tmp.json" 
)
BIGQUERY_MAX_ROWS = 5 

TFRECORD_FILE = (
    f"{BUCKET_NAME}/trainer_input_path/*"  
)

LOGGER_PUBSUB_TOPIC = "logger-pubsub-topic"
LOGGER_CLOUD_FUNCTION = "logger-cloud-function"

# Trainer parameters
TRAINING_ARTIFACTS_DIR = f"{BUCKET_NAME}/artifacts"
TRAINING_REPLICA_COUNT = 1
TRAINING_MACHINE_TYPE = "n1-standard-4"
TRAINING_ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
TRAINING_ACCELERATOR_COUNT = 0

# Deployer parameters
TRAINED_POLICY_DISPLAY_NAME = "movielens-trained-policy"
ENDPOINT_DISPLAY_NAME = "movielens-endpoint"
ENDPOINT_MACHINE_TYPE = "n1-standard-4"
ENDPOINT_REPLICA_COUNT = 1  
ENDPOINT_ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"  
ENDPOINT_ACCELERATOR_COUNT = 0 

# Prediction container parameters
PREDICTION_CONTAINER = "prediction-container"
PREDICTION_CONTAINER_DIR = "src/prediction_container"