In [47]:
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import os
import tensorflow as tf
import math
import random
from ast import literal_eval

In [54]:
class MiniPINN(tf.keras.Model):
    def __init__(self, n_channels, speed, array_geometry, he_initializer = True):
        super().__init__()
        self.array_geometry = array_geometry
        self.speed = speed
        self.initializer = tf.keras.initializers.HeNormal(1561)
        if not he_initializer:
            self.initializer = tf.keras.initializers.GlorotUniform(1561)
        self.net = tf.keras.Sequential([
            tf.keras.layers.Input(n_channels),
            tf.keras.layers.Dense(units=512, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=256, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=128, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=64, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=32, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=16, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=8, activation='relu', kernel_initializer = self.initializer),
            tf.keras.layers.Dense(units=1, activation = 'linear')
        ]) 

    def train_step(self, dataset):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        for x, y in dataset:
            with tf.GradientTape() as tape:
                y_pred = self(x, training=True)  # Forward pass
                # Compute the loss value
                # (the loss function is configured in `compile()`)
                loss = self.compute_loss(y=y, y_pred=y_pred, x, self.speed)
    
            # Compute gradients
            trainable_vars = self.trainable_variables
            gradients = tape.gradient(loss, trainable_vars)
            # Update weights
            self.optimizer.apply_gradients(zip(gradients, trainable_vars))
            # Update metrics (includes the metric that tracks the loss)
            for metric in self.metrics:
                if metric.name == "loss":
                    metric.update_state(loss)
                else:
                    metric.update_state(y, y_pred)
            # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

    
 
    def predict(self, inputs,verbose=False):
        output = self.net(inputs)
        output = tf.math.floormod(output,360.0)
        return output
    def call(self, inputs):
        output = self.net(inputs)
        output = tf.math.floormod(output,360.0)
        return output

In [None]:
class MiniPINNLoss(tf.keras.losses.Loss):
    # MniPINN Loss introducing physic to the model
    # Taus = R * cos(Theta) / C
    # Taus is time delays
    # R is the array geometry
    # C is speed
    def __init__(self, array_geometry):
        super().__init__()
        self.taus = time_delays # (Batch size, num of channels)
        self.r = array_geometry # (num of channels,2)
        self.c = speed # (Batch size, 1)
    def call(self, y_actual, y_predicted, time_delays, speed):
        # input time delays
        taus = time_delays
        # speed of sound through media
        c = speed
        
        # convert degree to radian
        actual_rad = y_actual*np.pi/180
        predicted_rad = y_predicted*np.pi/180

        # slowness vector (2,None)
        # since only dealing with azimuth
        slowness = tf.Tensor(np.array([tf.math.cos(predicted_rad), tf.math.sin(predicted_rad)])) / c

        # calculate time delay from predicted angle
        taus_hat = tf.tensordot(self.r, slowness, 1)
        
        # calculate the physical err
        phys_err = taus - taus_hat
        # calculate the physical loss
        phys_loss = tf.math.reduce_mean(phys_err)

        # MSE loss
        mse_loss = tf.keras.losses.MSE(y_actual, y_predicted)

        # Total loss
        loss = mse_loss + phys_loss
        return loss

In [53]:
def cossin_loss(y_actual, y_predicted):
    # convert degree to radian
    actual_rad = y_actual*np.pi/180
    predicted_rad = y_predicted*np.pi/180
    
    # the cosine and sine loss
    cos_loss = 1 - tf.math.cos(actual_rad - predicted_rad)
    sin_loss = tf.math.sin(actual_rad - predicted_rad)
    
    # loss 
    loss = cos_loss + sin_loss
    return loss

In [53]:
def cos_loss(y_actual, y_predicted):
    # convert degree to radian
    actual_rad = y_actual*np.pi/180
    predicted_rad = y_predicted*np.pi/180
    
    # the cosine loss
    loss = 1 - tf.math.cos(actual_rad-predicted_rad)
    return loss

In [55]:
def selector_loss(y_actual, y_predicted):
    # Select the minimum angle to deal with wrap around problem
    loss = tf.math.minimum(360-tf.math.abs(y_predicted-y_actual), \
                           tf.math.abs(y_predicted-y_actual))
    return loss

In [3]:
def training_evaluation(list_channels, AA_geometry, inputs, labels, he_initializer=True, loss ='mse', floormod=True, epochs = 50, plot = False, save_fig = False):
    models = []
    losses = []
    evaluates = []
    for channels in list_channels:
        # Get the geometry of the channels only in x and y planes
        geometry = AA_geometry[channels][:,:2]
        dataset = DataSetPacker(inputs, labels, channels)
        train_dataset, val_dataset, test_dataset = dataset.split(shuffle=False)
        model = MyModel(np.shape(channels), he_initializer, floormod=floormod)
        match loss:
            case 'mse':
                model.compile(optimizer='adam', loss='mse', metrics=['mse','mae'])
            case 'cossin':
                model.compile(optimizer='adam', loss=cossin_loss, metrics=['mse','mae'])
            case 'cos':
                model.compile(optimizer='adam', loss=cos_loss, metrics=['mse','mae'])
            case 'selector':
                model.compile(optimizer='adam', loss=selector_loss, metrics=['mse','mae'])
            case 'minipinn':
                model.compile(optimizer='adam', loss=minipinn_loss(, metrics=['mse','mae'])
        #model.compile(optimizer='adam', loss=cossin_loss, metrics=['mse','mae'])
        l = model.fit(train_dataset.batch(32), epochs=epochs, validation_data = val_dataset.batch(32), verbose=False)
        eva = EvaluateModel.evaluate(model, test_dataset, False)
        models.append(model)
        losses.append(l)
        evaluates.append(eva)
        if plot:
            if save_fig:
                EvaluateModel.plot_training(l, channels, save_dir = 'plot/history/')
                EvaluateModel.plot_evaluation(model, channels, test_dataset, save_dir = 'plot/evaluation/')
            else:
                EvaluateModel.plot_training(l, channels)
                EvaluateModel.plot_evaluation(model, channels, test_dataset)
    return models, evaluates, l
    