<a href="https://colab.research.google.com/github/mitchelledema/mitche/blob/main/custom_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Custom Training Setup
Full guide [here](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/migration/UJ3%20Vertex%20SDK%20Custom%20Image%20Classification%20with%20custom%20training%20container.ipynb)

## Setup environment variables

In [None]:
! gcloud auth login

In [39]:
newInstance = False

REGION = "us-west2"
PROJECT_ID = "calcium-bot-373723"
BUCKET_NAME = "custom_model2"
BUCKET_URI = f"gs://{BUCKET_NAME}"

! gcloud config set project $PROJECT_ID

Updated property [core/project].


## Set up trainer requirements

In [40]:
# Make folder for Python training script
! rm -rf custom
! mkdir custom

# Add package information
! touch custom/README.md

setup_cfg = "[egg_info]\ntag_build = tag_date = 0"
! echo "$setup_cfg" > custom/setup.cfg

setup_py = "import setuptools\n\nsetuptools.setup(\n  install_requires=[],\n  packages=setuptools.find_packages()\n)"
! echo "$setup_py" > custom/setup.py

pkg_info = "Metadata-Version: 1.0\n\nName: PPG to BP Training \n\nVersion: 0.0.0\n\nSummary: Training Setup\n\nHome-page: www.google.com\n\nAuthor: Google\n\nLicense: Public\n\nDescription: Training Setup\n\nPlatform: Vertex"
! echo "$pkg_info" > custom/PKG-INFO

# Make the training subfolder
! mkdir custom/trainer
! touch custom/trainer/__init__.py

## Add trainer code

In [41]:
%%writefile custom/trainer/task.py
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Subtract
from tensorflow.keras.callbacks import Callback
import argparse
import pickle
from tensorflow.python.lib.io import file_io
import sqlite3
from trainer.db import Database

parser = argparse.ArgumentParser()
parser.add_argument('--model_dir', dest='model_dir')
parser.add_argument('--bucket_name', dest='bucket_name')
parser.add_argument('--user_id', dest='userID')
args = parser.parse_args()

model_dir = args.model_dir
bucket_name = args.bucket_name
userID = int(args.userID)

basePath = f'/gcs/{bucket_name}/'

databasePath = os.path.join(basePath, 'database/calibration_data.db')
calibrationDataPath = os.path.join(basePath, f'user{userID}/calibration/')
conn = sqlite3.connect(databasePath)
db = Database(conn, calibrationDataPath)

def getModel():
  inputs = layers.Input(shape=(79,79,1))
  x = layers.Conv2D(filters=96, kernel_size=(5,5), strides=(2,2), activation='relu')(inputs)
  x = layers.BatchNormalization()(x)
  x = layers.MaxPool2D(pool_size=(3,3), strides=(2,2))(x)
  x = layers.Conv2D(filters=384, kernel_size=(5,5), strides=(1,1), activation='relu', padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.MaxPool2D(pool_size=(3,3), strides=(2,2))(x)
  x = layers.Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), activation='relu', padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), activation='relu', padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), activation='relu', padding='same')(x)
  x = layers.BatchNormalization()(x)
  x = layers.MaxPool2D(pool_size=(3,3), strides=(2,2))(x)
  x = layers.Flatten()(x)
  inputsFeatures = layers.Input(shape=(1,))
  x = layers.Concatenate()([x, inputsFeatures])
  x = layers.Dense(4096, activation='relu')(x)
  x = layers.Dropout(0.5)(x)
  x = layers.Dense(4096, activation='relu')(x)
  x = layers.Dropout(0.5)(x)
  outputs = layers.Dense(2, activation='linear')(x)
  model = keras.Model(inputs=[inputs, inputsFeatures], outputs=outputs)
  return model

def getModelS(model1, model2):
  last_layer = model1.layers[-1].output
  last_layer1 = model2.layers[-1].output
  outputs = Subtract()([last_layer, last_layer1])
  modelS = keras.Model(inputs=[model1.inputs, model2.inputs], outputs=outputs)
  return modelS

def getModelWeights(path, tf=False):
  if not tf:
    with open(path, 'rb') as file:
      weights = pickle.load(file)
      weights = weights['weights']
      return weights
  else:
    with file_io.FileIO(path, 'rb') as file:
      weights = pickle.load(file)
      weights = weights['weights']
      return weights

class EarlyStoppingByLossVal(Callback):
  def __init__(self, monitor='val_loss', value=0.00001, minEpoch=10, v=4, verbose=0):
    super(Callback, self).__init__()
    self.monitor = monitor
    self.value = value
    self.minEpoch = minEpoch
    self.v = v
    self.verbose = verbose
    self.prevValue = 1000
    self.count = 0

  def on_epoch_end(self, epoch, logs={}):
    current = logs.get(self.monitor)
    if current is None:
      print("Early stopping requires %s available!" % self.monitor)

    if current < self.prevValue or abs(current - self.prevValue) < self.v:
      self.prevValue = current
      if epoch > self.minEpoch and self.count > 10:
        if self.verbose > 0:
          print("\nEpoch %05d: early stopping THR" % epoch)
        self.model.stop_training = True
      self.count = self.count + 1
    else:
      self.count = 0

