# **Importing The Necessary Libraries**

In [2]:
import pandas as pd
import numpy as np
import re
from datetime import timedelta
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import IsolationForest

# **Preprocessing & Feature Engineering**

In [7]:
def load_data(file_path):
    df = pd.read_csv(file_path)
    print("After loading, shape:", df.shape)
    print("Columns:", df.columns.tolist())
    print("Sample data:\n", df.head())
    return df

def preprocess_time(df, time_col='start_time'):
    df[time_col] = pd.to_datetime(df[time_col], unit='ns', errors='coerce')
    print(f"After converting {time_col} to datetime, null count:", df[time_col].isnull().sum())
    df = df.sort_values(time_col)
    df.reset_index(drop=True, inplace=True)
    df['hour_of_day'] = df[time_col].dt.hour
    df['day_of_week'] = df[time_col].dt.dayofweek
    df['is_weekend'] = df['day_of_week'].apply(lambda x: 1 if x >= 5 else 0)
    if 'entity_id' in df.columns:
        df['time_since_last'] = df.groupby('entity_id')[time_col].diff().dt.total_seconds().fillna(0)
    else:
        df['time_since_last'] = df[time_col].diff().dt.total_seconds().fillna(0)
    print("After time preprocessing, shape:", df.shape)
    return df

def map_to_kubernetes(df):
    mapping = {
        'average_usage': 'container_memory_usage_bytes',
        'maximum_usage': 'container_memory_max_usage_bytes',
        'assigned_memory': 'container_spec_memory_limit_bytes',
        'page_cache_memory': 'container_memory_cache',
        'sample_rate': 'scrape_interval'
    }
    k8s_mapping = {
        'pod': 'The smallest deployable unit in Kubernetes',
        'node': 'A worker machine in Kubernetes',
        'container': 'A lightweight and portable executable image',
        'namespace': 'Virtual cluster in Kubernetes',
        'deployment': 'Manages replica sets and provides declarative updates to pods'
    }
    print("Kubernetes concept mapping for reference:")
    for k, v in k8s_mapping.items():
        print(f"  {k}: {v}")
    return df, mapping

def engineer_features(df):
    base_features = ['average_usage','maximum_usage','assigned_memory','page_cache_memory','sample_rate']
    # Convert to numeric values
    for col in base_features:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    derived_features = []
    # Process cpu_usage_distribution if available
    if 'cpu_usage_distribution' in df.columns:
        def parse_cpu_usage(dist_str):
            try:
                s = dist_str.strip("[]").strip()
                parts = re.split(r'\s+', s)
                values = [float(p) for p in parts if p]
                if values:
                    return np.mean(values), np.std(values), np.max(values), np.percentile(values,95)
                else:
                    return np.nan, np.nan, np.nan, np.nan
            except:
                return np.nan, np.nan, np.nan, np.nan
        cpu_stats = df['cpu_usage_distribution'].apply(parse_cpu_usage)
        df['cpu_usage_mean'] = [x[0] for x in cpu_stats]
        df['cpu_usage_std'] = [x[1] for x in cpu_stats]
        df['cpu_usage_max'] = [x[2] for x in cpu_stats]
        df['cpu_usage_p95'] = [x[3] for x in cpu_stats]
        derived_features.extend(['cpu_usage_mean','cpu_usage_std','cpu_usage_max','cpu_usage_p95'])

    # Create derived memory metrics
    df['memory_utilization'] = df['average_usage'] / df['assigned_memory']
    df['memory_pressure'] = df['maximum_usage'] / df['assigned_memory']
    df['cache_ratio'] = df['page_cache_memory'] / df['assigned_memory']

    # Calculate changes over time if available
    if 'time_since_last' in df.columns and df['time_since_last'].max() > 0:
        df.loc[df['time_since_last'] == 0, 'time_since_last'] = np.nan
        for feature in base_features:
            if feature in df.columns:
                df[f'{feature}_change'] = df[feature].diff() / df['time_since_last']
                derived_features.append(f'{feature}_change')

    # Rolling statistics over different windows
    windows = [3, 5, 10]
    if 'entity_id' in df.columns:
        for window in windows:
            for feature in ['memory_utilization', 'memory_pressure']:
                df[f'{feature}rolling_mean{window}'] = df.groupby('entity_id')[feature].rolling(window=window, min_periods=1).mean().reset_index(level=0, drop=True)
                df[f'{feature}rolling_std{window}'] = df.groupby('entity_id')[feature].rolling(window=window, min_periods=1).std().reset_index(level=0, drop=True)
                derived_features.extend([f'{feature}rolling_mean{window}', f'{feature}rolling_std{window}'])
    else:
        for window in windows:
            for feature in ['memory_utilization', 'memory_pressure']:
                df[f'{feature}rolling_mean{window}'] = df[feature].rolling(window=window, min_periods=1).mean()
                df[f'{feature}rolling_std{window}'] = df[feature].rolling(window=window, min_periods=1).std()
                derived_features.extend([f'{feature}rolling_mean{window}', f'{feature}rolling_std{window}'])

    all_features = base_features + derived_features + ['memory_utilization', 'memory_pressure', 'cache_ratio', 'hour_of_day', 'day_of_week', 'is_weekend']
    df.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Ensure 'failed' is in the dataframe
    if 'failed' not in df.columns:
        raise ValueError("Target column 'failed' not found in the dataset.")
    df['failed'] = df['failed'].astype(int)

    # Derive additional failure flags
    df['resource_exhaustion'] = ((df['maximum_usage'] > df['assigned_memory'] * 0.9) & (df['failed'] == 1)).astype(int)
    df['memory_pressure_failure'] = ((df['memory_pressure'] > 0.85) & (df['failed'] == 1)).astype(int)
    df['other_failure'] = ((df['failed'] == 1) & (df['resource_exhaustion'] == 0) & (df['memory_pressure_failure'] == 0)).astype(int)

    df.dropna(subset=['failed'], inplace=True)
    columns_to_fill = list(set(all_features + ['resource_exhaustion','memory_pressure_failure','other_failure']) - {'failed'})
    df[columns_to_fill] = df[columns_to_fill].fillna(0)
    print(f"After handling NaNs, shape is now {df.shape}")
    print("Remaining columns:", df.columns.tolist())
    return df, all_features

