<a href="https://colab.research.google.com/github/tourfade/uqtr-selmet/blob/main/Resnet1DRegressionSelmet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:

import os, io , skimage
import pandas as pd
from scipy.io import arff
import tensorflow as tf
from tensorflow.keras import activations, optimizers
from tqdm.keras import TqdmCallback
import numpy as np
from keras import Model
from keras.models import Sequential
from keras.layers import Dense, Conv1D, Dropout, Flatten, AveragePooling1D, Add, MaxPooling1D,ZeroPadding1D, Input, Activation, GaussianNoise, Reshape, BatchNormalization,GlobalAveragePooling1D 
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn import metrics
from sklearn.metrics import confusion_matrix, precision_score, accuracy_score
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import train_test_split
import urllib.request
import matplotlib.pyplot as plt
from google.colab import files
from skimage import data
from skimage.transform import rescale, resize, downscale_local_mean

In [4]:
opt = optimizers.Adam(learning_rate=0.001,  epsilon=0.0001)
label = "omdcell"
p = ""
batch_size = 10
epochs = 50
percent_test = 0.20

#opt = optimizers.SGD( learning_rate=0.005, momentum=0.01, nesterov=True, name="SGD")
#opt = optimizers.RMSprop(learning_rate=0.0001)

In [5]:
 
 
def build_dataset(df):
    Y = df[df.columns[-1]] 
    X = df[df.columns[0:-1]]
    X = X.values.reshape(X.shape[0], X.shape[1], 1)
    Y = Y.values.reshape(Y.shape[0], 1, 1)
    return (X, Y)

In [6]:
# Train function
#def root_mean_squared_error(y_true, y_pred):
 #       return K.sqrt(K.mean(K.square(y_pred - y_true))) 

def root_mean_squared_error(y,x):
        return (tf.sqrt(tf.reduce_mean(tf.square(x - y),axis = [0,1])))

def build_classifier(x_train, y_train, name):
    
    model = get_model()
    checkpoint_path = "train/"+name+".ckpt"
    checkpoint_dir = os.path.dirname(checkpoint_path)
    model.compile(loss= tf.keras.losses.MSE,
                  optimizer = opt, 
                  metrics=[tf.keras.metrics.RootMeanSquaredError()])
    # Create checkpoint callback
    cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, 
                                                     save_weights_only = True,
                                                     verbose=0, 
                                                     callbacks=[TqdmCallback(verbose=2)])
    
    x_cal, x_val, y_cal, y_val = train_test_split(x_train, y_train, test_size=percent_test, shuffle= True)

    
    result = model.fit(x_cal, y_cal, 
                        validation_data=(x_val, y_val),
                        callbacks = [cp_callback], 
                        #steps_per_epoch = 406,
                        batch_size = batch_size,
                        shuffle = True,
                        epochs = epochs )
    model.summary()
    # summarize history for accuracy
    plt.figure( )
    plt.subplots(figsize = (10, 5))
    plt.title('accuracies-loss')
    rmqe = result.history['root_mean_squared_error']
    plt.plot(rmqe, color = 'g', label='train')
    val_rmqe = result.history['val_root_mean_squared_error']
    plt.plot(val_rmqe, color = 'r', label='val')
    norm_loss = result.history['loss']/np.max(result.history['loss'])
    plt.plot(norm_loss, color = 'b', label='loss')
    plt.legend()
    plt.show()
    return checkpoint_path

In [7]:
def test_classifier(x_test, y_test, path, name):
    model = get_model()
    model.load_weights(path).expect_partial()
    model.compile(loss= tf.keras.losses.MSE,
                  optimizer=opt,
                  metrics=[tf.keras.metrics.RootMeanSquaredError()])
    
  
    
    print('(loss, rmse)', model.evaluate(x_test, y_test))
    return model
    

In [8]:
def identity_block(X, f, filters, stage, block):
  conv_base_name = 'res' + str(stage) + block + '_branch'
  bn_base_name = 'bn' + str(stage) + block + '_branch'
  f1, f2, f3 = filters
  X_shortcut = X

  X = Conv1D(filters = f1, kernel_size = 1, strides = 1,   name = conv_base_name+'2a', kernel_initializer='glorot_uniform')(X)
  X = BatchNormalization(axis = 1, name = bn_base_name+'2a')(X)
  X = Activation('relu')(X)

  X = Conv1D(filters = f2, kernel_size = f, strides = 1, padding = 'same', name = conv_base_name+'2b', kernel_initializer='glorot_uniform')(X)
  X = BatchNormalization(axis = 1, name = bn_base_name+'2b')(X)
  X = Activation('relu')(X)

  X = Conv1D(filters = f3, kernel_size = 1, strides = 1,  name = conv_base_name+'2c', kernel_initializer='glorot_uniform')(X)
  X = BatchNormalization(axis = 1, name = bn_base_name+'2c')(X)
  
  X = Add()([X, X_shortcut])
  X = Activation('relu')(X)
  return X

