# Synthetic Data Classification Demo

This notebook demonstrates the complete pipeline for magnetic distortion classification using synthetically generated sensor data.

## Process:
1. **Generate Synthetic Data**: Create sensor data with different magnetic distortion levels
2. **Create Spectrograms**: Convert time-series data to spectrograms for audio classification
3. **Load Pre-trained Model**: Use a fine-tuned model for magnetic distortion detection
4. **Classify Data**: Run inference on the generated synthetic data
5. **Analyze Results**: View classification results and confidence scores

## Features:
- **Self-contained**: Generates its own test data - no external files needed
- **Configurable**: Easy to adjust distortion levels and data characteristics
- **Educational**: Shows the complete ML pipeline from data generation to classification
- **Modular**: Uses the new modular architecture for clean, maintainable code

## Configuration:
- Set `DISTORTION_LEVEL` to test different magnetic distortion scenarios
- Modify `DURATION_S` to generate longer or shorter test sequences
- Adjust `MODEL_ID` to use different trained models

In [1]:
# Install required packages
%pip install datasets[audio]==3.0.1
%pip install mcap==1.2.1
%pip install torch
%pip install torchaudio
%pip install transformers[torch]==4.46.2
%pip install nstrumenta==0.1.3

# Clone repository if in Colab (needed for source files and utilities)
import sys
import os

if "google.colab" in sys.modules:
    print("üîÑ Detected Google Colab - setting up repository...")
    
    # Check if repo is already cloned
    if not os.path.exists("time-series-classifier"):
        print("üì• Cloning time-series-classifier repository...")
        !git clone https://github.com/nstrumenta/time-series-classifier.git
    else:
        print("‚úì Repository already exists")
    
    # Change to repo directory
    %cd time-series-classifier
    print(f"‚úì Working directory: {os.getcwd()}")
else:
    print("üñ•Ô∏è Detected local environment")

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
üñ•Ô∏è Detected local environment


In [None]:
import sys
import os

# Setup paths for both Colab and local environments
def setup_environment():
    """Setup environment for both Colab and local development"""
    current_dir = os.getcwd()
    
    # Determine if we're in Colab and adjust paths accordingly
    if "google.colab" in sys.modules:
        # In Colab, we should be in the repo directory after the previous cell
        src_dir = os.path.join(current_dir, "src")
        scripts_dir = os.path.join(current_dir, "scripts")
    else:
        # Local development - find repo root
        # Look for src directory in current or parent directories
        check_dirs = [current_dir, os.path.dirname(current_dir), os.path.join(current_dir, "..")]
        src_dir = None
        scripts_dir = None
        
        for check_dir in check_dirs:
            potential_src = os.path.join(check_dir, "src")
            potential_scripts = os.path.join(check_dir, "scripts")
            if os.path.exists(potential_src) and os.path.exists(potential_scripts):
                src_dir = potential_src
                scripts_dir = potential_scripts
                break
        
        if not src_dir:
            # Fallback to current directory structure
            src_dir = os.path.abspath(os.path.join(current_dir, "src"))
            scripts_dir = os.path.abspath(os.path.join(current_dir, "scripts"))
    
    # Add paths to sys.path
    for path in [src_dir, scripts_dir]:
        if os.path.exists(path) and path not in sys.path:
            sys.path.append(path)
            print(f"‚úì Added to path: {path}")
    
    return src_dir, scripts_dir

# Setup environment
src_dir, scripts_dir = setup_environment()

# Import utilities with fallback handling
def import_with_fallback():
    """Import modules with graceful fallbacks"""
    nst_client = None
    
    try:
        # Try new modular imports first
        from script_utils import init_script_environment
        src_dir, nst_client = init_script_environment()
        print("‚úì Using new script_utils environment setup")
        return nst_client, "modular"
        
    except ImportError:
        print("‚ö†Ô∏è script_utils not available, using manual setup...")
        
        # Manual Nstrumenta client setup
        from nstrumenta import NstrumentaClient
        
        if "google.colab" in sys.modules:
            from google.colab import userdata
            os.environ["NSTRUMENTA_API_KEY"] = userdata.get("NSTRUMENTA_API_KEY")
        
        nst_client = NstrumentaClient(os.getenv("NSTRUMENTA_API_KEY"))
        
        try:
            print(f"‚úì Connected to project: {nst_client.get_project()}")
        except Exception as e:
            print(f"‚ö†Ô∏è Could not verify project connection: {e}")
        
        return nst_client, "manual"

