## Simple incident classifier

In this lab we will create a simple incident classifier using the Random Forest algorithm from the `sklearn` library.

A random forest classifier is a machine learning algorithm that combines the predictions of multiple decision trees for improved accuracy. It uses bagging and feature randomness to create a diverse set of trees. Each tree makes a prediction, and the final prediction is determined by majority voting (for classification).

In [None]:
import sys
import os
import pandas as pd

from itertools import combinations
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

In [None]:
sys.path.append(os.path.dirname(os.getcwd()))
from utils import run_query, load_constants

We call `load_constants()` function define the constants to be utilised

In [None]:
constants = load_constants()

GOOGLE_CLOUD_PROJECT = constants["GCP"]["GOOGLE_CLOUD_PROJECT"]
GOOGLE_CLOUD_LOCATION = constants["GCP"]["GOOGLE_CLOUD_LOCATION"]
GOOGLE_CLOUD_GCS_BUCKET = constants["GCP"]["GOOGLE_CLOUD_GCS_BUCKET"]
GOOGLE_CLOUD_SERVICE_ACCOUNT = constants["GCP"]["GOOGLE_CLOUD_SERVICE_ACCOUNT"]
GOOGLE_GEMINI_MODEL_15 = constants["VERTEX"]["GOOGLE_GEMINI_MODEL_15"]

GOOGLE_CLOUD_BIGQUERY_PROJECT = constants["BIGQUERY"]["GOOGLE_CLOUD_BIGQUERY_PROJECT"]
GOOGLE_CLOUD_BIGQUERY_DATASET = constants["BIGQUERY"]["GOOGLE_CLOUD_BIGQUERY_DATASET"]


BASE_TABLE_NAME_EVENTS = constants["BIGQUERY"]["BASE_TABLE_NAME_EVENTS"]
BASE_TABLE_NAME_INCIDENTS = constants["BIGQUERY"]["BASE_TABLE_NAME_INCIDENTS"]

In [None]:
events_query = f"""
    SELECT *
    FROM `{GOOGLE_CLOUD_BIGQUERY_PROJECT}.{GOOGLE_CLOUD_BIGQUERY_DATASET}.{BASE_TABLE_NAME_EVENTS}` TABLESAMPLE SYSTEM (10 PERCENT) 
"""
events_df = run_query(events_query)

incidents_query = f"""
    SELECT *
    FROM `{GOOGLE_CLOUD_BIGQUERY_PROJECT}.{GOOGLE_CLOUD_BIGQUERY_DATASET}.{BASE_TABLE_NAME_INCIDENTS}`
"""
incidents_df = run_query(incidents_query)

The following code snippet performs two main tasks:

1. **Feature Creation (create_features function)**

   * **Aggregation:** It calculates various statistics (mean, max, min, count) for both individual network elements and the entire network over specified time windows (default: 1 hour).
   * **Network-Wide Context:** It adds network-wide statistics to each network element's data, providing context.
   * **Pairwise Differences:** It calculates mean value differences between all possible pairs of network elements, creating additional features.

2. **Incident Labeling (join_with_incidents function)**

   * **Incident Matching:** It checks if each timestamp in the feature data falls within any incident's start and end time.
   * **Labeling:** It adds two columns:
      * `incident_occurred`: Set to 1 if an incident occurred at that timestamp, 0 otherwise.
      * `incident_name`: The name of the incident (if any) at that timestamp.

**Key Points:**

* **Time-Based Analysis:** The code uses time windows to aggregate data and align features with incidents.
* **Network Element Comparisons:** Pairwise differences help capture relationships between network elements.
* **Incident Context:** The `incident_occurred` and `incident_name` columns provide valuable context for further analysis or modeling, potentially linking network behavior to incidents.