def conv_block(X, f, filters, stage, block, s =2):
  conv_base_name = 'res' + str(stage) + block + '_branch'
  bn_base_name = 'bn' + str(stage) + block + '_branch'
  f1, f2, f3 = filters
  X_shortcut = X

  X = Conv1D(filters = f1, kernel_size = 1, strides = s,  name = conv_base_name+'2a', kernel_initializer='glorot_uniform')(X)
  X = BatchNormalization(axis = 1, name = bn_base_name+'2a')(X)
  X = Activation('relu')(X)

  X = Conv1D(filters = f2, kernel_size = f, strides = 1, padding = 'same', name = conv_base_name+'2b', kernel_initializer='glorot_uniform')(X)
  X = BatchNormalization(axis = 1, name = bn_base_name+'2b')(X)
  X = Activation('relu')(X)

  X = Conv1D(filters = f3, kernel_size = 1, strides = 1, name = conv_base_name+'2c', kernel_initializer='glorot_uniform')(X)
  X = BatchNormalization(axis = 1, name = bn_base_name+'2c')(X)
  
  ##### SHORTCUT PATH ####
  X_shortcut = Conv1D(filters = f3, kernel_size = 1, strides = s, name = conv_base_name + '1', kernel_initializer = 'glorot_uniform')(X_shortcut)
  X_shortcut = BatchNormalization(axis = 1, name = bn_base_name + '1')(X_shortcut)

  X = Add()([X, X_shortcut])
  X = Activation('relu')(X)
  return X


In [9]:
def get_model():   
    # Define the input as a tensor with shape input_shape
    input_shape = (700, 1)
    X_input = Input(input_shape)

    # Zero-Padding
    X = ZeroPadding1D(padding = 3)(X_input)
    
    # Stage 1
    X = Conv1D(64, 7, strides = 2, name = 'conv1', kernel_initializer = "glorot_uniform")(X)
    X = BatchNormalization(axis = 1, name = 'bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling1D(3, strides=2)(X)

    # Stage 2
    X = conv_block(X, f = 3, filters = [64, 64, 256], stage = 2, block='a', s = 1)
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='b')
    X = identity_block(X, 3, [64, 64, 256], stage=2, block='c')

    # Stage 3
    X = conv_block(X, f = 3, filters = [128, 128, 512], stage = 3, block='a', s = 2)
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='b')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='c')
    X = identity_block(X, 3, [128, 128, 512], stage=3, block='d')

    # Stage 4
    X = conv_block(X, f = 3, filters = [256, 256, 1024], stage = 4, block='a', s = 2)
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='b')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='c')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='d')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='e')
    X = identity_block(X, 3, [256, 256, 1024], stage=4, block='f')

    # Stage 5
    X = conv_block(X, f = 3, filters = [512, 512, 2048], stage = 5, block='a', s = 2)
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='b')
    X = identity_block(X, 3, [512, 512, 2048], stage=5, block='c')

    # AVGPOOL.
    X = AveragePooling1D(2, name='avg_pool')(X)

    # output layer
    X = Flatten()(X)
    X = Dense(1, activation='linear', name='fc1', kernel_initializer = "glorot_uniform")(X)
    
    # Create model
    model = Model(inputs = X_input, outputs = X, name='ResNet1D50')

    return model

In [10]:
#@title Titre par défaut
train_df = pd.read_csv("/content/drive/MyDrive/cirad/selmet/"+label+"/"+label+"_train_data"+p+".csv", sep=',')

x_train, y_train = build_dataset(train_df)
mean = x_train.mean();
std = x_train.std()
x_train = (x_train - mean) /std


In [None]:
path =  build_classifier(x_train, y_train,  'model1') 


0epoch [00:00, ?epoch/s]

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50

In [None]:
test_df = pd.read_csv("/content/drive/MyDrive/cirad/selmet/"+label+"/"+label+"_test_data"+p+".csv", sep=',')

x_test, y_test = build_dataset(test_df)
x_test =  (x_test - mean) /std


model = test_classifier(x_test, y_test, path, 'model1')

y_predicted =  model.predict(x_test) 
y_g_truth =    y_test

plt.figure( )
plt.subplots(figsize = (10, 5))
plt.title('plot')
 
x = list(range(0,y_test.shape[0]))
y_predicted = y_predicted.reshape(1,-1)[0]
plt.scatter(y_g_truth, y_predicted.transpose() ,  s=10, c ='r', label='pred')
#plt.scatter(x, y_g_truth ,  s=10, c  = 'r', label='truth')
plt.legend()
plt.show()


 