In [None]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from google.colab import drive

# Mount Google Drive
drive.flush_and_unmount()
drive.mount('/content/drive', force_remount=True)

# Custom PyTorch Dataset
class GaitDataset(Dataset):
    def __init__(self, data, window_size=100, stride=20):
        self.window_size = window_size
        self.stride = stride

        # Features and targets
        self.feature_cols = [
            'left_hip_angle',
            'left_knee_angle',
            'right_hip_angle',
            'right_knee_angle'
        ]
        self.target_cols = [
            'locomotion_mode',
            'terrain_info',
            'gait_phase'
        ]

        # Validate columns
        missing_features = [col for col in self.feature_cols if col not in data.columns]
        missing_targets = [col for col in self.target_cols if col not in data.columns]
        if missing_features or missing_targets:
            raise ValueError(f"Missing columns: Features={missing_features}, Targets={missing_targets}")

        # Convert to numpy arrays
        self.features = data[self.feature_cols].values.astype(np.float32)
        self.labels = data[self.target_cols].values
        self.samples = self._create_windows()

    def _create_windows(self):
        windows = []
        max_start = len(self.features) - self.window_size + 1
        for i in range(0, max_start, self.stride):
            window_features = self.features[i:i+self.window_size]
            window_labels = self.labels[i+self.window_size-1]  # Last label in window
            windows.append((window_features, window_labels))
        return windows

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        features, labels = self.samples[idx]
        return torch.tensor(features, dtype=torch.float32), torch.tensor(labels, dtype=torch.long)

# Custom KNN Model in PyTorch
class PyTorchKNN:
    def __init__(self, k=5):
        self.k = k
        self.X_train = None
        self.y_train = None

    def fit(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train

    def predict(self, X_test):
        predictions = []
        for x in X_test:
            # Compute L2 distance (Euclidean)
            distances = torch.norm(self.X_train - x, dim=1)
            # Get k-nearest neighbors
            _, indices = torch.topk(distances, self.k, largest=False)
            # Vote for the most common label
            neighbor_labels = self.y_train[indices]
            unique_labels, counts = torch.unique(neighbor_labels, return_counts=True)
            pred_label = unique_labels[torch.argmax(counts)]
            predictions.append(pred_label)
        return torch.tensor(predictions)

# Meta-Learning KNN Trainer
class MetaKNN:
    def __init__(self):
        self.models = {
            'locomotion': PyTorchKNN(k=5),
            'terrain': PyTorchKNN(k=5),
            'gait_phase': PyTorchKNN(k=5)
        }
        self.encoders = {
            'locomotion': LabelEncoder(),
            'terrain': LabelEncoder(),
            'gait_phase': LabelEncoder()
        }

    def train(self, X_train, y_locomotion, y_terrain, y_gait_phase, max_epochs=100, target_accuracy=0.9):
        # Encode categorical labels
        y_locomotion_enc = self.encoders['locomotion'].fit_transform(y_locomotion)
        y_terrain_enc = self.encoders['terrain'].fit_transform(y_terrain)
        y_gait_enc = self.encoders['gait_phase'].fit_transform(y_gait_phase)

        # Convert to PyTorch tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_locomotion_tensor = torch.tensor(y_locomotion_enc, dtype=torch.long)
        y_terrain_tensor = torch.tensor(y_terrain_enc, dtype=torch.long)
        y_gait_tensor = torch.tensor(y_gait_enc, dtype=torch.long)

        # Train KNN models
        for epoch in range(max_epochs):
            print(f"Epoch {epoch + 1}/{max_epochs}")

            print("Training locomotion model...")
            self.models['locomotion'].fit(X_train_tensor, y_locomotion_tensor)

            print("Training terrain model...")
            self.models['terrain'].fit(X_train_tensor, y_terrain_tensor)

            print("Training gait phase model...")
            self.models['gait_phase'].fit(X_train_tensor, y_gait_tensor)

            # Evaluate
            train_acc = self.evaluate(X_train, y_locomotion, y_terrain, y_gait_phase)
            print(f"Training Accuracy: {train_acc}")

            # Check for target accuracy
            if all(acc >= target_accuracy for acc in train_acc.values()):
                print(f"Target accuracy achieved in {epoch + 1} epochs")
                break

    def evaluate(self, X_test, y_locomotion, y_terrain, y_gait_phase):
        results = {}

        # Convert test data to PyTorch tensors
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)

        # Predict and decode labels
        for target in ['locomotion', 'terrain', 'gait_phase']:
            y_true = locals()[f"y_{target}"]
            y_pred_enc = self.models[target].predict(X_test_tensor)
            y_pred = self.encoders[target].inverse_transform(y_pred_enc.numpy())

            acc = accuracy_score(y_true, y_pred)
            results[target] = acc

            # Confusion Matrix
            cm = confusion_matrix(y_true, y_pred)
            plt.figure(figsize=(8, 6))
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=self.encoders[target].classes_, yticklabels=self.encoders[target].classes_)
            plt.title(f'Confusion Matrix for {target.capitalize()}')
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.show()

        return results

    def save_predictions(self, X_test, output_path):
        # Convert test data to PyTorch tensors
        X_test_tensor = torch.tensor(X_test, dtype=torch.float32)

        # Predict and decode labels
        terrain_preds = self.models['terrain'].predict(X_test_tensor)
        terrain_preds_decoded = self.encoders['terrain'].inverse_transform(terrain_preds.numpy())

        # Save to Excel
        df = pd.DataFrame({'Incline Prediction': terrain_preds_decoded})
        df.to_excel(output_path, index=False)

# ===== MAIN EXECUTION =====
# Load data
data_dir = "/content/drive/MyDrive/tracedata1"
all_files = [f for f in os.listdir(data_dir) if f.endswith('.csv')]
full_data = pd.concat([pd.read_csv(os.path.join(data_dir, f)) for f in all_files])

# Create dataset
dataset = GaitDataset(full_data, window_size=100, stride=20)
print(f"Created {len(dataset)} sliding window samples")

# Prepare training data
X = np.array([x.numpy().flatten() for x, y in dataset])
y_locomotion = np.array([y[0].item() for x, y in dataset])
y_terrain = np.array([y[1].item() for x, y in dataset])
y_gait = np.array([y[2].item() for x, y in dataset])

# Split data
X_train, X_test, y_lo_train, y_lo_test = train_test_split(X, y_locomotion, test_size=0.2, random_state=42)
_, _, y_tr_train, y_tr_test = train_test_split(X, y_terrain, test_size=0.2, random_state=42)
_, _, y_ga_train, y_ga_test = train_test_split(X, y_gait, test_size=0.2, random_state=42)

# Train and evaluate
meta_knn = MetaKNN()
meta_knn.train(X_train, y_lo_train, y_tr_train, y_ga_train)
results = meta_knn.evaluate(X_test, y_lo_test, y_tr_test, y_ga_test)

# Display results
print("\n=== Evaluation Results ===")
for target, acc in results.items():
    print(f"{target.capitalize()} Accuracy: {acc:.2%}")

# Save predictions to Excel
output_path = "/content/drive/MyDrive/incline_predictions.xlsx"
meta_knn.save_predictions(X_test, output_path)
print(f"Predictions saved to {output_path}")

# Plot Gait Phase and Incline Signals
plt.figure(figsize=(12, 6))
plt.plot(full_data['gait_phase'], label='Gait Phase')
plt.plot(full_data['terrain_info'], label='Incline')
plt.title('Gait Phase and Incline Signals')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.show()