In [None]:
def create_features(df, window_size="1h"):
    df = df.copy()
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df = df.set_index("timestamp")

    features = df.groupby(["network_element_id", pd.Grouper(freq=window_size)]).agg(
        mean_value=("value", "mean"),
        max_value=("value", "max"),
        min_value=("value", "min"),
        count_events=("event", "count"))
    features = features.reset_index()
    network_wide = df.groupby(pd.Grouper(freq=window_size)).agg(
        network_mean_value=("value", "mean"),
        network_max_value=("value", "max"),
        network_min_value=("value", "min"),
        network_count_events=("event", "count"))
    network_wide = network_wide.reset_index()
    features = pd.merge(features, network_wide, on="timestamp", how="left")
    element_ids = df["network_element_id"].unique()

    for element1, element2 in combinations(element_ids, 2):
        features_e1 = features[features['network_element_id'] == element1]
        features_e2 = features[features['network_element_id'] == element2]
        merged = pd.merge(features_e1, features_e2, on='timestamp', how='left', suffixes=('_e1', '_e2'))
        merged['mean_diff_e1_e2'] = merged['mean_value_e1'] - merged['mean_value_e2']
        merged = merged.rename(columns={'mean_diff_e1_e2': f'mean_diff_{element1}_{element2}'})
        features = pd.merge(
            features,
            merged[['timestamp', 'network_element_id_e1', f'mean_diff_{element1}_{element2}']],
            left_on=['timestamp', 'network_element_id'],
            right_on=['timestamp', 'network_element_id_e1'],
            how='left',
        )
        features = features.drop('network_element_id_e1', axis=1)

    return features



def join_with_incidents(features_df, incidents_df):
    df = features_df.copy()
    df["incident_occurred"] = 0  
    for _, row in incidents_df.iterrows():
        start_time = row["start_time"]
        end_time = row["end_time"]
        incident_name = row["incident_name"]
        matching_features = df[
            (df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)
        ]
        
        df.loc[matching_features.index, "incident_occurred"] = 1
        df.loc[matching_features.index, "incident_name"] = incident_name
    return df

In [None]:
events_features = create_features(events_df)
events_with_incidents = join_with_incidents(events_features, incidents_df)

The next cell prepares the data for machine learning modeling:

1. **Label Encoding:**

* The `LabelEncoder` converts the categorical `network_element_id` into numerical labels. This is often necessary for machine learning algorithms that require numerical input.

2. **Feature Selection:**

* A list named `features` is created, specifying the columns that will be used as input features for the model.
* This includes:
    * Statistics for individual network elements (`mean_value`, `max_value`, `min_value`, `count_events`)
    * Network-wide statistics (`network_mean_value`, `network_max_value`, `network_min_value`, `network_count_events`)
    * Mean value differences between pairs of network elements.

3. **Target Variable Definition:**

* The `target` variable is set to `incident_occurred`. This indicates that the model will be trained to predict whether an incident occurred based on the selected features.

In [None]:
le = LabelEncoder()
events_with_incidents["network_element_id"] = le.fit_transform(
    events_with_incidents["network_element_id"]
)

features = [
    "network_element_id",
    "mean_value",
    "max_value",
    "min_value",
    "count_events",
    "network_mean_value",
    "network_max_value",
    "network_min_value",
    "network_count_events",
]

for element1, element2 in combinations(events_df["network_element_id"].unique(), 2):
    features.append(f"mean_diff_{element1}_{element2}")

target = "incident_occurred"



Next we prepare the data for training and evaluating a machine learning model:

1. **Feature and Target Separation:**

* `X`: The selected features are extracted from the `events_with_incidents` dataframe and assigned to `X`.
* `y`: The target variable (`incident_occurred`) is extracted and assigned to `y`.

2. **Handling Missing Values:**

* `X.fillna(0, inplace=True)`: Any missing values in the feature data `X` are filled with zeros. This is a common preprocessing step to ensure compatibility with many machine learning algorithms.

3. **Train-Test Split:**

