# Notes on Distributed Interactive Simulation (DIS) and High-Level Architecture (HLA)

In distributed simulations like *Modular Semi-Automated Forces* (ModSAF) and those using DMSO (Defense Modeling and Simulation Office) standards like DIS (Distributed Interactive Simulation) or HLA (High-Level Architecture), the primary method used to calculate the location of objects between updates is Dead Reckoning.

*The Dead Reckoning Concept*<br>
Dead reckoning allows each simulator to maintain a "ghost" or "proxy" version of entities controlled by other simulators.
<br>Instead of sending position updates every frame (which would overwhelmed network bandwidth), simulators send updates only when the entity's actual movement deviates significantly from its predicted location/motion.


**How to Calculate the next Location of a moving entity**
The calculation depends on the order of the dead reckoning algorithm used by the entity.
The standard DIS protocol (IEEE 1278) defines several specific algorithms, but they generally fall into three categories:

1. **Static (Zeroth-Order)**
Used for stationary objects. The position remains constant until a new update is received.
$$P(t)=P(t_0)$$
1. **Linear (First-Order)**
Used for objects moving at a constant velocity. It assumes the object continues in a straight line at the same speed.
$$P(t)=P(t_0)+V(t_0)⋅Δt$$
where
 - $P(t_0)$: Last reported position.
 - $V(t_0)$: Last reported velocity.
 - $Δt$: Time elapsed since the last update at $t_0$.
1. **Quadratic (Second-Order)**
Used for maneuvering objects (e.g., a turning aircraft or accelerating bike). This incorporates acceleration to provide a curved or changing path.

$$P(t)=P(t_0)+V(t_0)⋅Δt+ \frac{1}{2} ​A(t_0)⋅(Δt)^2$$

$A(t_0)$: Last reported acceleration.




Key Implementation Steps
Simulation follows this workflow:
1. **Extract Parameters**: When a PDU (Protocol Data Unit) or Object Update arrives, extract the *Location*, *Velocity*, *Acceleration*, *Orientation*, and the *Dead Reckoning Algorithm ID*.
1. **Synchronize Clocks**: Calculate Δt using the difference between the current simulation time and the timestamp of the last update.
1. **Extrapolate**: Apply the appropriate Dead Reckoning Algorithm to find the new X,Y,Z coordinates.
1. **Smoothing**: When a new "real" update arrives, the object might "jump" from its *predicted current location* to its *actual one*. To prevent jump, use **Convergence** (gradually moving the proxy toward the true position over a few frames) rather than snapping it instantly.

**When is a New Update Sent?**

The local simulator also runs its own dead reckoning model for its own entities. It compares the "true" position to the "dead reckoned" position. If the distance (error) exceeds a certain threshold (e.g., 1 meter for a bike, or a specific angular threshold for rotation), a new update is broadcasted to the network.

In [1]:
import time

class DeadReckoningEntity:
    def __init__(self):
        # State variables (Last known updates from the network)
        self.p0 = [0.0, 0.0, 0.0]  # Position (x, y, z)
        self.v0 = [0.0, 0.0, 0.0]  # Velocity (vx, vy, vz)
        self.a0 = [0.0, 0.0, 0.0]  # Acceleration (ax, ay, az)
        self.last_update_time = time.time()

    def receive_network_update(self, pos, vel, accel):
        """Called when a DIS PDU or HLA update arrives."""
        self.p0 = pos
        self.v0 = vel
        self.a0 = accel
        self.last_update_time = time.time()

    def get_current_location(self):
        """Calculates the extrapolated position at the current moment."""
        delta_t = time.time() - self.last_update_time

        # Second-order extrapolation: P = P0 + V0*t + 0.5 * A0 * t^2
        current_pos = []
        for i in range(3):
            extrapolated = (
                self.p0[i] +
                (self.v0[i] * delta_t) +
                (0.5 * self.a0[i] * (delta_t ** 2))
            )
            current_pos.append(round(extrapolated, 3))

        return current_pos

# --- Example Usage ---
# 1. Initialize entity
bike_proxy = DeadReckoningEntity()

