In [None]:
pip install -r requirements.txt

In [18]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, accuracy_score
from xgboost import XGBClassifier
from xgboost import XGBRegressor
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import IsolationForest

In [None]:
import xgboost as xgb
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt

def prepare_and_train_model(data_path):
    """
    Prepare data and train model for maintenance prediction
    """
    # Read the data
    df = pd.read_csv(data_path)
    
    # Convert timestamp to datetime with the correct format
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    # Feature Engineering
    # 1. Extract time-based features from timestamp
    df['day_of_month'] = df['timestamp'].dt.day
    df['month'] = df['timestamp'].dt.month
    
    # 2. Create rolling window features for each sensor (last 7 days)
    sensors = ['temperature', 'vibration', 'noise_level']
    for sensor in sensors:
        # Rolling mean
        df[f'{sensor}_7day_mean'] = df.groupby('machine_id')[sensor].rolling(
            window=7, min_periods=1).mean().reset_index(0, drop=True)
        
        # Rolling std
        df[f'{sensor}_7day_std'] = df.groupby('machine_id')[sensor].rolling(
            window=7, min_periods=1).std().reset_index(0, drop=True)
        
        # Rate of change (difference from previous day)
        df[f'{sensor}_change'] = df.groupby('machine_id')[sensor].diff()
        
        # Distance from normal operating range
        if sensor == 'temperature':
            normal_range = (60, 70)  # for CNC machines
        elif sensor == 'vibration':
            normal_range = (0.5, 0.9)
        else:  # noise_level
            normal_range = (65, 75)
            
        df[f'{sensor}_deviation'] = df[sensor].apply(
            lambda x: max(0, x - normal_range[1]) + min(0, x - normal_range[0])
        )
    
    # 3. Calculate running hours statistics
    df['hours_run_7day_mean'] = df.groupby('machine_id')['hours_run'].rolling(
        window=7, min_periods=1).mean().reset_index(0, drop=True)
    df['hours_run_7day_std'] = df.groupby('machine_id')['hours_run'].rolling(
        window=7, min_periods=1).std().reset_index(0, drop=True)
    
    # 4. Encode machine type
    le = LabelEncoder()
    df['machine_type_encoded'] = le.fit_transform(df['machine_type'])
    
    # Define features for the model
    feature_columns = [
        'temperature', 'vibration', 'noise_level', 'hours_run',
        'machine_type_encoded', 'day_of_month', 'month',
        'temperature_7day_mean', 'temperature_7day_std', 'temperature_change', 'temperature_deviation',
        'vibration_7day_mean', 'vibration_7day_std', 'vibration_change', 'vibration_deviation',
        'noise_level_7day_mean', 'noise_level_7day_std', 'noise_level_change', 'noise_level_deviation',
        'hours_run_7day_mean', 'hours_run_7day_std'
    ]
    
    # Handle missing values
    df = df.dropna(subset=feature_columns + ['machine_health'])
    
    # Prepare features and target
    X = df[feature_columns]
    y = df['machine_health']
    
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, shuffle=False
    )
    
    # Scale the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Create and train XGBoost model
    model = xgb.XGBRegressor(
        n_estimators=100,
        learning_rate=0.05,
        max_depth=5,
        min_child_weight=1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        callbacks=[xgb.callback.EarlyStopping(rounds=10)]
    )
    
    # Train the model
    model.fit(
        X_train_scaled, 
        y_train,
        eval_set=[(X_test_scaled, y_test)],
        verbose=True
    )
    
    # Make predictions
    y_pred = model.predict(X_test_scaled)
    
    # Calculate metrics
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_pred)
    
    # Create feature importance dataframe
    importance_df = pd.DataFrame({
        'feature': feature_columns,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    # Plot feature importance
    plt.figure(figsize=(12, 6))
    plt.bar(importance_df['feature'][:10], importance_df['importance'][:10])
    plt.xticks(rotation=45, ha='right')
    plt.title('Top 10 Most Important Features')
    plt.tight_layout()
    plt.show()
    
    print("\nActual vs Predicted Health Scores (Sample):")
    comparison_df = pd.DataFrame({
        'Actual': y_test.iloc[:5],
        'Predicted': y_pred[:5],
        'Difference': abs(y_test.iloc[:5] - y_pred[:5])
    })
    print(comparison_df)
    
    return {
        'model': model,
        'scaler': scaler,
        'label_encoder': le,
        'feature_columns': feature_columns,
        'metrics': {
            'rmse': rmse,
            'r2': r2
        },
        'feature_importance': importance_df
    }

def predict_health(model_artifacts, new_data):
    """
    Predict machine health for new data
    """
    df_pred = new_data.copy()
    
    # Apply the same feature engineering
    df_pred['timestamp'] = pd.to_datetime(df_pred['timestamp'])
    df_pred['day_of_month'] = df_pred['timestamp'].dt.day
    df_pred['month'] = df_pred['timestamp'].dt.month
    
    # Calculate rolling features
    sensors = ['temperature', 'vibration', 'noise_level']
    for sensor in sensors:
        df_pred[f'{sensor}_7day_mean'] = df_pred.groupby('machine_id')[sensor].rolling(
            window=7, min_periods=1).mean().reset_index(0, drop=True)
        df_pred[f'{sensor}_7day_std'] = df_pred.groupby('machine_id')[sensor].rolling(
            window=7, min_periods=1).std().reset_index(0, drop=True)
        df_pred[f'{sensor}_change'] = df_pred.groupby('machine_id')[sensor].diff()
        
        if sensor == 'temperature':
            normal_range = (60, 70)
        elif sensor == 'vibration':
            normal_range = (0.5, 0.9)
        else:  # noise_level
            normal_range = (65, 75)
            
        df_pred[f'{sensor}_deviation'] = df_pred[sensor].apply(
            lambda x: max(0, x - normal_range[1]) + min(0, x - normal_range[0])
        )
    
    df_pred['hours_run_7day_mean'] = df_pred.groupby('machine_id')['hours_run'].rolling(
        window=7, min_periods=1).mean().reset_index(0, drop=True)
    df_pred['hours_run_7day_std'] = df_pred.groupby('machine_id')['hours_run'].rolling(
        window=7, min_periods=1).std().reset_index(0, drop=True)
    
    # Encode machine type
    df_pred['machine_type_encoded'] = model_artifacts['label_encoder'].transform(df_pred['machine_type'])
    
    # Select and scale features
    X_pred = df_pred[model_artifacts['feature_columns']]
    X_pred_scaled = model_artifacts['scaler'].transform(X_pred)
    
    # Make predictions
    health_predictions = model_artifacts['model'].predict(X_pred_scaled)
    
    return health_predictions

# Example usage
if __name__ == "__main__":
    model_results = prepare_and_train_model('maintenance_data.csv')
    
    print("\nModel Performance:")
    print(f"RMSE: {model_results['metrics']['rmse']:.2f}")
    print(f"R² Score: {model_results['metrics']['r2']:.2f}")
    
    print("\nTop 5 Most Important Features:")
    print(model_results['feature_importance'].head())

In [None]:
# Load the dataset
file_path = 'maintenance_data.csv'  # Replace with the actual path
data = pd.read_csv(file_path)

# Add a random 'Health_Status' column
health_status_options = ['Healthy', 'Warning', 'Critical']

# Generate random health status for each row in the dataset
np.random.seed(42)
data['Health_Status'] = np.random.choice(health_status_options, size=len(data))
print(data.head())


In [14]:
# Convert all categorical columns to numeric values
for column in data.select_dtypes(include=['object']).columns:
    label_encoder = LabelEncoder()
    data[column] = label_encoder.fit_transform(data[column])

In [None]:
# Add a random 'Health_Status' column
health_status_options = ['Healthy', 'Warning', 'Critical']
np.random.seed(42)
data['Health_Status'] = np.random.choice(health_status_options, size=len(data))

# Convert categorical columns to numeric using LabelEncoder
for column in data.select_dtypes(include=['object']).columns:
    label_encoder = LabelEncoder()
    data[column] = label_encoder.fit_transform(data[column])


target_column = 'Health_Status'
features = data.drop(columns=[target_column])
target = data[target_column]

# Handle missing values 
features = features.fillna(features.mean())

# Scale the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

X_train, X_test, y_train, y_test = train_test_split(
    features_scaled, target, test_size=0.2, random_state=42
)

# Train the XGBoost model
model = XGBClassifier()
model.fit(X_train, y_train)

# Predict health status for new data
health_predictions = label_encoder.inverse_transform(model.predict(X_test))
print("Predicted Health Status:", health_predictions)

# Handle unexpected non-numeric values
data = data.apply(pd.to_numeric, errors='coerce')

# Check for NaN values after conversion and fill them with the mean
data = data.fillna(data.mean())

if 'cycle' in data.columns and 'failure' in data.columns:
    data['RUL'] = data.groupby('unit')['cycle'].transform('max') - data['cycle']
else:
    data['cycle'] = np.random.randint(1, 100, size=len(data))
    data['failure'] = np.random.choice([0, 1], size=len(data))
    data['RUL'] = np.random.randint(1, 100, size=len(data))

# Split the data into features and target (RUL)
target_column = 'RUL'
features = data.drop(columns=[target_column])
target = data[target_column]

# Scale the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

X_train, X_test, y_train, y_test = train_test_split(
    features_scaled, target, test_size=0.2, random_state=42
)

# Train the XGBoost Regressor model
model = XGBRegressor()
model.fit(X_train, y_train)

# Predict RUL for new data
rul_predictions = model.predict(X_test)
print("Predicted RUL:", rul_predictions)


In [None]:
# Initialize the Isolation Forest model
iso_forest = IsolationForest(contamination=0.05, random_state=42)

# Fit the model to the feature data
anomaly_pred = iso_forest.fit_predict(features_scaled)

# Convert predictions: -1 for anomaly, 1 for normal
anomalies = np.where(anomaly_pred == -1, 1, 0)

# Add anomaly labels to the data for inspection
data['Anomaly'] = anomalies

# Check the data with anomaly labels
print(data[['Anomaly'] + list(features.columns)].head())


In [None]:
# Train XGBoost model to identify the important features
X_train, X_test, y_train, y_test = train_test_split(features_scaled, anomalies, test_size=0.2, random_state=42)

# Train the XGBoost model
model = XGBClassifier()
model.fit(X_train, y_train)

# Evaluate the model's performance
y_pred = model.predict(X_test)
print("Classification Report:")
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))

