In [1]:
# gradient vs scalar field

import numpy as np

dx = 1

x = np.arange(0, 6.1, dx)

print(x)

f_x = x ** 2

df_x = np.gradient(f_x, dx)

print(df_x)

[0. 1. 2. 3. 4. 5. 6.]
[ 1.  2.  4.  6.  8. 10. 11.]


In [2]:
# prediction & measurement relationship

import numpy as np

initial_weight = 160
np.random.seed(2)
weights = np.random.uniform(155, 175, 10)
dt = 1
dr = 1
K = 0.4

def weight_gain(weights, initial_weight, dt, dr, K):

    num_weights = len(weights)
    predictions = np.zeros(num_weights)
    estimates = np.zeros(num_weights)
    prediction = 0
    estimate = initial_weight
      
    for i, z in enumerate(weights):

        # Prediction based on weight change
        prediction = estimate + dt * dr
        
        # Update estimate using the new weight value
        estimate = prediction + K * (z - prediction)

        estimates[i] = estimate
        predictions[i] = prediction

    return predictions, estimates

weight_gain(weights, initial_weight, dt, dr, K)


(array([161.        , 163.08795922, 161.06018538, 164.03341105,
        164.90262577, 165.30451788, 164.8253893 , 163.53242265,
        166.07362132, 165.04141018]),
 array([162.08795922, 160.06018538, 163.03341105, 163.90262577,
        164.30451788, 163.8253893 , 162.53242265, 165.07362132,
        164.04141018, 163.15946431]))

In [3]:
# g & h filter

dr = 1
dt = 1
K = 0.4
gain_scale = 0.3

np.random.seed(2)
weights = np.random.uniform(155, 175, 10)
initial_weight = 160

def weight_gain(weights, initial_weight, dt, dr, K):

    num_weights = len(weights)
    predictions = np.zeros(num_weights)
    estimates = np.zeros(num_weights)
    weight = initial_weight
      
    for i, z in enumerate(weights):

        # Prediction step
        weight = weight + dr * dt
        dr = dr
        predictions[i] = weight
        
        # Update step
        dr = dr + gain_scale * ((z - weight) / dt)
        estimate = weight + K * (z - weight)

        estimates[i] = estimate

    return predictions, estimates

weight_gain(weights, initial_weight, dt, dr, K)


(array([161.        , 162.81596941, 162.44270539, 163.13460462,
        163.99805682, 164.68429879, 164.44726004, 162.60393509,
        162.19505541, 161.42558715]),
 array([162.08795922, 159.8969915 , 163.86292306, 163.36334191,
        163.76177651, 163.45325784, 162.3055451 , 164.51652878,
        161.71427063, 160.98997049]))

In [10]:
# g & h filter refactored

dt = 1 # timestep
g = 0.4 # where in between measurement and predicition (g is proportional to priority of measurement)
h = 0.3 # where in between previous gain rate and current gain rate (h is reactivity to change)

x0 = 160 # initial state
dx0 = 1 # initial gain rate

np.random.seed(2)
xm = np.random.uniform(155, 175, 10) # measurements

def g_h_filter(xm, x0, dx0, a, b, dt):

    x = 0 # state
    dx = 0 # gain rate
    rk = 0 # residual   

    num= len(xm)
    estimates = np.zeros(num)

    for i, xm in enumerate(xm):

        # prediction 
        x = x0 + dx0 * dt # initial state
        dx = dx0 # initial gain rate

        # update
        r = xm - x # residual
        x0 = x + g * r # state
        dx0 = dx + (h * r) / dt # gain rate

        # store
        estimates[i] = x0
    
    return estimates

g_h_filter(xm, x0, dx0, a, b, dt)

array([162.08795922, 160.54976703, 162.30736897, 163.55297363,
       164.22674197, 163.66492334, 161.7048746 , 163.06225343,
       162.61670016, 161.599667  ])

In [13]:
from numpy.random import randn

def gen_data(x0, dx, count, noise_factor):
    return [x0 + dx*i + randn()*noise_factor for i in range(count)]

measurements = gen_data(0, 1, 30, 1)
print(measurements)

