# Google Earth Engine Component

## Initialize

In [14]:
#Import required libraries
import os
import json
from time import time
import math
import itertools

import ee
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import keras_tuner

In [2]:
# Check number of available GPUs
n_gpus = len(tf.config.list_physical_devices('GPU'))
assert n_gpus >= 1
print("Num GPUs Available:", n_gpus)

Num GPUs Available: 1


2022-07-19 06:43:59.955869: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-07-19 06:43:59.963909: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-07-19 06:43:59.965482: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


In [3]:
#Initialize Google Earth Engine
#just needed the 1st time
ee.Authenticate() 
ee.Initialize()

Enter verification code: 4/1AdQt8qiZ7fvAzT1BU6ac6O4P76j-PRt-GuwQqNtRL1SfAzJDvRXoAs-UO6c

Successfully saved authorization token.


## Define Globals

In [4]:
# Define classes
CLASSES = ['water',
           'vegetation_trees',
           'vegetation_grass',
           'turf',
           'impervious',
           'soil']

# Change classes to include lakes
CLASSES[0] = 'pools'
CLASSES_P1 = CLASSES + ['lakes']

N_CLASSES = len(CLASSES)

# Define the label and bands
LABEL = 'landcover'
NBANDS = ['R', 
         'G', 
         'B', 
         'N', 
         'NDVI',
         'N_Entropy', 
         'N_Contrast', 
         'N_Gearys']

ALL_BANDS = NBANDS + ['R_Entropy',
                      'R_Contrast',
                      'R_Gearys',
                      'G_Entropy',
                      'G_Contrast',
                      'G_Gearys',
                      'B_Entropy',
                      'B_Contrast',
                      'B_Gearys']

In [5]:
# Select desired band set
BANDS = NBANDS

## Read in CSV data

In [6]:
def read_data(file, classes=None):
    
    '''
    Read data and reshape for CNN input
    '''
        
    # Read in data and shuffle
    data = pd.read_csv(file).to_numpy()
    np.random.shuffle(data)

    # Split into X and Y
    X, Y = data[:,:-1], data[:, -1].astype(int)
    
    # Print class counts if labels are specified
    if classes:
        _, counts = np.unique(Y, return_counts=True)
        print(file)
        df = pd.DataFrame({'class': classes, 'counts': counts})
        df['proportion'] = df['counts']/df['counts'].sum()
        print(df)
        print()

    # Convert Y to sparse dataset
    sparse_Y = np.zeros((Y.size, Y.max()+1))
    sparse_Y[np.arange(Y.size), Y] = 1

    # Reshape for 1x1 kernel convolutions
    conv_X = X.reshape((X.shape[0], 1, 1, X.shape[1]))
    conv_sparse_Y = sparse_Y.reshape((sparse_Y.shape[0], 1, 1, sparse_Y.shape[1]))
        
    return conv_X, conv_sparse_Y

In [7]:
# File names for the training and testing datasets
IMAGES_DIR = '../datasets/GoogleEarth'
TRAIN_FILE_PREFIX = 'training_waterlake_Nbands_0717_v12'
TEST_FILE_PREFIX = 'testing_waterlake_Nbands_0717_v12' 
FILE_EXT = '.csv'

CSV_TRAIN_FILE_PATH = os.path.join(IMAGES_DIR, (TRAIN_FILE_PREFIX + FILE_EXT))
CSV_TEST_FILE_PATH = os.path.join(IMAGES_DIR, (TEST_FILE_PREFIX + FILE_EXT))

In [8]:
# Set the seed for data shuffle
np.random.seed(123)

conv_train_X, conv_sparse_train_Y = read_data(CSV_TRAIN_FILE_PATH, CLASSES)
conv_test_X, conv_sparse_test_Y = read_data(CSV_TEST_FILE_PATH, CLASSES)

../datasets/GoogleEarth/training_waterlake_Nbands_0717_v12.csv
              class  counts  proportion
0             pools    4754    0.018918
1  vegetation_trees   68413    0.272243
2  vegetation_grass   94741    0.377013
3              turf    3290    0.013092
4        impervious   71101    0.282940
5              soil    8995    0.035795