importances = model.feature_importances_

import matplotlib.pyplot as plt

sorted_idx = np.argsort(importances)[::-1]

# Plot the feature importances
plt.figure(figsize=(10, 6))
plt.barh(range(len(sorted_idx)), importances[sorted_idx], align='center')
plt.yticks(range(len(sorted_idx)), features.columns[sorted_idx])
plt.xlabel("Feature Importance")
plt.title("Feature Importance for Anomaly Detection")
plt.show()


In [None]:
# List of CNC machine components
components_list = [
    'Controller', 'Drives (Motors)', 'Spindle', 'Linear Guides', 
    'Ball Screws', 'Table', 'CNC Tool', 'Coolant System', 
    'Feedback Sensors', 'Power Supply', 'Operator Interface', 'Safety Systems'
]

# Add a column named 'components' with all components listed above
data['components'] = ', '.join(components_list)

print(data[['components']].head())


In [None]:
# Apply Isolation Forest to detect anomalies
iso_forest = IsolationForest(contamination=0.05, random_state=42)

# Fit the model to the features
anomaly_pred = iso_forest.fit_predict(features_scaled)

# Convert predictions: -1 for anomaly, 1 for normal
anomalies = np.where(anomaly_pred == -1, 1, 0)

# Add the anomaly labels to the dataset
data['Anomaly'] = anomalies

