## First Code for chilli for best model on Asta using tensorflow

In [None]:
import pandas as pd
import numpy as np
import random
import tensorflow as tf
import sklearn.metrics
from scipy.signal import savgol_filter
from sklearn.utils import shuffle
from tensorflow.keras import Model, Sequential, layers
from tensorflow.keras.layers import Conv1D, Flatten, Dense
from matplotlib import pyplot as plt

b_size = 32

class D1CNN(Model):
    def __init__(self):
        super(D1CNN,self).__init__()
        self.features = Sequential([
            Conv1D(filters = 32,kernel_size = 21,activation = 'relu'),
            layers.BatchNormalization(),
            layers.MaxPool1D(pool_size=2),
            Conv1D(filters = 64,kernel_size = 21,activation = 'relu'),
            layers.BatchNormalization(),
            layers.MaxPool1D(pool_size=3),
            Conv1D(filters = 128,kernel_size = 21,activation = 'relu'),
            layers.BatchNormalization(),
            layers.MaxPool1D(pool_size=3),
            Conv1D(filters = 256,kernel_size = 21,activation = 'relu'),
            layers.BatchNormalization(),
            layers.MaxPool1D(pool_size=3)

        ])

        self.features.add(layers.Dropout(0.4))

        self.flatten = layers.Flatten()

        self.ann_fea = Sequential([
            Dense(512,activation = 'relu'),
            layers.Dropout(0.2),
            Dense(512,activation = 'relu'),
            layers.Dropout(0.2),
            Dense(128,activation = 'relu'),
            layers.Dropout(0.2),
            Dense(128,activation = 'relu'),
            layers.Dropout(0.2),
            Dense(64,activation = 'relu'),
            layers.Dropout(0.2),
            Dense(64,activation = 'relu'),
            layers.Dropout(0.2),
            Dense(1)
        ])

    def call(self,x):
        x = tf.expand_dims(x,-1)
        x = self.features(x)
        x = self.flatten(x)
        x = self.ann_fea(x)
        return x


# preprocessing functions
def moving_smoothing(input_array, window_length):

    m_rows = len(range(window_length, input_array.shape[1] - window_length))
    m_cols = 2*window_length
    matrix = np.zeros((m_rows,m_cols),dtype=int)
    for j in range(0, len(matrix)):
        k = j+1
        matrix[j] = [x for x in range(k,k+2*window_length)]
    #Smoothing spectra using matrix operations:
    n_cols = m_rows
    newspectra = np.zeros((len(input_array),n_cols))
    for i in range(len(matrix)):
        newspectra[:,i] = np.mean(input_array[: ,matrix[i]],axis=1)
    #Add front and end tails (not smoothed):
    #new_spectra = np.asarray(newspectra)
    fronttail = newspectra[:,:1]
    endtail = newspectra[:,-1:]
    for k in range(1,window_length):
        fronttail=np.append(fronttail,newspectra[:,:1], axis=1)
        endtail = np.append(endtail,newspectra[:,-1:], axis =1)
    data = np.concatenate((fronttail, newspectra, endtail), axis=1)
    return data

def derivate_first(input_array, window_length, polyorder):
    der1 = savgol_filter(input_array, window_length, polyorder,deriv = 1)
    return der1

def MSC(input_array, reference = None):
    ''' Perform Multiplicative scatter correction'''

    #Mean correction
    for i in range(input_array.shape[0]):
        input_array[i,:] -= input_array[i,:].mean()

    # Get the reference spectrum. If not given, estimate from the mean    
    if reference is None:    
        # Calculate mean
        ref = np.mean(input_array, axis=0)
    else:
        ref = reference

    # Define a new data matrix and populate it with the corrected data    
    output_data = np.zeros_like(input_array)
    for i in range(input_array.shape[0]):
        # Run regression
        fit = np.polyfit(ref, input_array[i,:], 1, full=True)
        # Apply correction
        output_data[i,:] = (input_array[i,:] - fit[0][1]) / fit[0][0] 
    #print(fit)
    return output_data

def detrending(input_array):
    X = np.arange(input_array.shape[1])
    base = np.zeros((len(input_array),len(X)))
    for i in range(len(input_array)):
        c = np.polyfit(X, input_array[i], 2)
        base[i] = np.polyval(c, X)
    #Baseline removal
    base_remove = input_array - base
    return base_remove

