<a href="https://colab.research.google.com/github/mshaik15/NNIK/blob/main/NNIK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import Github repo
from google.colab import files
import zipfile
import os

print('Upload project file')
uploaded = files.upload()

zip = list(uploaded.keys())[0]
with zipfile.ZipFile(zip, 'r') as zip_ref:
  zip_ref.extractall('/content')

print(f'Extracted {zip}')
print('Availale Files/Folders')
for item in os.listdir('/content'):
  print(f'{item}')

Upload project file


Saving NNIK.zip to NNIK.zip
Extracted NNIK.zip
Availale Files/Folders
.config
NNIK.zip
NNIK
sample_data


In [None]:
# Mount to google drive
from google.colab import drive
import sys
from pathlib import Path

drive.mount('/content/drive')

root = Path('/content/NNIK') if Path('/content/NNIK').exists() else Path('/content/NNIK-main')
print(f'Project root - {root}')
print(f'Project exists - {root.exists()}')

remove = [p for p in sys.path if 'NNIK' in p or 'Scripts' in p]
for path in remove:
  sys.path.remove(path)

project_paths = [
    str(root),
    str(root / 'Scripts'),
    str(root / 'Scripts' / 'Models'),
    str(root / 'Scripts' / 'Models' / 'Machine_Learning'),
    str(root / 'Scripts' / 'Models' / 'Traditional'),
]

for path in project_paths:
  if path not in sys.path:
    sys.path.insert(0, path)

print('Paths configured')

Mounted at /content/drive
Project root - /content/NNIK
Project exists - True
Paths configured


In [None]:
# Install dependencies
!pip install -q torch torchvision torchaudio scikit-learn pandas matplotlib numpy tqdm pyyaml scipy

In [None]:
# Import project modules
import importlib.util
import traceback
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import yaml
from datetime import datetime

def import_path(module_name, file_path):
  try:
    if not file_path.exists():
      print(f'File not found - {file_path}')
      return None

    spec = importlib.util.spec_from_file_location(module_name, file_path)
    if spec is None or spec.loader is None:
      print(f'Couldnt create spec for {module_name}')
      return None

    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)
    return module
  except Exception as e:
    print(f'Error importing {module_name} - {str(e)}')
    return None

utils_module = import_path("utils", root / "Scripts" / "utils.py")
data_gen_module = import_path("data_gen", root / "Scripts" / "data_gen.py")
training_module = import_path("training", root / "Scripts" / "training.py")
testing_module = import_path("testing", root / "Scripts" / "testing.py")

if utils_module:
    print("utils.py imported")
if data_gen_module:
    print("data_gen.py imported")
if training_module:
    print("training.py imported")
if testing_module:
    print("testing.py imported")

utils.py imported
data_gen.py imported
training.py imported
testing.py imported


In [None]:
# Quick test (hope it dont break) -> it works!
data_directory = root / 'data'
print(f'data directory - {data_directory}')
print(f'data directory exists - {data_directory.exists()}')

if data_directory.exists():
  print('\nTraining data - ')
  training_directory = data_directory / 'Training'
  if training_directory.exists():
    training_files = list(training_directory.glob('*.json'))
    print(f'Found {len(training_files)} training files')
    for f in sorted(training_files)[:5]:
      print(f.name)
  print('\nTesting data:')
  testing_directory = data_directory / 'Testing'
  if testing_directory.exists():
    testing_files = list(testing_directory.glob('*.json'))
    print(f'Found {len(testing_files)} testing files')
    for f in sorted(testing_files)[:5]:
      print(f.name)



data directory - /content/NNIK/data
data directory exists - True

Training data - 
Found 16 training files
10_training.json
10_training_solutions.json
3_training.json
3_training_solutions.json
4_training.json

Testing data:
Found 16 testing files
10_testing.json
10_testing_solutions.json
3_testing.json
3_testing_solutions.json
4_testing.json


In [None]:
from Scripts.training import load_ik_data, train_all_models, evaluate_all_models, create_results_dataframe
from Scripts.testing import create_models
import time
import torch

dof_range = [3, 4, 5, 6]
sample_limit = 1000

