# Lecture Plan

1. Vertex AI: Pipelines

### Vertex AI: Pipelines


- Vertex AI Pipeline is a product to run pipelines serverless.
- Vertex AI Pipelines supports Kubeflow and TFX(Tensorflow Extended) pipelines.
- Vertex AI kubeflow pipelines reduce the effort of maintaining and managing a kubernetes cluster.

### Kubeflow pipelines

- Kubeflow is a ML toolkit for deployment of ML workflows on kubernetes.
- A pipeline in ML Workflow. Components in workflow are combined to form a graph.


#### Components

- a pipeline component is a self -contained code packaged as a docker image.
- component is a single step in pipeline. one component does one specific task.
- components are of two types, function based and container based components.
- parameters are used to pass data between pipelines(component input and output).
- artifacts serve the same purpose but they are used for large or complex data like datasets and model etc.

ex: data preprossing, model training and model evaluation.


#### Pipeline

- A pipeline contains inputs required to run the pipeline and the inputs and outputs of each component.

#### Compiler

- a compiler takes the pipeline and creates a pipeline specification in JSON or YAML format.

#### Model Lineage

- the infomation about artifacts and parameters of pipeline run are stored using Vertex ML Metadata.

In [147]:
from google.colab import auth
auth.authenticate_user()

In [None]:
# ! pip install kfp

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

In [148]:
import kfp
from kfp import compiler, dsl
from kfp.dsl import Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component

components can be compiled to yaml files, shared and loaded in pipelines.

```python
@component(output_component_file="file_name")
```

```
component = kfp.components.load_component_from_file('file_name')
```



In [None]:
@dsl.component(
    output_component_file="data_preprocessing.yaml",
    packages_to_install = ["pandas", "nltk"],
    base_image="python:3.9",
    )
