# Vertex AI and ML Pipelines 

---
## Setup

inputs:

In [1]:
project = !gcloud config get-value project
PROJECT_ID = project[0]
PROJECT_ID

In [2]:
REGION = 'us-east1'
DATANAME = 'api-news-data'
NOTEBOOK = 'supply_chain_pipeline_notebook'

# Resources
DEPLOY_COMPUTE = 'n1-standard-2'

packages:

In [3]:
#!pip install -U google-cloud-pipeline-components -U -q

In [3]:
from google.cloud import aiplatform
from datetime import datetime
import kfp
from kfp.v2 import compiler
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

from google.cloud import bigquery
from google.protobuf import json_format
import json
import numpy as np

clients:

In [5]:
aiplatform.init(project=PROJECT_ID, location=REGION)

parameters:

In [6]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = "restaurant-supplier-bucket"
URI = f"gs://{BUCKET}/{DATANAME}/models/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

In [2]:
SERVICE_ACCOUNT = !gcloud config list --format='value(core.account)' 
SERVICE_ACCOUNT = SERVICE_ACCOUNT[0]
SERVICE_ACCOUNT

List the service accounts current roles:

In [8]:
!gcloud projects get-iam-policy $PROJECT_ID --filter="bindings.members:$SERVICE_ACCOUNT" --format='table(bindings.role)' --flatten="bindings[].members"

ROLE
roles/editor
roles/run.admin
roles/storage.objectAdmin


environment:

In [9]:
!rm -rf {DIR}
!mkdir -p {DIR}

---
## Pipeline (KFP) Creation

In [10]:
# !pip install kfp google-cloud-pipeline-components google-cloud-storage openai pydantic
# !pip install langchain langchain-openai

import kfp
from kfp.v2 import compiler
from kfp.v2.dsl import component, pipeline, InputPath, OutputPath
from google.cloud import storage
from typing import NamedTuple, List
import requests
import json
import csv
from datetime import datetime
from pydantic import BaseModel, Field
from google.cloud import storage


In [23]:
# Import component and OutputPath
from kfp.v2.dsl import component, OutputPath

@component(
    base_image="python:3.8",
    packages_to_install=[
        "requests",
        "google-cloud-storage",
        "kfp==2.7.0",
        "click<9,>=8.0.0",
        "docstring-parser<1,>=0.7.3",
        "kfp-pipeline-spec==0.3.0",
        "kfp-server-api<2.1.0,>=2.0.0",
        "kubernetes<27,>=8.0.0",
        "PyYAML<7,>=5.3",
        "requests-toolbelt<1,>=0.8.0",
        "tabulate<1,>=0.8.6",
        "urllib3<2.0.0", 
    ]
)
def fetch_and_store_articles(
    news_api_key: str,
    project_id: str,
    bucket_name: str,
    restaurants_csv_path: str,
    output_file_path: OutputPath("json")
):
    # Import libraries 
    import csv
    import json
    import requests
    from google.cloud import storage
    from io import StringIO
    storage_client = storage.Client(project=project_id)
    bucket = storage_client.bucket(bucket_name)

    def read_restaurant_names_gcs(bucket_name, restaurants_csv_path):
        blob = bucket.blob(restaurants_csv_path)
        data = blob.download_as_text()
        f = StringIO(data)
        reader = csv.reader(f)
        # next(reader, None)  # Skip the header
        return [row[0] for row in reader]


    restaurant_names = read_restaurant_names_gcs(bucket_name, restaurants_csv_path)
    all_articles_data = []

    for name in restaurant_names:
        url = "http://eventregistry.org/api/v1/article/getArticles"
        payload = {
            "keyword": [name, "suppliers", "restaurant"],
            "keywordOper": "and",
            "articlesPage": 1,
            "articlesCount": 100,
            "articlesSortBy": "date",
            "articlesSortByAsc": False,
            "resultType": "articles",
            "dataType": ["news"],
            "apiKey": news_api_key,
            "includeArticleTitle": True,
            "includeArticleBody": True
        }
        response = requests.post(url, json=payload)
        if response.status_code == 200:
            articles = response.json().get('articles', {}).get('results', [])
            for article in articles:
                article_data = {
                    "url": article["url"],
                    "text": article["body"],
                    "relationships": []
                }
                all_articles_data.append(article_data)

    with open(output_file_path, 'w') as f:
        json.dump(all_articles_data, f, indent=2)