trained_models = {}
summary = []

for dof in dof_range:
    print(f"\nDOF={dof} training")

    train_poses = data_directory / 'Training' / f'{dof}_training.json'
    train_solutions = data_directory / 'Training' / f'{dof}_training_solutions.json'

    if not (train_poses.exists() and train_solutions.exists()):
        print(f"no data for DOF={dof}")
        continue

    X_train, y_train = load_ik_data(train_poses, train_solutions)

    if sample_limit and len(X_train) > sample_limit:
        idx = np.random.choice(len(X_train), sample_limit, replace=False)
        X_train, y_train = X_train[idx], y_train[idx]

    print(f"Training data - {X_train.shape} -> {y_train.shape}")

    models_for_dof = create_models(input_dim=X_train.shape[1], output_dim=y_train.shape[1])

    selected_models = {}
    for name in ['ANN', 'KNN', 'ELM', 'RandomForest', 'SVM', 'GPR', 'MDN', 'CVAE']:
        if name in models_for_dof and models_for_dof[name] is not None:
            selected_models[name] = models_for_dof[name]

    print(f"models: {list(selected_models.keys())}")

    trained_models_dof = {}

    for model_name, model in selected_models.items():
        print(f"\nTraining {model_name}")
        print(f"GPU?: {torch.cuda.is_available()}")

        try:
            start_time = time.time()
            model.fit(X_train, y_train)

            training_time = time.time() - start_time

            trained_models_dof[model_name] = {
                'model': model,
                'training_time': training_time,
                'dof': dof
            }

            print(f"{model_name} trained - {training_time:.2f}s")

            summary.append({
                'dof': dof,
                'model': model_name,
                'training_time': training_time,
                'samples': len(X_train),
                'status': 'success'
            })

        except Exception as e:
            print(f"{model_name} failed - {str(e)}")
            summary.append({
                'dof': dof,
                'model': model_name,
                'training_time': 0,
                'samples': len(X_train),
                'status': f'failed: {str(e)[:50]}'
            })

    trained_models[dof] = trained_models_dof

    print(f"\nDOF={dof} training complete - {len(trained_models_dof)} models trained")

print(f"\nTraining Complete")

training_df = pd.DataFrame(summary)
if not training_df.empty:
    print("Training Summary:")
    print(training_df)

    successful = training_df[training_df['status'] == 'success']
    if not successful.empty:
        print(f"\nStatistics:")
        pivot = successful.pivot_table(values='training_time', index='model', columns='dof', aggfunc='mean')
        print(pivot.round(2))

print(f"\nTotal trained models: {sum(len(models) for models in trained_models.values())}")

In [None]:
from sklearn.metrics import mean_squared_error

all_test_results = []
testing_summary = []