def detect_anomalies(df, features, contamination=0.05):
    print("Running anomaly detection...")
    X = df[features].copy().fillna(0)
    isolation_forest = IsolationForest(contamination=contamination, random_state=42)
    df['anomaly_score'] = isolation_forest.fit_predict(X)
    df['is_anomaly'] = (df['anomaly_score'] == -1).astype(int)
    print(f"Detected {df['is_anomaly'].sum()} anomalies out of {len(df)} records ({df['is_anomaly'].mean() * 100:.2f}%)")
    if 'failed' in df.columns:
        correlation = df['is_anomaly'].corr(df['failed'])
        print(f"Correlation between anomalies and failures: {correlation:.4f}")
    return df

def normalize_features(df, features):
    scaler = MinMaxScaler()
    print("Before normalization, shape of features:", df[features].shape)
    df_scaled = df.copy()
    df_scaled[features] = scaler.fit_transform(df[features])
    print("After normalization, shape of features:", df_scaled[features].shape)
    return df_scaled, scaler

def create_sequences(data, features, seq_length, target_cols):
    X_list = []
    y_dict = {target: [] for target in target_cols}

    def create_sliding_sequences(arr, targets_arr):
        windows = np.lib.stride_tricks.sliding_window_view(arr, window_shape=(seq_length, arr.shape[1]))
        windows = windows.squeeze(1)
        windows = windows[:-1]
        targets = targets_arr[seq_length:]
        return windows, targets

    if 'entity_id' in data.columns:
        grouped = data.groupby('entity_id')
        for name, group in grouped:
            if len(group) <= seq_length:
                continue
            arr = group[features].values
            targets_arr = group[target_cols].values
            if len(arr) > seq_length:
                seq_X, seq_targets = create_sliding_sequences(arr, targets_arr)
                X_list.append(seq_X)
                for idx, target in enumerate(target_cols):
                    y_dict[target].append(seq_targets[:, idx])
    else:
        arr = data[features].values
        targets_arr = data[target_cols].values
        if len(arr) > seq_length:
            X_arr, targets = create_sliding_sequences(arr, targets_arr)
            X_list.append(X_arr)
            for idx, target in enumerate(target_cols):
                y_dict[target] = targets[:, idx]
    if not X_list:
        print("No sequences could be created. Possibly not enough data per entity.")
        return np.array([]), {target: np.array([]) for target in target_cols}
    X = np.concatenate(X_list, axis=0)
    for target in target_cols:
        if isinstance(y_dict[target], list):
            y_dict[target] = np.concatenate(y_dict[target], axis=0)
    print("Created sequences: X shape =", X.shape)
    for target, y in y_dict.items():
        if len(y) > 0:
            print(f"y_{target} shape =", y.shape)
            print(f"Positive samples for {target}: {np.sum(y)} ({np.sum(y)/len(y)*100:.2f}%)")
        else:
            print(f"No targets for {target}.")
    return X, y_dict


