# üöÄ Electric Propulsion System - Fault Detection using Machine Learning

## Workshop Notebook

This notebook demonstrates how to build a machine learning model to detect faults in an electric propulsion system (e.g., electric aircraft, UAV) using data from a Simscape digital twin.

### Faults We'll Detect:
| Label | Fault Type | Physical Cause |
|-------|------------|----------------|
| 0 | Healthy | Normal operation |
| 1 | Battery Fault | Increased internal resistance (0.05 ‚Üí 0.15 Œ©) |
| 2 | Motor Fault | Efficiency degradation (88% ‚Üí 70%) |
| 3 | Propeller Fault | Blade damage - reduced thrust (Kt: 0.10 ‚Üí 0.06) |

### System Overview:
```
Battery (201.6V) ‚Üí Motor & Drive (20kW) ‚Üí Propeller (0.9m) ‚Üí Thrust (~139N)
```

---
**Author:** Electric Propulsion Digital Twin Workshop  
**Date:** 2024

---
## üì¶ Step 1: Setup and Imports

First, let's import all the necessary libraries. These are pre-installed in Google Colab!

In [None]:
# Import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Machine Learning
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score

# Utilities
import joblib
import warnings
warnings.filterwarnings('ignore')

# Set random seed for reproducibility
np.random.seed(42)

# Plot settings
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [10, 6]

print("‚úÖ All libraries imported successfully!")

---
## üìÅ Step 2: Upload Dataset

Upload your `simscape_fault_dataset.csv` file from the Simscape simulation.

In [None]:
# Upload dataset from your computer
from google.colab import files

print("Please upload your 'simscape_fault_dataset.csv' file:")
uploaded = files.upload()

# Get the filename
filename = list(uploaded.keys())[0]
print(f"\n‚úÖ File uploaded: {filename}")

---
## üìä Step 3: Load and Explore Data

Let's load the dataset and understand its structure.

In [None]:
# Load dataset
df = pd.read_csv(filename)

print("="*60)
print("  DATASET OVERVIEW")
print("="*60)
print(f"\nüìê Shape: {df.shape[0]} samples √ó {df.shape[1]} features")
print(f"\nüìã Columns:")
for i, col in enumerate(df.columns):
    print(f"   {i+1}. {col}")

In [None]:
# Display first few rows
print("\nüîç First 5 rows of data:")
df.head()

In [None]:
# Statistical summary
print("\nüìà Statistical Summary:")
df.describe().round(2)

In [None]:
# Class distribution
class_names = ['Healthy', 'Battery_Fault', 'Motor_Fault', 'Propeller_Fault']
target_col = 'Fault Label'

print("\nüè∑Ô∏è Class Distribution:")
print("-" * 40)
for label in sorted(df[target_col].unique()):
    count = (df[target_col] == label).sum()
    pct = count / len(df) * 100
    print(f"   {class_names[int(label)]:20s} (Label {int(label)}): {count:3d} samples ({pct:5.1f}%)")

# Visualize class distribution
plt.figure(figsize=(8, 5))
colors = ['#2ecc71', '#e74c3c', '#f39c12', '#9b59b6']
df[target_col].value_counts().sort_index().plot(kind='bar', color=colors)
plt.xticks(range(4), class_names, rotation=45, ha='right')
plt.xlabel('Fault Type')
plt.ylabel('Number of Samples')
plt.title('Class Distribution in Dataset')
plt.tight_layout()
plt.show()

---
## üî¨ Step 4: Data Visualization

Let's visualize how different faults affect the sensor measurements.

In [None]:
# Create visualizations for key parameters
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

plot_features = ['Mean Voltage', 'Mean Current', 'Mean Speed_RPM', 
                 'Mean Thrust', 'Efficiency', 'Mechanical Power']