[-0.31350819699340887, 1.7710117380694115, 0.13190934543623656, 4.731184665980468, 5.467678010573413, 4.664322661476152, 6.611340779573718, 7.0479705918681885, 7.170864710985221, 9.087710218408331, 11.000365886550695, 10.61890748248465, 11.624330576921006, 12.92552923710602, 14.43349633007657, 16.278379230271867, 15.365320694820637, 17.508396242683432, 18.216116006263682, 17.141387613876503, 19.580683517847312, 20.867671101563257, 21.960429760306393, 23.32600343338698, 21.959676951271284, 25.046255523141696, 25.322324422671958, 25.56056097326138, 28.52429643001035, 29.735279576065203]


In [3]:
# discrete bayesian filter

import numpy as np

map = np.array([1,1,0,0,0,0,0,0,1,0]) # map

belief = np.array([1/10]*10) # prior (initial beleif)

def update_belief(map, belief, z, z_prob):
    scale = z_prob / (1 - z_prob)
    belief[map==z] *= scale
    belief /= np.sum(belief)
    return belief

update_belief(map, belief, 1, 0.75)

array([0.1875, 0.1875, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625, 0.0625,
       0.1875, 0.0625])

In [5]:
def predict_move(belief, move, p_under, p_correct, p_over):
    n = len(belief)
    prior = np.zeros(n)
    for i in range(n):
        prior[i] = (
            belief[(i-move) % n]   * p_correct +
            belief[(i-move-1) % n] * p_over +
            belief[(i-move+1) % n] * p_under)      
    return prior

belief = [0., 0., .4, .6, 0., 0., 0., 0., 0., 0.]
predict_move(belief, 2, .1, .8, .1)


array([0.  , 0.  , 0.  , 0.04, 0.38, 0.52, 0.06, 0.  , 0.  , 0.  ])

In [14]:
def predict_move_convolution(pdf, offset, kernel):
    N = len(pdf)
    kN = len(kernel)
    width = int((kN - 1) / 2)

    prior = np.zeros(N)
    for i in range(N):
        for k in range (kN):
            index = (i + (width-k) - offset) % N
            prior[i] += pdf[index] * kernel[k]
    return prior

3

In [19]:
def gaussian_multiply(g1, g2):
    mu1, var1 = g1
    mu2, var2 = g2
    mean = (var1 * mu2 + var2 * mu1) / (var1 + var2)
    variance = (var1 * var2) / (var1 + var2)
    return (mean, variance)

def update(likelihood, prior):
    posterior = gaussian_multiply(likelihood, prior)
    return posterior

def predict(x, v):
    return (x[0] + v[0], x[1] + v[1])

g1 = (10., .2 ** 2)
g2 = (11., .1 ** 2)
estimate = update(g1, g2)
estimate

(10.799999999999999, 0.008000000000000002)

In [22]:
def update(prior, measurement):
    x, P = prior        # mean and variance of prior
    z, R = measurement  # mean and variance of measurement
    
    y = z - x        # residual
    K = P / (P + R)  # Kalman gain

    x = x + K*y      # posterior
    P = (1 - K) * P  # posterior variance
    return gaussian(x, P)

def predict(posterior, movement):
    x, P = posterior # mean and variance of posterior
    dx, Q = movement # mean and variance of movement
    x = x + dx
    P = P + Q
    return gaussian(x, P)

initial_mean = 0  # Initial mean of the state
initial_variance = 1  # Initial variance of the state
measurement_mean = 2  # Measurement mean
measurement_variance = 0.1  # Measurement variance
movement_mean = 1  # Movement mean (change in state)
movement_variance = 0.01  # Movement variance



In [1]:
import numpy as np

# Define system parameters
num_iter = 100  # Number of iterations
n = 3  # Dimension of state vector
m = 2  # Dimension of measurement vector

# Generate some example data
true_grad = np.random.rand(num_iter, n)
ϵ = np.random.randn(num_iter, m)  # Measurement noise

# Initialize variables
P = np.identity(n)  # Initial covariance matrix
K = np.zeros((n, m))  # Initial Kalman gain matrix

# Initialize state estimates
p_hat = np.zeros((num_iter, n))  # Estimated state vector
p = np.zeros((num_iter, n))  # True state vector

