In [None]:
import numpy as np
import matplotlib.pyplot as plt
# Visualización
import plotly.graph_objects as go

In [3]:
class Pendulo:

    def __init__(self,t, x0=[np.pi/4,0],nombre='Pendulo sin ruido',T = 0.01 ,k_steps = 1000):

        # Parámetros del sistema
        self.m = 1.0     # masa (kg)
        self.l = 1.0     # longitud (m)
        self.g = 9.81    # gravedad (m/s²)
        self.b = 0.1     # fricción (N⋅m⋅s/rad)
        self.T = T
        self.k_steps = k_steps
        self.t = t
        self.theta = np.zeros(k_steps)      # ángulo
        self.theta_dot = np.zeros(k_steps)  # velocidad angular
        self.z_theta = np.zeros(k_steps)      # ángulo
        self.z_theta_dot = np.zeros(k_steps)  # velocidad angular
        self.u = np.zeros(k_steps)          # control

        self.theta[0] = x0[0]    # 45 grados
        self.theta_dot[0] = x0[1] # velocidad inicial cero   
        self.z_theta[0] = x0[0]    # 45 grados
        self.z_theta_dot[0] = x0[1] # velocidad inicial cero   

        self.nombre = nombre

    # Esta funcion trabaja con pasos k, y no con todo el tiempo t
    def system (self,x,u,ruido=0):
        theta = x[0]
        theta_dot = x[1]

        dtheta = theta_dot + ruido[0]
        dtheta_dot = (-self.b*theta_dot - self.m*self.g*self.l*np.sin(theta) + u)/(self.m*self.l**2) + ruido[1]

        theta_k1 = theta + self.T*dtheta
        theta_dot_k1 = theta_dot + self.T*dtheta_dot

        return np.array([theta_k1, theta_dot_k1])

    # Esta funcion trabaja con pasos k 
    def control_retroalimentacion(self,estado, k):
        #theta_ref = np.sin(t)
        #dtheta_ref = np.cos(t)  
        #ddtheta_ref = -np.sin(t)
        # Estas referencias es con t = k  
        #t = k 
        theta_ref = 1/(np.exp(5 - k) + 1) #np.sin(k)
        dtheta_ref = np.exp(5 - k)/(np.exp(5 - k) + 1)**2 #np.cos(k)
        ddtheta_ref = (-np.exp(5 - k) + 2*np.exp(10 - 2*k)/(np.exp(5 - k) + 1))/(np.exp(5 - k) + 1)**2 #-np.sin(k) 
        
        theta,dtheta = estado  #+ np.random.normal(0,0.002,(estado.shape) )
        k1,k2 = 10,20 #2,1
        
        e = theta - theta_ref  # Error
        de = dtheta - dtheta_ref  # Derivada del error

        u = self.m*self.l**2*( ddtheta_ref - k1*(de) -k2*(e) + (self.b/(self.m*self.l**2))*dtheta + (self.g/self.l)*np.sin(theta) )  # Control
        return u

    def update(self,k,ruido_medicion=np.array([0,0]),ruido_proceso=[0,0]):
        x = np.array([self.theta[k], self.theta_dot[k]])
        z = x + ruido_medicion
        self.u[k+1] = self.control_retroalimentacion(z, self.t[k])
        self.theta[k+1], self.theta_dot[k+1] = self.system(x, self.u[k+1], ruido_proceso )
        self.z_theta[k+1], self.z_theta_dot[k+1] = z
        
        return z
    
    def update_RHONN(self,k,u,ruido_medicion=np.array([0,0]),ruido_proceso=[0,0]):
        x = np.array([self.theta[k], self.theta_dot[k]])
        z = x + ruido_medicion
        self.u[k+1] = u #self.control_retroalimentacion(z, t[k])
        self.theta[k+1], self.theta_dot[k+1] = self.system(x, self.u[k+1], ruido_proceso )
        self.z_theta[k+1], self.z_theta_dot[k+1] = z
        
        return z

    def datos(self):
        return self.theta,self.theta_dot

    def datos_z(self):
        return self.z_theta,self.z_theta_dot

In [11]:


class KalmanFilter:
    def __init__(self, F, H, Q, R, P0, x0):
        """
        Inicialización del Filtro de Kalman
        
        Parámetros:
        F : Matriz de transición de estado
        H : Matriz de observación
        Q : Matriz de covarianza del ruido del proceso
        R : Matriz de covarianza del ruido de medición
        P0: Matriz de covarianza inicial del error
        x0: Estado inicial
        """
        self.F = F  # Matriz de transición de estado
        self.H = H  # Matriz de observación
        self.Q = Q  # Covarianza del ruido del proceso
        self.R = R  # Covarianza del ruido de medición
        self.P = P0 # Covarianza del error
        self.x = x0 # Estado estimado
        
        # Dimensiones
        self.n = F.shape[0]  # Dimensión del estado
        self.m = H.shape[0]  # Dimensión de la medición
        
        # Historial
        self.history_x = [x0]
        self.history_P = [P0]
        
    def predict(self):
        """
        Paso de predicción:
        x = F*x
        P = F*P*F' + Q
        """
        # Predicción del estado
        self.x = self.F @ self.x
        
        # Predicción de la covarianza
        self.P = self.F @ self.P @ self.F.T + self.Q
        
        return self.x, self.P
    
    def update(self, z):
        """
        Paso de actualización:
        y = z - H*x
        S = H*P*H' + R
        K = P*H'*inv(S)
        x = x + K*y
        P = (I - K*H)*P
        """
        # Innovación
        y = z - self.H @ self.x
        
        # Covarianza de la innovación
        S = self.H @ self.P @ self.H.T + self.R
        
        # Ganancia de Kalman
        K = self.P @ self.H.T @ np.linalg.inv(S)
        
        # Actualización del estado
        self.x = self.x + K @ y
        
        # Actualización de la covarianza
        I = np.eye(self.n)
        self.P = (I - K @ self.H) @ self.P
        
        # Guardar historia
        self.history_x.append(self.x)
        self.history_P.append(self.P)
        
        return self.x, self.P