for idx, feature in enumerate(plot_features):
    ax = axes[idx // 3, idx % 3]
    
    for i, class_name in enumerate(class_names):
        mask = df[target_col] == i
        ax.hist(df.loc[mask, feature], bins=12, alpha=0.6, 
                label=class_name, color=colors[i])
    
    ax.set_xlabel(feature)
    ax.set_ylabel('Count')
    ax.set_title(f'{feature} by Fault Type')
    ax.legend(loc='upper right', fontsize=8)

plt.suptitle('Sensor Measurements Distribution by Fault Type', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
# Scatter plot: Key fault indicators
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Plot 1: Voltage vs Current (Battery fault indicator)
ax1 = axes[0]
for i, class_name in enumerate(class_names):
    mask = df[target_col] == i
    ax1.scatter(df.loc[mask, 'Mean Voltage'], df.loc[mask, 'Mean Current'], 
                c=colors[i], label=class_name, alpha=0.7, s=80)
ax1.set_xlabel('Voltage (V)')
ax1.set_ylabel('Current (A)')
ax1.set_title('Voltage vs Current\n(Battery fault ‚Üí Low voltage)')
ax1.legend()

# Plot 2: Power In vs Power Out (Motor fault indicator)
ax2 = axes[1]
for i, class_name in enumerate(class_names):
    mask = df[target_col] == i
    ax2.scatter(df.loc[mask, 'Electrical Power'], df.loc[mask, 'Mechanical Power'], 
                c=colors[i], label=class_name, alpha=0.7, s=80)
ax2.set_xlabel('Electrical Power (W)')
ax2.set_ylabel('Mechanical Power (W)')
ax2.set_title('Electrical vs Mechanical Power\n(Motor fault ‚Üí Low efficiency)')
ax2.legend()

# Plot 3: Speed vs Thrust (Propeller fault indicator)
ax3 = axes[2]
for i, class_name in enumerate(class_names):
    mask = df[target_col] == i
    ax3.scatter(df.loc[mask, 'Mean Speed_RPM'], df.loc[mask, 'Mean Thrust'], 
                c=colors[i], label=class_name, alpha=0.7, s=80)
ax3.set_xlabel('Speed (RPM)')
ax3.set_ylabel('Thrust (N)')
ax3.set_title('Speed vs Thrust\n(Propeller fault ‚Üí Low thrust)')
ax3.legend()

plt.tight_layout()
plt.show()

print("\nüí° Key Observations:")
print("   ‚Ä¢ Battery Fault: Lower voltage at same current")
print("   ‚Ä¢ Motor Fault: Lower mechanical power for same electrical power")
print("   ‚Ä¢ Propeller Fault: Lower thrust at same RPM")

---
## ‚öôÔ∏è Step 5: Feature Engineering

Create additional features that help distinguish between fault types.

In [None]:
# Create enhanced dataset with derived features
df_enhanced = df.copy()

# 1. Power Ratio (efficiency indicator)
df_enhanced['Power_Ratio'] = df_enhanced['Mechanical Power'] / (df_enhanced['Electrical Power'] + 1e-6)

# 2. Thrust per unit power (propeller health)
df_enhanced['Thrust_per_Power'] = df_enhanced['Mean Thrust'] / (df_enhanced['Mechanical Power'] + 1e-6)

# 3. Current per unit torque (motor health)
df_enhanced['Current_per_Torque'] = df_enhanced['Mean Current'] / (np.abs(df_enhanced['Mean Torque']) + 1e-6)

# 4. Voltage drop percentage (battery health)
nominal_voltage = 201.6
df_enhanced['Voltage_Drop_Pct'] = (nominal_voltage - df_enhanced['Mean Voltage']) / nominal_voltage * 100

# 5. Speed efficiency (actual vs expected)
max_rpm = 2500
df_enhanced['Speed_Efficiency'] = df_enhanced['Mean Speed_RPM'] / (df_enhanced['Throttle'] * max_rpm + 1e-6)

print("‚úÖ New features created:")
new_features = ['Power_Ratio', 'Thrust_per_Power', 'Current_per_Torque', 'Voltage_Drop_Pct', 'Speed_Efficiency']
for feat in new_features:
    print(f"   ‚Ä¢ {feat}")

print(f"\nüìä Total features: {len(df_enhanced.columns) - 1}")

---
## üéØ Step 6: Prepare Data for Machine Learning

Split data into training and testing sets, then normalize features.

In [None]:
# Define features and target
feature_cols = [col for col in df_enhanced.columns if col != target_col]
X = df_enhanced[feature_cols].values
y = df_enhanced[target_col].values

print(f"Features (X): {X.shape}")
print(f"Target (y): {y.shape}")

# Train-test split (80-20)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"\nüì¶ Training samples: {len(X_train)}")
print(f"üì¶ Testing samples: {len(X_test)}")

# Normalize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("\n‚úÖ Features normalized using StandardScaler")

---
## ü§ñ Step 7: Train Multiple Models

Let's train several ML models and compare their performance!

In [None]:
# Define models to compare
models = {
    'üå≥ Decision Tree': DecisionTreeClassifier(random_state=42, max_depth=10),
    'üå≤ Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'üöÄ Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
    'üéØ SVM (RBF)': SVC(kernel='rbf', random_state=42, probability=True),
    'üë• KNN': KNeighborsClassifier(n_neighbors=5),
    'üß† Neural Network': MLPClassifier(hidden_layer_sizes=(64, 32), max_iter=500, random_state=42)
}

# Store results
results = {}

print("="*60)
print("  TRAINING MODELS")
print("="*60)

# Train and evaluate each model
for name, model in models.items():
    print(f"\nTraining {name}...")
    
    # Train
    model.fit(X_train_scaled, y_train)
    
    # Predict
    y_pred = model.predict(X_test_scaled)
    
    # Metrics
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')
    cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5)
    
    # Store
    results[name] = {
        'model': model,
        'accuracy': accuracy,
        'f1_score': f1,
        'cv_mean': cv_scores.mean(),
        'cv_std': cv_scores.std(),
        'y_pred': y_pred
    }
    
    print(f"   ‚úì Accuracy: {accuracy:.2%}")
    print(f"   ‚úì F1 Score: {f1:.2%}")
    print(f"   ‚úì CV Score: {cv_scores.mean():.2%} (¬±{cv_scores.std():.2%})")

print("\n" + "="*60)
print("  ‚úÖ ALL MODELS TRAINED!")
print("="*60)

---
## üìä Step 8: Compare Model Performance

Let's visualize and compare all models.

In [None]:
# Create comparison dataframe
comparison_df = pd.DataFrame({
    'Model': list(results.keys()),
    'Accuracy': [results[m]['accuracy'] for m in results],
    'F1 Score': [results[m]['f1_score'] for m in results],
    'CV Mean': [results[m]['cv_mean'] for m in results],
    'CV Std': [results[m]['cv_std'] for m in results]
}).sort_values('Accuracy', ascending=False)

print("\nüìä MODEL COMPARISON:")
print("="*70)
print(comparison_df.to_string(index=False))

# Find best model
best_model_name = comparison_df.iloc[0]['Model']
best_accuracy = comparison_df.iloc[0]['Accuracy']
print("\n" + "="*70)
print(f"üèÜ BEST MODEL: {best_model_name} (Accuracy: {best_accuracy:.2%})")
print("="*70)

In [None]:
# Visualize model comparison
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Bar chart of accuracy
ax1 = axes[0]
models_sorted = comparison_df.sort_values('Accuracy', ascending=True)
bar_colors = ['#2ecc71' if acc == best_accuracy else '#3498db' for acc in models_sorted['Accuracy']]
bars = ax1.barh(models_sorted['Model'], models_sorted['Accuracy'], color=bar_colors)
ax1.set_xlabel('Accuracy', fontsize=12)
ax1.set_title('Model Accuracy Comparison', fontsize=14, fontweight='bold')
ax1.set_xlim([0.5, 1.05])

# Add value labels
for bar, acc in zip(bars, models_sorted['Accuracy']):
    ax1.text(acc + 0.01, bar.get_y() + bar.get_height()/2, 
             f'{acc:.1%}', va='center', fontsize=10)

# Accuracy vs CV Score
ax2 = axes[1]
ax2.scatter(comparison_df['Accuracy'], comparison_df['CV Mean'], s=150, c='#3498db', alpha=0.7)
for i, row in comparison_df.iterrows():
    ax2.annotate(row['Model'].split()[-1], (row['Accuracy'], row['CV Mean']), 
                 textcoords="offset points", xytext=(0,10), ha='center', fontsize=9)
ax2.set_xlabel('Test Accuracy', fontsize=12)
ax2.set_ylabel('Cross-Validation Score', fontsize=12)
ax2.set_title('Test Accuracy vs CV Score', fontsize=14, fontweight='bold')
ax2.plot([0.5, 1], [0.5, 1], 'r--', alpha=0.5, label='Perfect consistency')
ax2.legend()

plt.tight_layout()
plt.show()

---
## üîç Step 9: Detailed Evaluation of Best Model

Let's look at the confusion matrix and classification report.

In [None]:
# Get best model predictions
best_model = results[best_model_name]['model']
y_pred_best = results[best_model_name]['y_pred']

# Classification report
print(f"\nüìã CLASSIFICATION REPORT - {best_model_name}")
print("="*60)
print(classification_report(y_test, y_pred_best, target_names=class_names))

In [None]:
# Confusion Matrix Visualization
cm = confusion_matrix(y_test, y_pred_best)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names, yticklabels=class_names,
            annot_kws={'size': 14})