# Main loop
for k in range(num_iter - 1):
    for i, y in enumerate(true_grad[k]):
        R = 0.2  # Measurement noise
        P = 1    # Process noise
        
        # Calculate Kalman gain
        K = P / (P + R)
        
        if i == 0:
            p[k + 1, 1] = p[k, 1] + K * (y - (p[k, 1] - p[k, 0]))
        else:
            p[k + 1, i] = p[k, i] + K * (y - (p[k, i] - p[k, i - 1]))
        
        # Update covariance matrix
        P = (1 - K) * P
        
        # Update estimated state
        p_hat[k + 1] = p_hat[k] + K * (ϵ[k] - (p_hat[k] - p[k]))
    
    # Update true state (for simulation purposes)
    p[k + 1] = p[k] + true_grad[k] + ϵ[k]

# Print or visualize the results as needed


ValueError: operands could not be broadcast together with shapes (2,) (3,) 

In [3]:
import numpy as np
import matplotlib.pyplot as plt

# --- data --- #
ideal_p = np.arange(0, 10, 1) # define the true pressure field
ideal_grad = np.gradient(ideal_p) # define true gradient
noise = np.random.normal(0, 0.08, 10) # generate noise
mrd_grad = ideal_grad + noise # add noise to gradient

# --- initialization --- #
num_iter = 100000 # define number of iterations
grid = np.zeros((num_iter, 10)) # define the grid
init_p = np.abs(np.random.normal(0, 10, 10)) # define initial pressure field
grid[0] = init_p # initialize the grid
grid[:, 0] = 0 # apply dirichlet condition to the grid
p = grid

# --- solver --- #
for i, y in enumerate(mrd_grad):
    K = 0.2 
    for k in range(num_iter-1):
        if i == 0:
            p[k+1, 1] = p[k, 1] + K * (y - (p[k, 1] - p[k, 0]))
        else:
            p[k+1, i] = p[k, i] + K * (y - (p[k, i] - p[k, i-1]))
print('Pressure field: ', p[-1])
        
# --- error --- #
error = np.abs(ideal_p - p[-1])
print('Error: ', error)

Pressure field:  [0.         1.08826646 2.2150382  3.24335587 4.17634469 5.28068624
 6.26174791 7.22680444 8.27316285 9.3155836 ]
Error:  [0.         0.08826646 0.2150382  0.24335587 0.17634469 0.28068624
 0.26174791 0.22680444 0.27316285 0.3155836 ]


In [None]:
import numpy as np
import matplotlib.pyplot as plt

num_points = 100  # grid spacing
num_iter = 100000  # number of iterations
np.random.seed(42)  # set same random numbers

# Data (synthetic pressure field, gradient, and experimental measurement data)
x = np.linspace(0, 1, num_points)  # normalized space
true_pressure = (
    0.5 * np.sin(2 * np.pi * 1 * x) +
    0.3 * np.sin(2 * np.pi * 3 * x) +
    0.2 * np.sin(2 * np.pi * 5 * x)
)
true_gradient = np.gradient(true_pressure)  # true gradient
noise = np.random.normal(0, 0.01, num_points)  # noise (normally distributed)
noisy_gradient = true_gradient + noise  # noisy gradient

# Solver
p = np.zeros((num_iter, num_points))  # initialize grid
p[0] = np.abs(np.random.normal(0, 10, num_points))  # initialize pressure field
p[:, 0] = true_pressure[0]  # dirichlet condition

for i, z in enumerate(noisy_gradient):
    
    # Observer parameters
    P = 0.1  # posterior / prior variance
    R = 0.01  # measurement noise variance
    Q = 0.001  # gradient estimation variance (p[k,i]-p[k,i-1])

    for k in range(num_iter - 1):
        
        # Predict
        P = P + Q

        # Update
        K = P / (P + R)  # Kalman gain
        if i == 0:
            p[k+1, 1] = p[k, 1] + K * (z - (p[k, 1] - p[k, 0]))  # posterior
        else:
            p[k+1, i] = p[k, i] + K * (z - (p[k, i] - p[k, i-1]))  # posterior
        P = (1 - K) * P  # posterior variance

# Results
sim_pressure = p[-1]
error = np.abs(true_pressure - sim_pressure)

# Display
# print('True Pressure Field:', true_pressure)
# print('Calculated Pressure Field:', sim_pressure)
print('Max Error:', np.max(error))

# Visualization
plt.figure(figsize=(10, 6))
plt.plot(true_pressure, label='True Pressure Field')
plt.plot(sim_pressure, label='Calculated Pressure Field')
plt.legend()
plt.title('True vs Calculated Pressure Field')
plt.xlabel('Grid Index')
plt.ylabel('Pressure')
plt.show()
