In [1]:
import numpy as np

# Load input (features) and output (targets)
data = np.load("X.npy")  # Shape: (samples, time_steps, features)
target = np.load("ynpy.npy")  # Shape: (samples, output_dim)

print(f"Data shape: {data.shape}")  # Should be (samples, 5, 3)
print(f"Target shape: {target.shape}")  # Should be (samples, 3)

In [None]:
import tensorflow as tf


In [None]:

import numpy as np

def create_time_windows(data, target, input_window=5):
    """
    Convert long time-series data into overlapping sequences for LSTM training.

    Args:
    - data (np.array): Input data of shape (runs, timesteps, features)
    - target (np.array): Target data of shape (runs, timesteps, features)
    - input_window (int): Number of time steps per sequence

    Returns:
    - X: Input sequences of shape (samples, input_window, features)
    - y: Corresponding target values of shape (samples, output_dim)
    """
    X, y = [], []

    num_runs, timesteps, num_features = data.shape

    for run in range(num_runs):
        for i in range(timesteps - input_window):
            # Extract input time window
            X.append(data[run, i:i+input_window, :])
            # Extract target (next time step after window)
            y.append(target[run, i+input_window, :])

    return np.array(X), np.array(y)

# Convert data into time-windowed sequences
X, y = create_time_windows(data, target, input_window=5)

print(f"Processed Data Shape: {X.shape}")  # (samples, 5, 3)
print(f"Processed Labels Shape: {y.shape}")  # (samples, 3)

Processed Data Shape: (391250, 5, 3)
Processed Labels Shape: (391250, 3)


In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np

# Initialize scalers
scaler = MinMaxScaler()  # Use MinMaxScaler for GPS coordinates
imu_scaler = StandardScaler()  # Use StandardScaler for IMU data if necessary

# Flatten the data to fit the scaler
X_flattened = X.reshape(-1, X.shape[-1])
y_flattened = y.reshape(-1, y.shape[-1])

# Apply normalization
X_scaled = scaler.fit_transform(X_flattened)
y_scaled = scaler.fit_transform(y_flattened)

# Reshape back to original dimensions
X_scaled = X_scaled.reshape(X.shape)
y_scaled = y_scaled.reshape(y.shape)

In [None]:
from sklearn.model_selection import train_test_split

# Split data (80% Train, 20% Test)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42, shuffle=True)

# Print shapes
print(f"X_train shape: {X_train.shape}, X_test shape: {X_test.shape}")
print(f"y_train shape: {y_train.shape}, y_test shape: {y_test.shape}")

X_train shape: (313000, 5, 3), X_test shape: (78250, 5, 3)
y_train shape: (313000, 3), y_test shape: (78250, 3)


In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Dot, Activation, Concatenate, Flatten

# Define input shapes
time_steps = X.shape[1]  # Sequence length (past 5 seconds of data)
num_features = X.shape[2]  # Number of input features per time step (IMU + GPS)
output_dim = y.shape[1]
# Define inputs
inputs = Input(shape=(time_steps, num_features), name="Input_Features")

# Encoder (LSTM)
encoder_lstm = LSTM(64, return_sequences=True, return_state=True, name="Encoder_LSTM")
encoder_outputs, state_h, state_c = encoder_lstm(inputs)

# Attention Mechanism (Local Attention)
# Calculate attention scores (alignment scores)
attention_scores = Dense(1, activation='tanh', name="Attention_Scores")(encoder_outputs)

#Normalize scores to obtain attention weights
attention_weights = Activation('softmax', name="Attention_Weights")(attention_scores)

#Compute the context vector as the weighted sum of encoder outputs
context_vector = Dot(axes=1, name="Context_Vector")([attention_weights, encoder_outputs])

# Decoder (Fully Connected Layer)
decoder_dense = Dense(64, activation='relu', name="Decoder_Dense")(context_vector)

# Output Layer (Latitude, Longitude, Altitude)
output_layer = Dense(3, activation='linear', name="Trajectory_Output")(decoder_dense)