In [24]:
# This could improve with parallel processing 

# Import for kfp
from kfp.v2.dsl import component, InputPath, OutputPath

@component(
    packages_to_install=["google-cloud-storage", "openai", "pydantic", "langchain", "langchain-openai"],
    base_image="python:3.8"
)
def process_articles_component(
    input_json_path: InputPath('json'),
    output_json_path: OutputPath('json'),
    openai_api_key: str,
    bucket_name: str,
    project_id: str,
    output_folder: str
):
    # Import libraries 
    from typing import List
    import json
    import logging
    from google.cloud import storage
    from pydantic import BaseModel, Field
    from langchain.prompts import PromptTemplate
    from langchain.output_parsers import PydanticOutputParser
    from langchain_openai import ChatOpenAI
    
    class Relationship(BaseModel):
        supplier: str = Field(description="Supplier Name")
        buyer: str = Field(description="Buyer Name")
        product: str = Field(description="Products involved in the transaction")
        location: str = Field(description="Supplier Location")

    class RelationshipsData(BaseModel):
        url: str = Field(description="The URL of the article")
        text: str = Field(description="The text of the article")
        relationships: List[Relationship] = Field(description="List of buyer-supplier relationships")
    
    logging.basicConfig(level=logging.INFO)

    # Initialize OpenAI and LangChain components securely
    try:
        model = ChatOpenAI(api_key=openai_api_key, model="gpt-4", temperature=0)
    except Exception as e:
        logging.error(f"Error initializing ChatOpenAI: {e}")
        raise

    prompt_template = """
    Please help identify all buyer-supplier relationships present in the provided text. 
    The primary objective of this project is to pinpoint entities within the restaurant industry for NER purposes, and to delineate supply chains. 
    Extract instances where one organization provides goods or services to another, requiring the naming of both parties involved.
    Important Instructions 
    1. Do not add the url and article text to the JSON output, but rather leave as "input URL here" and "input text here" respectively. 
    2. Do not add a buyer-supplier relationship unless specific names of buyer and supplier organisations can be found. 
       This means examples like "Customers", "Popular Restuarant" do not count and should be excluded. 
    

    << FORMATTING >>
    {format_instructions}

    << INPUT >>
    {url_text}

    << OUTPUT>>
    Organize your findings into a JSON object following this structure:

    JSON
    {{
        "url": "input URL here",
        "text": "input text here",
        "relationships": [
            {{
                "supplier": "Supplier Name",
                "buyer": "Buyer Name",
                "product": "Products involved",
                "location": "Supplier Location"
            }}
            // Additional entries if multiple relationships are found
        ]
    }}
    """

    parser = PydanticOutputParser(pydantic_object=RelationshipsData)
    
    prompt = PromptTemplate(
        template=prompt_template,
        input_variables=["url_text"],
        partial_variables={"format_instructions": parser.get_format_instructions()}
    )

    chain = prompt | model | parser
        
    try:
        with open(input_json_path, 'r') as f:
            articles = json.load(f)
    except Exception as e:
        logging.error(f"Error reading input JSON from GCS: {e}")
        raise

    processed_texts = []
    
    for article in articles:
        try:
            processed_text = chain.invoke({"url_text": article["text"]})

            relationships_dicts = [relationship.dict() for relationship in processed_text.relationships]

            article_data = {
                "url": article["url"],
                "text": article["text"],
                "relationships": relationships_dicts  
            }
            processed_texts.append(article_data)

        except Exception as e:
            logging.warning(f"Error processing article {article['url']}: {e}")
            continue
    
            
    try:
        processed_texts_json = json.dumps(processed_texts, indent=2) 
        with open(output_json_path, 'w') as f:
            f.write(processed_texts_json)
        logging.info("Successfully processed articles and saved output.")
    except Exception as e:
        logging.error(f"Error saving processed texts: {e}")
        raise


In [25]:
import kfp.dsl as dsl
from kfp.v2.dsl import InputPath, OutputPath, component