# Ejemplo para el péndulo
def create_pendulum_kf(dt, std_process, std_measurement):
    """
    Crear filtro de Kalman para péndulo
    """
    # Matrices del sistema
    F = np.array([
        [1, dt],
        [0, 1]
    ])
    
    H = np.array([
        [1, 0],
        [0, 1]
    ])
    
    # Matrices de covarianza
    Q = np.array([
        [std_process**2 * dt**4/4, std_process**2 * dt**3/2],
        [std_process**2 * dt**3/2, std_process**2 * dt**2]
    ])
    
    R = np.array([
        [std_measurement**2, 0],
        [0, std_measurement**2]
    ])
    
    # Estado inicial y covarianza
    x0 = np.array([[np.pi/4], [0]])
    P0 = np.eye(2) * 0.1
    
    return KalmanFilter(F, H, Q, R, P0, x0)

# Ejemplo de uso para el péndulo
def simulate_pendulum_kf(pendulo, k_steps, t, ruido_medicion, ruido_proceso):
    """
    Simular péndulo con filtro de Kalman
    """
    # Parámetros
    dt = t[1] - t[0]
    std_process = 0.1
    std_measurement = 0.1
    
    # Crear filtro
    kf = create_pendulum_kf(dt, std_process, std_measurement)
    
    # Arrays para almacenar resultados
    x_true = np.zeros((2, k_steps))
    x_measured = np.zeros((2, k_steps))
    x_estimated = np.zeros((2, k_steps))
    
    # Condiciones iniciales
    x_true[:, 0] = [np.pi/4, 0]
    x_measured[:, 0] = x_true[:, 0] + ruido_medicion[0]
    x_estimated[:, 0] = kf.x.flatten()
    
    # Simulación
    for k in range(1, k_steps):
        # Actualizar sistema real
        x_true[:, k] = pendulo.update_RHONN(k-1, 0, ruido_medicion[k-1], ruido_proceso[k-1])
        
        # Medición con ruido
        x_measured[:, k] = x_true[:, k] + ruido_medicion[k]
        
        # Filtro de Kalman
        kf.predict()
        x_estimated[:, k] = kf.update(x_measured[:, k].reshape(-1,1))[0].flatten()
    
    return x_true, x_measured, x_estimated



def plot_results(t, x_true, x_measured, x_estimated):
    fig = go.Figure()

    # Ángulo
    fig.add_trace(go.Scatter(x=t, y=x_true[0,:], mode='lines', name='Real', line=dict(color='green')))
    fig.add_trace(go.Scatter(x=t, y=x_measured[0,:], mode='markers', name='Medido', marker=dict(color='red', opacity=0.5)))
    fig.add_trace(go.Scatter(x=t, y=x_estimated[0,:], mode='lines', name='Estimado KF', line=dict(color='blue')))

    fig.update_layout(
        title='Filtro de Kalman - Péndulo Simple',
        xaxis_title='Tiempo (s)',
        yaxis_title='Ángulo (rad)',
        legend=dict(x=0, y=1),
        height=600
    )

    # Velocidad angular
    fig.add_trace(go.Scatter(x=t, y=x_true[1,:], mode='lines', name='Real', line=dict(color='green'), yaxis='y2'))
    fig.add_trace(go.Scatter(x=t, y=x_measured[1,:], mode='markers', name='Medido', marker=dict(color='red', opacity=0.5), yaxis='y2'))
    fig.add_trace(go.Scatter(x=t, y=x_estimated[1,:], mode='lines', name='Estimado KF', line=dict(color='blue'), yaxis='y2'))

    fig.update_layout(
        yaxis2=dict(
            title='Velocidad Angular (rad/s)',
            overlaying='y',
            side='right'
        )
    )

    fig.show()

In [12]:
# Parámetros de simulación
T = 0.01    # período de muestreo (s)
t_final = 10  # tiempo final de simulación (s)
k_steps = int(t_final/T)  # número de pasos
n_edos = 2  # número de ecuaciones diferenciales

# Vectores para almacenar resultados
t = np.arange(0, t_final, T)  # vector de tiempo

# Calcular referencia
theta_ref = 1/(np.exp(5 - t) + 1) #np.sin(t)
dtheta_ref = np.exp(5 - t)/(np.exp(5 - t) + 1)**2 #np.cos(t)
ddtheta_ref = (-np.exp(5 - t) + 2*np.exp(10 - 2*t)/(np.exp(5 - t) + 1))/(np.exp(5 - t) + 1)**2 #-np.sin(t) 

np.random.seed(158)
ruido_medicion = np.random.normal(0,0.02,(k_steps,n_edos) )
ruido_proceso = np.random.normal(0,0.3,(k_steps,n_edos) )

In [13]:
# Crear péndulo
pendulo = Pendulo(nombre='Pendulo con ruido', t=t)

# Simular
x_true, x_measured, x_estimated = simulate_pendulum_kf(
    pendulo, k_steps, t, ruido_medicion, ruido_proceso
)

# Visualizar
plot_results(t, x_true, x_measured, x_estimated)