In [12]:
def engineer_features(df):
    base_features = ['average_usage','maximum_usage','assigned_memory','page_cache_memory','sample_rate']
    for col in base_features:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    derived_features = []
    if 'cpu_usage_distribution' in df.columns:
        def parse_cpu_usage(dist_str):
            try:
                s = dist_str.strip("[]").strip()
                parts = re.split(r'\s+', s)
                values = [float(p) for p in parts if p]
                if values:
                    return np.mean(values), np.std(values), np.max(values), np.percentile(values,95)
                else:
                    return np.nan, np.nan, np.nan, np.nan
            except:
                return np.nan, np.nan, np.nan, np.nan
        cpu_stats = df['cpu_usage_distribution'].apply(parse_cpu_usage)
        df['cpu_usage_mean'] = [x[0] for x in cpu_stats]
        df['cpu_usage_std'] = [x[1] for x in cpu_stats]
        df['cpu_usage_max'] = [x[2] for x in cpu_stats]
        df['cpu_usage_p95'] = [x[3] for x in cpu_stats]
        derived_features.extend(['cpu_usage_mean','cpu_usage_std','cpu_usage_max','cpu_usage_p95'])

    df['memory_utilization'] = df['average_usage'] / df['assigned_memory']
    df['memory_pressure'] = df['maximum_usage'] / df['assigned_memory']
    df['cache_ratio'] = df['page_cache_memory'] / df['assigned_memory']

    if 'time_since_last' in df.columns and df['time_since_last'].max() > 0:
        df.loc[df['time_since_last'] == 0, 'time_since_last'] = np.nan
        for feature in base_features:
            if feature in df.columns:
                df[f'{feature}_change'] = df[feature].diff() / df['time_since_last']
                derived_features.append(f'{feature}_change')

    windows = [3, 5, 10]
    if 'entity_id' in df.columns:
        for window in windows:
            for feature in ['memory_utilization', 'memory_pressure']:
                df[f'{feature}rolling_mean{window}'] = df.groupby('entity_id')[feature].rolling(window=window, min_periods=1).mean().reset_index(level=0, drop=True)
                df[f'{feature}rolling_std{window}'] = df.groupby('entity_id')[feature].rolling(window=window, min_periods=1).std().reset_index(level=0, drop=True)
                derived_features.extend([f'{feature}rolling_mean{window}', f'{feature}rolling_std{window}'])
    else:
        for window in windows:
            for feature in ['memory_utilization', 'memory_pressure']:
                df[f'{feature}rolling_mean{window}'] = df[feature].rolling(window=window, min_periods=1).mean()
                df[f'{feature}rolling_std{window}'] = df[feature].rolling(window=window, min_periods=1).std()
                derived_features.extend([f'{feature}rolling_mean{window}', f'{feature}rolling_std{window}'])

    all_features = base_features + derived_features + ['memory_utilization', 'memory_pressure', 'cache_ratio', 'hour_of_day', 'day_of_week', 'is_weekend']
    df.replace([np.inf, -np.inf], np.nan, inplace=True)

    if 'failed' not in df.columns:
        raise ValueError("Target column 'failed' not found in the dataset.")
    df['failed'] = df['failed'].astype(int)

    # Updated thresholds:
    df['resource_exhaustion'] = ((df['maximum_usage'] >= df['assigned_memory'] * 0.5) & (df['failed'] == 1)).astype(int)
    df['memory_pressure_failure'] = ((df['memory_pressure'] >= 0.5) & (df['failed'] == 1)).astype(int)
    df['other_failure'] = ((df['failed'] == 1) & (df['resource_exhaustion'] == 0) & (df['memory_pressure_failure'] == 0)).astype(int)

    df.dropna(subset=['failed'], inplace=True)
    columns_to_fill = list(set(all_features + ['resource_exhaustion','memory_pressure_failure','other_failure']) - {'failed'})
    df[columns_to_fill] = df[columns_to_fill].fillna(0)
    print(f"After handling NaNs, shape is now {df.shape}")
    print("Remaining columns:", df.columns.tolist())
    return df, all_features


In [13]:
# Step 1: Load the data
df = load_data("/content/google-cluster-dataset.csv")

# Step 2: Preprocess time columns (using 'start_time' as the time column)
df = preprocess_time(df, time_col='start_time')

# Step 3: Map to Kubernetes (for reference)
df, k8s_mapping = map_to_kubernetes(df)

# Step 4: Engineer features and add failure flags
df, features = engineer_features(df)

# Step 5: Detect anomalies using the engineered features
df = detect_anomalies(df, features)