# Initialize Nstrumenta client
nst_client, setup_type = import_with_fallback()

# Import project modules with fallbacks
def import_mcap_utils():
    """Import MCAP utilities with fallbacks"""
    try:
        # Try new modular imports
        from mcap_utils import spectrogram_from_timeseries, classify_from_spectrogram
        print("‚úì Using new modular mcap_utils")
        return spectrogram_from_timeseries, classify_from_spectrogram, "modular"
    except ImportError:
        try:
            # Fallback to monolithic import
            import mcap_utilities
            print("‚ö†Ô∏è Using legacy mcap_utilities")
            return mcap_utilities.spectrogram_from_timeseries, mcap_utilities.classify_from_spectrogram, "legacy"
        except ImportError:
            print("‚ùå Could not import MCAP utilities")
            return None, None, "failed"

spectrogram_from_timeseries, classify_from_spectrogram, mcap_import_type = import_mcap_utils()

print(f"\nüìã Environment Summary:")
print(f"  - Setup type: {setup_type}")
print(f"  - MCAP utils: {mcap_import_type}")
print(f"  - Working directory: {os.getcwd()}")
print(f"  - Python path includes: {len([p for p in sys.path if 'src' in p or 'scripts' in p])} project directories")

In [2]:
# Configuration for synthetic data generation and classification
import uuid

# Model configuration
model_id = "MAG_DIST_DEBUG"  # Use the debug model for faster demonstration

# Synthetic data configuration
DISTORTION_LEVEL = "high"  # Options: "none", "low", "high"
DURATION_S = 30.0  # Duration of synthetic data to generate (seconds)
SAMPLE_RATE = 100  # Sample rate for synthetic data

# Generate unique identifiers for this test
test_id = str(uuid.uuid4())[:8]
print(f"üß™ Test ID: {test_id}")
print(f"üß≤ Testing magnetic distortion level: {DISTORTION_LEVEL}")
print(f"‚è±Ô∏è  Data duration: {DURATION_S} seconds")

# File configuration
working_folder = f"./temp/classify_demo_{test_id}"
synthetic_data_file = f"test_data_{DISTORTION_LEVEL}_{test_id}.mcap"
labels_file = f"test_data_{DISTORTION_LEVEL}_{test_id}.labels.json"
spectrogram_file = f"test_data_{DISTORTION_LEVEL}_{test_id}.spectrogram.mcap"
classification_file = f"test_data_{DISTORTION_LEVEL}_{test_id}.classification.mcap"
model_tar_filename = f"{model_id}.model.tar.gz"

# Working directory management with cross-platform support
def setup_working_directory_portable(path):
    """Setup working directory that works in both Colab and local environments"""
    abs_path = os.path.abspath(path)
    os.makedirs(abs_path, exist_ok=True)
    os.chdir(abs_path)
    return abs_path

def reset_to_initial_directory():
    """Reset to initial directory with environment detection"""
    if "google.colab" in sys.modules:
        # In Colab, find repo root with safety measures
        current = os.getcwd()
        max_iterations = 10
        
        for _ in range(max_iterations):
            # Check for repo indicators
            has_src = os.path.exists(os.path.join(current, "src"))
            has_scripts = os.path.exists(os.path.join(current, "scripts"))
            has_notebooks = os.path.exists(os.path.join(current, "notebooks"))
            
            # If we find repo root indicators, use this directory
            if has_src and (has_scripts or has_notebooks):
                os.chdir(current)
                return
            
            # Move up one directory
            parent = os.path.dirname(current)
            if parent == current or current == "/":
                break
            current = parent
    else:
        # Local environment
        try:
            if setup_type == "modular":
                from script_utils import reset_to_initial_cwd
                reset_to_initial_cwd()
        except (ImportError, NameError):
            pass

# Reset to appropriate starting directory
print("üìÅ Setting up directories...")
reset_to_initial_directory()

# Setup working directory
try:
    if setup_type == "modular":
        from script_utils import setup_working_directory
        setup_working_directory(working_folder)
    else:
        setup_working_directory_portable(working_folder)