def data_preprocessing():
  pass



  @component(
  def data_preprocessing():
  return component_factory.create_component_from_func(


In [149]:
# Define the bucket
BUCKET_NAME = f"project-2-trng-1855"  # @param {type:"string"}
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipelines"

In [151]:
@dsl.component(
    output_component_file="dataset_component_1.yaml",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-storage"],
    base_image="python:3.9",
    )
def create_dataset(src_uris:str, dataset: Output[str]):
  from google.cloud import aiplatform, storage
  from google.cloud.aiplatform import jobs

  aiplatform.init(project='theta-cell-406519', location='us-central1')
  display_name = "trng-1855-dataset-ip"
  dataset = aiplatform.TextDataset.create(
      display_name=display_name,
      gcs_source=src_uris,
      import_schema_uri=aiplatform.schema.dataset.ioformat.text.single_label_classification,
      sync=True,
  )
  return dataset.resource_name


  @dsl.component(
  def create_dataset(src_uris:str, dataset: Output[str]):


In [152]:
@dsl.component(
    output_component_file="text_model_training_component_1.yaml",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-storage"],
    base_image="python:3.9",
    )
def train_text_model(dataset: str, model: Output[str]):
  from google.cloud import aiplatform, storage
  from google.cloud.aiplatform import jobs

  aiplatform.init(project='theta-cell-406519', location='us-central1')
  training_job_display_name = "training-job-unique"
  dataset = aiplatform.TextDataset(dataset_name=dataset)
  job = aiplatform.AutoMLTextTrainingJob(
      display_name=training_job_display_name,
      prediction_type="classification",
      multi_label=False,
      )
  model_display_name = "ip-classification"
  # Run the training job
  model = job.run(
      dataset=dataset,
      model_display_name=model_display_name,
      training_fraction_split=0.5,
      validation_fraction_split=0.1,
      test_fraction_split=0.1,
      sync=True,
      )
  return model.resource_name


  @dsl.component(
  def train_text_model(dataset: str, model: Output[str]):


In [153]:
@dsl.component(
    output_component_file="model_deployment_component_1.yaml",
    packages_to_install = ["google-cloud-aiplatform", "google-cloud-storage"],
    base_image="python:3.9",
    )
def deploy_model(model_name: str):
  from google.cloud import aiplatform, storage
  from google.cloud.aiplatform import jobs
  aiplatform.init(project='theta-cell-406519', location='us-central1')
  model = aiplatform.ModelRegistry(model_name)
  model = model.get_model()
  deployed_model_display_name = "trng-1855-ip-classification"
  endpoint = model.deploy(deployed_model_display_name=deployed_model_display_name, sync=True)

  @dsl.component(
  def deploy_model(model_name: str):


In [154]:
@dsl.pipeline(
    name="ip-model-pipeline", pipeline_root=PIPELINE_ROOT
)
def pipeline(src_uris:str = "gs://project-2-trng-1855/training-data.csv"):
  dataset = create_dataset(src_uris=src_uris).output
  model = train_text_model( dataset = dataset).output
  deploy_model(model_name = model)



In [155]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="ip-text-classification-pipeline_1.json"
)

#### Vertex AI Experiments


Vertex AI Experiments is a tool to track and analyze different models and architectures. It is used to track

- metrics and parameters of model and	Compare models
- Model lineage
- pipeline runs




In [156]:
from google.cloud import aiplatform, storage
from google.cloud.aiplatform import jobs

EXPERIMENT_NAME = "trng-1855-text-classification"
# Create experiment
aiplatform.init(experiment=EXPERIMENT_NAME)
aiplatform.start_run("run-2")




INFO:google.cloud.aiplatform.metadata.experiment_resources:Associating projects/719559140092/locations/us-central1/metadataStores/default/contexts/trng-1855-text-classification-run-2 to Experiment: trng-1855-text-classification


<google.cloud.aiplatform.metadata.experiment_run_resource.ExperimentRun at 0x7c0e64662e30>

In [157]:
# Model
model = aiplatform.ModelRegistry('4831586144959332352')

model = model.get_model()

model_evaluations = model.list_model_evaluations()

for model_evaluation in model_evaluations:
    print(model_evaluation.to_dict())


{'name': 'projects/719559140092/locations/us-central1/models/4831586144959332352@1/evaluations/8584133173514862592', 'metricsSchemaUri': 'gs://google-cloud-aiplatform/schema/modelevaluation/classification_metrics_1.0.0.yaml', 'metrics': {'auPrc': 0.99404764, 'confusionMatrix': {'rows': [[0.0, 1.0, 0.0], [0.0, 6.0, 0.0], [0.0, 0.0, 5.0]], 'annotationSpecs': [{'displayName': 'General', 'id': '3127734822923927552'}, {'displayName': 'Service', 'id': '5433577832137621504'}, {'displayName': 'Sales', 'id': '8315881593654738944'}]}, 'logLoss': 0.12222175, 'confidenceMetrics': [{'f1ScoreAt1': 0.9166667, 'precision': 0.33333334, 'recallAt1': 0.9166667, 'recall': 1.0, 'confidenceThreshold': 0.0, 'precisionAt1': 0.9166667, 'maxPredictions': 0.0, 'f1Score': 0.5}, {'f1ScoreAt1': 0.9166667, 'precision': 0.6, 'recallAt1': 0.9166667, 'recall': 1.0, 'confidenceThreshold': 0.05, 'precisionAt1': 0.9166667, 'maxPredictions': 0.0, 'f1Score': 0.75}, {'f1ScoreAt1': 0.9166667, 'precision': 0.7058824, 'recallAt

In [158]:
for model_evaluation in model_evaluations:
    metrics = model_evaluation.to_dict()['metrics']
    log_metrics = {}
    log_metrics['auPrc'] = metrics['auPrc']
    log_metrics['logLoss'] = metrics['logLoss']
    confidenceMetrics = model_evaluation.to_dict()['metrics']['confidenceMetrics']
    #print(confidenceMetrics)
    aiplatform.log_params(log_metrics)
    aiplatform.log_params(confidenceMetrics[0])

In [None]:
aiplatform.end_run()