../datasets/GoogleEarth/testing_waterlake_Nbands_0717_v12.csv
              class  counts  proportion
0             pools    1351    0.030826
1  vegetation_trees    4770    0.108840
2  vegetation_grass    8521    0.194428
3              turf    2635    0.060124
4        impervious    7006    0.159859
5              soil   19543    0.445923



In [9]:
conv_train_X.shape, conv_sparse_train_Y.shape

((251294, 1, 1, 8), (251294, 1, 1, 6))

In [10]:
conv_test_X.shape, conv_sparse_test_Y.shape

((43826, 1, 1, 8), (43826, 1, 1, 6))

## Build NN models

### Custom F1 Metric

In [18]:
class MultiClassFBeta(keras.metrics.Metric):
    '''
    Define a custom F-beta metric class to optimize against
    during hyperparameter tuning. Class can perform F-beta calcutions
    for macro, weighted, and raw scores for any value of beta.
    
    Default is macro F1 score.
    '''
    
    def __init__(self,  n_class=N_CLASSES, name=None, beta=1, average='macro',
                 epsilon=1e-7, **kwargs):
        
        # If name is not provided, set default name
        if not name:
            name = f"{average}_f{beta}"
        print(name)

        # initializing an object of the Metric super class
        super(MultiClassFBeta, self).__init__(name=name, **kwargs)

        # initializing static variables 
        self.beta_squared = beta**2
        self.n_class = n_class
        self.average = average
        self.epsilon = epsilon

        # initializing state variables
        self.tp = self.add_weight(name='tp', 
                                  shape=(self.n_class,), 
                                  initializer='zeros')     # initializing true positives
        self.actual_positives = self.add_weight(name='ap', 
                                                shape=(self.n_class,), 
                                                initializer='zeros') # initializing actual positives
        self.predicted_positives = self.add_weight(name='pp',
                                                   shape=(self.n_class,), 
                                                   initializer='zeros') # initializing predicted positives

    
    def update_state(self, ytrue, ypred, sample_weight=None):

        '''
        Updates the metrics to preserve the running state
        '''

        # casting ytrue and ypred as float dtype
        ytrue = tf.cast(ytrue, tf.float32)
        ypred = tf.cast(ypred, tf.float32)

        # finding the maximum probability in ypred
        max_prob = tf.reduce_max(ypred, axis=-1, keepdims=True)

        # making ypred one hot encoded such that the class with the maximum probability 
        # as encoded as 1 while others as 0
        ypred = tf.cast(tf.equal(ypred, max_prob), tf.float32)

        # Calculate TP, PP, AP
        TP = tf.reshape(tf.reduce_sum(ytrue*ypred, axis=0), [self.n_class])
        PP = tf.reshape(tf.reduce_sum(ypred, axis=0), [self.n_class])
        AP = tf.reshape(tf.reduce_sum(ytrue, axis=0), [self.n_class])

        self.tp.assign_add(TP) # updating true positives atrribute
        self.predicted_positives.assign_add(PP) # updating predicted positives atrribute
        self.actual_positives.assign_add(AP) # updating actual positives atrribute

    def result(self):
    
        '''
        Performs final metric computations and returns result
        '''

        self.precision = self.tp/(self.predicted_positives+self.epsilon) # calculates precision
        self.recall = self.tp/(self.actual_positives+self.epsilon) # calculates recall

        # calculating fbeta score
        self.fb = (1+self.beta_squared)*self.precision*self.recall / (self.beta_squared*self.precision + self.recall + self.epsilon)

        if self.average == 'weighted':
            return tf.reduce_sum(self.fb*self.actual_positives / tf.reduce_sum(self.actual_positives))

        elif self.average == 'raw':
            return self.fb

        return tf.reduce_mean(self.fb)

    def reset_state(self):
        
        '''
        Reset the tracked metrics (state)
        '''

        self.tp.assign(tf.zeros(self.n_class)) # resets true positives to zero
        self.predicted_positives.assign(tf.zeros(self.n_class)) # resets predicted positives to zero
        self.actual_positives.assign(tf.zeros(self.n_class)) # resets actual positives to zero

### KerasTuner