* `train_test_split`: The data is split into training and testing sets. 
    * 80% of the data (`test_size=0.2`) is used for training the model (`X_train`, `y_train`).
    * 20% is held out for evaluating the model's performance on unseen data (`X_test`, `y_test`).
    * `random_state=42` ensures reproducibility of the split.

**Key Points:**

* **Model Readiness:** The data is now structured into the standard format for training and evaluating supervised machine learning models: input features (`X`) and corresponding target values (`y`).
* **Train-Test Split:** This crucial step helps prevent overfitting by evaluating the model on data it hasn't seen during training, providing a more realistic estimate of its performance on new, unseen data.
* **Missing Value Handling:** Filling missing values with zeros is a simple imputation strategy. Depending on the data and the model, other strategies might be more appropriate.

In [None]:
X = events_with_incidents[features]
y = events_with_incidents[target]


X.fillna(0, inplace=True)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)


This code initializes and trains a Random Forest Classifier model:

1. **Model Initialization:**

* `model = RandomForestClassifier(n_estimators=100, random_state=42)`: 
   * A Random Forest Classifier model is created.
   * `n_estimators=100`: The model will consist of an ensemble of 100 decision trees.
   * `random_state=42`: Sets the random seed for reproducibility, ensuring that the same model is created each time the code is run.

2. **Model Training:**

* `model.fit(X_train, y_train)`:
   * The model is trained on the training data (`X_train`, `y_train`).
   * During training, the model learns patterns and relationships between the input features and the target variable (`incident_occurred`). 
   * These learned patterns are stored within the model's internal structure, allowing it to make predictions on new, unseen data. 

**Key Points:**

* **Random Forest:** A powerful ensemble learning algorithm that often achieves high accuracy and robustness.
* **Hyperparameters:** The `n_estimators` parameter controls the number of trees in the forest. Other hyperparameters can be tuned to further optimize the model's performance.
* **Training Process:** The `fit` method adjusts the model's internal parameters based on the training data to minimize prediction errors.

In [None]:
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)


This code evaluates the performance of the trained Random Forest Classifier:

1. **Prediction:**

* `y_pred = model.predict(X_test)`:
    * The trained model is used to make predictions on the test data (`X_test`).
    * The `predict` method applies the learned patterns from the training phase to generate predicted labels (`y_pred`) for each instance in the test set.

2. **Accuracy Calculation:**

* `accuracy = accuracy_score(y_test, y_pred)`:
    * The `accuracy_score` function compares the predicted labels (`y_pred`) with the true labels (`y_test`).
    * It calculates the accuracy, which is the proportion of correctly predicted instances out of the total number of instances in the test set.

3. **Result Output:**

* `print(f"Accuracy: {accuracy}")`:
    * The calculated accuracy is printed to the console.

**Key Points:**

* **Model Performance:** The accuracy score provides a basic measure of how well the model generalizes to unseen data. 
* **Evaluation Metric:** Accuracy is suitable for balanced datasets where the classes are roughly equally represented. For imbalanced datasets, other metrics like precision, recall, or F1-score might be more informative.
* **Further Analysis:** Depending on the specific problem and requirements, further evaluation and analysis might involve techniques like confusion matrices, ROC curves, or cross-validation.

In [None]:
y_pred = model.predict(X_test)

accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy}")

## Create a Vertex AI Pipeline

### What is Vertex AI Pipelines?

Vertex AI Pipelines is a managed service on Google Cloud Platform that simplifies the orchestration and execution of machine learning workflows. It's built on Kubeflow Pipelines, providing a scalable and reliable way to:

* **Build complex ML workflows:** Chain together multiple ML tasks (data preprocessing, feature engineering, model training, evaluation, etc.)
* **Automate & schedule pipelines:** Trigger pipelines on a schedule or based on events
* **Track experiments & lineage:** Keep track of model versions, parameters, and data used for reproducibility
* **Deploy models to production:** Integrate with Vertex AI for easy model deployment and serving

**Key Benefits**