def savitzky(input_array, window_length, polyorder):
    savgol = savgol_filter(input_array, window_length, polyorder)
    return savgol

#to divide data into test train and valid
def data_1D(f):
    sed = 100
    df = pd.read_csv(f,encoding='utf-8')
    col = ['Device ID','Sample ID','Cap','800']
    #col = [str(x) for x in range(0,151)]+[str(x) for x in range(751,801)]+['Sample ID','Device ID','Cap']
    d_asta = df.drop(columns=col)
    d_asta = d_asta.dropna(subset=['ASTA'])
    X_train = []
    X_val = []
    Y_train = []
    Y_val = []
    l = [55,70,76,96,96,86,86,75] # for 600 samples to train.
    k = 40
    for i in range(len(l)):
        qr = 'ASTA>='+str(k+i*10)+' & ASTA<'+str(k+(i+1)*10)
        data_asta_3 = d_asta.query(qr)
        data_asta_3_1 = data_asta_3.sample(n=l[i],random_state = sed)
        data_asta_3_2 = data_asta_3.drop(data_asta_3_1.index)

        if (len(data_asta_3_2.index)<14):
            data_asta_3_2 = data_asta_3_2.sample(n=len(data_asta_3_2.index),random_state = sed)
        else:
            data_asta_3_2 = data_asta_3_2.sample(n=14,random_state = sed)
        
        X_train.append(np.array(data_asta_3_1.drop(columns = 'ASTA')))
        Y_train.append(np.array(data_asta_3_1['ASTA']))
        X_val.append(np.array(data_asta_3_2.drop(columns = 'ASTA')))
        Y_val.append(np.array(data_asta_3_2['ASTA']))
    
    X_train = np.vstack(X_train)
    Y_train = np.hstack(Y_train)
    X_val = np.vstack(X_val)
    Y_val = np.hstack(Y_val)

    #x_train = pd.DataFrame(X_train)
    #x_train['Actual Y'] = Y_train
    #x_train.to_csv('train_chili.csv')

    X_train = savitzky(X_train,11,2)
    X_train = MSC(X_train)
    X_train = detrending(X_train)
    X_train = derivate_first(X_train,11,2)

    X_val = savitzky(X_val,11,2)
    X_val = MSC(X_val)
    X_val = detrending(X_val)
    X_val = derivate_first(X_val,11,2)


    return X_train,Y_train,X_val,Y_val

def self_plot(train,test,l1,l2):
    fig = plt.figure(figsize=(18,9))
    ax1 = fig.add_subplot(1,1,1)
    ax1.plot(train,label = l1)
    ax1.plot(test,label = l2)
    ax1.legend()

def run():
    n_epoch = 8000
    model = D1CNN()
    optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-6)
    loss = tf.keras.losses.MeanAbsoluteError()
    accuracy = tf.keras.metrics.MeanAbsoluteError()
    file_name = '/content/drive/MyDrive/Chili/chili-001.csv'
    x_train,y_train,x_val,y_val = data_1D(file_name)
    model.compile(optimizer = optimizer, loss = loss, metrics = accuracy)
    history = model.fit(x_train,y_train,epochs = n_epoch,shuffle = True,validation_data = (x_val,y_val),batch_size = b_size,verbose = 1)

    self_plot(history.history['loss'],history.history['val_loss'],'training loss','testing_loss')
    self_plot(history.history['accuracy'],history.history['val_accuracy'],'training accuracy','testing_accuracy')

if __name__ == '__main__':
    run()

Epoch 1/8000
Epoch 2/8000
 1/20 [>.............................] - ETA: 4s - loss: 84.1101 - mean_absolute_error: 84.1101

KeyboardInterrupt: ignored

# NIR feature extraction based on deep auto-encoder Neural Network

### DAE with LDS on ASTA

Wihtout Outlier Removal , 
Model config - 800,128, 16,4

In [None]:
import pandas as pd
import numpy as np
import random
import tensorflow as tf
import sklearn.metrics
from scipy.signal import savgol_filter
from sklearn.utils import shuffle
from scipy.stats import chi2
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.covariance import EmpiricalCovariance, MinCovDet
from scipy.ndimage import convolve1d
from scipy.signal.windows import triang
from scipy.ndimage import gaussian_filter1d
import torch.nn.functional as F
import sys
from scipy.stats import chi2
import io
import matlplotlib.pyplot as plt



