In [74]:
#%pip install torchtext
#%pip install torch-model-archiver
#%pip install --user pandas_gbq

In [75]:
import json
import os
import time
from collections import OrderedDict

import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
from google.cloud import aiplatform, bigquery
from google.cloud.aiplatform.prediction import LocalModel
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from torchtext.vocab import vocab

In [76]:
from transformers import BertTokenizerFast, BertForTokenClassification
from torch.utils.data import DataLoader
from tqdm import tqdm
from torch.optim import SGD
import evaluate

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"


## Env setup

In [77]:
REGION = "us-central1"

PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]
# PROJECT_ID = "qwiklabs-asl-00-748dac0c969e"

ARTIFACT_STORE = f"gs://{PROJECT_ID}-ner-kfp-artifact-store"

DATA_ROOT = f"{ARTIFACT_STORE}/data"
JOB_DIR_ROOT = f"{ARTIFACT_STORE}/jobs"
TRAINING_FILE_PATH = f"{DATA_ROOT}/training/dataset.csv"
VALIDATION_FILE_PATH = f"{DATA_ROOT}/validation/dataset.csv"
TESTING_FILE_PATH = f"{DATA_ROOT}/testing/dataset.csv"
API_ENDPOINT = f"{REGION}-aiplatform.googleapis.com"

In [78]:
os.environ["JOB_DIR_ROOT"] = JOB_DIR_ROOT
os.environ["TRAINING_FILE_PATH"] = TRAINING_FILE_PATH
os.environ["VALIDATION_FILE_PATH"] = VALIDATION_FILE_PATH
os.environ["PROJECT_ID"] = PROJECT_ID
os.environ["REGION"] = REGION

In [79]:
!gsutil ls | grep ^{ARTIFACT_STORE}/$ || gsutil mb -l {REGION} {ARTIFACT_STORE}

gs://qwiklabs-asl-00-69fe165840f7-ner-kfp-artifact-store/


## Save NER granular data from source to BQ

In [61]:
%%bash

DATASET_LOCATION=US
DATASET_ID=ner_granular_dataset
TABLE_ID=ner_granular
DATA_SOURCE=gs://quicklabs-asl-ner-dataset/ner.csv
SCHEMA=text:STRING,\
labels:STRING

exists=$(bq ls -d | grep -w $DATASET_ID)
if [ -n "$exists" ]; then
   echo "$DATASET_ID already exists"
else
   echo "Creating $dataset"
   bq --location=$DATASET_LOCATION --project_id=$PROJECT_ID mk --dataset $DATASET_ID
fi

bq --project_id=$PROJECT_ID --dataset_id=$DATASET_ID load \
--source_format=CSV \
--skip_leading_rows=1 \
--replace \
$TABLE_ID \
$DATA_SOURCE \
$SCHEMA

W0613 22:55:06.874674 140063327733568 bigquery_client.py:731] There is no apilog flag so non-critical logging is disabled.


ner_granular_dataset already exists


W0613 22:55:08.048714 140614478464832 bigquery_client.py:731] There is no apilog flag so non-critical logging is disabled.
Waiting on bqjob_r71c11e291affb208_00000188b6f85511_1 ... (2s) Current status: DONE   


In [89]:
%%bigquery df
SELECT *
FROM `ner_granular_dataset.ner_granular`

Query is running:   0%|          |

Downloading:   0%|          |

In [12]:
%%bigquery
SELECT *
FROM `qwiklabs-asl-00-748dac0c969e.asl_team3.enron_mail_body_500`

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,text
0,---------------------- forwarded by phillip k ...
1,no
2,"greg, something that i forgot to ask you. do ..."
3,"lucy, here is a draft of a memo we should dist..."
4,"brad, with regard to tori kuykendall, i would..."
...,...
493,04/27/2001 01:01 pm a current notes user: rea...
494,how am i to send them the money for the silent...
495,i will email you an updated operating statemen...
496,"ray, is there any detail on the gas cost proxy..."


## training and validation split

In [90]:
df.head()

Unnamed: 0,text,labels
0,...,O
1,The,O
2,Questions ?,O O
3,It was .,O O O
4,Fifteen dollars .,O O O


In [91]:
df = df.iloc[np.random.permutation(len(df))][0:5000]

labels = [i.split() for i in df['labels'].values.tolist()]
unique_labels = set()

for lb in labels:
        [unique_labels.add(i) for i in lb if i not in unique_labels]
labels_to_ids = {k: v for v, k in enumerate(unique_labels)}
ids_to_labels = {v: k for v, k in enumerate(unique_labels)}

df_train, df_val, df_test = np.split(df.sample(frac=1, random_state=42),
                            [int(.8 * len(df)), int(.9 * len(df))])

In [92]:
len(unique_labels)

17

In [93]:
df.head()

Unnamed: 0,text,labels
28282,"It would be her third stint in rehab , followi...",O O O O O O O O O O O O O B-geo O B-geo O O B-...
47574,The International Committee of the Red Cross s...,O B-org I-org O O B-geo I-geo O O O O O O O O ...
18599,What is new is the idea of putting the solar p...,O O O O O O O O O O O O O O O O O O O O O O O O
1666,Authorities say the suspects were planning a m...,O O O O O O O O O O
23146,Analysts predict the spending surge will encou...,O O O O O O O O B-geo O O O O O O O O O O O O ...


## train/val/test data save to BQ

In [74]:
import pandas_gbq

try:
    pandas_gbq.to_gbq(df_train, "ner_granular_dataset.training", project_id=PROJECT_ID)
except:
    print("table already exist")


100%|██████████| 1/1 [00:00<00:00, 14716.86it/s]


In [75]:
!bq extract \
--destination_format CSV \
ner_granular_dataset.training \
$TRAINING_FILE_PATH

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
W0613 23:20:16.496857 140601218156352 bigquery_client.py:731] There is no apilog flag so non-critical logging is disabled.
Waiting on bqjob_r5c4812b15a241663_00000188b70f5971_1 ... (0s) Current status: DONE   


In [76]:
try:
    pandas_gbq.to_gbq(df_val, "ner_granular_dataset.validation", project_id=PROJECT_ID)
except:
    print("table already exist")


100%|██████████| 1/1 [00:00<00:00, 9489.38it/s]


In [77]:
!bq extract \
--destination_format CSV \
ner_granular_dataset.validation \
$VALIDATION_FILE_PATH


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
W0613 23:20:27.614960 139728485074752 bigquery_client.py:731] There is no apilog flag so non-critical logging is disabled.
Waiting on bqjob_r62cd9e6ae0e182c_00000188b70f84df_1 ... (0s) Current status: DONE   


In [78]:
try:
    pandas_gbq.to_gbq(df_test, "ner_granular_dataset.testing", project_id=PROJECT_ID)
except:
    print("table already exist")


100%|██████████| 1/1 [00:00<00:00, 8793.09it/s]