# 2. Simulate receiving a network update (Entity is at origin, moving at 10m/s with 2m/s^2 accel)
bike_proxy.receive_network_update(pos=[0, 0, 0], vel=[10, 0, 0], accel=[2, 0, 0])

# 3. Predict position over 3 seconds without new network updates
print("Extrapolating local position...")
for _ in range(4):
    print(f"Time: {round(time.time() - bike_proxy.last_update_time, 1)}s | Position: {bike_proxy.get_current_location()}")
    time.sleep(1)


Extrapolating local position...
Time: 0.0s | Position: [0.003, 0.0, 0.0]
Time: 1.0s | Position: [11.012, 0.0, 0.0]
Time: 2.0s | Position: [24.024, 0.0, 0.0]
Time: 3.0s | Position: [39.084, 0.0, 0.0]


**Heartbeats**: Even if an object doesn't move beyond the threshold, DMSO standards usually require a "Heartbeat" PDU (often every 5 to 60 seconds) to let other simulators know the entity hasn't crashed or disconnected.

**Rotation (Orientation)**: In distributed simulation, you must also dead reckon Euler angles (Psi, Theta, Phi). This is usually done using angular velocity (body rates).
$$Orientation(t)=Orientation(t_0)+AngularVelocity⋅Δt$$


**Smoothing the "Jump"**

When receive_network_update is called, ghost simulator shouldn't just overwrite the position. If the dead reckoning was slightly off, the object will **teleport**.

To fix such jump, receiving simulator should calculate the **error vector** between **current predicted position** and the new update, then apply a small percentage of that error over the next few frames until the two positions converge. This is often called Quat/Vector Interpolation (Slerp/Lerp) or Convergence.

To prevent the "teleporting" effect when a new network update arrives, the following code uses *Linear Convergence*. Instead of snapping to the new position, we maintain a "display" position that quickly slides toward the new "true" position over a short period (usually 0.5 to 2.0 seconds).


In [14]:
import time

class SmoothedEntity:
    def __init__(self, convergence_time=1.0):
        # Truth state (from Dead Reckoning)
        self.p_net = [0.0, 0.0, 0.0]
        self.v_net = [0.0, 0.0, 0.0]
        self.a_net = [0.0, 0.0, 0.0]
        self.last_update_time = time.time()

        # Smoothing state
        self.display_pos = [0.0, 0.0, 0.0]
        self.offset = [0.0, 0.0, 0.0]
        self.convergence_time = convergence_time # Seconds to fix the "jump"

    def receive_network_update(self, pos, vel, accel):
        # 1. Calculate where we CURRENTLY think we are (the "wrong" prediction)
        current_pred = self._calculate_dr_pos()

        # 2. Update the "Truth" from the network
        self.p_net = pos
        self.v_net = vel
        self.a_net = accel
        self.last_update_time = time.time()

        # 3. Calculate the error (difference between our prediction and network truth)
        # This offset will be added to the truth and bled off over time.
        for i in range(3):
            self.offset[i] = current_pred[i] - self.p_net[i]

    def _calculate_dr_pos(self):
        """Internal Dead Reckoning calculation."""
        dt = time.time() - self.last_update_time
        return [
            self.p_net[i] + (self.v_net[i] * dt) + (0.5 * self.a_net[i] * dt**2)
            for i in range(3)
        ]

    def get_display_location(self):
        """The position you actually render on screen."""
        dt = time.time() - self.last_update_time
        dr_pos = self._calculate_dr_pos()

        # Calculate how much of the offset is left
        # (Linear decay: if conv_time is 1s, at 0.5s we show 50% of the offset)
        ratio = max(0, (self.convergence_time - dt) / self.convergence_time)

        self.display_pos = [
            dr_pos[i] + (self.offset[i] * ratio)
            for i in range(3)
        ]
        return self.display_pos

    def get_last_update_time(self):
        return self.last_update_time

    def get_current_motion(self):
        return f"Pos: {self.p_net} V: {self.v_net} A: {self.a_net} @{self.last_update_time:.3f}"

# --- Simulation Execution ---
entity = SmoothedEntity(convergence_time=2.0)

