# Survival Analysis: NASA Turbofan Engine Degradation (C-MAPSS)
## Predicting Remaining Useful Life (RUL)

**Objective**: Predict time until engine failure using sensor data from turbofan engines.

**Dataset**: NASA's C-MAPSS (Commercial Modular Aero-Propulsion System Simulation)
- Available at: https://ti.arc.nasa.gov/tech/dash/groups/pcoe/prognostic-data-repository/
- Also on Kaggle: https://www.kaggle.com/datasets/behrad3d/nasa-cmaps

**Survival Analysis Setup**:
- **Time variable**: Cycles until failure (RUL - Remaining Useful Life)
- **Event**: Engine failure (all engines fail in training set)
- **Censoring**: Test set engines (RUL unknown)
- **Features**: 21 sensor measurements + 3 operational settings

In [None]:
# Install required packages
!pip install lifelines pandas numpy matplotlib seaborn scikit-learn

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from lifelines import KaplanMeierFitter, CoxPHFitter, WeibullAFTFitter
from lifelines.statistics import logrank_test
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import warnings
warnings.filterwarnings('ignore')

plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('tab10')
%matplotlib inline

## 1. Data Loading and Exploration

In [None]:
# Column names for the dataset
index_names = ['unit_id', 'time_cycles']
setting_names = ['setting_1', 'setting_2', 'setting_3']
sensor_names = [f'sensor_{i}' for i in range(1, 22)]
col_names = index_names + setting_names + sensor_names

# Load training data (FD001 - simplest subset with single operating condition)
# Download from: https://ti.arc.nasa.gov/tech/dash/groups/pcoe/prognostic-data-repository/
train_df = pd.read_csv('train_FD001.txt', sep=' ', header=None, names=col_names)
train_df = train_df.dropna(axis=1)  # Remove empty columns from space-separated format

# Load test data
test_df = pd.read_csv('test_FD001.txt', sep=' ', header=None, names=col_names)
test_df = test_df.dropna(axis=1)

# Load true RUL values for test set
rul_df = pd.read_csv('RUL_FD001.txt', sep=' ', header=None, names=['RUL'])

print(f"Training data shape: {train_df.shape}")
print(f"Test data shape: {test_df.shape}")
print(f"\nColumns: {train_df.columns.tolist()}")
print(f"\nNumber of engines in training: {train_df['unit_id'].nunique()}")
print(f"Number of engines in test: {test_df['unit_id'].nunique()}")

In [None]:
# Explore a single engine's lifecycle
sample_engine = train_df[train_df['unit_id'] == 1]
print(f"Sample engine lifecycle:")
print(f"Total cycles: {sample_engine['time_cycles'].max()}")
print(f"Sensor measurements per cycle: {len(sensor_names)}")
print(f"\nFirst few cycles:")
print(sample_engine.head())

## 2. Feature Engineering for Survival Analysis

In [None]:
# Calculate RUL (Remaining Useful Life) for each cycle
def add_rul(df):
    """Add RUL column - cycles until failure"""
    df_rul = df.copy()
    # Get max cycle for each engine
    max_cycles = df_rul.groupby('unit_id')['time_cycles'].max().reset_index()
    max_cycles.columns = ['unit_id', 'max_cycles']
    
    # Merge and calculate RUL
    df_rul = df_rul.merge(max_cycles, on='unit_id', how='left')
    df_rul['RUL'] = df_rul['max_cycles'] - df_rul['time_cycles']
    
    return df_rul

train_df = add_rul(train_df)

print("RUL statistics in training data:")
print(train_df['RUL'].describe())

# Visualize RUL distribution
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].hist(train_df['RUL'], bins=50, edgecolor='black', alpha=0.7)
axes[0].set_xlabel('Remaining Useful Life (cycles)', fontsize=12)
axes[0].set_ylabel('Frequency', fontsize=12)
axes[0].set_title('Distribution of RUL in Training Data', fontsize=14, fontweight='bold')
axes[0].grid(True, alpha=0.3)