b_size = 32 #batch size
np.random.seed(100)

class VAE(tf.keras.Model):

    def __init__(self):
        super(VAE,self).__init__()
        l2 = tf.keras.regularizers.l2(l2 = 0.3)
        self.encoder = tf.keras.Sequential(
            [
             tf.keras.layers.InputLayer(shape = (800,),batch_size = b_size)
             tf.keras.layers.Dense(128,activation = 'ReLu',kernel_regularizer = l2)
             tf.keras.layers.Dense(16, activation = 'ReLu',kernel_regularizer = l2 )
             tf.keras.layers.Dense(4, activation = 'ReLu',kernel_regularizer = l2)
            ]
        )
        self.decoder = tf.keras.Sequential(
            [
             tf.keras.layers.InputLayer(shape = (4,), batch_size = b_size)
             tf.keras.layers.Dense(16,activation = 'ReLu',kernel_regularizer = l2)
             tf.keras.layers.Dense(128, activation = 'ReLu',kernel_regularizer = l2 )
             tf.keras.layers.Dense(800)
            ]
        )

      def call(self,x):
          x = self.encoder(x)
          x = self.decoder(x)
          return x




# preprocessing functions
def moving_smoothing(input_array, window_length):

    m_rows = len(range(window_length, input_array.shape[1] - window_length))
    m_cols = 2*window_length
    matrix = np.zeros((m_rows,m_cols),dtype=int)
    for j in range(0, len(matrix)):
        k = j+1
        matrix[j] = [x for x in range(k,k+2*window_length)]
    #Smoothing spectra using matrix operations:
    n_cols = m_rows
    newspectra = np.zeros((len(input_array),n_cols))
    for i in range(len(matrix)):
        newspectra[:,i] = np.mean(input_array[: ,matrix[i]],axis=1)
    #Add front and end tails (not smoothed):
    #new_spectra = np.asarray(newspectra)
    fronttail = newspectra[:,:1]
    endtail = newspectra[:,-1:]
    for k in range(1,window_length):
        fronttail=np.append(fronttail,newspectra[:,:1], axis=1)
        endtail = np.append(endtail,newspectra[:,-1:], axis =1)
    data = np.concatenate((fronttail, newspectra, endtail), axis=1)
    return data

def derivate_first(input_array, window_length, polyorder):
    der1 = savgol_filter(input_array, window_length, polyorder,deriv = 1)
    return der1

def MSC(input_array, reference = None):
    ''' Perform Multiplicative scatter correction'''

    #Mean correction
    for i in range(input_array.shape[0]):
        input_array[i,:] -= input_array[i,:].mean()

    # Get the reference spectrum. If not given, estimate from the mean    
    if reference is None:    
        # Calculate mean
        ref = np.mean(input_array, axis=0)
    else:
        ref = reference

    # Define a new data matrix and populate it with the corrected data    
    output_data = np.zeros_like(input_array)
    for i in range(input_array.shape[0]):
        # Run regression
        fit = np.polyfit(ref, input_array[i,:], 1, full=True)
        # Apply correction
        output_data[i,:] = (input_array[i,:] - fit[0][1]) / fit[0][0] 
    #print(fit)
    return output_data

def detrending(input_array):
    X = np.arange(input_array.shape[1])
    base = np.zeros((len(input_array),len(X)))
    for i in range(len(input_array)):
        c = np.polyfit(X, input_array[i], 2)
        base[i] = np.polyval(c, X)
    #Baseline removal
    base_remove = input_array - base
    return base_remove

def savitzky(input_array, window_length, polyorder):
    savgol = savgol_filter(input_array, window_length, polyorder)
    return savgol

def get_lds_kernel_window(kernel, ks, sigma):
    assert kernel in ['gaussian', 'triang', 'laplace']
    half_ks = (ks - 1) // 2
    if kernel == 'gaussian':
        base_kernel = [0.] * half_ks + [1.] + [0.] * half_ks
        kernel_window = gaussian_filter1d(base_kernel, sigma=sigma) / max(gaussian_filter1d(base_kernel, sigma=sigma))
    elif kernel == 'triang':
        kernel_window = triang(ks)
    else:
        laplace = lambda x: np.exp(-abs(x) / sigma) / (2. * sigma)
        kernel_window = list(map(laplace, np.arange(-half_ks, half_ks + 1))) / max(map(laplace, np.arange(-half_ks, half_ks + 1)))

    return kernel_window