# Step 6: Normalize the features for modeling
df_scaled, scaler = normalize_features(df, features)

# You can now inspect the final DataFrame:
df_scaled.head()


After loading, shape: (405894, 34)
Columns: ['Unnamed: 0', 'time', 'instance_events_type', 'collection_id', 'scheduling_class', 'collection_type', 'priority', 'alloc_collection_id', 'instance_index', 'machine_id', 'resource_request', 'constraint', 'collections_events_type', 'user', 'collection_name', 'collection_logical_name', 'start_after_collection_ids', 'vertical_scaling', 'scheduler', 'start_time', 'end_time', 'average_usage', 'maximum_usage', 'random_sample_usage', 'assigned_memory', 'page_cache_memory', 'cycles_per_instruction', 'memory_accesses_per_instruction', 'sample_rate', 'cpu_usage_distribution', 'tail_cpu_usage_distribution', 'cluster', 'event', 'failed']
Sample data:
    Unnamed: 0           time  instance_events_type  collection_id  \
0           0              0                     2    94591244395   
1           1  2517305308183                     2   260697606809   
2           2   195684022913                     6   276227177776   
3           3              0    

Unnamed: 0.1,Unnamed: 0,time,instance_events_type,collection_id,scheduling_class,collection_type,priority,alloc_collection_id,instance_index,machine_id,...,memory_pressurerolling_std5,memory_utilizationrolling_mean10,memory_utilizationrolling_std10,memory_pressurerolling_mean10,memory_pressurerolling_std10,resource_exhaustion,memory_pressure_failure,other_failure,anomaly_score,is_anomaly
0,183421,0,2,152917495628,3,1,200,0,47,17377689713,...,0.0,0.0,0.0,0.0,0.0,0,0,1,1,0
1,256265,352321459944,3,261561475113,2,1,101,0,425,178160671591,...,0.0,0.0,0.0,0.0,0.0,0,0,0,1,0
2,44099,365626782748,3,261561475113,2,1,101,0,257,813863542,...,0.0,0.0,0.0,0.0,0.0,0,0,0,1,0
3,145811,1846171586901,3,261561475113,2,1,101,0,263,527532269,...,0.0,0.0,0.0,0.0,0.0,0,0,0,1,0
4,154472,2571380789305,3,261561475113,2,1,101,0,135,559903004,...,0.0,0.0,0.0,0.0,0.0,0,0,0,1,0


In [16]:
for target in target_cols:
    print(f"{target} distribution:\n", df_scaled[target].value_counts())


failed distribution:
 failed
0    313216
1     92678
Name: count, dtype: int64
resource_exhaustion distribution:
 resource_exhaustion
0    405894
Name: count, dtype: int64
memory_pressure_failure distribution:
 memory_pressure_failure
0    405894
Name: count, dtype: int64
other_failure distribution:
 other_failure
0    313216
1     92678
Name: count, dtype: int64
is_anomaly distribution:
 is_anomaly
0    385876
1     20018
Name: count, dtype: int64


In [15]:
sequence_length = 10
target_cols = ['failed', 'resource_exhaustion', 'memory_pressure_failure', 'other_failure', 'is_anomaly']
X, y_dict = create_sequences(df_scaled, features, sequence_length, target_cols)

Created sequences: X shape = (405884, 10, 32)
y_failed shape = (405884,)
Positive samples for failed: 92677 (22.83%)
y_resource_exhaustion shape = (405884,)
Positive samples for resource_exhaustion: 0 (0.00%)
y_memory_pressure_failure shape = (405884,)
Positive samples for memory_pressure_failure: 0 (0.00%)
y_other_failure shape = (405884,)
Positive samples for other_failure: 92677 (22.83%)
y_is_anomaly shape = (405884,)
Positive samples for is_anomaly: 20018 (4.93%)


# **Model Training**

In [18]:
import pickle
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

