# Active Inference Mountain car

In [298]:
import Pkg;
Pkg.activate(".");
Pkg.instantiate();
Pkg.add("DataFrames");

In [299]:
using RxInfer, Plots, Distributions, CSV, DataFrames

In [300]:
import HypergeometricFunctions: _₂F₁

function create_physics(; engine_force_limit = 0.04, friction_coefficient = 0.1)
    
    # Engine force as function of action
    Fa = (a::Real) -> engine_force_limit * tanh(a) 
    # Derivative of the engine force
    Fa_prime = (a::Real) -> engine_force_limit - engine_force_limit * tanh(a)^2 
    # Friction force as function of velocity
    Ff = (y_dot::Real) -> -friction_coefficient * y_dot 
    # Derivative of the friction force
    Ff_prime = (y_dot::Real) -> -friction_coefficient 
    
    # Gravitational force (horizontal component) as function of position
    Fg = (y::Real) -> begin
        if y < 0
            0.05*(-2*y - 1)
        else
            0.05*(-(1 + 5*y^2)^(-0.5) - (y^2)*(1 + 5*y^2)^(-3/2) - (y^4)/16)
        end
    end

    # Derivative of the gravitational force
    Fg_prime = (y::Real) -> begin 
        if y < 0
            -0.1
        else
            0.05*((-4*y^3)/16 + (5*y)/(1 + 5*y^2)^1.5 + (3*5*y^3)/(1 + 5*y^2)^(5/2) - (2*y)/(1 + 5*y^2)^(3/2))
        end
    end
    
    # The height of the landscape as a function of the horizontal coordinate
    height = (x::Float64) -> begin
        if x < 0
            h = x^2 + x
        else
            h = x * _₂F₁(0.5,0.5,1.5, -5*x^2) + x^3 * _₂F₁(1.5, 1.5, 2.5, -5*x^2) / 3 + x^5 / 80
        end
        return 0.05*h
    end
    
    return (Fa, Fa_prime, Ff, Ff_prime, Fg, Fg_prime, height)
end

create_physics (generic function with 1 method)

In [301]:
engine_force_limit   = 0.04
friction_coefficient = 0.1

Fa, Fa_prime, Ff, Ff_prime, Fg, Fg_prime, height = create_physics(
    engine_force_limit = engine_force_limit,
    friction_coefficient = friction_coefficient
);

initial_position = -0.5
initial_velocity = 0.0

x_target = [0.75, 0.0] 

valley_x = range(-2, 2, length=400)
valley_y = [ height(xs) for xs in valley_x ]
plot(valley_x, valley_y, title = "Mountain Car", label = "Landscape", color = "black")
scatter!([ initial_position ], [ height(initial_position) ], label="Startig Position")   
scatter!([x_target[1]], [height(x_target[1])], label="Goal")
savefig("mountain_car_plot.png")


"/Users/jackmontgomery/Desktop/UCT/Research_Project/active-temporal-predictive-coding/message_passing/mountain_car_plot.png"

In [302]:
function create_world(; Fg, Ff, Fa, initial_position = -0.5, initial_velocity = 0.0)

    y_t_min = initial_position
    y_dot_t_min = initial_velocity
    
    y_t = y_t_min
    y_dot_t = y_dot_t_min
    
    execute = (a_t::Float64) -> begin
        # Compute next state
        y_dot_t = y_dot_t_min + Fg(y_t_min) + Ff(y_dot_t_min) + Fa(a_t)
        y_t = y_t_min + y_dot_t
    
        # Reset state for next step
        y_t_min = y_t
        y_dot_t_min = y_dot_t
    end
    
    observe = () -> begin 
        return [y_t, y_dot_t]
    end
        
    return (execute, observe)
end

create_world (generic function with 1 method)

![](ai-mountain-car-naive.gif)

In [303]:
@model function mountain_car(m_u, V_u, m_x, V_x, m_s_t_min, V_s_t_min, T, Fg, Fa, Ff, engine_force_limit)

    # Transition function modeling transition due to gravity and friction
    g = (s_t_min::AbstractVector) -> begin
        s_t = similar(s_t_min) # Next state
        s_t[2] = s_t_min[2] + Fg(s_t_min[1]) + Ff(s_t_min[2]) # Update velocity
        s_t[1] = s_t_min[1] + s_t[2] # Update position
        return s_t
    end

    # Function for modeling engine control
    h = (u::AbstractVector) -> [0.0, Fa(u[1])]

    # Internal model perameters
    Gamma = 1e-4 * diageye(2) # Transition precision
    Theta = 1e-4 * diageye(2) # Observation variance

    s_t_min ~ MvNormal(mean=m_s_t_min, cov=V_s_t_min)
    s_k_min = s_t_min

    local s

    for k in 1:T

        u[k] ~ MvNormal(mean=m_u[k], cov=V_u[k])

        u_h_k[k] ~ h(u[k]) where {meta=DeltaMeta(method = Linearization())}
        s_g_k[k] ~ g(s_k_min) where {meta=DeltaMeta(method = Linearization())}
        u_s_sum[k] ~ s_g_k[k] + u_h_k[k]

        s[k] ~ MvNormal(mean=u_s_sum[k], cov=Gamma)
        x[k] ~ MvNormal(mean=s[k], cov=Theta)
        x[k] ~ MvNormal(mean=m_x[k], cov=V_x[k]) # goal
        
        s_k_min = s[k]
    end

    return (s,)