In [19]:
class CNNHyperModel(keras_tuner.HyperModel):

    """
    Custom CNN hyperparamater tuning model. 
    Inherits from keras_tuner HyperModel.
    """
    
    def __init__(self, n_features, n_classes):
        
        super(CNNHyperModel, self).__init__()
        
        self.n_features = n_features
        self.n_classes = n_classes
        

    def build(self, hp):
        
        '''
        Build the model with a range of hyperparameters for tuning.
        Current hyperparameters being tuned:
          - Activation functions
          - Learning rate
          - # hidden layers
          - Nodes (filters) per layer
          - Dropout per layer
          - Batch size
        '''
        
        # Define ranges for activations, learning rate, and # hidden layers 
        activation = hp.Choice("activation", ["relu", "tanh"])
        lr = hp.Choice("lr", [1e-4, 3e-4, 6e-4, 1e-3, 3e-3, 6e-3, 1e-2])
        hidden_layers = hp.Int("layers", min_value=1, max_value=3)
        
        
        # Set model input shape
        model = keras.Sequential()
        model.add(layers.Input((None, None, self.n_features,)))
        
        # For each hiden layer, set the number of nodes/filters and dropout
        for i in range(hidden_layers):
            
            nodes = hp.Int(f"nodes_{i+1}", min_value=8, max_value=64, step=8)
            model.add(layers.Conv2D(nodes, (1,1), activation=activation))
        
            dropout_rate = hp.Float(f"dropout_rate_{i+1}", min_value=0.0, max_value=0.5, step=0.05)
            model.add(layers.Dropout(dropout_rate))
 
        # Add the final output layer
        model.add(layers.Conv2D(self.n_classes, (1,1), activation=tf.nn.softmax))
        
        # Compile the model with the specified loss function, metrics, and learning rate
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
                      loss='categorical_crossentropy',
                      metrics=[MultiClassFBeta(n_class=self.n_classes), 'accuracy'])
        
        # Print model architecture
        print(model.summary())
        
        return model

    def fit(self, hp, model, *args, **kwargs):
        
        '''
        Fit model on data and tune batch size
        '''
        
        return model.fit(
            *args,
            batch_size=hp.Int("batch_size", min_value=16, max_value=126, step=8),
            **kwargs,
        )

## Training pipeline

In [20]:
def tune_model(train_X, train_Y, test_X, test_Y, obj, tuning_dir=None, project_name='CNN', 
               epochs=20, patience=3, search_type='Bayesian', 
               max_trials=10, per_trial=2, bayesian_beta=2, new_search=False):
    
    '''
    Function to perform hyperparameter search.
    
    Params
    ------
    train_X : np.array
        Train dataset feature matrix
    train_Y : np.array
        Train dataset labels matrix
    test_X : np.array
        Test dataset feature matrix
    test_Y : np.array
        Test dataset labels matrix
    obj: str | keras_tuner.Objective
        The objective to optimize for
    tuning_dir: str
        Path to store tuning trials (defaults to 'path/to/current/parent_directory/model_tuning/')
    project_name: str
        Directory under tuning_dir to store model tuning results (defaults to 'CNN')
    epochs: int
        Max number of epochs to train (defaults to 20)
    patience: int
        Patience for EarlyStopping callback (defaults to 3)
    search_type: str
        Hyperparameter search type, either 'Random' or 'Bayesian' (defaults to 'Bayesian')
    max_trials: int
        Maximum number of search trials (defaults to 10)
    per_trial: int
        The number of models that should be built and fit for each trial (defaults to 2)
    bayesian_beta: int
        The balancing factor of exploration and exploitation. 
        The larger it is, the more explorative it is. (defaults to 2)
    new_search: bool
        Whether to overwrite the previous results in the same directory 
        or resume the previous search. (defaults to False)


    Returns
    -------
    CNN_tuner: keras_tuner
        The fitted Keras Tuner that stores the results of the 
        hyperparameter tuning and the associated models.
    '''
    
    
    # Get input and output shapes for the CNN model
    n_features = train_X.shape[-1]
    n_classes = train_Y.shape[-1]
    
    # Specify the default tuning directory
    if not tuning_dir:
        tuning_dir = os.path.join(os.path.dirname(os.getcwd()), "model_tuning")
        tuning_dir

    # Use GPU 
    with tf.device('/device:GPU:0'):
        
        model = CNNHyperModel(n_features=n_features, n_classes=n_classes)
        
        # Random search
        if search_type=='Random':
            CNN_tuner = keras_tuner.RandomSearch(
                hypermodel=model,
                objective=keras_tuner.Objective("val_macro_f1", direction="max"),
                max_trials=max_trials,
                executions_per_trial=per_trial,
                overwrite=new_search,
                directory=tuning_dir,
                project_name=project_name,
            )
        
        
        # BayesianSearch
        elif search_type=='Bayesian':
                CNN_tuner = keras_tuner.BayesianOptimization(
                hypermodel=model,
                objective=keras_tuner.Objective("val_macro_f1", direction="max"),
                max_trials=max_trials,
                beta=bayesian_beta,
                executions_per_trial=per_trial,
                overwrite=new_search,
                directory=tuning_dir,
                project_name=project_name,
            )
        
        # If neither search type is specified, raise an error
        else:
            raise ValueError("search_type must be either 'Random' or 'Bayesian'")
            
        # Fit the hyperparameter tuner
        CNN_tuner.search(x=train_X, y=train_Y, 
                         verbose=2, validation_data=(test_X, test_Y), 
                         epochs=epochs, callbacks=[keras.callbacks.EarlyStopping('val_macro_f1', 
                                                                             patience=patience, 
                                                                             mode='max', 
                                                                             restore_best_weights=True)]
                        )
        
    return CNN_tuner
    