* **Managed service:** Reduces operational overhead compared to self-managing Kubeflow
* **Integration with Google Cloud:** Seamlessly works with other GCP services (BigQuery, Cloud Storage, etc.)
* **Scalability:** Handles large-scale ML workflows

**In the context of this code:** These components are designed to run as part of a Vertex AI Pipeline, leveraging its capabilities for orchestration, tracking, and potential scaling of the ML workflow.

### Kubeflow Pipeline Components for Incident Prediction

This code defines several Kubeflow Pipeline components designed to work together to build and train a machine learning model for incident prediction.

#### Components Breakdown

* **`load_data_from_bigquery_op`**

  * **Purpose:** Loads data from BigQuery using a provided SQL query.
  * **Inputs:** `query` (SQL query string), `project_id`
  * **Output:** CSV file saved to `output_data_path`

* **`create_features_op`**

  * **Purpose:** Engineers features from the loaded data.
  * **Inputs:** Data from `input_data_path`, `window_size` (time window for aggregation, default 1 hour)
  * **Output:** CSV file with engineered features saved to `output_data_path`
  * **Key operations:**
    * Time-based aggregation of metrics (mean, max, min, count) for each network element
    * Calculation of network-wide statistics
    * Creation of pairwise difference features between network elements

* **`join_features_op`**

  * **Purpose:** Joins the engineered features with incident data
  * **Inputs:** Feature data from `features_data_path`, incident data from `incidents_data_path`
  * **Output:** CSV file with joined data saved to `output_data_path`
  * **Key operation:**
    * Labels each timestamp in the feature data based on whether an incident occurred during that time

* **`train_and_evaluate_model_op`**

  * **Purpose:** Trains and evaluates a machine learning model
  * **Inputs:** Joined data from `input_data_path`
  * **Outputs:** Trained model saved to `model_output_path`
  * **Key operations:**
    * Preprocessing: Label encodes categorical variables, handles missing values, splits data into training and testing sets
    * Trains a Random Forest Classifier
    * Evaluates the model on the test set (calculates accuracy)



In [None]:
from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset,InputPath, Model,OutputPath, component)


@component(
    packages_to_install=[
        "pandas",
        "scikit-learn",
        "google-cloud-bigquery",
        "google-cloud-storage",
        "db-dtypes"
    ],
    base_image="python:3.10", 
)
def load_data_from_bigquery_op(query: str, project_id: str, output_data_path: OutputPath(Dataset)):
    
    from google.cloud import bigquery
    
    bq_client = bigquery.Client(project=project_id)
    df = bq_client.query(query).to_dataframe()
    df.to_csv(output_data_path, index=False)


@component(
    packages_to_install=[
        "pandas",
        "scikit-learn",
        "google-cloud-bigquery",
        "google-cloud-storage",
        "db-dtypes"
    ],
    base_image="python:3.10",
)
def create_features_op(
    input_data_path: InputPath(Dataset),
    output_data_path: OutputPath(Dataset),
    window_size: str = '1h',
):
    import pandas as pd
    from itertools import combinations

    def create_features(df, window_size="1h"):
        df = df.copy()
        df["timestamp"] = pd.to_datetime(df["timestamp"])
        df = df.set_index("timestamp")

        features = df.groupby(["network_element_id", pd.Grouper(freq=window_size)]).agg(
            mean_value=("value", "mean"),
            max_value=("value", "max"),
            min_value=("value", "min"),
            count_events=("event", "count"),
        )
        features = features.reset_index()

        network_wide = df.groupby(pd.Grouper(freq=window_size)).agg(
            network_mean_value=("value", "mean"),
            network_max_value=("value", "max"),
            network_min_value=("value", "min"),
            network_count_events=("event", "count"),
        )
        network_wide = network_wide.reset_index()

        features = pd.merge(features, network_wide, on="timestamp", how="left")

        element_ids = df["network_element_id"].unique()
        for element1, element2 in combinations(element_ids, 2):
            features_e1 = features[features["network_element_id"] == element1]
            features_e2 = features[features["network_element_id"] == element2]
            merged = pd.merge(
                features_e1,
                features_e2,
                on="timestamp",
                how="left",
                suffixes=("_e1", "_e2"),
            )
            merged["mean_diff_e1_e2"] = merged["mean_value_e1"] - merged["mean_value_e2"]

            merged = merged.rename(
                columns={"mean_diff_e1_e2": f"mean_diff_{element1}_{element2}"}
            )

            features = pd.merge(
                features,
                merged[
                    [
                        "timestamp",
                        "network_element_id_e1",
                        f"mean_diff_{element1}_{element2}",
                    ]
                ],
                left_on=["timestamp", "network_element_id"],
                right_on=["timestamp", "network_element_id_e1"],
                how="left",
            )
            features = features.drop("network_element_id_e1", axis=1)

        return features

    df = pd.read_csv(input_data_path)
    features_df = create_features(df, window_size)
    features_df.to_csv(output_data_path, index=False)

