<a href="https://colab.research.google.com/github/jcandane/StochasticPhysics/blob/main/instrument.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## import stuff

Also include default test cases

In [None]:
import numpy as np
import scipy
import gpcam
from gpcam import AutonomousExperimenterGP
import time
import h5py ## https://docs.h5py.org/en/stable/quick.html
import datetime

import plotly.graph_objects as go
def plot2d(xdata = None, ydata=None):
    """ Plot in plotly
    """
    fig = go.Figure()
    #fig.add_trace(go.Surface(x=x, y=y, z=z))
    if xdata is not None:
        fig.add_trace(go.Scatter3d(x=xdata[:,0], y=xdata[:,1], z=ydata[:,0], mode='markers'))

    fig.update_layout(title='Ground Truth Random-Contious 2d-Function', autosize=True, width=800, height=800, margin=dict(l=65, r=50, b=65, t=90))
    fig.show()


data_labels=["x_data", "y_data", "noise variance", "cost", "id", "time stamp", "date time", "measured"]
list_of_data=[{'x_data': np.array([3.04986762, 4.66446892]),
  'y_data': -0.6519311743614136,
  'noise variance': None,
  'cost': [np.array([0, 0]), np.array([3.04986762, 4.66446892]), 7.714336536446131],
  'id': '3b115847-cbe5-4c19-b2c9-0004455f8a52',
  'time stamp': 1705512086.8447511,
  'date time': '17/01/2024_11:21:26',
  'measured': False},
 {'x_data': np.array([1.07469298, 6.04996553]),
  'y_data': -0.13806646827231991,
  'noise variance': None,
  'cost': [np.array([0, 0]), np.array([1.07469298, 6.04996553]), 7.124658510619914],
  'id': 'e343a7e8-b133-415e-a69f-6768438e6934',
  'time stamp': 1705512086.844794,
  'date time': '17/01/2024_11:21:26',
  'measured': False},
 {'x_data': np.array([4.54385235, 6.1862373 ]),
  'y_data': 0.9841470549061848,
  'noise variance': None,
  'cost': [np.array([0, 0]), np.array([4.54385235, 6.1862373 ]), 10.730089647376804],
  'id': '1e7a8bc2-9e1e-4f92-b5e0-f8205b63f259',
  'time stamp': 1705512086.844807,
  'date time': '17/01/2024_11:21:26',
  'measured': False},
 {'x_data': np.array([0.36948575, 7.26536243]), "y_data": None }]


def instrument0(data):
    """
    GIVEN : data (List[dic]) gpCAM only gives the data it is using at-the-moment
    """
    for entry in data:
        entry["y_data"] = np.sin(np.linalg.norm(entry["x_data"]))
        entry["cost"]   = [np.array([0,0]),entry["x_data"],np.sum(entry["x_data"])]
    return data

#### GP quick-test???
START=time.time()
my_ae = AutonomousExperimenterGP(np.array([[0,10],[0,10]]),
                                 np.ones((3)),
                                 np.array([[0.001,100.],[0.001,100.],[0.001,100.]]), ## limits
                                 init_dataset_size= 20,
                                 instrument_function = instrument0,
                                 cost_function_parameters={"offset": 5.0,"slope":10.0},
                                 store_inv = True)
my_ae.train(method = "hgdl")      ### trains the kernel, to find optimial hyperparameters given the intiially-random data
my_ae.go(len(my_ae.x_data) + 10)  ### runs the Autonomous Experiment
print(time.time()-START)

3.0624289512634277


## RCF

In [None]:
class RCF():
    """
    this an object of a random-contionus function, with-respect-to a GP kernel
    f : IN -> OUT
    can we be given the points?
    a previous function to compose with?
    Automatic Derivatives??
    """

    def __init__(self, Domain, X, D=1, kernel=None):
        self.domain = Domain ### numpy.2darray
        self.D      = D      ### int (dimension of OUT)

        ### get IN points
        if isinstance(X, int): ## if X is int, then get random sampling to define function
            self.D_ix = self.getrandom(X)
        else: ## if X is 2d-np.array, then get uniform grid to define function
            try:
                self.D_ix = self.getgrid(X)
            except:
                raise print("Error")

        μ_i = np.zeros(self.D_ix.shape[0])

        ### cholesky-factor
        Σ_ij      = self.default_kernel(self.D_ix, self.D_ix)
        self.L_ij = np.linalg.cholesky(Σ_ij) ## if using random, it might not be PSD because of point collisions...

        ### calculate y-axis
        Σ_i  = np.diag(Σ_ij)
        D_iX = np.random.normal( μ_i[:,None]*np.ones(self.D)[None,:], Σ_i[:,None]*np.ones(self.D)[None,:], (Σ_i.shape[0],self.D) )
        ## correlate D_iX using the Cholesky-factorization, yielding random/correlated normal-samples
        self.D_iX = self.L_ij @ D_iX ## ~ Y N^2
        ### careful with the kernel correlation-length this can make things no so smooth!!

    def evalulate(self, D_ax):
        """ evalulate for arbitrary values/points in OUT given points in IN
        GIVEN   : function-values above {D_ix, D_iX, L_ij} : 2d-numpy.array
        GET     : D_aX
        """
        return self.default_kernel(D_ax, self.D_ix) @ scipy.linalg.cho_solve((self.L_ij, True), self.D_iX)

    def getgrid(self, dr_x): ###! spacing: linspace or arange???!!
        """
        get regular grid spacing based on dr_x
        Get: D_ix : numpy.2darray
        """
        R_ix = np.stack(np.meshgrid(*[ np.arange(self.domain[i,0], self.domain[i,1], dr_x[i]) for i in range(len(dr_x)) ]), axis=-1)
        return R_ix.reshape((np.prod( R_ix.shape[:-1] ), R_ix.shape[-1]))

    def getrandom(self, N):
        """ perhaps use Poisson-Disc sampling algorithm to ensure PSD!
        get random points in the domain to define the function
        Get: D_ix : numpy.2darray
        """
        return np.asarray([(element[1]-element[0])*np.random.rand(N) + element[0] for element in self.domain]).T

    def default_kernel(self, R_ix, R_jx, ξ=0.1):
        """
        compute kernel function (RBF) between two domain points

        R_ijx = X_ix - Y_jx
        Σ     = exp( - sum( R_ijx**2 , over=x) / ξ )

        INPUT  : X (X data) : numpy.2darray
                 Y (Y data) : numpy.2darray
                *ξ (correlation length) : float64
        RETURN : Σ : numpy.2darray
        """
        R_ij = np.linalg.norm(R_ix[:, None, :] - R_jx[None, :, :], axis=2)
        return np.exp( - R_ij**2 / ξ )