@dsl.pipeline(
    name=f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}',
    pipeline_root=f'{URI}/{TIMESTAMP}/kfp/'
)
def supply_chain_pipeline(
    news_api_key: str,
    openai_api_key: str,
    project_id: str,
    bucket_name: str,
    restaurants_csv_path: str,
    output_folder: str,
):
    # Component invocation 
    fetch_articles_task = fetch_and_store_articles(
        news_api_key=news_api_key,
        project_id=project_id,
        bucket_name=bucket_name,
        restaurants_csv_path=restaurants_csv_path,
    )

    # Use the output from the previous component as input for the next one
    process_articles_task = process_articles_component(
        input_json_path=fetch_articles_task.outputs['output_file_path'],  
        openai_api_key=openai_api_key,
        bucket_name=bucket_name,
        project_id=project_id,
        output_folder=output_folder,
    )


---
## Compile Pipeline

In [26]:
import kfp.compiler as compiler

compiler.Compiler().compile(
    pipeline_func=supply_chain_pipeline,  
    package_path=f"{DIR}/{NOTEBOOK}.yaml"  # Or '.json' 
)

# Move compiled pipeline files to GCS Bucket
!gsutil cp {DIR}/{NOTEBOOK}.yaml {URI}/{TIMESTAMP}/kfp/


Copying file://temp/supply_chain_pipeline_notebook/supply_chain_pipeline_notebook.yaml [Content-Type=application/octet-stream]...
/ [1 files][ 13.2 KiB/ 13.2 KiB]                                                
Operation completed over 1 objects/13.2 KiB.                                     


---
## Create Vertex AI Pipeline Job

In [27]:
from google.cloud import aiplatform

# Adjust these variables based on your specific environment and pipeline
PROJECT_ID = PROJECT_ID
REGION = 'us-east1'
PIPELINE_NAME = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}'
TEMPLATE_PATH = f"{URI}/{TIMESTAMP}/kfp/{NOTEBOOK}.yaml"  

# Initialize the Vertex AI client
aiplatform.init(project=PROJECT_ID, location=REGION)

# Create and run the pipeline job
pipeline_job = aiplatform.PipelineJob(
    display_name=PIPELINE_NAME,
    template_path=TEMPLATE_PATH,
    parameter_values={
        'news_api_key': 'c3960efd-57ae-42e5-8bb1-adf076f9744c',
        'openai_api_key': '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',
        'project_id': PROJECT_ID,
        'bucket_name': BUCKET,
        'restaurants_csv_path': 'restaurant_names/restaurants_names_entry.csv',
        'output_folder': f'{BUCKET}/processed_output',
    },
    enable_caching=False
)


In [28]:
# Run the pipeline job
response = pipeline_job.run(
    service_account = SERVICE_ACCOUNT
)

Creating PipelineJob
PipelineJob created. Resource name: projects/557605249790/locations/us-east1/pipelineJobs/kfp-supply-chain-pipeline-notebook-api-news-data-20240406203101-20240406211955
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/557605249790/locations/us-east1/pipelineJobs/kfp-supply-chain-pipeline-notebook-api-news-data-20240406203101-20240406211955')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east1/pipelines/runs/kfp-supply-chain-pipeline-notebook-api-news-data-20240406203101-20240406211955?project=557605249790
PipelineJob projects/557605249790/locations/us-east1/pipelineJobs/kfp-supply-chain-pipeline-notebook-api-news-data-20240406203101-20240406211955 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/557605249790/locations/us-east1/pipelineJobs/kfp-supply-chain-pipeline-notebook-api-news-data-20240406203101-20240406211955 current state:
PipelineState.PIPELINE_STATE_RUNNIN

In [106]:
pipeline_df = aiplatform.get_pipeline_df(pipeline = "kfp-supply-chain-pipeline-notebook-api-news-data-20240406164425")
pipeline_df.head(1)

Unnamed: 0,pipeline_name,run_name,param.input:bucket_name,param.input:news_api_key,param.input:project_id,param.input:openai_api_key,param.vmlmd_lineage_integration,param.input:output_folder,param.input:restaurants_csv_path
0,kfp-supply-chain-pipeline-notebook-api-news-da...,kfp-supply-chain-pipeline-notebook-api-news-da...,restaurant-supplier-bucket,c3960efd-57ae-42e5-8bb1-adf076f9744c,restaurant-supplier-networks,sk-flQQ9k4j6Go70wd6DfiUT3BlbkFJYZk9gUeV27KFPNF...,{'pipeline_run_component': {'location_id': 'us...,restaurant-supplier-bucket/processed_output,restaurant_names/restaurants_names_entry.csv