@component(
    packages_to_install=["pandas", "scikit-learn", "google-cloud-bigquery", "google-cloud-storage","db-dtypes"],
    base_image="python:3.10",
)
def join_features_op(
    features_data_path: InputPath(Dataset),
    incidents_data_path: InputPath(Dataset),
    output_data_path: OutputPath(Dataset),
):
    import pandas as pd

    def join_with_incidents(features_df, incidents_df):
        df = features_df.copy()
        df["incident_occurred"] = 0
        for _, row in incidents_df.iterrows():
            start_time = row["start_time"]
            end_time = row["end_time"]
            incident_name = row["incident_name"]

            matching_features = df[
                (df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)
            ]

            df.loc[matching_features.index, "incident_occurred"] = 1
            df.loc[matching_features.index, "incident_name"] = incident_name
        return df

    features_df = pd.read_csv(features_data_path)
    incidents_df = pd.read_csv(incidents_data_path)
    joined_df = join_with_incidents(features_df, incidents_df)
    joined_df.to_csv(output_data_path, index=False)

@component(
    packages_to_install=["pandas", "scikit-learn", "google-cloud-bigquery", "google-cloud-storage", "joblib", "db-dtypes"],
    base_image="python:3.10", 
)
def train_and_evaluate_model_op(
    input_data_path: InputPath(Dataset),
    model_output_path: OutputPath(Model),
):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import LabelEncoder
    from sklearn.metrics import accuracy_score
    from joblib import dump
    from itertools import combinations

    df = pd.read_csv(input_data_path)
    le = LabelEncoder()
    df['network_element_id'] = le.fit_transform(df['network_element_id'])
    target = 'incident_occurred'
    X = df.drop(columns=[target,'timestamp','incident_name'])
    y = df[target]

    X.fillna(0, inplace=True)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy: {accuracy}")

    dump(model, model_output_path)

In [None]:
@dsl.pipeline(
    name="telco-incident-prediction-pipeline",
    pipeline_root=f"gs://{GOOGLE_CLOUD_GCS_BUCKET}/pipeline-root-rca",
)
def telco_incident_prediction_pipeline():

    load_events_task = load_data_from_bigquery_op(
        project_id=GOOGLE_CLOUD_BIGQUERY_PROJECT,
        query=f"""
            SELECT *
            FROM `{GOOGLE_CLOUD_BIGQUERY_PROJECT}.{GOOGLE_CLOUD_BIGQUERY_DATASET}.{BASE_TABLE_NAME_EVENTS}` TABLESAMPLE SYSTEM (10 PERCENT) 
        """,
    )
    load_incidents_task = load_data_from_bigquery_op(
        project_id=GOOGLE_CLOUD_BIGQUERY_PROJECT,
        query=f"""
            SELECT *
            FROM `{GOOGLE_CLOUD_BIGQUERY_PROJECT}.{GOOGLE_CLOUD_BIGQUERY_DATASET}.{BASE_TABLE_NAME_INCIDENTS}`
        """,
    )

    create_features_task = create_features_op(
        input_data_path=load_events_task.outputs["output_data_path"],
        window_size="1h",
    )

    join_features_task = join_features_op(
        features_data_path=create_features_task.outputs["output_data_path"],
        incidents_data_path=load_incidents_task.outputs["output_data_path"],
    )

    train_model_task = train_and_evaluate_model_op(
        input_data_path=join_features_task.outputs["output_data_path"],
    )