## Instrument Function

In [None]:
dr_x   = np.array([0.1, 0.1])
domain = np.array([[ 0.0, 1.0],
                   [-0.5,  0.7]])
f = RCF(domain, dr_x)

plot2d(xdata = f.D_ix, ydata=f.D_iX)

######
def math_function(x_data):
    #return np.sin(np.linalg.norm(x_data, axis=1))
    return f.evalulate(x_data)


def gpcam_to_h5(data, filename="to_vintrument.h5"):
    """
    this function reads gpcam's data, and creates an h5 file (to be read by the intrument)
    GIVEN : data : List[dic]
    """

    to_analyze=[]
    for entry in data:
        to_analyze.append(entry["x_data"])
    to_analyze = np.asarray(to_analyze) ## make into a np.array, D_ax

    h5f = h5py.File(filename, "w")
    h5f.create_dataset("dataset_1", data=to_analyze)
    h5f.close()
    return None

def h5_to_vinstrument(filename="to_vintrument.h5"):
    """
    this function reads a h5 file, to obtain a 2d-numpy.array (to be used by the virtual-intrument)
    GIVEN :
    GET   :
    """

    h5f    = h5py.File(filename, "r")
    x_data = np.asarray(h5f.get('dataset_1'))
    h5f.close()

    return x_data ### numpy.array of dimensions ( samples , coordinates ) i.e. D_ax

def vinstrument_to_h5(y_data, filename="from_vintrument.h5"):
    """
    this function obtained the vintrument's y_data, along with other meta-data saves to an h5
    GIVEN : y_data (D_aX)
    """

    h5f = h5py.File(filename, "w")
    h5f.create_dataset("dataset_1", data=y_data)
    h5f.close()
    return None

def h5_to_gpcam(data, filename="from_vintrument.h5"):
    """
    this function updates gpcam's 'data' variable (List[dic]), by reading a h5 file.
    """
    h5f    = h5py.File(filename, "r")
    y_data = np.asarray(h5f["dataset_1"]) ## D_aX
    h5f.close()

    for a, entry in enumerate(data):
        entry["y_data"] = y_data[a]

    return data

def vinstrument():

    x_data = h5_to_vinstrument()

    y_data = math_function(x_data)

    vinstrument_to_h5(y_data)

    return None

def instrument(data):

    ### gpcam -> h5 (x-coordinates only)
    gpcam_to_h5(data)

    ### vintrument()
    vinstrument()

    ### h5 -> gpcam (everything)
    data = h5_to_gpcam(data)

    return data

## gpCAM run with $\texttt{vinstrument}$

In [None]:
#TRY: stationary
def skernel(x1,x2,hps,obj):
    #The kernel follows the mathematical definition of a kernel. This
    #means there is no limit to the variety of kernels you can define.
    d = np.linalg.norm(x1 - x2) #obj.get_distance_matrix(x1,x2)
    return hps[0] * obj.matern_kernel_diff1(d,hps[1])


#### GP quick-test???
START=time.time()
my_ae = AutonomousExperimenterGP(domain,
                                 np.ones((3)),
                                 np.array([[0.001,100.],[0.001,100.], [0.001,100.]]), ## limits
                                 init_dataset_size= 20,
                                 instrument_function = instrument,
                                 cost_function_parameters={"offset": 5.0,"slope":10.0},
                                 store_inv = True)
my_ae.train(method = "hgdl")      ### trains the kernel, to find optimial hyperparameters given the intiially-random data
my_ae.go(len(my_ae.x_data) + 40)  ### runs the Autonomous Experiment
my_ae.train(method = "hgdl")
print(time.time()-START)

fig = go.Figure()

fig.add_trace(go.Scatter3d(x=f.D_ix[:,0], y=f.D_ix[:,1], z=f.D_iX[:,0], mode='markers', name="RCF (ground truth)"))
fig.add_trace(go.Scatter3d(x=my_ae.x_data[:,0], y=my_ae.x_data[:,1], z=my_ae.y_data, mode='markers', name="gpCAM"))

fig.update_layout(title='Ground Truth Random-Contious 2d-Function', autosize=True, width=800, height=800, margin=dict(l=65, r=50, b=65, t=90))
fig.show()

my_ae.hyperparameters

6.871522665023804


array([1., 1., 1.])