# Initial Update
entity.receive_network_update([0,0,0], [10,0,0], [0,0,0])
time.sleep(2) # Let it travel to approx x=20

# New Update arrives, but dead reckoning was off by 5 meters (Real x is 15)
print(f"Display Position[@{time.time():.3f}]: {entity.get_display_location()} {entity.get_current_motion()}")
print("--- Network Update Received (Correcting Position) ---")
entity.receive_network_update([15,0,0], [8,0,0], [0,0,0])
print(f"Updated: now: {time.time():.3f}  {entity.get_current_motion()}")

# Observe the smooth transition
for _ in range(10):
    print(f"Display Position[@{time.time():.3f}]: {entity.get_display_location()} Δt={time.time()-entity.get_last_update_time():.3f}")
    time.sleep(0.3)



Display Position[@1767056624.343]: [20.006377696990967, 0.0, 0.0] Pos: [0, 0, 0] V: [10, 0, 0] A: [0, 0, 0] @1767056622.342
--- Network Update Received (Correcting Position) ---
Updated: now: 1767056624.344  Pos: [15, 0, 0] V: [8, 0, 0] A: [0, 0, 0] @1767056624.344
Display Position[@1767056624.344]: [20.014567227608495, 0.0, 0.0] Δt=0.000
Display Position[@1767056624.644]: [21.66477531111758, 0.0, 0.0] Δt=0.301
Display Position[@1767056624.945]: [23.317823718853845, 0.0, 0.0] Δt=0.602
Display Position[@1767056625.246]: [24.9704916763352, 0.0, 0.0] Δt=0.903
Display Position[@1767056625.546]: [26.621076992913515, 0.0, 0.0] Δt=1.203
Display Position[@1767056625.847]: [28.27256979693692, 0.0, 0.0] Δt=1.504
Display Position[@1767056626.148]: [29.92514595101119, 0.0, 0.0] Δt=1.804
Display Position[@1767056626.449]: [31.843358993530273, 0.0, 0.0] Δt=2.105
Display Position[@1767056626.750]: [34.248891830444336, 0.0, 0.0] Δt=2.406
Display Position[@1767056627.050]: [36.65176200866699, 0.0, 0.0]

When a new update arrives, the difference between where the local simulation thought the entity was and where the network says it actually is creates an **Error Vector**.

- The "Snap" (Bad): Immediately moving entity from x=20 to x=15.
- The "Smooth" (Good): The entity continues moving forward at  8m/s, but its "visual" position gradually loses those extra 5 meters over the next 1–2 seconds.

Variables to Tune
- **Convergence Time**: If set too short (e.g., 0.1s), user still see a jitter. If set too long (e.g., 5.0s), the entity might appear to be "sliding" sideways or moving unrealistically relative to the terrain.
- **Thresholds**: When to send a new update? For instance, if the error is >1m. Therefore, smoothing algorithm should be optimized to handle errors roughly that size.

**The Threshold Check (When to Send an Update)**

To save bandwidth, a simulator runs a "Ghost" version of itself. Then, it only broadcasts a new PDU if the "True" local position differs from the "Ghost" position by a specific threshold.

In [None]:
#
def should_send_update(true_pos, ghost_pos, true_ori, ghost_ori, distance_threshold=1.5,angle_threshold=3.0):
    """Decides whether or not to send an update.
    :param true_pos: True position
    :param ghost_pos: Ghost position
    :param true_ori: True orientation
    :param ghost_ori: Ghost orientation
    :param distance_threshold: Distance threshold
    :param angle_threshold: Angle threshold
    :return: True or False
    """
    # 1. Distance Threshold (e.g., 1.5 meters)
    distance_err = math.sqrt(sum((t - g)**2 for t, g in zip(true_pos, ghost_pos)))

    # 2. Angular Threshold (e.g., 3.0 degrees)
    # Note: Simplistic version; doesn't account for 359 to 1 degree wrap
    angle_err = max(abs(t - g) for t, g in zip(true_ori, ghost_ori))

    # Send if thresholds are exceeded
    return distance_err > distance_threshold or angle_err > angle_threshold


Threshold values depends on the entity because some maybe slower than the others.