def calibrateModel(model, train, trainHR, trainLabels):
  model1 = getModel()
  model1.build((None, 79, 79))
  model1.compile(
      optimizer=keras.optimizers.Adam(),
      loss=keras.losses.MeanAbsoluteError()
  )
  model1.set_weights(model.get_weights())

  callbacks = [EarlyStoppingByLossVal(monitor='loss', value=8, minEpoch=20, v=8)]

  model1.fit(
      [train, trainHR],
      trainLabels,
      epochs=40,
      batch_size=1,
      callbacks=callbacks,
      shuffle=False,
      verbose=0
  )

  model2 = getModel()
  model2.build((None, 79, 79))
  model2.compile(
      optimizer=keras.optimizers.Adam(),
      loss=keras.losses.MeanAbsoluteError()
  )
  model2.set_weights(model1.get_weights())

  model1.trainable = False
  model2.trainable = False

  model4 = getModelS(model1, model2)
  return model1, model4

def getLabels(SBP, DBP):
  labels = [[SBP[i], DBP[i]] for i in range(0, len(SBP))]
  labels = np.vstack(labels)
  return labels

calDataPath = os.path.join(basePath, f'user{userID}/calibration/')
# calDataPath = f'gs://{bucket_name}/user{userID}/calibration/'
modelWeightsPath = os.path.join(basePath, f'user{userID}/model/model_weightsPPG.pkl')

allData = []
allHR = []
allLabels = []
calData = []
calHR = []
calLabels = []
predData = []
predHR = []
predLabels = []
# calDataPaths = file_io.list_directory(calDataPath)
calDataPaths = os.listdir(calDataPath)
calTypes = ['baseline', 'resting', 'exercise']

data = db.getCalData(userID, 'all')
allData = []
allHR = []
allSBP = []
allDBP = []
calKeys = ['B', 'R', 'E']
for key in calKeys:
  cal = data[key]
  allData += cal['data']
  allHR += cal['HR']
  allSBP += cal['SBP']
  allDBP += cal['DBP']

X_trainCal = np.vstack(allData)
X_trainHRCal = np.vstack(allHR)
Y_trainCal = getLabels(allSBP, allDBP)

calRE = data['R']['data'] + data['E']['data']
HR = data['R']['HR'] + data['E']['HR']
SBP = data['R']['SBP'] + data['E']['SBP']
DBP = data['R']['DBP'] + data['E']['DBP']

predData = np.vstack(calRE)
predHR = np.vstack(HR)
predLabels = getLabels(SBP, DBP)

calData = np.repeat(data['B']['data'][0], len(predData), axis=0)
calHR = np.repeat(data['B']['HR'], len(predHR), axis=0)
calLabels = np.repeat([[data['B']['SBP'][0], data['B']['DBP'][0]]], len(predLabels), axis=0)

baseModelWeightsPath = os.path.join(basePath, 'custom_prediction_routine_tutorial/model/model_weightsPPG.pkl')
# baseModelWeightsPath = f'gs://{bucket_name}/user{userID}/model/model_weightsPPG2.pkl'
weights = getModelWeights(baseModelWeightsPath, tf=True)
  
modelS = getModel()
modelS.set_weights(weights)

calibrate = True
if calibrate:
  calibratedModels = []
  calibratedValues = []
  m = []
  for i in range(30):
    model1, model4 = calibrateModel(modelS, X_trainCal, X_trainHRCal, Y_trainCal)
    # calibratedModels.append(model4)
    calibratedModels.append(model1)
    prediction = model4.predict([[calData, calHR], [predData, predHR]])
    SBP = calLabels[:,0] - prediction[:,0]
    DBP = calLabels[:,1] - prediction[:,1]
    valueSBP = np.abs(SBP-predLabels[:,0])
    valueDBP = np.abs(DBP-predLabels[:,1])
    values = np.vstack((valueSBP, valueDBP)).T
    value = np.linalg.norm(values)
    # predicted = calibrationWindowLabels - prediction
    # value = np.abs(predicted - Y_trainCal2)
    calibratedValues.append(value)
  print(calibratedValues)
  minValue = np.argmin(calibratedValues)
  # model4 = calibratedModels[minValue]
  model1 = calibratedModels[minValue]
  calibratedModels = []
# else:
  # modelS = keras.models.load_model(filePath)
# fileName = f"modelPPG5.h5"
# filePath = f"/content/drive/MyDrive/Colab Notebooks/{fileName}"
# model1.save(filePath)

weights = {
  'weights': model1.get_weights()
}

with open(modelWeightsPath, 'wb') as file:
  pickle.dump(weights, file)

fileName = os.path.basename(modelWeightsPath)
db.insertModelData(userID, fileName)

# prediction = model4.predict([[X_trainCal3, X_trainHRCal3], [X_trainCal2, X_trainHRCal2]], verbose=0)
# print(prediction)
# SBP = calibrationWindowLabels[:,0] - prediction[:,0]
# DBP = calibrationWindowLabels[:,1] - prediction[:,1]