except (ImportError, NameError):
    setup_working_directory_portable(working_folder)

print(f"üìÅ Demo workspace: {working_folder}")
print(f"üéØ Will generate: {synthetic_data_file}")
print(f"üè∑Ô∏è  Will create labels: {labels_file}")
print(f"üìä Will classify with model: {model_id}")

üß™ Test ID: 12f5d7e5
üß≤ Testing magnetic distortion level: high
‚è±Ô∏è  Data duration: 30.0 seconds
üìÅ Setting up directories...
üìÅ Demo workspace: ./temp/classify_demo_12f5d7e5
üéØ Will generate: test_data_high_12f5d7e5.mcap
üè∑Ô∏è  Will create labels: test_data_high_12f5d7e5.labels.json
üìä Will classify with model: MAG_DIST_DEBUG


In [None]:
# Step 1: Generate synthetic sensor data
print("? Generating synthetic sensor data...")

def download_with_fallback(client, remote_file, local_path=None, extract_tar=False):
    """Download files with fallback to different methods"""
    local_path = local_path or remote_file
    
    try:
        # Try new utilities if available
        if setup_type == "modular":
            from script_utils import fetch_nstrumenta_file
            fetch_nstrumenta_file(client, remote_file, local_path, extract_tar=extract_tar)
            return True
    except (ImportError, NameError):
        pass
    
    # Fallback to manual download
    import tarfile
    
    if not os.path.exists(local_path):
        print(f"downloading {remote_file} to {local_path}.")
        client.download(remote_file, local_path)
        
        if extract_tar and local_path.endswith(('.tar.gz', '.tgz')):
            print(f"extracting {local_path}")
            with tarfile.open(local_path, "r:gz") as tar:
                tar.extractall()
    else:
        print(f"{local_path} exists.")
    
    return True

# First, download the pre-trained model if needed
if not os.path.exists("model"):
    print("üì• Downloading pre-trained model...")
    try:
        download_with_fallback(nst_client, model_tar_filename, extract_tar=True)
        print("‚úì Model downloaded and extracted")
    except Exception as e:
        print(f"‚ùå Error downloading model: {e}")
        print("üîß Please ensure you have a trained model available")
        print(f"   Expected: {model_tar_filename} in your Nstrumenta project")
        raise
else:
    print("‚úì Model already available")

# Generate synthetic data based on configuration
print(f"üîÑ Generating synthetic sensor data with {DISTORTION_LEVEL} magnetic distortion...")

try:
    # Import synthetic data generator
    if 'SyntheticDataGenerator' in globals() and SyntheticDataGenerator is not None:
        generator = SyntheticDataGenerator()
    else:
        # Fallback import
        from synthetic import SyntheticDataGenerator
        generator = SyntheticDataGenerator()
    
    # Create a motion plan for testing
    test_plan = {
        "initialization": {
            "pose": {
                "origin": {"lat": 38.446, "lng": -122.687, "height": 0.0},
                "position": {"x": 0.0, "y": 0.0, "z": 0.0},
                "rotation": {"w": 1.0, "x": 0.0, "y": 0.0, "z": 0.0}
            },
            "start_time_ns": 0,
            "sample_rate": SAMPLE_RATE,
            "mag": {
                "calibration": {
                    "bias": {"x": 0.0, "y": 0.0, "z": 0.0},
                    "matrix": [[0.00053, 0.0, 0.0], [0.0, 0.00053, 0.0], [0.0, 0.0, 0.00053]]
                }
            },
            "acc": {
                "calibration": {
                    "bias": {"x": 0.0, "y": 0.0, "z": 0.0},
                    "matrix": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]
                }
            },
            "gyro": {
                "calibration": {
                    "bias": {"x": 0.0, "y": 0.0, "z": 0.0},
                    "matrix": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]
                }
            }
        },
        "segments": [
            {
                "name": f"test_segment_{DISTORTION_LEVEL}",
                "duration_s": DURATION_S,
                "rotation_rpy_degrees": {"roll": 45.0, "pitch": 30.0, "yaw": 60.0},
                "magnetic_distortion": 0.0 if DISTORTION_LEVEL == "none" else (1.0 if DISTORTION_LEVEL == "low" else 2.5),
                "mag_distortion": {"level": DISTORTION_LEVEL}
            }
        ]
    }
    
    # Generate the synthetic data
    generator.generate(plan_data=test_plan, output_file=synthetic_data_file, verbose=False)
    print(f"‚úì Generated synthetic data: {synthetic_data_file}")
    
    # Generate labels
    generator.generate_labels(test_plan, labels_file)
    print(f"‚úì Generated labels: {labels_file}")
    
    # Verify files were created
    if os.path.exists(synthetic_data_file) and os.path.exists(labels_file):
        data_size = os.path.getsize(synthetic_data_file)
        print(f"‚úì Synthetic data file: {data_size:,} bytes")
        
        # Show what's in the labels file
        with open(labels_file, 'r') as f:
            labels_data = __import__('json').load(f)
            events = labels_data.get('events', [])
            print(f"‚úì Labels file: {len(events)} events")
            if events:
                sample_event = events[0]
                distortion_level = sample_event.get('metadata', {}).get('mag_distortion', 'unknown')
                duration = sample_event.get('endTime', {}).get('sec', 0) - sample_event.get('startTime', {}).get('sec', 0)
                print(f"  - Sample event: {distortion_level} distortion, {duration}s duration")
    else:
        raise FileNotFoundError("Failed to generate synthetic data files")
        