In [39]:
def save_best_models(tuner, model_names, n_models=1, model_dir=None):
    
    '''
    Save the best N models to a directory and return the save path
    '''
    
    # Get the best N models
    best_models = tuner.get_best_models()[:n_models]
    print(best_models)

    # Default model directory
    if not model_dir:
        model_dir = os.path.join(os.path.dirname(os.getcwd()), "models")
        
    if isinstance(model_names, str):
        model_names = [model_names]
    
    print(model_names)
    
    # Save the best N models
    for model, model_name in zip(best_models, model_names):
        save_dir = os.path.join(model_dir, model_name)
        model.save(save_dir)
        
    return save_dir

In [None]:
# Define hyperparameter tuning params
results_dir = 'CNN_Nbands_wlake' 
epochs=20
max_trials=20
new_search=True
per_trial=1
beta=2.1
objective = keras_tuner.Objective("val_macro_f1", direction="max")

# Start hyperparameter search
CNN_tuner = tune_model(conv_train_X, conv_sparse_train_Y, conv_test_X, conv_sparse_test_Y,
                       objective, project_name=results_dir, epochs=epochs, bayesian_beta=beta,
                       max_trials=max_trials, per_trial=per_trial, new_search=new_search)

In [None]:
# Save the best performing model
model_name = f'{results_dir}_{time():.0f}'
save_dir = save_best_models(CNN_tuner, model_name)

print(f"Best N models saved at: {save_dir}")

## Check Hyperparamater Tuning Results

In [18]:
def convert_tuning_results(trial_dict):
    trial_df = pd.DataFrame(trial_dict)
        
    cols = trial_df.columns.tolist()
    metrics = ['macro_f1', 'accuracy']
    for m in metrics:
        cols.remove(m)

    cols = cols + metrics
    trial_df = trial_df[cols]

    df = trial_df.sort_values(by='macro_f1')
    
    return df

In [19]:
def get_tuning_results(model_path):

    trial_dict = {}
    trial_count = 0
    
    for item in os.listdir(model_path):
        path = os.path.join(model_path, item)
        if os.path.isdir(path):
            f = open(os.path.join(path, 'trial.json'))
            data = json.load(f)

            # Track existing keys to add any None values
            existing_keys = set(trial_dict.keys())

            temp_dict = data['hyperparameters']['values']

            macro_f1 = data['metrics']['metrics'].get('val_macro_f1', 0)
            accuracy = data['metrics']['metrics'].get('val_accuracy', 0)
            
            if macro_f1:
                macro_f1 = macro_f1['observations'][0]['value'][0]
            if accuracy:
                accuracy = accuracy['observations'][0]['value'][0]
                
            temp_dict['macro_f1'] = macro_f1
            temp_dict['accuracy'] = accuracy

            for k,v in temp_dict.items():
                if not trial_dict.get(k):
                    trial_dict[k] = []

                    # Append all None for previous trials
                    for i in range(trial_count):
                        trial_dict[k].append(None)

                trial_dict[k].append(v)

                if k in existing_keys:
                    existing_keys.remove(k)

            if existing_keys:
                for k in existing_keys:
                    trial_dict[k].append(None)

            # Update counter
            trial_count += 1

    df = convert_tuning_results(trial_dict)

    return df