model = Model(inputs=inputs, outputs=output_layer, name="Trajectory_Prediction_Model")
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
model.summary()

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

# Early stopping to monitor validation loss and stop if no improvement
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Train the model
history = model.fit(
    X_train, y_train,
    epochs=150,  # Maximum epochs
    batch_size=64,
    validation_split=0.2,  # 20% of training data for validation
    callbacks=[early_stopping]  # Use early stopping
)

# Save the trained model
model.save('trajectory_prediction_model.h5')

Epoch 1/150




[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 6ms/step - loss: 0.0943 - mae: 0.2505 - val_loss: 0.0874 - val_mae: 0.2443
Epoch 2/150
[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 6ms/step - loss: 0.0877 - mae: 0.2439 - val_loss: 0.0875 - val_mae: 0.2408
Epoch 3/150
[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 6ms/step - loss: 0.0875 - mae: 0.2433 - val_loss: 0.0874 - val_mae: 0.2414
Epoch 4/150
[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 6ms/step - loss: 0.0876 - mae: 0.2439 - val_loss: 0.0874 - val_mae: 0.2450
Epoch 5/150
[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 6ms/step - loss: 0.0877 - mae: 0.2440 - val_loss: 0.0876 - val_mae: 0.2453
Epoch 6/150
[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 6ms/step - loss: 0.0875 - mae: 0.2439 - val_loss: 0.0874 - val_mae: 0.2431
Epoch 7/150
[1m3913/3913[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s



In [None]:
# Evaluate the model on the test set
test_loss, test_mae = model.evaluate(X_test, y_test, verbose=1)

print(f"Test Loss (MSE): {test_loss}")
print(f"Test Mean Absolute Error (MAE): {test_mae}")

In [None]:
import matplotlib.pyplot as plt

# Predict the test set
y_pred = model.predict(X_test)
y_pred = np.squeeze(y_pred)
# Compare predictions with ground truth
print("Predicted values:", y_pred[:5])  # First 5 predictions
print("Actual values:", y_test[:5])    # First 5 ground truth values

# Plotting predictions vs. actual values
plt.figure(figsize=(10, 6))
plt.plot(y_test[:, 0], label="Actual Latitude", color='blue')  # Latitude
plt.plot(y_pred[:, 0], label="Predicted Latitude", color='cyan', linestyle='dashed')
plt.title("Latitude: Actual vs Predicted")
plt.legend()
plt.show()

# Similarly for Longitude and Altitude
plt.figure(figsize=(10, 6))
plt.plot(y_test[:, 1], label="Actual Longitude", color='green')  # Longitude
plt.plot(y_pred[:, 1], label="Predicted Longitude", color='lime', linestyle='dashed')
plt.title("Longitude: Actual vs Predicted")
plt.legend()
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(y_test[:, 2], label="Actual Altitude", color='red')  # Altitude
plt.plot(y_pred[:, 2], label="Predicted Altitude", color='orange', linestyle='dashed')
plt.title("Altitude: Actual vs Predicted")
plt.legend()
plt.show()

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import numpy as np

# Compute MAE, RMSE, and R^2 score for Latitude, Longitude, and Altitude
mae = mean_absolute_error(y_test, y_pred, multioutput='raw_values')
rmse = np.sqrt(mean_squared_error(y_test, y_pred, multioutput='raw_values'))
r2 = r2_score(y_test, y_pred, multioutput='raw_values')

# Print results
print(f"Mean Absolute Error (MAE): Latitude={mae[0]:.6f}, Longitude={mae[1]:.6f}, Altitude={mae[2]:.6f}")
print(f"Root Mean Squared Error (RMSE): Latitude={rmse[0]:.6f}, Longitude={rmse[1]:.6f}, Altitude={rmse[2]:.6f}")
print(f"R² Score: Latitude={r2[0]:.6f}, Longitude={r2[1]:.6f}, Altitude={r2[2]:.6f}")