except Exception as e:
    print(f"‚ùå Error generating synthetic data: {e}")
    print("üîß This requires the synthetic data generator to be available")
    raise

print("üéâ Synthetic data generation completed!")

In [None]:
# Step 2: Create spectrograms and run classification
from transformers import ASTFeatureExtractor, ASTForAudioClassification
import json

print("üîß Loading models and feature extractor...")

# Load pretrained feature extractor
pretrained_model = "MIT/ast-finetuned-audioset-10-10-0.4593"
feature_extractor = ASTFeatureExtractor.from_pretrained(pretrained_model)
print("‚úì Feature extractor loaded")

# Load fine-tuned model for prediction
model = ASTForAudioClassification.from_pretrained("./model")
print("‚úì Fine-tuned model loaded")
print(f"  üìä Model has {model.config.num_labels} classes: {list(model.config.id2label.values())}")

def upload_with_fallback(client, local_file, remote_prefix, overwrite=True):
    """Upload files with fallback to different methods"""
    try:
        # Try new utilities if available
        if setup_type == "modular":
            from script_utils import upload_with_prefix
            upload_with_prefix(client, local_file, remote_prefix, overwrite=overwrite)
            return True
    except (ImportError, NameError):
        pass
    
    # Fallback to manual upload
    remote_path = f"{remote_prefix}/{local_file}"
    print(f"uploading {local_file} to {remote_path}")
    client.upload(local_file, remote_path, overwrite=overwrite)
    return True

def create_spectrogram_if_not_exists(input_file, spectrogram_mcap_file):
    """Create spectrogram from timeseries data if it doesn't exist"""
    if not os.path.exists(spectrogram_mcap_file):
        print(f"üîÑ Creating spectrogram from {input_file}...")
        
        if spectrogram_from_timeseries is None:
            raise ImportError("Spectrogram creation function not available")
            
        spectrogram_from_timeseries(
            input_file=input_file,
            spectrogram_mcap_file=spectrogram_mcap_file,
            feature_extractor=feature_extractor,
        )
        print(f"‚úì Spectrogram created: {spectrogram_mcap_file}")
        
        # Upload spectrogram (optional)
        try:
            upload_with_fallback(nst_client, spectrogram_mcap_file, f"classify_demo/{test_id}", overwrite=True)
            print(f"‚úì Spectrogram uploaded")
        except Exception as e:
            print(f"‚ö†Ô∏è Upload failed (continuing anyway): {e}")
    else:
        print(f"‚úì Spectrogram already exists: {spectrogram_mcap_file}")

# Create spectrogram
print("üìà Processing spectrogram...")
try:
    create_spectrogram_if_not_exists(synthetic_data_file, spectrogram_file)
except Exception as e:
    print(f"‚ùå Error creating spectrogram: {e}")
    print("üîß This may be due to missing MCAP utilities")
    raise

# Perform classification
print("üéØ Running classification...")
try:
    if classify_from_spectrogram is None:
        raise ImportError("Classification function not available")
        
    classify_from_spectrogram(
        spectrogram_mcap_file=spectrogram_file,
        classification_file=classification_file,
        model=model,
    )
    print(f"‚úì Classification completed: {classification_file}")