In [79]:
!bq extract \
--destination_format CSV \
ner_granular_dataset.testing \
$TESTING_FILE_PATH


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
W0613 23:20:33.166322 139964956174144 bigquery_client.py:731] There is no apilog flag so non-critical logging is disabled.
Waiting on bqjob_r1b4353a61aa4e907_00000188b70f9a8f_1 ... (0s) Current status: DONE   


## Preprocessing 

In [94]:
df_train = pd.read_csv(TRAINING_FILE_PATH)
df_validation = pd.read_csv(VALIDATION_FILE_PATH)
df_test = pd.read_csv(TESTING_FILE_PATH)

In [95]:
df_train.head()

Unnamed: 0,text,labels
0,The,O
1,Other support it .,O O O O
2,He was 82 .,O O O O
3,No casualties were reported .,O O O O O
4,One person was injured .,O O O O O


In [96]:
tokenizer = BertTokenizerFast.from_pretrained('bert-base-cased')


In [97]:
label_all_tokens = False

def align_label(texts, labels):
    tokenized_inputs = tokenizer(texts, padding='max_length', max_length=512, truncation=True)

    word_ids = tokenized_inputs.word_ids()

    previous_word_idx = None
    label_ids = []

    for word_idx in word_ids:

        if word_idx is None:
            label_ids.append(-100)

        elif word_idx != previous_word_idx:
            try:
                label_ids.append(labels_to_ids[labels[word_idx]])
            except:
                label_ids.append(-100)
        else:
            try:
                label_ids.append(labels_to_ids[labels[word_idx]] if label_all_tokens else -100)
            except:
                label_ids.append(-100)
        previous_word_idx = word_idx

    return label_ids

class DataSequence(torch.utils.data.Dataset):

    def __init__(self, df):

        lb = [i.split() for i in df['labels'].values.tolist()]
        txt = df['text'].values.tolist()
        self.texts = [tokenizer(str(i),
                               padding='max_length', max_length = 512, truncation=True, return_tensors="pt") for i in txt]
        self.labels = [align_label(i,j) for i,j in zip(txt, lb)]

    def __len__(self):

        return len(self.labels)

    def get_batch_data(self, idx):

        return self.texts[idx]

    def get_batch_labels(self, idx):

        return torch.LongTensor(self.labels[idx])

    def __getitem__(self, idx):

        batch_data = self.get_batch_data(idx)
        batch_labels = self.get_batch_labels(idx)
        
        return batch_data, batch_labels

## build bert model

In [98]:
class BertModel(torch.nn.Module):

    def __init__(self):

        super(BertModel, self).__init__()

        self.bert = BertForTokenClassification.from_pretrained('bert-base-cased', num_labels=len(unique_labels))

    def forward(self, input_id, mask, label):

        output = self.bert(input_ids=input_id, attention_mask=mask, labels=label, return_dict=False)

        return output