end

In [304]:
import RxInfer.ReactiveMP: getrecent, messageout

function create_agent(; T=20, Fg, Fa, Ff, engine_force_limit, x_target, initial_position, initial_velocity)

    # Control
    Epsilon = fill(1e4, 1, 1)
    m_u = Vector{Float64}[[0.0] for k = 1:T]    # Centered around 0
    V_u = Matrix{Float64}[Epsilon for k = 1:T]  # High Variance


    # Observations
    m_x = [zeros(2) for k = 1:T]                # Centered at 0
    V_x = [huge * diageye(2) for k = 1:T]       # High Variance
    
    Sigma = 1e-4 * diageye(2)                   # Until the target time
    V_x[end] = Sigma                            # Then set small variance
    m_x[end] = x_target                         # Then set small variance

    # Initial Hidden State
    m_s_t_min = [initial_position, initial_velocity]
    V_s_t_min = tiny * diageye(2)

    # Initialise infernce result

    result = nothing

    compute = (upsilon_t::Float64, y_hat_t::Vector{Float64}) -> begin

        m_u[1] = [upsilon_t]        # Register action with the generative model
        V_u[1] = fill(tiny, 1, 1)   # Clamp control prior to performed action

        m_x[1] = y_hat_t            # Register actual obseved value with the generative model
        V_x[1] = tiny * diageye(2)  # Clamp observation prior to the observed value

        data = Dict(:m_u => m_u,
            :V_u => V_u,
            :m_x => m_x,
            :V_x => V_x,
            :m_s_t_min => m_s_t_min,
            :V_s_t_min => V_s_t_min)

        model = mountain_car(T=T, Fg=Fg, Fa=Fa, Ff=Ff, engine_force_limit=engine_force_limit)
        result = infer(
            model=model,
            data=data,
            free_energy=true
        )
        return result.free_energy
    end

    # The `act` function returns the inferred best possible action
    act = () -> begin
        if result !== nothing
            return mode(result.posteriors[:u][2])[1]
        else
            return 0.0 # Without inference result we return some 'random' action
        end
    end

    # The `slide` function modifies the `(m_s_t_min, V_s_t_min)` for the next step
    # and shifts (or slides) the array of future goals `(m_x, V_x)` and inferred actions `(m_u, V_u)`
    slide = () -> begin

        model = RxInfer.getmodel(result.model)
        (s,) = RxInfer.getreturnval(model)
        varref = RxInfer.getvarref(model, s)
        var = RxInfer.getvariable(varref)

        slide_msg_idx = 3 # This index is model dependend
        (m_s_t_min, V_s_t_min) = mean_cov(getrecent(messageout(var[2], slide_msg_idx)))

        m_u = circshift(m_u, -1)
        m_u[end] = [0.0]
        V_u = circshift(V_u, -1)
        V_u[end] = Epsilon

        m_x = circshift(m_x, -1)
        m_x[end] = x_target
        V_x = circshift(V_x, -1)
        V_x[end] = Sigma
    end

    return (compute, act, slide)
end

create_agent (generic function with 1 method)

In [305]:
(execute_ai, observe_ai) = create_world(
    Fg=Fg, Ff=Ff, Fa=Fa,
    initial_position=initial_position,
    initial_velocity=initial_velocity
) # Let there be a world

T_ai = 25

(compute_ai, act_ai, slide_ai) = create_agent(;
    T=T_ai,
    Fa=Fa,
    Fg=Fg,
    Ff=Ff,
    engine_force_limit=engine_force_limit,
    x_target=x_target,
    initial_position=initial_position,
    initial_velocity=initial_velocity
)

N_ai = 100

agent_a = Vector{Float64}(undef, N_ai)          # Actions
agent_x = Vector{Vector{Float64}}(undef, N_ai)  # Observations
agent_e = Vector{Vector{Float64}}(undef, N_ai)  # Free energy

for t = 1:N_ai
    agent_a[t] = act_ai()               # Invoke an action from the agent
    execute_ai(agent_a[t])              # The action influences hidden external states
    agent_x[t] = observe_ai()           # Observe the current environmental outcome (update p)
    agent_e[t] = compute_ai(agent_a[t], agent_x[t])  # Infer beliefs from current model state (update q)
    slide_ai()                          # Prepare for next iteration
end

In [306]:
# Create a DataFrame for storing the data
df = DataFrame(
    time=1:N_ai,
    actions=agent_a,
    observations=[x[1] for x in agent_x],  # Assuming 1D observations for simplicity
    free_energy=[e[1] for e in agent_e]    # Assuming 1D free energy values
)

# Save the data to a CSV file
CSV.write("ai_simulation_data.csv", df)

"ai_simulation_data.csv"

In [307]:
animation_ai = @animate for i in 1:N_ai

    pls = plot(valley_x, valley_y, title="Active inference results", label="Landscape", color="black")

    pls = scatter!(pls, [agent_x[i][1]], [height(agent_x[i][1])], label="car")
    pls = scatter!(pls, [x_target[1]], [height(x_target[1])], label="goal")

    # pef - plot engine force
    pef = plot(Fa.(agent_a[1:i]), title="Engine force (agents actions)", xlim=(0, N_ai), ylim=(-0.05, 0.05))

    plot(pls, pef, size=(800, 400))
end

gif(animation_ai, "./ai-mountain-car-ai.gif", fps=20, show_msg=false);