def train_and_select_best_model(df_scaled, features, target_cols, model_constructors):
    """
    Trains multiple models on each target variable, selects the best-performing model based on weighted F1 score,
    and saves that model to disk.

    Parameters:
        df_scaled (DataFrame): Preprocessed and normalized dataset.
        features (list): List of feature column names.
        target_cols (list): List of target column names (e.g., ['failed', 'is_anomaly']).
        model_constructors (dict): Dictionary of model names and their corresponding constructors.

    Returns:
        best_models (dict): A dictionary where for each target, the best model is stored along with its name and F1 score.
    """
    best_models = {}

    for target in target_cols:
        best_f1 = -1
        best_model = None
        best_model_name = ""
        print(f"\n=== Evaluating models for target: {target} ===")

        # Extract features and target
        X = df_scaled[features].values
        y = df_scaled[target].values

        # Split data into training and testing sets (80/20 split)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

        for model_name, model_constructor in model_constructors.items():
            print(f"\nTraining model: {model_name}")
            model = model_constructor
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            f1 = f1_score(y_test, y_pred, average='weighted')
            print(f"{model_name} weighted F1 score: {f1:.4f}")

            if f1 > best_f1:
                best_f1 = f1
                best_model = model
                best_model_name = model_name

        best_models[target] = {"model": best_model, "model_name": best_model_name, "f1_score": best_f1}
        print(f"\nBest model for {target}: {best_model_name} with weighted F1 score: {best_f1:.4f}")
        # Save the best model to disk using pickle
        with open(f"best_model_{target}.pkl", "wb") as f:
            pickle.dump(best_model, f)

    return best_models

# Define model constructors for various models (without SVM)
model_constructors = {
    'Logistic Regression': LogisticRegression(max_iter=1000, random_state=42),
    'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
    'Decision Tree': DecisionTreeClassifier(random_state=42),
    'XGBoost': XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)
}

# Specify the targets you want to evaluate:
anomaly_targets = ['failed', 'is_anomaly']

# Now, assuming you've already prepared and scaled your data:
# df = load_data("borg_traces_data.csv")
# df = preprocess_time(df, time_col='start_time')
# df, k8s_mapping = map_to_kubernetes(df)
# df, features = engineer_features(df)
# df = detect_anomalies(df, features)
# df_scaled, scaler = normalize_features(df, features)

best_models = train_and_select_best_model(df_scaled, features, anomaly_targets, model_constructors)



=== Evaluating models for target: failed ===

Training model: Logistic Regression
Logistic Regression weighted F1 score: 0.6699

Training model: Random Forest
Random Forest weighted F1 score: 0.9979

Training model: Decision Tree
Decision Tree weighted F1 score: 0.9976

Training model: XGBoost


Parameters: { "use_label_encoder" } are not used.



XGBoost weighted F1 score: 0.9905

Best model for failed: Random Forest with weighted F1 score: 0.9979

=== Evaluating models for target: is_anomaly ===

Training model: Logistic Regression
Logistic Regression weighted F1 score: 0.9926

Training model: Random Forest
Random Forest weighted F1 score: 0.9995

Training model: Decision Tree
Decision Tree weighted F1 score: 0.9993

Training model: XGBoost


Parameters: { "use_label_encoder" } are not used.



XGBoost weighted F1 score: 0.9995

Best model for is_anomaly: XGBoost with weighted F1 score: 0.9995


# **LSTM-Model**

In [5]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout

def build_lstm_model(seq_length, num_features):
    model = Sequential()
    model.add(LSTM(64, input_shape=(seq_length, num_features), return_sequences=True))
    model.add(Dropout(0.3))
    model.add(LSTM(32))
    model.add(Dropout(0.3))
    model.add(Dense(16, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))  # For binary classification
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    return model

model = build_lstm_model(seq_length=10, num_features=len(features))
model.summary()


  super().__init__(**kwargs)


In [6]:
history = model.fit(
    X_train,
    y_train,
    epochs=10,
    batch_size=32,
    validation_split=0.1
)

Epoch 1/10
[1m9133/9133[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m135s[0m 14ms/step - accuracy: 0.7929 - loss: 0.5009 - val_accuracy: 0.7914 - val_loss: 0.5143
Epoch 2/10
[1m9133/9133[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m130s[0m 13ms/step - accuracy: 0.8030 - loss: 0.4584 - val_accuracy: 0.7394 - val_loss: 0.6065
Epoch 3/10
[1m9133/9133[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m182s[0m 17ms/step - accuracy: 0.8228 - loss: 0.4073 - val_accuracy: 0.6166 - val_loss: 0.6338
Epoch 4/10
[1m9133/9133[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m124s[0m 14ms/step - accuracy: 0.8348 - loss: 0.3757 - val_accuracy: 0.6437 - val_loss: 0.7491
Epoch 5/10
[1m9133/9133[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m138s[0m 13ms/step - accuracy: 0.8483 - loss: 0.3438 - val_accuracy: 0.6358 - val_loss: 0.7754
Epoch 6/10
[1m9133/9133[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m141s[0m 13ms/step - accuracy: 0.8572 - loss: 0.3238 - val_accuracy: 0.6384 - val_loss: 0.711

In [7]:
loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
print(f"Test Loss: {loss:.4f}, Test Accuracy: {accuracy:.4f}")

Test Loss: 0.9150, Test Accuracy: 0.6252
