<a href="https://colab.research.google.com/github/lynnfdsouza/drone_terrain_masking/blob/main/Real_Sensor_API_Integration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np

class KalmanFilter1D:
    """Simple 1D Kalman Filter for position/velocity estimation."""
    def __init__(self, x0=0, v0=0, accel_var=0.1, meas_var=1.0):
        self.x = x0
        self.v = v0
        self.P = np.eye(2)
        self.F = np.eye(2)
        self.Q = np.array([[0.25, 0.5],[0.5, 1.0]]) * accel_var
        self.H = np.array([[1, 0]])  # Position measurement only
        self.R = np.array([[meas_var]])

    def predict(self, dt, accel=0):
        # State transition
        self.F = np.array([[1, dt], [0, 1]])
        B = np.array([[0.5*dt**2], [dt]])
        u = np.array([[accel]])
        state = np.array([[self.x], [self.v]])
        state = self.F @ state + B @ u
        self.x, self.v = state.flatten()
        # Covariance update
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, z):
        y = z - (self.H @ np.array([[self.x], [self.v]]))
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        state = np.array([[self.x], [self.v]]) + K @ y
        self.x, self.v = state.flatten()
        I = np.eye(2)
        self.P = (I - K @ self.H) @ self.P

class ObstacleAvoidance:
    """Obstacle avoidance using simulated LIDAR/range data."""
    def __init__(self, min_dist=5.0):
        self.min_dist = min_dist

    def check_obstacles(self, lidar_ranges):
        # lidar_ranges: list of (angle, distance)
        for angle, dist in lidar_ranges:
            if dist < self.min_dist:
                print(f"Obstacle detected at {angle}°: {dist:.2f}m")
                return True, angle
        return False, None

    def avoidance_direction(self, obstacle_angle):
        # Simple logic: turn away from detected obstacle
        if obstacle_angle < 0:
            return "right"
        else:
            return "left"

class RealSensorAPI:
    """Stub class: Replace with actual sensor API calls."""
    def get_acceleration(self):
        # Replace with real sensor read
        return np.random.randn(3) * 0.02

    def get_gyroscope(self):
        return np.random.randn(3) * 0.01

    def get_position(self):
        # From external source, e.g., visual odometry or quantum position sensor
        return np.random.randn(3) * 0.5

    def get_lidar_ranges(self):
        # Simulated 8-direction LIDAR (angle in degrees, distance in meters)
        return [(a, 10 + np.random.uniform(-2, 2)) for a in range(-90, 91, 30)]

def terrain_elevation(x, y):
    # Example: rolling hills
    return 15.0 * np.sin(0.03*x) * np.cos(0.04*y) + 30

def main():
    # Initialize filters for each axis (x, y, z)
    kf_x = KalmanFilter1D()
    kf_y = KalmanFilter1D()
    kf_z = KalmanFilter1D(x0=50.0)
    obstacle_avoid = ObstacleAvoidance()
    sensors = RealSensorAPI()

    dt = 0.1  # 10 Hz update rate
    for t in np.arange(0, 30, dt):
        accel = sensors.get_acceleration()
        pos_meas = sensors.get_position()
        lidar = sensors.get_lidar_ranges()

        # Predict step (assume measured acceleration)
        kf_x.predict(dt, accel=accel[0])
        kf_y.predict(dt, accel=accel[1])
        kf_z.predict(dt, accel=accel[2])

        # Update step (use external position measurement)
        kf_x.update(pos_meas[0])
        kf_y.update(pos_meas[1])
        kf_z.update(pos_meas[2])

        # Terrain masking
        terrain = terrain_elevation(kf_x.x, kf_y.x)
        altitude_buffer = 10
        if kf_z.x < terrain + altitude_buffer:
            kf_z.x = terrain + altitude_buffer
            print(f"Terrain masking: Adjusted altitude to {kf_z.x:.1f}m")

        # Obstacle avoidance
        obstacle, angle = obstacle_avoid.check_obstacles(lidar)
        if obstacle:
            turn = obstacle_avoid.avoidance_direction(angle)
            print(f"Obstacle at {angle}°, turning {turn}")

        # (Replace print with drone control command)
        print(f"t={t:.1f}s | pos=({kf_x.x:.2f}, {kf_y.x:.2f}, {kf_z.x:.2f})")

if __name__ == "__main__":
    main()

Terrain masking: Adjusted altitude to 40.1m
t=0.0s | pos=(0.20, 0.35, 40.09)
Terrain masking: Adjusted altitude to 39.9m
t=0.1s | pos=(-0.11, 0.20, 39.95)
Terrain masking: Adjusted altitude to 39.9m
t=0.2s | pos=(-0.25, 0.17, 39.89)
Terrain masking: Adjusted altitude to 40.1m
t=0.3s | pos=(0.13, -0.04, 40.06)
Terrain masking: Adjusted altitude to 40.0m
t=0.4s | pos=(0.06, -0.02, 40.03)
Terrain masking: Adjusted altitude to 40.1m
t=0.5s | pos=(0.13, 0.10, 40.06)
Terrain masking: Adjusted altitude to 40.1m
t=0.6s | pos=(0.12, -0.05, 40.05)
Terrain masking: Adjusted altitude to 40.1m
t=0.7s | pos=(0.26, 0.12, 40.12)
Terrain masking: Adjusted altitude to 40.0m
t=0.8s | pos=(-0.02, 0.16, 39.99)
Terrain masking: Adjusted altitude to 39.9m
t=0.9s | pos=(-0.14, 0.39, 39.94)
Terrain masking: Adjusted altitude to 40.0m
t=1.0s | pos=(-0.04, 0.29, 39.98)
Terrain masking: Adjusted altitude to 40.0m
t=1.1s | pos=(0.07, 0.35, 40.03)
Terrain masking: Adjusted altitude to 40.0m
t=1.2s | pos=(0.06, 0.22

How This Works
1. Kalman Filters for Sensor Fusion
Each axis (x, y, z) has its own Kalman filter, fusing acceleration and position measurements to estimate position and velocity, robust to noise and drift.
2. Obstacle Avoidance
Simulated LIDAR returns distances; if any reading is below the min_dist, an obstacle is detected and a turn direction is chosen.
3. Terrain Masking
The drone’s altitude is always kept above the local terrain plus a buffer, using a digital elevation model (DEM).
4. Real Sensor API Integration
Replace the RealSensorAPI stubs with actual calls to hardware (quantum IMU, magnetometer, LIDAR, etc.).
To Integrate Real Sensors:

Use the SDK or API provided by the sensor manufacturer for quantum IMU, LIDAR, etc.
Replace the methods in RealSensorAPI with actual sensor reads (e.g., via USB, UART, CAN, or ROS topics).