In [None]:
import numpy as np
import pandas as pd
import json
import matplotlib.pyplot as plt
from pathlib import Path

from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
import json, requests
from dateutil import parser as dateutil_parser
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def safe_parse_timestamp(x):
    if isinstance(x, pd.Timestamp):
        return x.tz_convert('UTC') if x.tzinfo else x.tz_localize('UTC')
    try:
        ts = pd.to_datetime(x, utc=True, infer_datetime_format=True)
        if pd.isna(ts):
            raise ValueError()
        return ts
    except:
        dt = dateutil_parser.parse(x)
        ts = pd.Timestamp(dt)
        if ts.tzinfo is None:
            ts = ts.tz_localize('UTC')
        else:
            ts = ts.tz_convert('UTC')
        return ts

def extract_segments_from_gt(gt_json):
    segs = gt_json.get('segments', gt_json)
    out=[]
    for s in segs:
        start = safe_parse_timestamp(s['start'])
        end   = safe_parse_timestamp(s['end'])
        label = s.get('label',0)
        out.append({'start':start,'end':end,'label':label})
    return out

def labels_from_segments(timestamps, segments, fill_label=0):
    ts = pd.to_datetime(timestamps, utc=True)
    labels = np.full(len(ts), fill_label, dtype=object)
    for seg in segments:
        mask = (ts>=seg['start']) & (ts<seg['end'])
        labels[mask]=seg['label']
    uniq=[u for u in np.unique(labels) if u!=fill_label]
    label_map={lbl:i+1 for i,lbl in enumerate(uniq)}
    label_map[fill_label]=0
    int_labels=np.array([label_map.get(l,0) for l in labels])
    return int_labels, label_map


In [None]:
DATA_PATH = Path("table_task_UR5e_data.csv")
GT_PATH   = Path("table_task_UR5e_ground_truth.json")

df = pd.read_csv(DATA_PATH)
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df.head()

In [None]:
with open(GT_PATH, "r") as f:
    gt = json.load(f)

segments = []
for seg_name, seg_info in gt.items():
    segments.append({
        "label": seg_info["label_id"],
        "start": pd.to_datetime(seg_info["start"], errors="coerce"),
        "end":   pd.to_datetime(seg_info["end"],   errors="coerce")
    })

segments[:3]

In [None]:
labels = np.zeros(len(df), dtype=int)

for seg in segments:
    mask = (df["timestamp"] >= seg["start"]) & (df["timestamp"] <= seg["end"])
    labels[mask] = seg["label"]

df["label"] = labels
df.head()

In [None]:
features = ["x", "y", "z"]
X = df[features].values
y = df["label"].values

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
SEQ_LEN = 50

def make_sequences(X, y, seq_len=SEQ_LEN):
    X_out, y_out = [], []
    for i in range(0, len(X)-seq_len):
        X_out.append(X[i:i+seq_len])
        y_out.append(y[i:i+seq_len])
    return X_out, y_out

X_seq, y_seq = make_sequences(X_scaled, y)

In [None]:
split = int(0.8 * len(X_seq))

X_train = X_seq[:split]
y_train = y_seq[:split]

X_test  = X_seq[split:]
y_test  = y_seq[split:]

In [None]:
def pad_sequences(X, y):
    max_len = max(len(seq) for seq in X)

    X_pad, y_pad = [], []
    for seq, labels in zip(X, y):
        pad_len = max_len - len(seq)

        seq_pad = np.pad(seq, ((0, pad_len), (0, 0)), mode='constant')
        lab_pad = np.pad(labels, (0, pad_len), mode='constant')

        X_pad.append(seq_pad)
        y_pad.append(lab_pad)

    return np.array(X_pad), np.array(y_pad)

X_train_pad, y_train_pad = pad_sequences(X_train, y_train)
X_test_pad,  y_test_pad  = pad_sequences(X_test,  y_test)

X_train_t = torch.tensor(X_train_pad, dtype=torch.float32).to(device)
y_train_t = torch.tensor(y_train_pad, dtype=torch.long).to(device)
X_test_t  = torch.tensor(X_test_pad,  dtype=torch.float32).to(device)

In [None]:
class GRUSegmenter(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_classes):
        super().__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):
        out, _ = self.gru(x)
        return self.fc(out)

In [None]:
input_dim = X_train_t.shape[2]
hidden_dim = 64
num_classes = len(np.unique(y))

model = GRUSegmenter(input_dim, hidden_dim, num_classes).to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

model

In [None]:
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train_t)
    loss = criterion(
        outputs.reshape(-1, num_classes),
        y_train_t.reshape(-1)
    )

    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs} | Loss: {loss.item():.4f}")

In [None]:
model.eval()
with torch.no_grad():
    logits = model(X_test_t)
    y_pred_pad = logits.argmax(dim=2).cpu().numpy()

# unpad
y_pred = []
for pred_seq, true_seq in zip(y_pred_pad, y_test):
    y_pred.append(pred_seq[:len(true_seq)])

len(y_pred), len(y_pred[0])

In [None]:
plt.figure(figsize=(14, 4))
plt.plot(y_test[0], label="True", linewidth=2)
plt.plot(y_pred[0], label="Predicted", linestyle="--")
plt.legend()
plt.title("Example Sequence â€” True vs Predicted Labels")
plt.grid(True)
plt.show()

In [None]:
y_pred_flat = np.concatenate(y_pred)

plt.figure(figsize=(16, 4))
plt.plot(y, label="Ground Truth")
plt.plot(range(len(y_pred_flat)), y_pred_flat, label="RNN Predicted", alpha=0.7)
plt.legend()
plt.title("Full-Timeline Segmentation")
plt.show()