except Exception as e:
    print(f"‚ùå Error during classification: {e}")
    raise

# Analyze results
print("\n? Analyzing Classification Results...")
print("=" * 50)

if os.path.exists(classification_file):
    # Read the classification results
    try:
        from mcap.reader import make_reader
        
        with open(classification_file, "rb") as f:
            reader = make_reader(f)
            
            classifications = []
            for schema, channel, message in reader.iter_messages():
                msg_data = json.loads(message.data.decode("utf-8"))
                classifications.append(msg_data)
        
        print(f"üìä Found {len(classifications)} classification results")
        
        if classifications:
            # Analyze the classifications
            class_counts = {}
            confidence_scores = []
            
            for i, result in enumerate(classifications):
                predicted_class = result.get('predicted_class', 'unknown')
                confidence = result.get('confidence', 0.0)
                
                class_counts[predicted_class] = class_counts.get(predicted_class, 0) + 1
                confidence_scores.append(confidence)
                
                if i < 5:  # Show first 5 results as examples
                    print(f"  Sample {i+1}: {predicted_class} (confidence: {confidence:.3f})")
            
            # Summary statistics
            print(f"\nüìà Classification Summary:")
            print(f"  - Total predictions: {len(classifications)}")
            print(f"  - Average confidence: {sum(confidence_scores)/len(confidence_scores):.3f}")
            print(f"  - Min confidence: {min(confidence_scores):.3f}")
            print(f"  - Max confidence: {max(confidence_scores):.3f}")
            
            print(f"\nüè∑Ô∏è  Class Distribution:")
            for class_name, count in sorted(class_counts.items()):
                percentage = (count / len(classifications)) * 100
                print(f"  - {class_name}: {count} predictions ({percentage:.1f}%)")
            
            # Compare with expected result
            expected_class = f"mag_distortion_{DISTORTION_LEVEL}"
            if expected_class in class_counts:
                correct_predictions = class_counts[expected_class]
                accuracy = (correct_predictions / len(classifications)) * 100
                print(f"\nüéØ Accuracy Check:")
                print(f"  - Expected class: {expected_class}")
                print(f"  - Correct predictions: {correct_predictions}/{len(classifications)} ({accuracy:.1f}%)")
                
                if accuracy > 80:
                    print("  ‚úÖ Excellent classification performance!")
                elif accuracy > 60:
                    print("  ‚úì Good classification performance")
                elif accuracy > 40:
                    print("  ‚ö†Ô∏è Moderate classification performance")
                else:
                    print("  ‚ùå Poor classification performance - may need more training")
            else:
                print(f"\n‚ö†Ô∏è Expected class '{expected_class}' not found in results")
                print(f"Available classes: {list(class_counts.keys())}")
        
    except Exception as e:
        print(f"‚ùå Error analyzing results: {e}")
        print("Results file exists but could not be parsed")

# Upload results (optional)
print(f"\nüì§ Uploading results...")
try:
    upload_with_fallback(nst_client, classification_file, f"classify_demo/{test_id}", overwrite=True)
    upload_with_fallback(nst_client, synthetic_data_file, f"classify_demo/{test_id}", overwrite=True)
    upload_with_fallback(nst_client, labels_file, f"classify_demo/{test_id}", overwrite=True)
    print(f"‚úì Results uploaded to classify_demo/{test_id}/")
except Exception as e:
    print(f"‚ö†Ô∏è Upload failed (results still available locally): {e}")

print("\nüéâ Classification demo completed successfully!")
print(f"\n? Demo Summary:")
print(f"  - Test ID: {test_id}")
print(f"  - Distortion Level Tested: {DISTORTION_LEVEL}")
print(f"  - Data Duration: {DURATION_S} seconds")
print(f"  - Model Used: {model_id}")
print(f"  - Synthetic Data: {synthetic_data_file}")
print(f"  - Classification Results: {classification_file}")

print(f"\nüí° Try This Next:")
print(f"  1. Change DISTORTION_LEVEL to 'none', 'low', or 'high' and re-run")
print(f"  2. Increase DURATION_S for longer test sequences")
print(f"  3. Modify the motion parameters in the test_plan")
print(f"  4. Compare results across different distortion levels")