## Telco Incident Prediction Pipeline

This code defines a Kubeflow pipeline named "telco-incident-prediction-pipeline" for predicting incidents in a telecommunications network.

### Pipeline Structure

The pipeline consists of the following tasks:

1. **`load_events_task`**

   * Loads events data from a BigQuery table specified by `BASE_TABLE_NAME_EVENTS`. 
   * Uses a 10% systematic sample of the table to potentially speed up development and testing.

2. **`load_incidents_task`**

   * Loads incidents data from a BigQuery table specified by `BASE_TABLE_NAME_INCIDENTS`.
   * Loads the entire table.

3. **`create_features_task`**

   * Takes the output of `load_events_task` (events data) as input.
   * Creates features using a 1-hour window size.
   * The specific feature engineering steps are defined within the `create_features_op` component (refer to previous explanations).

4. **`join_features_task`**

   * Takes the outputs of `create_features_task` (engineered features) and `load_incidents_task` (incidents data) as input.
   * Joins the features with incident information to create a labeled dataset.
   * The joining logic is defined within the `join_features_op` component.

5. **`train_model_task`**

   * Takes the output of `join_features_task` (labeled dataset) as input.
   * Trains and evaluates a Random Forest Classifier model on the data.
   * The model training and evaluation steps are defined within the `train_and_evaluate_model_op` component.

### Pipeline Root

* `pipeline_root`: Specifies the Google Cloud Storage (GCS) bucket where pipeline artifacts (metadata, intermediate outputs) will be stored.

### Key Points

* **Data Loading:** The pipeline starts by loading events and incidents data from BigQuery.
* **Feature Engineering:** Features are created from the events data using time-based aggregation and pairwise difference calculations.
* **Data Labeling:** The engineered features are joined with incident information to create a labeled dataset for training a supervised model.
* **Model Training and Evaluation:** A Random Forest Classifier is trained and evaluated on the labeled data.
* **Vertex AI Pipelines:** This pipeline is designed to run on Vertex AI Pipelines, leveraging its capabilities for orchestration, scalability, and tracking of machine learning workflows.

**Important Considerations:**

* **Environment Variables:** The pipeline references environment variables (`GOOGLE_CLOUD_GCS_BUCKET`, `GOOGLE_CLOUD_BIGQUERY_PROJECT`, `GOOGLE_CLOUD_BIGQUERY_DATASET`, `BASE_TABLE_NAME_EVENTS`, `BASE_TABLE_NAME_INCIDENTS`) that need to be set appropriately before running the pipeline.
* **Data Schema:** The pipeline assumes specific column names and data structures in the BigQuery tables and intermediate CSV files. Ensure that your data matches these expectations.
* **Model Deployment:** This pipeline focuses on training and evaluating the model. Further steps would be needed to deploy the trained model for real-time incident prediction.

Let me know if you have any specific questions or would like any part explained in more detail.

In [None]:
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=telco_incident_prediction_pipeline,
    package_path="telco_incident_prediction_pipeline.yml",
)

Finally, this code submits the defined Kubeflow pipeline for execution on Vertex AI Pipelines:

In [None]:
from google.cloud import aiplatform

DISPLAY_NAME = "telco_incident_prediction_pipeline"

job = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=f"{DISPLAY_NAME}.yml",
)

job.run(service_account=GOOGLE_CLOUD_SERVICE_ACCOUNT)