**Smoothing orientation**

Handling angular wrap-around with Euler angles (0° to 360°) is tricky because a small movement from 1° to 359° looks like a massive 358° jump to a simple subtraction script, even though it’s only a 2° change.

Quaternions represent orientation as a four-dimensional vector (q=
[w,x,y,z]). They don't suffer from gimbal lock and make calculating the "difference" between two rotations mathematically elegant.
The "error" between two orientations is the angle of the relative rotation required to get from the Ghost orientation to the True orientation

In [None]:
import math

def get_quaternion_error(q_true, q_ghost):
    """
    Calculates the angular difference (in radians) between two quaternions.
    Quaternions expected as [w, x, y, z] normalized.
    """
    # 1. Calculate the dot product
    dot = sum(t * g for t, g in zip(q_true, q_ghost))

    # 2. Ensure dot product is within valid acos range [-1, 1]
    dot = max(-1.0, min(1.0, dot))

    # 3. The angle between them is 2 * acos(|dot|)
    # We use abs(dot) because q and -q represent the same rotation
    angle_rad = 2 * math.acos(min(abs(dot), 1.0))

    return math.degrees(angle_rad)

# Usage in Threshold Check:
# if get_quaternion_error(q_true, q_ghost) > profile["ori_threshold"]:
#     send_update()


When a remote update arrives with a new orientation, don't use linear interpolation (LERP) on Euler angles, as it causes "wobbling."

SLERP (Spherical Linear Interpolation) moves the object along the shortest arc on the surface of a 4D sphere, providing perfectly smooth rotation.
SLERP maintains a constant angular velocity, preventing the rotation from looking "rubbery" or inconsistent.


In [None]:
import math
import time

def slerp(q1, q2, t):
    """
    Spherical Linear Interpolation between two quaternions [w, x, y, z].
    t is the interpolation factor from 0.0 to 1.0.
    """
    # 1. Compute the dot product (cosine of the angle between quaternions)
    dot = sum(a * b for a, b in zip(q1, q2))

    # 2. If the dot product is negative, SLERP takes the long way around.
    # Flip one quaternion to take the short way.
    if dot < 0.0:
        q1 = [-x for x in q1]
        dot = -dot

    # 3. Use linear interpolation if the quaternions are very close to avoid division by zero
    if dot > 0.9995:
        res = [q1[i] + t * (q2[i] - q1[i]) for i in range(4)]
        # Normalize
        norm = math.sqrt(sum(x*x for x in res))
        return [x/norm for x in res]

    # 4. Standard SLERP calculation
    theta_0 = math.acos(dot)        # Angle between input vectors
    theta = theta_0 * t             # Angle between q1 and result

    sin_theta_0 = math.sin(theta_0)
    sin_theta = math.sin(theta)

    s1 = math.cos(theta) - dot * sin_theta / sin_theta_0
    s2 = sin_theta / sin_theta_0

    return [(s1 * q1[i]) + (s2 * q2[i]) for i in range(4)]

class RemoteEntityRotation:
    def __init__(self, convergence_time=1.0):
        self.q_net = [1, 0, 0, 0]    # The "Truth" from network updates
        self.q_display = [1, 0, 0, 0] # What we show on screen
        self.last_update = time.time()
        self.convergence_time = convergence_time

    def receive_update(self, new_q):
        # When a PDU arrives, we don't snap.
        # We store the current display position to start a new interpolation.
        self.q_start = self.q_display
        self.q_net = new_q
        self.last_update = time.time()

    def get_render_orientation(self):
        elapsed = time.time() - self.last_update
        # Calculate t (0.0 to 1.0) based on convergence window
        t = min(1.0, elapsed / self.convergence_time)

        self.q_display = slerp(self.q_display, self.q_net, t)
        return self.q_display

# --- Example Usage ---
entity = RemoteEntityRotation(convergence_time=0.5) # 0.5s to fix rotation jumps
# Simulate a 90-degree turn update arriving
entity.receive_update([0.707, 0, 0.707, 0])