# Check the first few rows with anomaly labels
print(data[['Anomaly', 'components']].head())


In [None]:
# List of CNC machine components
components_list = [
    'Controller', 'Drives (Motors)', 'Spindle', 'Linear Guides', 
    'Ball Screws', 'Table', 'CNC Tool', 'Coolant System', 
    'Feedback Sensors', 'Power Supply', 'Operator Interface', 'Safety Systems'
]

# Function to identify components likely to fail based on anomaly detection and feature importance
def get_failing_components_per_machine(anomaly_labels, feature_importances, components_list, importance_threshold=0.1):
    failing_components = []
    
    # Determine which components have high importance (greater than the threshold)
    for i, importance in enumerate(feature_importances):
        if importance > importance_threshold:
            failing_components.append(components_list[i])
    
    # If an anomaly is detected, return the list of likely failing components for this machine
    if anomaly_labels == 1:
        return ', '.join(failing_components)
    else:
        return np.nan

# Apply the function to each row of the dataset
data['components_about_to_fail'] = data.apply(
    lambda row: get_failing_components_per_machine(row['Anomaly'], model.feature_importances_, components_list, importance_threshold=0.1),
    axis=1
)

print(data[['components', 'Anomaly', 'components_about_to_fail']].head())

In [None]:
# Simulated Remaining Life
data['Simulated_Remaining_Life'] = np.random.randint(1, 100, size=len(data))

# Failure Risk Score (arbitrary, for demonstration purposes)
data['Failure_Risk_Score'] = np.random.randint(1, 11, size=len(data))

from sklearn.preprocessing import LabelEncoder

# Convert all categorical columns to numeric values
for column in data.select_dtypes(include=['object']).columns:
    label_encoder = LabelEncoder()
    data[column] = label_encoder.fit_transform(data[column])

# Split the data into features and target
target_column = 'Simulated_Remaining_Life'
features = data.drop(columns=[target_column])
target = data[target_column]

# Handle missing values if any
features = features.fillna(features.mean())

# Scale the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features)

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    features_scaled, target, test_size=0.2, random_state=42
)

# Train an XGBoost Regressor to predict remaining life
model = XGBRegressor()
model.fit(X_train, y_train)

# Predict Simulated Remaining Life and Failure Risk Score
predicted_remaining_life = model.predict(X_test)

# Add predictions back to the dataset for display
output_df = pd.DataFrame({
    'Simulated_Remaining_Life': y_test,  # True values
    'Predicted_Simulated_Remaining_Life': predicted_remaining_life,
    'Failure_Risk_Score': data.loc[y_test.index, 'Failure_Risk_Score'],  # True failure scores
    'Predicted_Failure_Risk_Score': np.random.uniform(9, 11, len(y_test))  # Simulated predictions
}).reset_index(drop=True)

# Display the result
print(output_df.head(10))