In [22]:
model_path = '../model_tuning/CNN_nbands/'
df = get_tuning_results(model_path)
df

Unnamed: 0,hidden_nodes,activation,dropout_rate,lr,batch_size,macro_f1,accuracy
2,4,relu,0.45,0.03,16,0.029853,0.098369
4,32,tanh,0.0,0.03,16,0.07206,0.145274
3,4,tanh,0.3,0.0003,88,0.235266,0.269199
0,32,relu,0.3,0.006,120,0.60455,0.751669
9,20,tanh,0.0,0.0001,120,0.606146,0.792507
1,32,relu,0.3,0.0001,64,0.756491,0.82169
5,32,relu,0.0,0.0001,64,0.836103,0.866926
6,32,relu,0.0,0.0001,120,0.841142,0.862142
8,32,relu,0.0,0.0001,16,0.851653,0.881726
7,32,relu,0.0,0.01,120,0.856703,0.882175


## EEificiation

In [55]:
from tensorflow.python.tools import saved_model_utils

meta_graph_def = saved_model_utils.get_meta_graph_def(save_dir, 'serve')
inputs = meta_graph_def.signature_def['serving_default'].inputs
outputs = meta_graph_def.signature_def['serving_default'].outputs

# Just get the first thing(s) from the serving signature def.  i.e. this
# model only has a single input and a single output.
input_name = None
for k,v in inputs.items():
  input_name = v.name
  break

output_name = None
for k,v in outputs.items():
  output_name = v.name
  break

# Make a dictionary that maps Earth Engine outputs and inputs to
# AI Platform inputs and outputs, respectively.
import json
input_dict = "'" + json.dumps({input_name: "array"}) + "'"
output_dict = "'" + json.dumps({output_name: "output"}) + "'"
print(input_dict)
print(output_dict)

'{"serving_default_input_1:0": "array"}'
'{"StatefulPartitionedCall:0": "output"}'


In [76]:
# Put the EEified model in the appropriate bucket and API name
PROJECT = 'w210-351617'
OUTPUT_BUCKET = 'test-tf-gee'
EEIFIED_DIR = 'gs://' + OUTPUT_BUCKET + f'/{results_dir}'

MODEL_NAME = f'{results_dir}v2'
VERSION_NAME = 'v0'
REGION = 'us-central1'

In [77]:
EEIFIED_DIR

'gs://test-tf-gee/CNN_Nbands_wlake_model'

In [78]:
# Run the model prepare commands
!earthengine set_project {PROJECT}
!earthengine model prepare --source_dir {save_dir} --dest_dir {EEIFIED_DIR} --input {input_dict} --output {output_dict}

Successfully saved project id
Success: model at 'gs://test-tf-gee/CNN_Nbands_wlake_model' is ready to be hosted in AI Platform.


In [79]:
# Create API endpoint hosted on Google AI Platform
!gcloud ai-platform models create {MODEL_NAME} \
  --project {PROJECT} \
  --region {REGION}

!gcloud ai-platform versions create {VERSION_NAME} \
  --project {PROJECT} \
  --region {REGION} \
  --model {MODEL_NAME} \
  --origin {EEIFIED_DIR} \
  --framework "TENSORFLOW" \
  --runtime-version=2.3 \
  --python-version=3.7

Using endpoint [https://us-central1-ml.googleapis.com/]
Created ai platform model [projects/w210-351617/models/CNN_Nbands_wlake_modelv2].
Using endpoint [https://us-central1-ml.googleapis.com/]
Creating version (this might take a few minutes)......done.                    