print("Smoothing rotation...")
for _ in range(5):
    print(f"Current Render Quat: {[round(x,3) for x in entity.get_render_orientation()]}")
    time.sleep(0.1)


This packet (the PDU) carries the current state and the "Dead Reckoning Parameters" that tell other simulators how to predict your movement.
The following structure is based on the IEEE 1278 standard but simplified The Entity State Packet (as a Python Class).

In [None]:
import time

class EntityStatePDU:
    def __init__(self, entity_id, entity_type):
        self.entity_id = entity_id
        self.entity_type = entity_type  # e.g., "Bicycle", "Bike", "Car", "Drone",...
        self.timestamp = time.time()

        # State Data
        self.pos = [0.0, 0.0, 0.0]     # World Coordinates (X, Y, Z)
        self.ori = [1.0, 0.0, 0.0, 0.0] # Orientation (Quaternion w, x, y, z)

        # Dead Reckoning Parameters (The "Intent")
        self.vel = [0.0, 0.0, 0.0]     # Linear Velocity
        self.accel = [0.0, 0.0, 0.0]   # Linear Acceleration
        self.ang_vel = [0.0, 0.0, 0.0] # Angular Velocity (Body Rates)
        self.dr_algorithm = 4          # 4 = High fidelity (DRM_RVW in DIS)

    def to_dict(self):
        """Serialize to dictionary for JSON network transmission."""
        return self.__dict__

    @classmethod
    def from_dict(cls, data):
        """Create a PDU object from received network data."""
        pdu = cls(data['entity_id'], data['entity_type'])
        pdu.__dict__.update(data)
        return pdu


The Broadcaster (delivery agent sim) simulates local entity and sends out its state whenever it moves too far from its "ghost."


In [None]:
import socket
import json
import time

# Simulation Settings
DEST_IP = "127.0.0.1"
PORT = 5005
threshold = 1.0

# Initial State
pos_true = [0.0, 0.0, 0.0]
pos_ghost = [0.0, 0.0, 0.0]
vel = [5.0, 0.0, 0.0] # Moving at 5m/s in X

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

print(f"Broadcasting to {DEST_IP}:{PORT}...")

try:
    while True:
        # 1. Update Physics
        pos_true[0] += vel[0] * 0.1

        # 2. Check Threshold (Distance between True and Ghost)
        dist_err = abs(pos_true[0] - pos_ghost[0])

        if dist_err > threshold:
            # 3. Create PDU
            pdu = {
                "id": "delivery_agent_123",
                "pos": pos_true,
                "vel": vel,
                "timestamp": time.time()
            }
            # 4. Send over UDP
            message = json.dumps(pdu).encode('utf-8')
            sock.sendto(message, (DEST_IP, PORT))

            # 5. Sync Ghost
            pos_ghost = list(pos_true)
            print(f"Sent Update: {pos_true}")

        time.sleep(0.1)
except KeyboardInterrupt:
    print("Broadcaster stopped.")


The Listener (Remote Viewer; User's Active Device) receives the PDUs and uses the Dead Reckoning + Convergence logic we built to show a smooth path.

In [None]:
import socket
import json
import time

PORT = 5005
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", PORT))
sock.setblocking(False)

remote_pos_dr = [0.0, 0.0, 0.0]
remote_vel = [0.0, 0.0, 0.0]
last_pdu_time = time.time()

print(f"Listening on port {PORT}...")

try:
    while True:
        # 1. Try to receive a new PDU
        try:
            data, addr = sock.recvfrom(1024)
            pdu = json.loads(data.decode('utf-8'))

            # Update the model with new network truth
            remote_pos_dr = pdu['pos']
            remote_vel = pdu['vel']
            last_pdu_time = pdu['timestamp']
            print(f"--- Received PDU: {remote_pos_dr} ---")
        except BlockingIOError:
            pass # No new data, just continue extrapolating

        # 2. Dead Reckon locally
        dt = time.time() - last_pdu_time
        current_x = remote_pos_dr[0] + (remote_vel[0] * dt)

        print(f"Extrapolated Position: {current_x:.2f}", end='\r')
        time.sleep(0.05)

except KeyboardInterrupt:
    print("\nListener stopped.")
