<a href="https://colab.research.google.com/github/matteraggi/maritimeAIS/blob/main/TrainingAIS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Install + Import**

In [None]:
# === Installazioni necessarie su Colab ===
!pip install pandas numpy pyarrow torch matplotlib scikit-learn

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# **Caricamento dataset**

In [None]:
df = pd.read_parquet("ais_preprocessed.parquet")
print(df.shape)
print(df.head())

# **Creazione finestre temporali**

In [None]:
SEQUENCE_LEN = 60
FEATURES = ["X", "Y", "SOG", "COG", "Heading"]

def create_sequences(df, seq_len=SEQUENCE_LEN):
    Xs, ys = [], []
    for mmsi, group in df.groupby("MMSI"):
        data = group[FEATURES].values
        for i in range(len(data) - seq_len):
            Xs.append(data[i:i+seq_len])
            ys.append(data[i+seq_len, :2])  # prevedi solo X,Y
    return np.array(Xs), np.array(ys)

X, y = create_sequences(df)
print(X.shape, y.shape)


# **Divisione train/test**

In [None]:
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

print(f"Train: {len(X_train)}  Test: {len(X_test)}")

# **LSTM model**

In [None]:
class TrajectoryLSTM(nn.Module):
    def __init__(self, input_size=5, hidden_size=64, output_size=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # ultima uscita
        return out

model = TrajectoryLSTM()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# **Training**

In [None]:
EPOCHS = 10
BATCH_SIZE = 128

def to_tensor(a):
    return torch.tensor(a, dtype=torch.float32)

X_train_t, y_train_t = to_tensor(X_train), to_tensor(y_train)
X_test_t, y_test_t = to_tensor(X_test), to_tensor(y_test)

for epoch in range(EPOCHS):
    model.train()
    idx = torch.randperm(len(X_train_t))
    for i in range(0, len(X_train_t), BATCH_SIZE):
        batch_idx = idx[i:i+BATCH_SIZE]
        Xb, yb = X_train_t[batch_idx], y_train_t[batch_idx]

        optimizer.zero_grad()
        pred = model(Xb)
        loss = criterion(pred, yb)
        loss.backward()
        optimizer.step()

    # valutazione
    model.eval()
    with torch.no_grad():
        test_pred = model(X_test_t)
        test_loss = criterion(test_pred, y_test_t)
    print(f"Epoch {epoch+1}/{EPOCHS} | Train loss: {loss.item():.5f} | Test loss: {test_loss.item():.5f}")

# **Rilevazione anomalie (spoofing simulato)**

In [None]:
# Simula attacco
X_attack = X_test.copy()
drift = 500 / 1000  # metri → unità normalizzate ~ scala 1
X_attack[:, :, 0] += drift  # spostamento in X

X_attack_t = to_tensor(X_attack)

# Confronta errori predizione
model.eval()
with torch.no_grad():
    pred_clean = model(X_test_t).numpy()
    pred_attack = model(X_attack_t).numpy()

err_clean = np.mean((pred_clean - y_test)**2, axis=1)
err_attack = np.mean((pred_attack - y_test)**2, axis=1)

# **Visualizzazione detection**

In [None]:
plt.hist(err_clean, bins=50, alpha=0.6, label='normale')
plt.hist(err_attack, bins=50, alpha=0.6, label='spoofing')
plt.title("Distribuzione errore di predizione (MSE)")
plt.xlabel("Errore quadratico medio")
plt.ylabel("Conteggio")
plt.legend()
plt.show()

threshold = np.percentile(err_clean, 99)
detections = np.mean(err_attack > threshold)
print(f"Tasso di rilevamento anomalie: {detections*100:.1f}%")

**Output atteso**

Grafico MSE: la curva “spoofing” sarà spostata a destra → errore maggiore.

Tasso rilevamento: >90% con drift marcato.