def get_weights(l):
    lds_kernel_window = get_lds_kernel_window('gaussian', 3, 1)
    smoothed_value = convolve1d(np.asarray(l),weights = lds_kernel_window,mode='constant')
    smoothed_value[0] = smoothed_value [1]
    smoothed_value[-1] = smoothed_value[-2]
    weights = [np.float32(1 / x) for x in smoothed_value]
    scaling = len(weights) / np.sum(weights)
    weights = [scaling * x for x in weights]
    return weights



def data_1D(f):
    sed = 100
    df = pd.read_csv(f,encoding='utf-8')
    #col = [str(x) for x in range(0,151)]+[str(x) for x in range(751,801)]+['Sample ID','Cap']
    col = ['Sample ID','Cap','800','Device ID']
    d_asta = df.drop(columns=col)
    d_asta = d_asta.dropna(subset=['ASTA'])
    d_asta['ASTA'] = d_asta['ASTA'].round()
    d_asta = d_asta[d_asta['ASTA']>=40]
    d_asta = d_asta[d_asta['ASTA']<120]

    X_train = []
    X_val = []
    Y_train = []
    Y_val = []

    value_dict = {x:0 for x in range(40,120)}
    labels = d_asta['ASTA']
    
    
    for label in labels:
        value_dict[(int(label))]+=1

    l_1 = np.array([x for x in value_dict.values()])

    for x,y in value_dict.items():
        if(y>1):
            qr = 'ASTA=='+str(x)
            data_asta_3 = d_asta.query(qr)
            data_asta_3_2 = data_asta_3.sample(n=1,random_state = sed)
            data_asta_3_1 = data_asta_3.drop(data_asta_3_2.index)
        
        X_train.append(np.array(data_asta_3_1.drop(columns = ['ASTA'])))
        Y_train.append(np.array(data_asta_3_1['ASTA']))
        X_val.append(np.array(data_asta_3_2.drop(columns = ['ASTA'])))
        Y_val.append(np.array(data_asta_3_2['ASTA']))
    
    X_train = np.vstack(X_train)
    Y_train = np.hstack(Y_train)
    X_val = np.vstack(X_val)
    Y_val = np.hstack(Y_val)

    x_train = pd.DataFrame(X_train)
    x_train['Actual Y'] = Y_train
    x_train.to_csv('train_chili.csv')

    idx = np.random.randint(X_train.shape[0], size = X_train.shape[0]//b_size *b_size)
    X_train = X_train[idx,:]
    Y_train = Y_train[idx]

    X_train = savitzky(X_train,11,2)
    X_train = MSC(X_train)
    X_train = detrending(X_train)
    X_train = derivate_first(X_train,11,2)

    X_val = savitzky(X_val,11,2)
    X_val = MSC(X_val)
    X_val = detrending(X_val)
    X_val = derivate_first(X_val,11,2)


    return X_train,Y_train,X_val,Y_val,get_weights(l_1)

def weighted_l1_loss(inputs, targets, weights=None):
    loss = F.l1_loss(inputs, targets, reduction='none')
    if weights is not None:
        loss *= weights.expand_as(loss)
    loss = torch.mean(loss)
    return loss

def weighted_mse_loss(inputs, targets, weights=None):
    loss = (inputs - targets) ** 2
    if weights is not None:
        loss *= weights.expand_as(loss)
    loss = torch.mean(loss)
    return loss

def run():
    n_epoch = 8000
    model = VAE()
    optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-6)
    loss = weighted_mse_loss
    accuracy = tf.keras.metrics.MeanSquaredError()
    file_name = '/content/drive/MyDrive/Chili/chili-001.csv'
    x_train,y_train,x_val,y_val = data_1D(file_name)
    model.compile(optimizer = optimizer, loss = loss, metrics = accuracy)
    history = model.fit(x_train,y_train,epochs = n_epoch,shuffle = True,validation_data = (x_val,y_val),batch_size = b_size,verbose = 1)

    self_plot(history.history['loss'],history.history['val_loss'],'training loss','testing_loss')
    self_plot(history.history['accuracy'],history.history['val_accuracy'],'training accuracy','testing_accuracy')


if __name__ == '__main__':
    run()