# Show lifecycle of first 5 engines
for engine_id in range(1, 6):
    engine_data = train_df[train_df['unit_id'] == engine_id]
    axes[1].plot(engine_data['time_cycles'], engine_data['RUL'], label=f'Engine {engine_id}')

axes[1].set_xlabel('Time (cycles)', fontsize=12)
axes[1].set_ylabel('Remaining Useful Life', fontsize=12)
axes[1].set_title('RUL Degradation for Sample Engines', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Create survival dataset - use last observation for each engine
# This represents the point at which we want to predict RUL

# For training: use observations at various points in lifecycle
# Strategy: Sample multiple timepoints per engine

def create_survival_dataset(df, sampling_strategy='last_only'):
    """
    Create survival dataset from engine run data.
    
    sampling_strategy:
    - 'last_only': Use only final measurement per engine
    - 'multiple': Sample multiple points per engine
    """
    if sampling_strategy == 'last_only':
        # Take last cycle for each engine
        survival_df = df.groupby('unit_id').last().reset_index()
        
    elif sampling_strategy == 'multiple':
        # Sample at different lifecycle stages
        dfs = []
        for unit_id in df['unit_id'].unique():
            engine_data = df[df['unit_id'] == unit_id]
            max_cycle = engine_data['time_cycles'].max()
            
            # Sample at 25%, 50%, 75%, 100% of lifecycle
            sample_cycles = [int(max_cycle * p) for p in [0.25, 0.5, 0.75, 1.0]]
            
            for cycle in sample_cycles:
                closest = engine_data.iloc[(engine_data['time_cycles'] - cycle).abs().argsort()[0]]
                dfs.append(closest)
        
        survival_df = pd.DataFrame(dfs)
    
    # Duration = RUL at this point
    survival_df['duration'] = survival_df['RUL']
    
    # Event = 1 (all training engines eventually fail)
    survival_df['event'] = 1
    
    return survival_df

# Create datasets with different strategies
survival_last = create_survival_dataset(train_df, 'last_only')
survival_multi = create_survival_dataset(train_df, 'multiple')

print(f"Survival dataset (last only): {survival_last.shape}")
print(f"Survival dataset (multiple samples): {survival_multi.shape}")
print(f"\nDuration statistics:")
print(survival_last['duration'].describe())

## 3. Sensor Data Analysis and Feature Selection

In [None]:
# Identify useful sensors (those that show variation)
sensor_std = train_df[sensor_names].std()
useful_sensors = sensor_std[sensor_std > 0.001].index.tolist()

print(f"Useful sensors (showing variation): {len(useful_sensors)}")
print(f"Sensors: {useful_sensors}")

# Visualize correlation between sensors and RUL
fig, ax = plt.subplots(figsize=(12, 8))

correlation_with_rul = train_df[useful_sensors + ['RUL']].corr()['RUL'].drop('RUL')
correlation_with_rul = correlation_with_rul.abs().sort_values(ascending=False)

y_pos = np.arange(len(correlation_with_rul))
ax.barh(y_pos, correlation_with_rul.values, alpha=0.7)
ax.set_yticks(y_pos)
ax.set_yticklabels(correlation_with_rul.index)
ax.set_xlabel('Absolute Correlation with RUL', fontsize=12)
ax.set_title('Sensor Importance for RUL Prediction', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()

# Select top sensors
top_sensors = correlation_with_rul.head(10).index.tolist()
print(f"\nTop 10 sensors for RUL prediction: {top_sensors}")

## 4. Kaplan-Meier Analysis: Overall Engine Survival

In [None]:
# Overall survival curve for engines
kmf = KaplanMeierFitter()
kmf.fit(survival_last['duration'], survival_last['event'], label='All Engines')

fig, ax = plt.subplots(figsize=(12, 6))
kmf.plot_survival_function(ax=ax, ci_show=True)
plt.title('Kaplan-Meier Survival Curve: Turbofan Engine Remaining Life', 
         fontsize=14, fontweight='bold')
plt.xlabel('Remaining Useful Life (cycles)', fontsize=12)
plt.ylabel('Probability of Engine Survival', fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"Median RUL: {kmf.median_survival_time_:.1f} cycles")
print(f"\nSurvival probabilities:")
for cycles in [50, 100, 150, 200]:
    prob = kmf.predict(cycles)
    print(f"  {cycles} cycles: {prob:.1%}")

## 5. Stratified Analysis: By Operating Condition

In [None]:
# Stratify by setting_1 (operating condition)
# Create bins for operational settings
survival_last['setting_1_bin'] = pd.qcut(survival_last['setting_1'], q=3, 
                                          labels=['Low', 'Medium', 'High'])

fig, ax = plt.subplots(figsize=(12, 6))

for condition in ['Low', 'Medium', 'High']:
    mask = survival_last['setting_1_bin'] == condition
    kmf_cond = KaplanMeierFitter()
    kmf_cond.fit(survival_last[mask]['duration'], 
                 survival_last[mask]['event'], 
                 label=f'{condition} Operating Condition')
    kmf_cond.plot_survival_function(ax=ax, ci_show=False)

plt.title('Survival Curves by Operating Condition', fontsize=14, fontweight='bold')
plt.xlabel('Remaining Useful Life (cycles)', fontsize=12)
plt.ylabel('Probability of Engine Survival', fontsize=12)
plt.legend(title='Setting 1')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 6. Stratified Analysis: By Sensor Degradation Level

In [None]:
# Use top sensor as stratification variable
top_sensor = top_sensors[0]
survival_last['sensor_level'] = pd.qcut(survival_last[top_sensor], q=3, 
                                         labels=['Low', 'Medium', 'High'],
                                         duplicates='drop')

fig, ax = plt.subplots(figsize=(12, 6))

for level in survival_last['sensor_level'].dropna().unique():
    mask = survival_last['sensor_level'] == level
    kmf_sensor = KaplanMeierFitter()
    kmf_sensor.fit(survival_last[mask]['duration'], 
                   survival_last[mask]['event'], 
                   label=f'{top_sensor}: {level}')
    kmf_sensor.plot_survival_function(ax=ax, ci_show=False)

plt.title(f'Survival Curves by {top_sensor} Level', fontsize=14, fontweight='bold')
plt.xlabel('Remaining Useful Life (cycles)', fontsize=12)
plt.ylabel('Probability of Engine Survival', fontsize=12)
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 7. Cox Proportional Hazards Model

In [None]:
# Prepare features for Cox model
cox_features = setting_names + top_sensors

# Standardize features
scaler = StandardScaler()
cox_df = survival_last[cox_features + ['duration', 'event']].copy()
cox_df[cox_features] = scaler.fit_transform(cox_df[cox_features])

# Remove any missing values
cox_df = cox_df.dropna()

print(f"Cox dataset shape: {cox_df.shape}")
print(f"Features: {len(cox_features)}")

In [None]:
# Fit Cox model
cph = CoxPHFitter(penalizer=0.1)
cph.fit(cox_df, duration_col='duration', event_col='event')

print("Cox Proportional Hazards Model Summary:")
print(f"Concordance Index: {cph.concordance_index_:.4f}")
print(f"Log-likelihood: {cph.log_likelihood_:.4f}")
print(f"AIC: {cph.AIC_:.4f}")

In [None]:
# Display model summary
summary = cph.summary
summary['hazard_ratio'] = np.exp(summary['coef'])
summary_sorted = summary.sort_values('p', ascending=True)

print("\nSignificant Factors (p < 0.05):")
significant = summary_sorted[summary_sorted['p'] < 0.05]
print(significant[['coef', 'hazard_ratio', 'p']].to_string())

In [None]:
# Visualize coefficients
fig, ax = plt.subplots(figsize=(10, 8))

y_pos = np.arange(len(summary))
coefficients = summary['coef'].values
labels = summary.index

colors = ['red' if c > 0 else 'green' for c in coefficients]
ax.barh(y_pos, coefficients, color=colors, alpha=0.6)
ax.axvline(0, color='black', linestyle='--', linewidth=1)
ax.set_yticks(y_pos)
ax.set_yticklabels(labels)
ax.set_xlabel('Cox Coefficient', fontsize=12)
ax.set_title('Feature Importance for Engine Failure\n(Red: Increases failure risk, Green: Decreases risk)', 
            fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()

## 8. Weibull AFT Model

In [None]:
# Fit Weibull AFT model
aft = WeibullAFTFitter(penalizer=0.1)
aft.fit(cox_df, duration_col='duration', event_col='event')

print("Weibull AFT Model Summary:")
print(f"Concordance Index: {aft.concordance_index_:.4f}")
print(f"\nLambda (scale) parameters:")
print(aft.summary.loc[aft.summary.index.str.contains('lambda_')])
print(f"\nRho (shape) parameter:")
print(aft.summary.loc[aft.summary.index.str.contains('rho_')])

## 9. RUL Prediction for Test Engines

In [None]:
# Prepare test data - take last observation for each test engine
test_last = test_df.groupby('unit_id').last().reset_index()

# Standardize test features using training scaler
test_features = test_last[cox_features].copy()
test_features_scaled = pd.DataFrame(
    scaler.transform(test_features),
    columns=cox_features,
    index=test_features.index
)

# Predict using Cox model
predicted_rul_cox = cph.predict_expectation(test_features_scaled)

# Predict using Weibull AFT
predicted_rul_aft = aft.predict_expectation(test_features_scaled)

# Compare with true RUL
results = pd.DataFrame({
    'unit_id': test_last['unit_id'].values,
    'true_RUL': rul_df['RUL'].values,
    'predicted_RUL_cox': predicted_rul_cox.values,
    'predicted_RUL_aft': predicted_rul_aft.values
})

results['error_cox'] = results['true_RUL'] - results['predicted_RUL_cox']
results['error_aft'] = results['true_RUL'] - results['predicted_RUL_aft']

print("Prediction Results (first 10 engines):")
print(results.head(10).to_string())

In [None]:
# Calculate prediction metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

metrics = {
    'Cox Model': {
        'RMSE': np.sqrt(mean_squared_error(results['true_RUL'], results['predicted_RUL_cox'])),
        'MAE': mean_absolute_error(results['true_RUL'], results['predicted_RUL_cox']),
        'R²': r2_score(results['true_RUL'], results['predicted_RUL_cox'])
    },
    'Weibull AFT': {
        'RMSE': np.sqrt(mean_squared_error(results['true_RUL'], results['predicted_RUL_aft'])),
        'MAE': mean_absolute_error(results['true_RUL'], results['predicted_RUL_aft']),
        'R²': r2_score(results['true_RUL'], results['predicted_RUL_aft'])
    }
}

print("\nPrediction Performance Metrics:")
print("="*60)
for model, scores in metrics.items():
    print(f"\n{model}:")
    for metric, value in scores.items():
        print(f"  {metric}: {value:.2f}")

In [None]:
# Visualize predictions
fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Cox predictions
axes[0].scatter(results['true_RUL'], results['predicted_RUL_cox'], alpha=0.6)
axes[0].plot([0, results['true_RUL'].max()], [0, results['true_RUL'].max()], 
            'r--', linewidth=2, label='Perfect prediction')
axes[0].set_xlabel('True RUL (cycles)', fontsize=12)
axes[0].set_ylabel('Predicted RUL (cycles)', fontsize=12)
axes[0].set_title(f'Cox Model Predictions\nRMSE: {metrics["Cox Model"]["RMSE"]:.2f}', 
                 fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Weibull AFT predictions
axes[1].scatter(results['true_RUL'], results['predicted_RUL_aft'], alpha=0.6, color='orange')
axes[1].plot([0, results['true_RUL'].max()], [0, results['true_RUL'].max()], 
            'r--', linewidth=2, label='Perfect prediction')
axes[1].set_xlabel('True RUL (cycles)', fontsize=12)
axes[1].set_ylabel('Predicted RUL (cycles)', fontsize=12)
axes[1].set_title(f'Weibull AFT Predictions\nRMSE: {metrics["Weibull AFT"]["RMSE"]:.2f}', 
                 fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 10. Survival Curves for Sample Test Engines

In [None]:
# Plot predicted survival curves for sample engines
sample_indices = [0, 10, 20, 30, 40]

fig, ax = plt.subplots(figsize=(12, 6))

for idx in sample_indices:
    engine_features = test_features_scaled.iloc[idx:idx+1]
    surv_func = cph.predict_survival_function(engine_features)
    
    unit_id = results.iloc[idx]['unit_id']
    true_rul = results.iloc[idx]['true_RUL']
    
    surv_func.plot(ax=ax, label=f'Engine {unit_id} (True RUL: {true_rul:.0f})')

plt.title('Predicted Survival Curves for Sample Test Engines', fontsize=14, fontweight='bold')
plt.xlabel('Remaining Useful Life (cycles)', fontsize=12)
plt.ylabel('Probability of Survival', fontsize=12)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 11. Key Insights and Recommendations

In [None]:
print("=" * 80)
print("KEY FINDINGS: TURBOFAN ENGINE RUL PREDICTION")
print("=" * 80)

print(f"\n1. Overall Engine Statistics:")
print(f"   - Median RUL at failure: {kmf.median_survival_time_:.1f} cycles")
print(f"   - Training engines: {train_df['unit_id'].nunique()}")
print(f"   - Test engines: {test_df['unit_id'].nunique()}")

print(f"\n2. Model Performance:")
print(f"   - Cox C-Index: {cph.concordance_index_:.4f}")
print(f"   - Cox RMSE: {metrics['Cox Model']['RMSE']:.2f} cycles")
print(f"   - Cox MAE: {metrics['Cox Model']['MAE']:.2f} cycles")
print(f"   - Weibull AFT C-Index: {aft.concordance_index_:.4f}")
print(f"   - Weibull RMSE: {metrics['Weibull AFT']['RMSE']:.2f} cycles")

print(f"\n3. Most Important Sensors:")
for idx, sensor in enumerate(top_sensors[:5], 1):
    corr = correlation_with_rul[sensor]
    print(f"   {idx}. {sensor}: correlation = {corr:.3f}")

print(f"\n4. Key Risk Factors (from Cox model):")
top_risk = summary_sorted[summary_sorted['p'] < 0.05].head(5)
for idx, (factor, row) in enumerate(top_risk.iterrows(), 1):
    print(f"   {idx}. {factor}: HR={row['hazard_ratio']:.3f} (p={row['p']:.4f})")

print("\n" + "=" * 80)
print("RECOMMENDATIONS FOR PREDICTIVE MAINTENANCE")
print("=" * 80)
print("1. Monitor high-importance sensors continuously for early degradation detection")
print("2. Schedule maintenance when predicted RUL < 50 cycles (safety margin)")
print("3. Implement different monitoring strategies for different operating conditions")
print("4. Use survival curves to optimize maintenance scheduling and minimize downtime")
print("5. Focus on sensors with strongest RUL correlation for real-time monitoring")
print("6. Consider ensemble methods combining Cox and AFT predictions")
print("7. Update predictions as new sensor data becomes available (online learning)")
print("=" * 80)

## Next Steps

1. **Deep learning**: Implement LSTM or CNN-based survival models for sequential sensor data
2. **Feature engineering**: Create rolling statistics, trends, and rate-of-change features
3. **Multi-dataset**: Extend to FD002, FD003, FD004 with multiple operating conditions
4. **Uncertainty quantification**: Add confidence intervals to RUL predictions
5. **Anomaly detection**: Combine survival analysis with outlier detection
6. **Cost-sensitive learning**: Optimize for maintenance cost minimization
7. **Transfer learning**: Apply models trained on one engine type to others