In [99]:
def compute_metrics(p):
    predictions, labels = p
    #select predicted index with maximum logit for each token
    predictions = np.argmax(predictions, axis=2)
    # Remove ignored index (special tokens)
    true_predictions = [
        [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_names[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }


In [100]:

eva_metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

f1_metric = evaluate.load("f1")

In [None]:
def train_loop(model, df_train, df_val):

    train_dataset = DataSequence(df_train)
    val_dataset = DataSequence(df_val)

    train_dataloader = DataLoader(train_dataset, num_workers=4, batch_size=BATCH_SIZE, shuffle=True)
    val_dataloader = DataLoader(val_dataset, num_workers=4, batch_size=BATCH_SIZE)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    optimizer = SGD(model.parameters(), lr=LEARNING_RATE)

    if use_cuda:
        model = model.cuda()

    best_acc = 0
    best_loss = 1000

    for epoch_num in range(EPOCHS):

        total_acc_train = 0
        total_loss_train = 0

        model.train()

        for train_data, train_label in tqdm(train_dataloader):

            train_label = train_label.to(device)
            mask = train_data['attention_mask'].squeeze(1).to(device)
            input_id = train_data['input_ids'].squeeze(1).to(device)

            optimizer.zero_grad()
            loss, logits = model(input_id, mask, train_label)

            for i in range(logits.shape[0]):

              logits_clean = logits[i][train_label[i] != -100]
              label_clean = train_label[i][train_label[i] != -100]

              predictions = logits_clean.argmax(dim=1)
              acc = (predictions == label_clean).float().mean()
              total_acc_train += acc
              total_loss_train += loss.item()

            loss.backward()
            optimizer.step()
        
        model.eval()

        total_acc_val = 0
        total_loss_val = 0
        
        for val_data, val_label in val_dataloader:

            val_label = val_label.to(device)
            mask = val_data['attention_mask'].squeeze(1).to(device)
            input_id = val_data['input_ids'].squeeze(1).to(device)

            loss, logits = model(input_id, mask, val_label)
            
            for i in range(logits.shape[0]):

              logits_clean = logits[i][val_label[i] != -100]
              label_clean = val_label[i][val_label[i] != -100]

              predictions = logits_clean.argmax(dim=1)
              acc = (predictions == label_clean).float().mean()
              total_acc_val += acc
              total_loss_val += loss.item()
                
        f1_metric.add_batch(predictions=predictions, references=label_clean)
        val_accuracy = total_acc_val / len(df_val)
        val_loss = total_loss_val / len(df_val)
        f1 = f1_metric.compute(average='micro')
        print(
            f'Epochs: {epoch_num + 1} | Loss: {total_loss_train / len(df_train): .3f} | Accuracy: {total_acc_train / len(df_train): .3f} | Val_Loss: {total_loss_val / len(df_val): .3f} | Accuracy: {total_acc_val / len(df_val): .3f}')
        print(f'f1: {f1}')

LEARNING_RATE = 5e-3
EPOCHS = 5
BATCH_SIZE = 5

model = BertModel()
train_loop(model, df_train, df_validation)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForTokenClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['cl

## testing

In [92]:
def evaluate(model, df_test):

    test_dataset = DataSequence(df_test)

    test_dataloader = DataLoader(test_dataset, num_workers=4, batch_size=1)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:
        model = model.cuda()

    total_acc_test = 0.0

    for test_data, test_label in test_dataloader:

            test_label = test_label.to(device)
            mask = test_data['attention_mask'].squeeze(1).to(device)

            input_id = test_data['input_ids'].squeeze(1).to(device)

            loss, logits = model(input_id, mask, test_label)

            for i in range(logits.shape[0]):

              logits_clean = logits[i][test_label[i] != -100]
              label_clean = test_label[i][test_label[i] != -100]

              predictions = logits_clean.argmax(dim=1)
              acc = (predictions == label_clean).float().mean()
              total_acc_test += acc

    val_accuracy = total_acc_test / len(df_test)
    print(f'Test Accuracy: {total_acc_test / len(df_test): .3f}')


evaluate(model, df_test)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

## prediction on single instance

In [93]:
def align_word_ids(texts):
  
    tokenized_inputs = tokenizer(texts, padding='max_length', max_length=512, truncation=True)

    word_ids = tokenized_inputs.word_ids()

    previous_word_idx = None
    label_ids = []

    for word_idx in word_ids:

        if word_idx is None:
            label_ids.append(-100)

        elif word_idx != previous_word_idx:
            try:
                label_ids.append(1)
            except:
                label_ids.append(-100)
        else:
            try:
                label_ids.append(1 if label_all_tokens else -100)
            except:
                label_ids.append(-100)
        previous_word_idx = word_idx

    return label_ids


def evaluate_one_text(model, sentence):


    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:
        model = model.cuda()

    text = tokenizer(sentence, padding='max_length', max_length = 512, truncation=True, return_tensors="pt")

    mask = text['attention_mask'].to(device)
    input_id = text['input_ids'].to(device)
    label_ids = torch.Tensor(align_word_ids(sentence)).unsqueeze(0).to(device)

    logits = model(input_id, mask, None)
    logits_clean = logits[0][label_ids != -100]

    predictions = logits_clean.argmax(dim=1).tolist()
    prediction_label = [ids_to_labels[i] for i in predictions]
    print(sentence)
    print(prediction_label)
    

In [95]:
evaluate_one_text(model, 'Bill Gates is the founder of Microsoft at Seattle, Washionton')
evaluate_one_text(model, 'Dear Jim, thanks for booking at Marroit at San Jose, California on May 15th, 2023')
evaluate_one_text(model, "Dear Mary, here is your shopping summary at Macy's at San Jose, California")
evaluate_one_text(model, "Dear Larry, here is your united airline ticket departing from San Francisco Airpot on March 12, 2013")
evaluate_one_text(model,"Starting in late October, we'll launch new flights from Los Angeles to Auckland and begin daily service from San Francisco to Auckland.")

Bill Gates is the founder of Microsoft at Seattle, Washionton
['B-per', 'I-per', 'O', 'O', 'O', 'O', 'B-org', 'O', 'B-geo', 'O', 'B-org']
Dear Jim, thanks for booking at Marroit at San Jose, California on May 15th, 2023
['O', 'B-per', 'O', 'O', 'O', 'O', 'O', 'B-org', 'O', 'B-geo', 'I-geo', 'O', 'I-geo', 'O', 'I-tim', 'I-tim', 'I-tim', 'I-tim']
Dear Mary, here is your shopping summary at Macy's at San Jose, California
['O', 'I-per', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-org', 'O', 'O', 'O', 'I-geo', 'I-geo', 'O', 'O']
Dear Larry, here is your united airline ticket departing from San Francisco Airpot on March 12, 2013
['O', 'B-per', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-geo', 'I-geo', 'I-org', 'O', 'B-tim', 'I-tim', 'I-tim', 'I-tim']
Starting in late October, we'll launch new flights from Los Angeles to Auckland and begin daily service from San Francisco to Auckland.
['O', 'O', 'B-tim', 'B-tim', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-geo', 'I-geo', 'O', 'B-geo', 'O', 'O', 

## Write the training script.

In [15]:
TRAINING_APP_FOLDER = "training_app"
os.makedirs(TRAINING_APP_FOLDER, exist_ok=True)


In [53]:
%%writefile {TRAINING_APP_FOLDER}/train.py
import os
import subprocess
import sys
import json

import fire
import hypertune
import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
from torchtext.vocab import vocab
from collections import OrderedDict
import torch.nn.functional as F

import time

from google.cloud import aiplatform, bigquery
from torch.utils.data import DataLoader, TensorDataset

from transformers import BertTokenizerFast, BertForTokenClassification
from torch.utils.data import DataLoader
from tqdm import tqdm
from torch.optim import SGD
import evaluate

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

tokenizer = BertTokenizerFast.from_pretrained('bert-base-cased')
label_all_tokens = False

def align_label(texts, labels):
    tokenized_inputs = tokenizer(texts, padding='max_length', max_length=512, truncation=True)

    word_ids = tokenized_inputs.word_ids()

    previous_word_idx = None
    label_ids = []

    for word_idx in word_ids:

        if word_idx is None:
            label_ids.append(-100)

        elif word_idx != previous_word_idx:
            try:
                label_ids.append(labels_to_ids[labels[word_idx]])
            except:
                label_ids.append(-100)
        else:
            try:
                label_ids.append(labels_to_ids[labels[word_idx]] if label_all_tokens else -100)
            except:
                label_ids.append(-100)
        previous_word_idx = word_idx

    return label_ids

class DataSequence(torch.utils.data.Dataset):

    def __init__(self, df):

        lb = [i.split() for i in df['labels'].values.tolist()]
        txt = df['text'].values.tolist()
        self.texts = [tokenizer(str(i),
                               padding='max_length', max_length = 512, truncation=True, return_tensors="pt") for i in txt]
        self.labels = [align_label(i,j) for i,j in zip(txt, lb)]

    def __len__(self):

        return len(self.labels)

    def get_batch_data(self, idx):

        return self.texts[idx]

    def get_batch_labels(self, idx):

        return torch.LongTensor(self.labels[idx])

    def __getitem__(self, idx):

        batch_data = self.get_batch_data(idx)
        batch_labels = self.get_batch_labels(idx)
        
        return batch_data, batch_labels

class BertModel(torch.nn.Module):

    def __init__(self):

        super(BertModel, self).__init__()

        self.bert = BertForTokenClassification.from_pretrained('bert-base-cased', num_labels=17)

    def forward(self, input_id, mask, label):

        output = self.bert(input_ids=input_id, attention_mask=mask, labels=label, return_dict=False)

        return output 
    
f1_metric = evaluate.load("f1")

def train_evaluate(job_dir, TRAINING_FILE_PATH, VALIDATION_FILE_PATH, TESTING_FILE_PATH, batch_size, max_iter, hptune):
    
    df_train = pd.read_csv(TRAINING_FILE_PATH)
    df_val = pd.read_csv(VALIDATION_FILE_PATH)
    df_test = pd.read_csv(TESTING_FILE_PATH)

    def train_loop(model, df_train, df_val):

        train_dataset = DataSequence(df_train)
        val_dataset = DataSequence(df_val)

        train_dataloader = DataLoader(train_dataset, num_workers=4, batch_size=BATCH_SIZE, shuffle=True)
        val_dataloader = DataLoader(val_dataset, num_workers=4, batch_size=BATCH_SIZE)

        use_cuda = torch.cuda.is_available()
        device = torch.device("cuda" if use_cuda else "cpu")

        optimizer = SGD(model.parameters(), lr=LEARNING_RATE)

        if use_cuda:
            model = model.cuda()

        best_acc = 0
        best_loss = 1000

        for epoch_num in range(EPOCHS):

            total_acc_train = 0
            total_loss_train = 0

            model.train()

            for train_data, train_label in tqdm(train_dataloader):

                train_label = train_label.to(device)
                mask = train_data['attention_mask'].squeeze(1).to(device)
                input_id = train_data['input_ids'].squeeze(1).to(device)

                optimizer.zero_grad()
                loss, logits = model(input_id, mask, train_label)

                for i in range(logits.shape[0]):

                  logits_clean = logits[i][train_label[i] != -100]
                  label_clean = train_label[i][train_label[i] != -100]

                  predictions = logits_clean.argmax(dim=1)
                  acc = (predictions == label_clean).float().mean()
                  total_acc_train += acc
                  total_loss_train += loss.item()

                loss.backward()
                optimizer.step()

            model.eval()

            total_acc_val = 0
            total_loss_val = 0

            for val_data, val_label in val_dataloader:

                val_label = val_label.to(device)
                mask = val_data['attention_mask'].squeeze(1).to(device)
                input_id = val_data['input_ids'].squeeze(1).to(device)

                loss, logits = model(input_id, mask, val_label)

                for i in range(logits.shape[0]):

                  logits_clean = logits[i][val_label[i] != -100]
                  label_clean = val_label[i][val_label[i] != -100]

                  predictions = logits_clean.argmax(dim=1)
                  acc = (predictions == label_clean).float().mean()
                  total_acc_val += acc
                  total_loss_val += loss.item()

            f1_metric.add_batch(predictions=predictions, references=label_clean)
            val_accuracy = total_acc_val / len(df_val)
            val_loss = total_loss_val / len(df_val)
            f1 = f1_metric.compute(average='micro')
            print(
                f'Epochs: {epoch_num + 1} | Loss: {total_loss_train / len(df_train): .3f} | Accuracy: {total_acc_train / len(df_train): .3f} | Val_Loss: {total_loss_val / len(df_val): .3f} | Accuracy: {total_acc_val / len(df_val): .3f}')
            print(f'f1: {f1}')

    def evaluate(model, df_test):

        test_dataset = DataSequence(df_test)

        test_dataloader = DataLoader(test_dataset, num_workers=4, batch_size=1)

        use_cuda = torch.cuda.is_available()
        device = torch.device("cuda" if use_cuda else "cpu")

        if use_cuda:
            model = model.cuda()

        total_acc_test = 0.0

        for test_data, test_label in test_dataloader:

                test_label = test_label.to(device)
                mask = test_data['attention_mask'].squeeze(1).to(device)

                input_id = test_data['input_ids'].squeeze(1).to(device)

                loss, logits = model(input_id, mask, test_label)

                for i in range(logits.shape[0]):

                  logits_clean = logits[i][test_label[i] != -100]
                  label_clean = test_label[i][test_label[i] != -100]

                  predictions = logits_clean.argmax(dim=1)
                  acc = (predictions == label_clean).float().mean()
                  total_acc_test += acc
        f1_metric.add_batch(predictions=predictions, references=label_clean)
        f1 = f1_metric.compute(average='micro')
        val_accuracy = total_acc_test / len(df_test)
        print(f'Test Accuracy: {total_acc_test / len(df_test): .3f}')
        print(f'f1: {f1}')


    LEARNING_RATE = 5e-3
    EPOCHS = max_iter #5
    BATCH_SIZE = batch_size #5

    model = BertModel()
    train_loop(model, df_train, df_val)
    evaluate(model, df_test)
    # Save the model
    model_filename = "model.pt"
    model_scripted = torch.jit.script(model)
    model_scripted.save(model_filename)
    gcs_model_path = "{}/{}".format(job_dir, model_filename)

    # # export json for preprocessing
    # preprocesinng_json = {c: transformers[c].serialize_constants() 
    #                       for c in df_train.columns if c != LABEL_COLUMN}
    # preproc_json_filename = 'preprocessing.json'
    # with open(preproc_json_filename, 'w') as f:
    #     json.dump(preprocesinng_json, f)
    # gcs_preprocessing_json_path = "{}/{}".format(job_dir, preproc_json_filename)

    # send files to GCS
    subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
    # subprocess.check_call(['gsutil', 'cp', preproc_json_filename, gcs_preprocessing_json_path], 
    #                       stderr=sys.stdout)

    
if __name__ == "__main__":
    fire.Fire(train_evaluate)
    

Overwriting training_app/train.py


## package the file into a docker image

In [54]:
%%writefile {TRAINING_APP_FOLDER}/Dockerfile

FROM us-docker.pkg.dev/vertex-ai/training/pytorch-xla.1-11:latest
RUN pip install -U fire cloudml-hypertune pandas==0.25.3
RUN pip install -U torchtext==0.12.0
RUN pip install -U torch-model-archiver
RUN pip install -U transformers
RUN pip install -U evaluate
ENV GPU_NUM_DEVICES=1
ENV PJRT_DEVICE=GPU
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Overwriting training_app/Dockerfile


In [55]:
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

IMAGE_NAME = "trainer_image"
IMAGE_TAG = "latest"
IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{IMAGE_TAG}"

os.environ["IMAGE_URI"] = IMAGE_URI

In [59]:
!docker build --tag $IMAGE_URI $TRAINING_APP_FOLDER

Sending build context to Docker daemon  11.26kB
Step 1/11 : FROM us-docker.pkg.dev/vertex-ai/training/pytorch-xla.1-11:latest
latest: Pulling from vertex-ai/training/pytorch-xla.1-11

[1B17ec1767: Pulling fs layer 
[1B45a9c0c5: Pulling fs layer 
[1Be4c1f40f: Pulling fs layer 
[1B86ab2510: Pulling fs layer 
[1Badc78060: Pulling fs layer 
[1B84563a60: Pulling fs layer 
[1B4e23074d: Pulling fs layer 
[1Bd65705b8: Pulling fs layer 
[1Bf7b4815b: Pulling fs layer 
[1Bc0f08642: Pulling fs layer 
[1B16f81350: Pulling fs layer 
[1Bb700ef54: Pulling fs layer 
[1B60d3626c: Pulling fs layer 
[1B8c5f1f8f: Pulling fs layer 
[1B5ac89e7f: Pulling fs layer 
[1Bdcc3a763: Pulling fs layer 
[1Ba1a53c2f: Pulling fs layer 
[1Be6fcc0e6: Pulling fs layer 
[1B3226acd7: Pulling fs layer 
[1B4de043c5: Pulling fs layer 
[1B87f0858e: Pulling fs layer 
[1B07888aac: Pulling fs layer 
[1Bfc1617dd: Pulling fs layer 
[1B6205e6a8: Pulling fs layer 
[1B7f764283: Pulling fs layer 
[1Bc87c58c1: Pu

In [63]:
IMAGE_URI

'gcr.io/qwiklabs-asl-00-69fe165840f7/trainer_image:latest'

In [64]:
TRAINING_APP_FOLDER

'training_app'

In [73]:
!docker run $IMAGE_URI \
-job_dir "gs://qwiklabs-asl-00-69fe165840f7-ner-kfp-artifact-store/jobs/JOB_VERTEX_20230615_154521" \ 
-TRAINING_FILE_PATH "gs://qwiklabs-asl-00-69fe165840f7-ner-kfp-artifact-store/data/training/dataset.csv" \ 
-VALIDATION_FILE_PATH "gs://qwiklabs-asl-00-69fe165840f7-ner-kfp-artifact-store/data/validation/dataset.csv" \ 
-TESTING_FILE_PATH "gs://qwiklabs-asl-00-69fe165840f7-ner-kfp-artifact-store/data/testing/dataset.csv" \ 
-batch_size 2 \ 
-max_iter 2 \ 
-nohptune


In [None]:
!docker push 

In [65]:
!docker images

REPOSITORY                                              TAG       IMAGE ID       CREATED          SIZE
gcr.io/qwiklabs-asl-00-69fe165840f7/trainer_image       latest    ac270d20adca   20 minutes ago   18.4GB
us-docker.pkg.dev/vertex-ai/training/pytorch-xla.1-11   latest    8a590d7d196b   4 months ago     18.2GB


In [56]:
!gcloud builds submit --tag $IMAGE_URI $TRAINING_APP_FOLDER

Creating temporary tarball archive of 2 file(s) totalling 8.5 KiB before compression.
Uploading tarball of [training_app] to [gs://qwiklabs-asl-00-69fe165840f7_cloudbuild/source/1686842994.379759-4fa04448fd1d432b84df6686664cc5ae.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-asl-00-69fe165840f7/locations/global/builds/e9288a1c-492c-4706-bc32-fdeacf031143].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/e9288a1c-492c-4706-bc32-fdeacf031143?project=430886062887 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "e9288a1c-492c-4706-bc32-fdeacf031143"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-asl-00-69fe165840f7_cloudbuild/source/1686842994.379759-4fa04448fd1d432b84df6686664cc5ae.tgz#1686842994640997
Copying gs://qwiklabs-asl-00-69fe165840f7_cloudbuild/source/1686842994.379759-4fa04448fd1d432b84df6686664cc5ae.tgz#1686842994640997...
/ [1 files][  2.7 KiB/  2.7 KiB]                  

In [22]:
## configure and run the training job

In [23]:
# batch_size = 5
# max_iter = 5

In [57]:
TIMESTAMP = time.strftime("%Y%m%d_%H%M%S")
JOB_NAME = f"JOB_VERTEX_{TIMESTAMP}"
JOB_DIR = f"{JOB_DIR_ROOT}/{JOB_NAME}"

MACHINE_TYPE="n1-standard-4"
REPLICA_COUNT=1
ACCELERATOR_TYPE="NVIDIA_TESLA_T4"
ACCELERATOR_COUNT=1

WORKER_POOL_SPEC = f"""\
machine-type={MACHINE_TYPE},\
replica-count={REPLICA_COUNT},\
accelerator-type={ACCELERATOR_TYPE},\
accelerator-count={ACCELERATOR_COUNT},\
container-image-uri={IMAGE_URI}\
"""

ARGS = f"""\
--job_dir={JOB_DIR},\
--TRAINING_FILE_PATH={TRAINING_FILE_PATH},\
--VALIDATION_FILE_PATH={VALIDATION_FILE_PATH},\
--TESTING_FILE_PATH={TESTING_FILE_PATH},\
--batch_size=2,\
--max_iter=2,\
--nohptune\
"""

!gcloud ai custom-jobs create \
  --region={REGION} \
  --display-name={JOB_NAME} \
  --worker-pool-spec={WORKER_POOL_SPEC} \
  --args={ARGS}

print("The model will be exported at:", JOB_DIR)

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
CustomJob [projects/430886062887/locations/us-central1/customJobs/6826300211422298112] is submitted successfully.

Your job is still active. You may view the status of your job with the command

  $ gcloud ai custom-jobs describe projects/430886062887/locations/us-central1/customJobs/6826300211422298112

or continue streaming the logs with the command

  $ gcloud ai custom-jobs stream-logs projects/430886062887/locations/us-central1/customJobs/6826300211422298112
The model will be exported at: gs://qwiklabs-asl-00-69fe165840f7-ner-kfp-artifact-store/jobs/JOB_VERTEX_20230615_154521


In [58]:
!gsutil ls $JOB_DIR

CommandException: One or more URLs matched no objects.


In [51]:
%%bash
python3 -m /training_app/train.py --job_dir $JOB_DIR, \
--TRAINING_FILE_PATH $TRAINING_FILE_PATH, \
--VALIDATION_FILE_PATH $VALIDATION_FILE_PATH, \
--TESTING_FILE_PATH $TESTING_FILE_PATH, \
--batch_size 2, \
--max_iter 2, \
--hptune False \

/opt/conda/bin/python3: Error while finding module specification for '/training_app/train.py' (ModuleNotFoundError: No module named '/training_app/train'). Try using '/training_app/train' instead of '/training_app/train.py' as the module name.


## Deploy model to vertexAI prediction

In [None]:
PREDICT_APP_FOLDER = "predict_app"
os.makedirs(PREDICT_APP_FOLDER, exist_ok=True)
os.environ["PREDICT_APP_FOLDER"] = PREDICT_APP_FOLDER
os.environ["JOB_DIR"] = JOB_DIR


In [None]:
%%writefile {PREDICT_APP_FOLDER}/handler.py
from ts.torch_handler.base_handler import BaseHandler

import logging
import os
import json
from collections import OrderedDict

logging.basicConfig(level=logging.INFO)

import numpy as np

import torch
from torchtext.vocab import vocab
import torch.nn.functional as F



## ============= Below is the code from Takumi's template; to be deleted ===================

In [29]:
%%writefile {TRAINING_APP_FOLDER}/train.py
import os
import subprocess
import sys
import json

import fire
import hypertune
import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
from torchtext.vocab import vocab
from collections import OrderedDict
import torch.nn.functional as F


CATEGORICAL_FEATURES = ["Wilderness_Area", "Soil_Type"]
NUMERICAL_FEATURES = ['Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology',
       'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
       'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm',
       'Horizontal_Distance_To_Fire_Points']
LABEL_COLUMN = "Cover_Type"


class OneHotEncoder:
    def fit(self, series: pd.Series):
        categories = series.unique().tolist()
        dictionary = vocab(OrderedDict([(cat, 1) for cat in categories]))
        self.dictionary = dictionary
        return self

    def transform(self, feature:list, dictionary=None) -> torch.Tensor:
        if dictionary:
            assert type(dictionary) == list
            self.dictionary = vocab(
                OrderedDict([(cat, 1) for cat in dictionary])
            )

        indices = self.dictionary.lookup_indices(feature)
        indices = torch.tensor(indices, dtype=torch.long)
        one_hot = F.one_hot(
            indices, num_classes=len(self.dictionary.get_itos())
        )
        return torch.tensor(one_hot).float()

    def serialize_constants(self):
        return {"dictionary": self.dictionary.get_itos()}
class StandardScaler:
    def fit(self, series: pd.Series):
        self.mean = np.float64(series.mean())
        self.std = np.float64(series.std())
        return self

    def transform(self, feature:list, mean=None, std=None) -> torch.Tensor:
        if mean:
            self.mean = np.float64(mean)
        if std:
            self.std = np.float64(std)

        standardized = (feature - self.mean) / self.std
        return torch.tensor(standardized)[:, None].float()

    def serialize_constants(self):
        return {"mean": self.mean, "std": self.std}

def preprocess(df, transformers):
    transformed_features = [transformers[c].transform(df[c].to_list()) 
                            for c in df.columns if c != LABEL_COLUMN]
    features = torch.cat(transformed_features, 1)
    
    label = df[LABEL_COLUMN].to_list()
    label = torch.LongTensor(label)
    return features, label

    
def train_evaluate(job_dir, training_dataset_path, validation_dataset_path, batch_size, max_iter, hptune):
    
    df_train = pd.read_csv(training_dataset_path)
    df_validation = pd.read_csv(validation_dataset_path)

    if not hptune:
        df_train = pd.concat([df_train, df_validation])

    
    # Preproc Categorical columns
    transformers = {c_feature: OneHotEncoder().fit(df_train[c_feature]) 
                    for c_feature in CATEGORICAL_FEATURES}
    transformers.update({n_feature: StandardScaler().fit(df_train[n_feature]) 
                         for n_feature in NUMERICAL_FEATURES})

    training_transformed = preprocess(df_train, transformers)
    validation_transformed = preprocess(df_validation, transformers)

    # Dataset and DataLoader
    train_dataset = TensorDataset(*training_transformed)
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size)
    validation_dataset = TensorDataset(*validation_transformed)
    validation_dataloader = DataLoader(validation_dataset, batch_size=batch_size)
        # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Model
    class NeuralNetworkModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.linear_relu_stack = nn.Sequential(
                nn.Linear(54, 32),
                nn.ReLU(),
                nn.Linear(32, 16),
                nn.ReLU(),
                nn.Linear(16, 7)
            )

        def forward(self, x):
            return self.linear_relu_stack(x)

    model = NeuralNetworkModel().to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    
    def train(dataloader, model, loss_fn, optimizer, device):
        model.train()
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)

            optimizer.zero_grad()
            pred = model(X)
            loss = loss_fn(pred, y)
            loss.backward()
            optimizer.step()
    
    def validation(dataloader, model, loss_fn, device):
        size = len(dataloader.dataset)
        num_batches = len(dataloader)
        validation_loss = 0
        correct = 0

        model.eval()
        with torch.no_grad():
            for X, y in dataloader:
                X = X.to(device)
                y = y.to(device)
                pred = model(X)
                validation_loss += loss_fn(pred, y).item()
                correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        validation_loss /= num_batches
        correct /= size
        return correct, validation_loss
    
    epochs = max_iter
    for t in range(epochs):
        print(f"Epoch {t+1}")
        train(train_dataloader, model, loss_fn, optimizer, device)

    if hptune:
        accuracy, _ = validation(validation_dataloader, model, loss_fn, device)
        # Log it with hypertune
        hpt = hypertune.HyperTune()
        hpt.report_hyperparameter_tuning_metric(
          hyperparameter_metric_tag='accuracy',
          metric_value=accuracy
        )

    if not hptune:
        # Save the model
        model_filename = "model.pt"
        model_scripted = torch.jit.script(model)
        model_scripted.save(model_filename)
        gcs_model_path = "{}/{}".format(job_dir, model_filename)

        # export json for preprocessing
        preprocesinng_json = {c: transformers[c].serialize_constants() 
                              for c in df_train.columns if c != LABEL_COLUMN}
        preproc_json_filename = 'preprocessing.json'
        with open(preproc_json_filename, 'w') as f:
            json.dump(preprocesinng_json, f)
        gcs_preprocessing_json_path = "{}/{}".format(job_dir, preproc_json_filename)

        # send files to GCS
        subprocess.check_call(['gsutil', 'cp', model_filename, gcs_model_path], stderr=sys.stdout)
        subprocess.check_call(['gsutil', 'cp', preproc_json_filename, gcs_preprocessing_json_path], 
                              stderr=sys.stdout)


if __name__ == "__main__":
    fire.Fire(train_evaluate)

Writing training_app/train.py


In [30]:
%%writefile {TRAINING_APP_FOLDER}/Dockerfile

FROM us-docker.pkg.dev/vertex-ai/training/pytorch-xla.1-11:latest
RUN pip install -U fire cloudml-hypertune pandas==0.25.3
RUN pip install -U torchtext==0.12.0
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Writing training_app/Dockerfile


In [31]:
IMAGE_NAME = "trainer_image"
IMAGE_TAG = "latest"
IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{IMAGE_TAG}"

os.environ["IMAGE_URI"] = IMAGE_URI

In [32]:
!gcloud builds submit --tag $IMAGE_URI $TRAINING_APP_FOLDER

Creating temporary tarball archive of 2 file(s) totalling 6.6 KiB before compression.
Uploading tarball of [training_app] to [gs://qwiklabs-asl-00-69fe165840f7_cloudbuild/source/1686596468.642348-67fef37a619f455cb4f9d3b03137571c.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-asl-00-69fe165840f7/locations/global/builds/931d9ff3-f4ff-4186-b4a3-28165f912df9].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/931d9ff3-f4ff-4186-b4a3-28165f912df9?project=430886062887 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "931d9ff3-f4ff-4186-b4a3-28165f912df9"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-asl-00-69fe165840f7_cloudbuild/source/1686596468.642348-67fef37a619f455cb4f9d3b03137571c.tgz#1686596468916955
Copying gs://qwiklabs-asl-00-69fe165840f7_cloudbuild/source/1686596468.642348-67fef37a619f455cb4f9d3b03137571c.tgz#1686596468916955...
/ [1 files][  2.5 KiB/  2.5 KiB]                  

In [33]:
TIMESTAMP = time.strftime("%Y%m%d_%H%M%S")
JOB_NAME = f"forestcover_tuning_{TIMESTAMP}"
JOB_DIR = f"{JOB_DIR_ROOT}/{JOB_NAME}"

os.environ["JOB_NAME"] = JOB_NAME
os.environ["JOB_DIR"] = JOB_DIR

In [34]:
%%bash

MACHINE_TYPE="n1-standard-4"
REPLICA_COUNT=1
CONFIG_YAML=config.yaml

cat <<EOF > $CONFIG_YAML
studySpec:
  metrics:
  - metricId: accuracy
    goal: MAXIMIZE
  parameters:
  - parameterId: max_iter
    discreteValueSpec:
      values:
      - 10
      - 20
  - parameterId: batch_size
    integerValueSpec:
      minValue: 16
      maxValue: 128
    scaleType: UNIT_LINEAR_SCALE
  algorithm: ALGORITHM_UNSPECIFIED # results in Bayesian optimization
trialJobSpec:
  workerPoolSpecs:  
  - machineSpec:
      machineType: $MACHINE_TYPE
    replicaCount: $REPLICA_COUNT
    containerSpec:
      imageUri: $IMAGE_URI
      args:
      - --job_dir=$JOB_DIR
      - --training_dataset_path=$TRAINING_FILE_PATH
      - --validation_dataset_path=$VALIDATION_FILE_PATH
      - --hptune
EOF

gcloud ai hp-tuning-jobs create \
    --region=$REGION \
    --display-name=$JOB_NAME \
    --config=$CONFIG_YAML \
    --max-trial-count=5 \
    --parallel-trial-count=5

echo "JOB_NAME: $JOB_NAME"

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
Hyperparameter tuning job [1171725215026118656] submitted successfully.

Your job is still active. You may view the status of your job with the command

  $ gcloud ai hp-tuning-jobs describe 1171725215026118656 --region=us-central1

Job State: JOB_STATE_PENDING


JOB_NAME: forestcover_tuning_20230612_193643


In [38]:
def get_trials(job_name):
    jobs = aiplatform.HyperparameterTuningJob.list()
    match = [job for job in jobs if job.display_name == JOB_NAME]
    tuning_job = match[0] if match else None
    return tuning_job.trials if tuning_job else None


def get_best_trial(trials):
    metrics = [trial.final_measurement.metrics[0].value for trial in trials]
    best_trial = trials[metrics.index(max(metrics))]
    return best_trial


def retrieve_best_trial_from_job_name(jobname):
    trials = get_trials(jobname)
    best_trial = get_best_trial(trials)
    return best_trial


In [39]:
best_trial = retrieve_best_trial_from_job_name(JOB_NAME)

In [40]:
batch_size = int(best_trial.parameters[0].value)
max_iter = int(best_trial.parameters[1].value)

In [41]:
TIMESTAMP = time.strftime("%Y%m%d_%H%M%S")
JOB_NAME = f"JOB_VERTEX_{TIMESTAMP}"
JOB_DIR = f"{JOB_DIR_ROOT}/{JOB_NAME}"

MACHINE_TYPE="n1-standard-4"
REPLICA_COUNT=1

WORKER_POOL_SPEC = f"""\
machine-type={MACHINE_TYPE},\
replica-count={REPLICA_COUNT},\
container-image-uri={IMAGE_URI}\
"""

ARGS = f"""\
--job_dir={JOB_DIR},\
--training_dataset_path={TRAINING_FILE_PATH},\
--validation_dataset_path={VALIDATION_FILE_PATH},\
--batch_size={batch_size},\
--max_iter={max_iter},\
--nohptune\
"""

!gcloud ai custom-jobs create \
  --region={REGION} \
  --display-name={JOB_NAME} \
  --worker-pool-spec={WORKER_POOL_SPEC} \
  --args={ARGS}


print("The model will be exported at:", JOB_DIR)

Using endpoint [https://us-central1-aiplatform.googleapis.com/]
CustomJob [projects/430886062887/locations/us-central1/customJobs/5245618106075709440] is submitted successfully.

Your job is still active. You may view the status of your job with the command

  $ gcloud ai custom-jobs describe projects/430886062887/locations/us-central1/customJobs/5245618106075709440

or continue streaming the logs with the command

  $ gcloud ai custom-jobs stream-logs projects/430886062887/locations/us-central1/customJobs/5245618106075709440
The model will be exported at: gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/jobs/JOB_VERTEX_20230612_233129


In [55]:
!gsutil ls $JOB_DIR

gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/jobs/JOB_VERTEX_20230612_233129/model.pt
gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/jobs/JOB_VERTEX_20230612_233129/preprocessing.json


In [56]:
PREDICT_APP_FOLDER = "predict_app"
os.makedirs(PREDICT_APP_FOLDER, exist_ok=True)
os.environ["PREDICT_APP_FOLDER"] = PREDICT_APP_FOLDER
os.environ["JOB_DIR"] = JOB_DIR

In [57]:
%%writefile {PREDICT_APP_FOLDER}/handler.py
from ts.torch_handler.base_handler import BaseHandler

import logging
import os
import json
from collections import OrderedDict

logging.basicConfig(level=logging.INFO)

import numpy as np

import torch
from torchtext.vocab import vocab
import torch.nn.functional as F
    
CATEGORICAL_FEATURES = ["Wilderness_Area", "Soil_Type"]
NUMERICAL_FEATURES = ['Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology',
       'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
       'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm',
       'Horizontal_Distance_To_Fire_Points']

    
class ModelHandler(BaseHandler):
    
    def __init__(self):
        self._context = None
        self.initialized = False
        self.explain = False
        self.target = 0


    def _one_hot_transformer(self, feature, dictionary):
        dictionary = vocab(OrderedDict([(cat, 1) for cat in dictionary]))
        
        indices = dictionary.lookup_indices(feature)
        indices = torch.tensor(indices, dtype=torch.long)
        one_hot = F.one_hot(indices, num_classes=len(dictionary.get_itos()))
        return torch.tensor(one_hot).float()

    def _standard_scale_transformer(self, feature, mean, std):
        standardized = (feature - np.float64(mean)) / np.float64(std)
        return torch.tensor(standardized)[:, None].float()    


    def initialize(self, context):
        self._context = context
        self.initialized = True
                
        #  load the model
        self.manifest = context.manifest

        properties = context.system_properties
        model_dir = properties.get("model_dir")

        self.device = "cpu"

        # Read model serialize/pt file
        serialized_file = self.manifest['model']['serializedFile']
        model_pt_path = os.path.join(model_dir, serialized_file)
        if not os.path.isfile(model_pt_path):
            raise RuntimeError("Missing the model.pth file")

        self.model = torch.jit.load(model_pt_path)
        
        # Read preprocesinng setting
        preprocessing_json_path = os.path.join(model_dir, "preprocessing.json")
        with open(preprocessing_json_path) as f:
            self.preprocessing_json = json.load(f)
        
        self.initialized = True


    def preprocess(self, _data):
        data = _data[0]
        
        preprocessed_data = [self._one_hot_transformer(data[c_feature], 
                                                       self.preprocessing_json[c_feature]["dictionary"]) 
                             for c_feature in CATEGORICAL_FEATURES]
        preprocessed_data.extend([self._standard_scale_transformer(data[n_feature],
                                                                   self.preprocessing_json[n_feature]["mean"],
                                                                   self.preprocessing_json[n_feature]["std"]) 
                                  for n_feature in NUMERICAL_FEATURES])
        preprocessed_data = torch.cat(preprocessed_data, 1)
        
        return preprocessed_data

    def inference(self, model_input):
        model_output = self.model.forward(model_input)
        return model_output

    def postprocess(self, inference_output):
        postprocess_output = torch.argmax(inference_output, dim=1)        
        return [postprocess_output.cpu().numpy().tolist()]

    def handle(self, data, context):
        model_input = self.preprocess(data)
        model_output = self.inference(model_input)
        output = self.postprocess(model_output)
        return output

Overwriting predict_app/handler.py


In [58]:
!gsutil cp $JOB_DIR/model.pt ./$PREDICT_APP_FOLDER
!gsutil cp $JOB_DIR/preprocessing.json ./$PREDICT_APP_FOLDER

Copying gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/jobs/JOB_VERTEX_20230612_233129/model.pt...
/ [1 files][ 17.8 KiB/ 17.8 KiB]                                                
Operation completed over 1 objects/17.8 KiB.                                     
Copying gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/jobs/JOB_VERTEX_20230612_233129/preprocessing.json...
/ [1 files][  1.2 KiB/  1.2 KiB]                                                
Operation completed over 1 objects/1.2 KiB.                                      


In [72]:
%%bash 

torch-model-archiver -f \
  --model-name model \
  --version 1.0 \
  --serialized-file ./$PREDICT_APP_FOLDER/model.pt \
  --handler ./$PREDICT_APP_FOLDER/handler.py \
  --extra-files ./$PREDICT_APP_FOLDER/preprocessing.json \
  --export-path=./$PREDICT_APP_FOLDER

In [73]:
TIMESTAMP = time.strftime("%Y%m%d_%H%M%S")
DEPLOY_MODEL_GCS_URI = f"{ARTIFACT_STORE}/pytorch-model-deploy/{TIMESTAMP}"

!gsutil cp -r $PREDICT_APP_FOLDER $DEPLOY_MODEL_GCS_URI

Copying file://predict_app/handler.py [Content-Type=text/x-python]...
Copying file://predict_app/model.pt [Content-Type=application/vnd.snesdev-page-table]...
Copying file://predict_app/preprocessing.json [Content-Type=application/json]...
Copying file://predict_app/model.mar [Content-Type=application/octet-stream]... 
/ [4 files][ 38.1 KiB/ 38.1 KiB]                                                
Operation completed over 4 objects/38.1 KiB.                                     


In [74]:
!gsutil ls -al $DEPLOY_MODEL_GCS_URI

      3563  2023-06-13T15:55:53Z  gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/pytorch-model-deploy/20230613_155552/handler.py#1686671753421488  metageneration=1
     15931  2023-06-13T15:55:53Z  gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/pytorch-model-deploy/20230613_155552/model.mar#1686671753676318  metageneration=1
     18242  2023-06-13T15:55:53Z  gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/pytorch-model-deploy/20230613_155552/model.pt#1686671753511636  metageneration=1
      1257  2023-06-13T15:55:53Z  gs://qwiklabs-asl-00-69fe165840f7-kfp-artifact-store/pytorch-model-deploy/20230613_155552/preprocessing.json#1686671753590373  metageneration=1
TOTAL: 4 objects, 38993 bytes (38.08 KiB)


In [75]:
model_display_name = f"pytorch-model-{TIMESTAMP}"
model_description = (
    "PyTorch based forest cover classifier with the pre-built PyTorch image"
)
serving_container_image_uri = (
    "us-docker.pkg.dev/vertex-ai/prediction/pytorch-cpu.1-11:latest"
)

model = aiplatform.Model.upload(
    display_name=model_display_name,
    description=model_description,
    serving_container_image_uri=serving_container_image_uri,
    artifact_uri=DEPLOY_MODEL_GCS_URI,
)

model.wait()

print(model.display_name)
print(model.resource_name)

Creating Model
Create Model backing LRO: projects/430886062887/locations/us-central1/models/9179772002766946304/operations/4103519438018445312
Model created. Resource name: projects/430886062887/locations/us-central1/models/9179772002766946304@1
To use this Model in another session:
model = aiplatform.Model('projects/430886062887/locations/us-central1/models/9179772002766946304@1')
pytorch-model-20230613_155552
projects/430886062887/locations/us-central1/models/9179772002766946304


In [76]:
machine_type = "n1-standard-4"

endpoint = model.deploy(
    machine_type=machine_type, accelerator_type=None, accelerator_count=None
)

Creating Endpoint
Create Endpoint backing LRO: projects/430886062887/locations/us-central1/endpoints/637923452296101888/operations/5750148051775782912
Endpoint created. Resource name: projects/430886062887/locations/us-central1/endpoints/637923452296101888
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/430886062887/locations/us-central1/endpoints/637923452296101888')
Deploying model to Endpoint : projects/430886062887/locations/us-central1/endpoints/637923452296101888
Deploy Endpoint model backing LRO: projects/430886062887/locations/us-central1/endpoints/637923452296101888/operations/7252098527503843328
Endpoint model deployed. Resource name: projects/430886062887/locations/us-central1/endpoints/637923452296101888


In [77]:
instances = [
    {
        "Elevation": [2841.0],
        "Aspect": [45.0],
        "Slope": [0.0],
        "Horizontal_Distance_To_Hydrology": [644.0],
        "Vertical_Distance_To_Hydrology": [282.0],
        "Horizontal_Distance_To_Roadways": [1376.0],
        "Hillshade_9am": [218.0],
        "Hillshade_Noon": [237.0],
        "Hillshade_3pm": [156.0],
        "Horizontal_Distance_To_Fire_Points": [1003.0],
        "Wilderness_Area": ["Commanche"],
        "Soil_Type": ["C4758"],
    }
]

endpoint.predict(instances)

Prediction(predictions=[[6.0]], deployed_model_id='3432466394707394560', model_version_id='1', model_resource_name='projects/430886062887/locations/us-central1/models/9179772002766946304', explanations=None)

In [2]:
import pandas as pd

df1 = [ 'i like to shop at store a.' , 'he likes to shop at the store b.', 'she is happy to shop at store c.', 'we want to shop at the store d.']
df2 = [ 'store a', 'store b', 'store c', 'store d' ]
df3 = [ 'like to', 'likes to shop', 'at store' ]

myDataSet = list(zip(df1,df2))
df = pd.DataFrame(data = myDataSet, columns=['df1', 'df2'])

def in_statements(val):
    for statement in df3:
        if statement in val:
            color = 'yellow'
            break
        else:
            color = 'black'
    return 'background-color: %s' % color

df = df.style.applymap(in_statements)

df

Unnamed: 0,df1,df2
0,i like to shop at store a.,store a
1,he likes to shop at the store b.,store b
2,she is happy to shop at store c.,store c
3,we want to shop at the store d.,store d