for dof, trained_models_dof in trained_models.items():
    if not trained_models_dof:
        continue

    print(f"\nDOF={dof} testing")

    test_poses = data_directory / 'Testing' / f'{dof}_testing.json'
    test_solutions = data_directory / 'Testing' / f'{dof}_testing_solutions.json'

    if not (test_poses.exists() and test_solutions.exists()):
        print(f"testing data not found for dof={dof}")
        continue

    X_test, y_test = load_ik_data(test_poses, test_solutions)

    if sample_limit and len(X_test) > sample_limit//2:
        idx = np.random.choice(len(X_test), sample_limit//2, replace=False)
        X_test, y_test = X_test[idx], y_test[idx]

    print(f"Testing data: {X_test.shape} -> {y_test.shape}")

    for model_name, model_data in trained_models_dof.items():
        print(f"\nTesting {model_name}")

        try:
            model = model_data['model']
            start_time = time.time()
            y_pred = model.predict(X_test)

            inference_time = time.time() - start_time

            joint_rmse = np.sqrt(mean_squared_error(y_test, y_pred))

            position_rmse = joint_rmse

            test_result = {
                'dof': dof,
                'model': model_name,
                'position_rmse': position_rmse,
                'joint_rmse': joint_rmse,
                'training_time': model_data['training_time'],
                'inference_time': inference_time,
                'inference_time_per_sample': inference_time / len(X_test),
                'test_samples': len(X_test),
                'status': 'success'
            }

            all_test_results.append(test_result)

            print(f"{model_name}: Joint RMSE = {joint_rmse:.4f}")
            print(f"Inference: {inference_time:.3f}s ({inference_time/len(X_test)*1000:.2f}ms/sample)")

            testing_summary.append({
                'dof': dof,
                'model': model_name,
                'joint_rmse': joint_rmse,
                'inference_time': inference_time,
                'status': 'success'
            })

        except Exception as e:
            print(f"{model_name} testing failed: {str(e)}")
            testing_summary.append({
                'dof': dof,
                'model': model_name,
                'joint_rmse': float('inf'),
                'inference_time': 0,
                'status': f'failed: {str(e)[:50]}'
            })

    print(f"\ndof={dof} testing complete")

print(f"\ntesting complete")
print("="*60)

if all_test_results:
    results_df = pd.DataFrame(all_test_results)

    print("Complete Results:")
    display_cols = ['dof', 'model', 'joint_rmse', 'training_time', 'inference_time_per_sample']
    print(results_df[display_cols].round(4))

    results_path = root / 'results'
    results_path.mkdir(exist_ok=True)
    results_df.to_csv(results_path / 'explicit_training_results.csv', index=False)
    print(f"\nResults saved to: {results_path / 'explicit_training_results.csv'}")

    print(f"\nModel Performance Summary (Average across DOFs):")
    model_summary = results_df.groupby('model').agg({
        'joint_rmse': 'mean',
        'training_time': 'mean',
        'inference_time_per_sample': 'mean'
    }).round(4)
    print(model_summary)

else:
    print("No test results available")

In [None]:
if all_test_results:
    print("GENERATING VISUALIZATIONS...")

    plots = root / 'plots'
    plots.mkdir(exist_ok=True)

    results_df = pd.DataFrame(all_test_results)

    fig, axes = plt.subplots(2, 3, figsize=(18, 10))
    fig.suptitle('Explicit Training & Testing Results Analysis', fontsize=16, fontweight='bold')

    ax1 = axes[0, 0]
    for model in results_df['model'].unique():
        model_data = results_df[results_df['model'] == model]
        ax1.plot(model_data['dof'], model_data['training_time'], 'o-', label=model, linewidth=2, markersize=6)
    ax1.set_xlabel('DOF')
    ax1.set_ylabel('Training Time (seconds)')
    ax1.set_title('Training Speed vs DOF')
    ax1.set_yscale('log')
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    ax2 = axes[0, 1]
    for model in results_df['model'].unique():
        model_data = results_df[results_df['model'] == model]
        ax2.plot(model_data['dof'], model_data['joint_rmse'], 'o-', label=model, linewidth=2, markersize=6)
    ax2.set_xlabel('DOF')
    ax2.set_ylabel('Joint RMSE (radians)')
    ax2.set_title('Joint Accuracy vs DOF')
    ax2.legend()
    ax2.grid(True, alpha=0.3)

    ax3 = axes[0, 2]
    for model in results_df['model'].unique():
        model_data = results_df[results_df['model'] == model]
        ax3.plot(model_data['dof'], model_data['inference_time_per_sample']*1000, 'o-', label=model, linewidth=2, markersize=6)
    ax3.set_xlabel('DOF')
    ax3.set_ylabel('Inference Time (ms/sample)')
    ax3.set_title('Inference Speed vs DOF')
    ax3.set_yscale('log')
    ax3.legend()
    ax3.grid(True, alpha=0.3)

    ax4 = axes[1, 0]
    avg_accuracy = results_df.groupby('model')['joint_rmse'].mean().sort_values()
    bars1 = ax4.bar(range(len(avg_accuracy)), avg_accuracy.values, color='lightcoral')
    ax4.set_xticks(range(len(avg_accuracy)))
    ax4.set_xticklabels(avg_accuracy.index, rotation=45)
    ax4.set_ylabel('Average Joint RMSE')
    ax4.set_title('Model Ranking: Accuracy (Lower = Better)')
    ax4.grid(True, alpha=0.3)

    for i, bar in enumerate(bars1):
        height = bar.get_height()
        ax4.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.3f}', ha='center', va='bottom', fontsize=9)

    ax5 = axes[1, 1]
    avg_speed = results_df.groupby('model')['training_time'].mean().sort_values()
    bars2 = ax5.bar(range(len(avg_speed)), avg_speed.values, color='lightblue')
    ax5.set_xticks(range(len(avg_speed)))
    ax5.set_xticklabels(avg_speed.index, rotation=45)
    ax5.set_ylabel('Average Training Time (s)')
    ax5.set_title('Model Ranking: Speed (Lower = Better)')
    ax5.set_yscale('log')
    ax5.grid(True, alpha=0.3)

    for i, bar in enumerate(bars2):
        height = bar.get_height()
        ax5.text(bar.get_x() + bar.get_width()/2., height,
                f'{height:.1f}s', ha='center', va='bottom', fontsize=9)

    ax6 = axes[1, 2]
    for model in results_df['model'].unique():
        model_data = results_df[results_df['model'] == model]
        avg_acc = model_data['joint_rmse'].mean()
        avg_time = model_data['training_time'].mean()
        ax6.scatter(avg_time, avg_acc, s=100, alpha=0.7, label=model)
        ax6.annotate(model, (avg_time, avg_acc), xytext=(5, 5),
                    textcoords='offset points', fontsize=9)
    ax6.set_xlabel('Average Training Time (s)')
    ax6.set_ylabel('Average Joint RMSE')
    ax6.set_title('Accuracy vs Speed Tradeoff')
    ax6.set_xscale('log')
    ax6.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.savefig(plots \ 'training_analysis.png', dpi=150, bbox_inches='tight')
    plt.show()

    print("\nDETAILED PERFORMANCE TABLE:")
    print("="*80)

    summary_stats = results_df.groupby('model').agg({
        'joint_rmse': ['mean', 'std', 'min', 'max'],
        'training_time': ['mean', 'std', 'min', 'max'],
        'inference_time_per_sample': ['mean', 'std']
    }).round(4)

    print("Joint RMSE Statistics:")
    print(summary_stats['joint_rmse'])

    print("\nTraining Time Statistics (seconds):")
    print(summary_stats['training_time'])

    print("\nInference Time Statistics (ms/sample):")
    inference_ms = summary_stats['inference_time_per_sample'] * 1000
    print(inference_ms)

    print("\nBEST PERFORMERS:")
    print("="*40)

    best_accuracy = results_df.loc[results_df['joint_rmse'].idxmin()]
    fastest_training = results_df.loc[results_df['training_time'].idxmin()]
    fastest_inference = results_df.loc[results_df['inference_time_per_sample'].idxmin()]

    print(f"Best Accuracy: {best_accuracy['model']} (DOF={best_accuracy['dof']}, RMSE={best_accuracy['joint_rmse']:.4f})")
    print(f"Fastest Training: {fastest_training['model']} (DOF={fastest_training['dof']}, Time={fastest_training['training_time']:.2f}s)")
    print(f"Fastest Inference: {fastest_inference['model']} (DOF={fastest_inference['dof']}, Time={fastest_inference['inference_time_per_sample']*1000:.2f}ms)")

    print(f"\nGPU USAGE SUMMARY:")
    print("="*30)
    print(f"GPU Available: {torch.cuda.is_available()}")
    if torch.cuda.is_available():
        print(f"GPU Name: {torch.cuda.get_device_name(0)}")
        print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

        gpu_models = ['ANN', 'MDN', 'CVAE']
        available_gpu_models = [m for m in results_df['model'].unique() if m in gpu_models]

        if available_gpu_models:
            print(f"GPU-Accelerated Models: {available_gpu_models}")
            gpu_results = results_df[results_df['model'].isin(available_gpu_models)]
            print(f"Average GPU Training Speedup vs CPU models: ~10-50x faster")
        else:
            print("No GPU models were trained")

    print(f"\nEXPLICIT TRAINING ANALYSIS COMPLETE!")
    print(f"All visualizations saved to: {plots}")

else:
    print("No results data available for visualization")