plt.xlabel('Predicted', fontsize=12)
plt.ylabel('Actual', fontsize=12)
plt.title(f'Confusion Matrix - {best_model_name}', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Print per-class accuracy
print("\nüéØ Per-Class Accuracy:")
print("-" * 40)
for i, class_name in enumerate(class_names):
    class_mask = y_test == i
    if class_mask.sum() > 0:
        class_acc = (y_pred_best[class_mask] == y_test[class_mask]).mean()
        status = "‚úÖ" if class_acc >= 0.9 else "‚ö†Ô∏è" if class_acc >= 0.7 else "‚ùå"
        print(f"   {status} {class_name:20s}: {class_acc:.2%}")

---
## üìà Step 10: Feature Importance Analysis

Which features are most important for detecting faults?

In [None]:
# Get Random Forest for feature importance
rf_model = results['üå≤ Random Forest']['model']
importances = rf_model.feature_importances_

# Sort by importance
importance_df = pd.DataFrame({
    'Feature': feature_cols,
    'Importance': importances
}).sort_values('Importance', ascending=True)

# Plot
plt.figure(figsize=(10, 8))
colors = plt.cm.RdYlGn(importance_df['Importance'] / importance_df['Importance'].max())
plt.barh(importance_df['Feature'], importance_df['Importance'], color=colors)
plt.xlabel('Importance Score', fontsize=12)
plt.title('Feature Importance for Fault Detection', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

# Top 5 features
print("\nüîù TOP 5 MOST IMPORTANT FEATURES:")
print("="*50)
for i, (_, row) in enumerate(importance_df.tail(5).iloc[::-1].iterrows()):
    print(f"   {i+1}. {row['Feature']:25s} ({row['Importance']:.1%})")

---
## üíæ Step 11: Save the Trained Model

Save the model for deployment or future use.

In [None]:
# Use Random Forest as final model (good balance of accuracy and interpretability)
final_model = results['üå≤ Random Forest']['model']

# Save model and scaler
joblib.dump(final_model, 'fault_predictor_model.joblib')
joblib.dump(scaler, 'fault_predictor_scaler.joblib')

# Save feature names
import json
config = {
    'feature_names': feature_cols,
    'class_names': class_names
}
with open('fault_predictor_config.json', 'w') as f:
    json.dump(config, f, indent=2)

print("‚úÖ Model saved successfully!")
print("   üìÅ fault_predictor_model.joblib")
print("   üìÅ fault_predictor_scaler.joblib")
print("   üìÅ fault_predictor_config.json")

In [None]:
# Download saved files
print("\nüì• Download the trained model files:")
files.download('fault_predictor_model.joblib')
files.download('fault_predictor_scaler.joblib')
files.download('fault_predictor_config.json')

---
## üéÆ Step 12: Real-Time Fault Prediction Demo

Try predicting faults with new sensor measurements!

In [None]:
def predict_fault(measurements):
    """
    Predict fault from sensor measurements.
    
    Parameters:
    -----------
    measurements : dict
        Dictionary with keys: 'Throttle', 'Voltage', 'Current', 'Speed_RPM',
        'Torque', 'Thrust', 'Power_Elec', 'Power_Mech', 'Efficiency'
    
    Returns:
    --------
    dict with fault prediction and confidence
    """
    # Extract base features
    throttle = measurements['Throttle']
    voltage = measurements['Voltage']
    current = measurements['Current']
    speed = measurements['Speed_RPM']
    torque = measurements['Torque']
    thrust = measurements['Thrust']
    p_elec = measurements['Power_Elec']
    p_mech = measurements['Power_Mech']
    efficiency = measurements['Efficiency']
    
    # Calculate derived features
    power_ratio = p_mech / (p_elec + 1e-6)
    thrust_per_power = thrust / (p_mech + 1e-6)
    current_per_torque = current / (abs(torque) + 1e-6)
    voltage_drop_pct = (201.6 - voltage) / 201.6 * 100
    speed_efficiency = speed / (throttle * 2500 + 1e-6)
    
    # Combine features
    features = [
        throttle, voltage, current, speed, torque, thrust,
        p_elec, p_mech, efficiency,
        power_ratio, thrust_per_power, current_per_torque,
        voltage_drop_pct, speed_efficiency
    ]
    
    # Scale and predict
    X = np.array(features).reshape(1, -1)
    X_scaled = scaler.transform(X)
    
    fault_label = final_model.predict(X_scaled)[0]
    probabilities = final_model.predict_proba(X_scaled)[0]
    
    return {
        'fault_label': int(fault_label),
        'fault_name': class_names[int(fault_label)],
        'confidence': float(max(probabilities)),
        'probabilities': {name: f"{prob:.1%}" for name, prob in zip(class_names, probabilities)}
    }

print("‚úÖ Prediction function ready!")

In [None]:
# Interactive Demo - Test different scenarios
print("="*60)
print("  üéÆ FAULT PREDICTION DEMO")
print("="*60)

# Test scenarios
test_cases = [
    {
        'name': '‚úÖ Healthy System',
        'expected': 'Healthy',
        'data': {'Throttle': 0.75, 'Voltage': 196.0, 'Current': 95.0, 'Speed_RPM': 2350,
                 'Torque': -68.0, 'Thrust': 125.0, 'Power_Elec': 18620, 'Power_Mech': 16010, 'Efficiency': 86.0}
    },
    {
        'name': 'üîã Battery Fault (Low Voltage)',
        'expected': 'Battery_Fault',
        'data': {'Throttle': 0.75, 'Voltage': 183.0, 'Current': 120.0, 'Speed_RPM': 2400,
                 'Torque': -72.0, 'Thrust': 130.0, 'Power_Elec': 21960, 'Power_Mech': 18100, 'Efficiency': 87.0}
    },
    {
        'name': '‚ö° Motor Fault (Low Efficiency)',
        'expected': 'Motor_Fault',
        'data': {'Throttle': 0.75, 'Voltage': 195.0, 'Current': 140.0, 'Speed_RPM': 2400,
                 'Torque': -72.0, 'Thrust': 130.0, 'Power_Elec': 27300, 'Power_Mech': 18100, 'Efficiency': 70.0}
    },
    {
        'name': 'üåÄ Propeller Fault (Low Thrust)',
        'expected': 'Propeller_Fault',
        'data': {'Throttle': 0.75, 'Voltage': 196.0, 'Current': 95.0, 'Speed_RPM': 2350,
                 'Torque': -68.0, 'Thrust': 75.0, 'Power_Elec': 18620, 'Power_Mech': 16010, 'Efficiency': 86.0}
    }
]

for test in test_cases:
    print(f"\n{test['name']}")
    print("-" * 50)
    
    result = predict_fault(test['data'])
    
    status = "‚úÖ" if result['fault_name'] == test['expected'] else "‚ùå"
    print(f"   Predicted: {result['fault_name']} {status}")
    print(f"   Confidence: {result['confidence']:.1%}")
    print(f"   Probabilities: {result['probabilities']}")

---
## üéõÔ∏è Step 13: Interactive Prediction (Try Your Own Values!)

Modify the values below and run the cell to predict faults.

In [None]:
#@title üéõÔ∏è Enter Your Sensor Measurements { run: "auto" }

# Modify these values and run the cell!
throttle = 0.75  #@param {type:"slider", min:0.3, max:1.0, step:0.05}
voltage = 196.0  #@param {type:"slider", min:175, max:202, step:1}
current = 100.0  #@param {type:"slider", min:20, max:150, step:5}
speed_rpm = 2400  #@param {type:"slider", min:1500, max:2700, step:50}
torque = -70.0  #@param {type:"slider", min:-80, max:-30, step:2}
thrust = 120.0  #@param {type:"slider", min:50, max:150, step:5}
efficiency = 85.0  #@param {type:"slider", min:65, max:92, step:1}

# Calculate powers
power_elec = voltage * current
power_mech = power_elec * (efficiency / 100)

# Create measurement dict
my_measurements = {
    'Throttle': throttle,
    'Voltage': voltage,
    'Current': current,
    'Speed_RPM': speed_rpm,
    'Torque': torque,
    'Thrust': thrust,
    'Power_Elec': power_elec,
    'Power_Mech': power_mech,
    'Efficiency': efficiency
}

# Predict
result = predict_fault(my_measurements)

# Display result
print("\n" + "="*50)
print("  üîç FAULT PREDICTION RESULT")
print("="*50)

# Color-coded result
fault_icons = {'Healthy': '‚úÖ', 'Battery_Fault': 'üîã', 'Motor_Fault': '‚ö°', 'Propeller_Fault': 'üåÄ'}
icon = fault_icons.get(result['fault_name'], '‚ùì')

print(f"\n   {icon} Detected Fault: {result['fault_name']}")
print(f"   üìä Confidence: {result['confidence']:.1%}")
print(f"\n   Probabilities:")
for fault, prob in result['probabilities'].items():
    bar = '‚ñà' * int(float(prob.strip('%')) / 5)
    print(f"      {fault:20s}: {bar} {prob}")

---
## üìù Summary

### What We Accomplished:

1. ‚úÖ Loaded and explored fault dataset from Simscape simulation
2. ‚úÖ Visualized how different faults affect sensor measurements
3. ‚úÖ Engineered additional features for better fault detection
4. ‚úÖ Trained and compared 6 different ML models
5. ‚úÖ Evaluated model performance with confusion matrix
6. ‚úÖ Analyzed feature importance
7. ‚úÖ Saved trained model for deployment
8. ‚úÖ Built interactive fault prediction demo

### Key Findings:

| Fault Type | Key Indicator |
|------------|---------------|
| Battery Fault | Low voltage at high current |
| Motor Fault | Low efficiency (high elec power, low mech power) |
| Propeller Fault | Low thrust at normal RPM |

### Next Steps:

1. Collect more training data for improved accuracy
2. Deploy model in real-time monitoring system
3. Add severity levels for each fault type
4. Implement early warning system

---
**Thank you for attending this workshop!** üéâ