Writing custom/trainer/task.py


In [42]:
%%writefile custom/trainer/db.py
import json
import pickle
from datetime import datetime, timezone

class Database(object):
  def __init__(self, conn, basePath):
    self.conn = conn
    self.basePath = basePath
  
  def getCalData(self, userID, calType):
    if calType == 'all':
      selectQuery = f"SELECT HR, SBP, DBP, filename, code FROM calibration_info, cal_codes WHERE userid = {userID} AND calibration_info.calcodeid = cal_codes.id"
      try:
        records = self.conn.execute(selectQuery).fetchall()
      except Exception as e:
        print(e)
        return False
      data = {
        'B': {
          'HR': [],
          'SBP': [],
          'DBP': [],
          'data': []
        },
        'R': {
          'HR': [],
          'SBP': [],
          'DBP': [],
          'data': []
        },
        'E': {
          'HR': [],
          'SBP': [],
          'DBP': [],
          'data': []
        }
      }
      for row in records:
        data[row[4]]['HR'].append([row[0]])
        data[row[4]]['SBP'].append(row[1])
        data[row[4]]['DBP'].append(row[2])
        with open(self.basePath + row[3], 'rb') as file:
          curData = pickle.load(file)
        data[row[4]]['data'].append(curData['data'])  
    else:
      selectQuery = f"SELECT HR, SBP, DBP, filename FROM calibration_info WHERE userid = {userID} AND calcodeid = (SELECT id FROM cal_codes WHERE code = \'{calType}\')"
      try:
        records = self.conn.execute(selectQuery).fetchall()
      except Exception as e:
        print(e)
        return False
      data = {
        'HR': [],
        'SBP': [],
        'DBP': [],
        'data': []
      }
      for row in records:
        data['HR'].append([row[0]])
        data['SBP'].append(row[1])
        data['DBP'].append(row[2])
        with open(self.basePath + row[3], 'rb') as file:
          curData = pickle.load(file)
        data['data'].append(curData['data'])
    return data
  
  def insertModelData(self, userID, fileName):
    selectQuery = f'SELECT id FROM models WHERE userid = {userID}'
    id = self.conn.execute(selectQuery).fetchone()
    if id:
      id = id[0]
      status = self.updateModelData(id)
      if status:
        return True
      else:
        return False
    insertQuery = f'INSERT INTO models (userid, filename) VALUES ({userID}, \'{fileName}\')'
    try:
      self.conn.executescript(insertQuery)
      self.conn.commit()
    except Exception as e:
      print(e)
      return False

  def updateModelData(self, id):
    updateQuery = f"UPDATE models SET createtime = datetime('now') WHERE id = {id};"

    try:
      self.conn.executescript(updateQuery)
      self.conn.commit()
    except Exception as e:
      print(e)
      return False
  
    return True

Writing custom/trainer/db.py


## Create Dockerfile

In [43]:
%%writefile custom/Dockerfile
FROM tensorflow/tensorflow:2.3.4-gpu
RUN pip install heartpy pandas keras==2.3.1
WORKDIR /root

WORKDIR /

COPY trainer /trainer

ENTRYPOINT ["python", "-m", "trainer.task"]

Writing custom/Dockerfile


## Build training container

In [44]:
REPOSITORY = "calibration-train"
CONTAINER_NAME = "tf-gpu.2-3"
TAG = "latest"
TRAIN_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{CONTAINER_NAME}:{TAG}"

In [45]:
if newInstance:
  ! gcloud artifacts repositories create {REPOSITORY} --repository-format=docker --location={REGION} --description="Docker repository for custom training"
! gcloud artifacts repositories list

Listing items under project calcium-bot-373723, across all locations.

                                                                               ARTIFACT_REGISTRY
REPOSITORY         FORMAT  MODE                 DESCRIPTION                            LOCATION  LABELS  ENCRYPTION          CREATE_TIME          UPDATE_TIME          SIZE (MB)
calibration-train  DOCKER  STANDARD_REPOSITORY  Docker repository for custom training  us-west4          Google-managed key  2023-03-08T20:59:46  2023-03-21T19:21:11  3260.104


In [48]:
if newInstance:
  %cd custom
  !gcloud builds submit --region={REGION} --tag=$TRAIN_IMAGE
  %cd ..

### Store training code on Cloud Storage Bucket

In [47]:
! rm -f custom.tar custom.tar.gz
! tar cvf custom.tar custom
! gzip custom.tar
! gsutil cp custom.tar.gz $BUCKET_URI/custom_trainer/trainer_PPGtoBP.tar.gz

custom/
custom/trainer/
custom/trainer/task.py
custom/trainer/db.py
custom/trainer/__init__.py
custom/Dockerfile
custom/PKG-INFO
custom/setup.py
custom/setup.cfg
custom/README.md
Copying file://custom.tar.gz [Content-Type=application/x-tar]...
/ [1 files][  3.6 KiB/  3.6 KiB]                                                
Operation completed over 1 objects/3.6 KiB.                                      
