## In this Notebook we are developing all the components that are required to run the anomaly detection job.

In [212]:
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.experimental.custom_job import utils
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component

In [213]:
# This component detects anomalies in netscout data. 
"""
But eventually I think we need to have our own table and just query the most recent one from the 
`prj-netw-prod-npda-dataeng-03.wlssrtsv_netscout_raw.wireless-netscout-control-plane`
and save it in our own table. Additionally we also can have a data retention policy (10 days or even less) to avoid costs.
"""
@component(
    packages_to_install=["pandas", "fsspec", "gcsfs", "prophet", "google-cloud-bigquery", "rrcf"],
    base_image="python:3.9",
    output_component_file="netscout-ml.yaml"
)
def netscout_ml_w6017_amf_server(project_id: str, dataset: str, table: str, storage_gcs: str ):
    
    from google.cloud import bigquery
    from google.cloud import storage
    import pandas as pd
    from prophet import Prophet
    import rrcf
    import json
    from typing import Tuple
    import io
    

    def get_recent_transaction(project_id: str, dataset: str, table: str):
        """"
        Getting the most recent (in the last 10 min) netscout Data
        
        Inputs: The GCP Project ID, dataset name, table name
        
        Outputs: A dataframe with the timestamp and the value at that timestamp
        """
        
        client = bigquery.Client(project=project_id)

        query = "select destination_host_ip_address from `table_xxxx` limit 10"
        # Execute a query.
        QUERY = (query)
    
        query_job = client.query(QUERY)  # API request
        rows = query_job.result()  # Waits for query to finish
    
        df_recent_trans = pd.DataFrame([dict(row) for row in rows])
        print(df_recent_trans.head())
        if not df_recent_trans.empty:
            df_recent_trans.columns = ['ds', 'y']
            df_recent_trans['ds'] = df_recent_trans['ds'].dt.tz_localize(None)
            df_recent_trans['y'] = df_recent_trans['y'].astype(float)
            print(df_recent_trans.head())
        
        return df_recent_trans
    
    
    
    def get_prior_30_days(storage_gcs: str):
        """"
        Getting the historical data (the last 30 days) from the cloud storage, changing the column names and then removing the UTC from 'ds'
        
        Inputs: Storage Bucket 
        
        Outputs: A dataframe with the historical data with the timestamp and its value.
        """
        
        file_path_last_30days = 'preprocessed_success_trans0725_0824.csv'        
        df_past_30_days = pd.read_csv(storage_gcs+file_path_last_30days, parse_dates=['Datetime'])
        df_past_30_days.columns = ['ds', 'y']
        df_past_30_days['ds'] = df_past_30_days['ds'].dt.tz_localize(None)
        
        return df_past_30_days
    
    
    
    def train_prophet(historical_df: pd.DataFrame, recent_df: pd.DataFrame):
        """"
        Train the prophet model on the last 30 days (historical_df) and forecast the next time stamp and compare it with the 
        recent_df timestamp value, if there is significant difference (i.e. falls outside the uncertainity level of the forecasted
        value), then declare it as an anomaly.
        
        Inputs: Historical Dataframe (the last 30 days) and Most Recent Dataframe
        
        Outputs: Alarm Notification (True/False) and the updated dataframe (i.e. the oldest point deleted and the newest point added)
        """
        
        # Prophet model
        model = Prophet(growth='flat', seasonality_mode='additive', interval_width=0.7)
        model.fit(historical_df)
        df_trans_future = model.make_future_dataframe(periods=12, freq='5T', include_history=True)
        forecast = model.predict(df_trans_future)
        print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail())
        print(historical_df.tail())
        historical_df_1 = pd.concat([historical_df, recent_df], ignore_index=True)
        print("before ", historical_df.shape, historical_df_1.shape)
        historical_df_1 = historical_df_1.iloc[1:]
        print(historical_df_1.shape, " after")
        print(historical_df_1.tail())
        print(historical_df_1.head())
        
        
        # Now let us look into if the observed value is outside the box of the uncertainity level
        neg_yhat_lower = forecast['yhat_lower'] < 0
        print(len(neg_yhat_lower))
        forecast.loc[neg_yhat_lower, 'yhat_lower'] = 0
        lower_level = forecast.iloc[-12]['yhat_lower']
        upper_level = forecast.iloc[-12]['yhat_upper']
        alarm_notification = False
        if (recent_df.iloc[0]['y'] <= lower_level) or (upper_level < recent_df.iloc[0]['y']):
            alarm_notification = True
        
        print('alarm notification')
        print(lower_level, recent_df.iloc[0]['y'], upper_level)
        return (alarm_notification, historical_df_1)
    
    
    
    def incremental_train_rrcf(recent_df: pd.DataFrame, storage_gcs: str, rrcf_threshold: float):
        """"
        Update the Robust Random Cut Forest (rrcf) model by adding the most recent point (recent_df). Check if the most recent data points 
        anomaly score is greater than the given threshold (rrcf_threshold). If so it is an anomaly. Serialize the updated trees and save it
        to the storage. 
        
        Inputs: Most Recent Dataframe, Storage (serialized json files), Threshold (RRCF theshold)
        
        Outputs: Alarm Notification (True/False).
        """
        
        forest_retrieved = []
        pickle_files_path = 'serialized_files/'
        
        
        client = storage.Client()
        bucket_name = 'xxx_bucket'
        blob_name = 'serialized_files/'
        bucket = client.get_bucket(bucket_name)
        
        for i in range(80):
            print('Beginning to Retrieve Pickled Files')
            blob = bucket.blob(blob_name+'tree_'+str(i)+'.json')
            file_like_object = io.BytesIO(blob.download_as_bytes())
            #json_data = blob.download_as_file()
            obj = json.load(file_like_object)
            tree = rrcf.RCTree()
            tree.load_dict(obj)
            print(tree)
            forest_retrieved.append(tree)
            
            # with open(storage+pickle_files_path+'tree_'+str(i)+'.json', 'r') as infile:
            #     obj = json.load(infile)
            #     tree = rrcf.RCTree()
            #     tree.load_dict(obj)
            #     forest_retrieved.append(tree)
        
        print(len(forest_retrieved))
        # Initialize a client
        client = storage.Client()

        # Define the bucket and file you want to write to
        bucket_name = 'xxx_bucket'
        base_blob_name = 'updated_xxx/'
        count = 0
        
        for i, tree in enumerate(forest_retrieved):
            

            # # Get the bucket and blob
            # bucket = client.get_bucket(bucket_name)
            # blob = bucket.blob(blob_name)

            
            obj = tree.to_dict()
            json_data = json.dumps(obj)

            # Create a file-like object
            file_like_object = io.BytesIO(json_data.encode())

            # Define the blob name (including the 'tree_' prefix)
            blob_name = base_blob_name + 'tree_' + str(i) + '.json'

            # Upload the file-like object to Google Cloud Storage
            bucket = client.get_bucket(bucket_name)
            blob = bucket.blob(blob_name)
            blob.upload_from_file(file_like_object, content_type='application/json')

            print(f'File uploaded to {bucket_name}/{blob_name}')
            count += 1
            if count == 1:
                  break
            
        return
    
    
    
    def eval_n_alarm_notification(prophet_result: Tuple[(str, str)], rrcf_result: Tuple[(str, str)]):
        """"
        Send the alarm notification if both the prophet_result and rrcf_result is True.
        
        
        Inputs: Historical Dataframe (the last 30 days) and Most Recent Dataframe
        
        Outputs: Alarm Notification (True/False) and the updated dataframe (i.e. the oldest point deleted and the newest point added)
        """
        
        
        
        return
    
    df_recent_transactions = get_recent_transaction(project_id=project_id, dataset=dataset, table=table)
    #print(df_recent_transactions)
    if not df_recent_transactions.empty:
        df_past_30_days = get_prior_30_days(storage_gcs)
        print(df_past_30_days.head())
        prophet_alarm_notification, df_30_days_recent = train_prophet(df_past_30_days, df_recent_transactions)
        print(prophet_alarm_notification)
        incremental_train_rrcf(df_recent_transactions, storage_gcs, 10.0)
        

In [214]:
# Creating the pipeline.

from datetime import datetime
TIMESTAMP =datetime.now().strftime("%Y%m%d%H%M%S")
# print(TIMESTAMP) 20230913141236
DISPLAY_NAME = 'pipeline-anomaly-detection-job{}'.format(TIMESTAMP)

In [215]:
BUCKET_URI = "gs://xxx_bucket"
PIPELINE_ROOT = "{}/alarm-detection".format(BUCKET_URI)
SERVICE_ACCOUNT = "name@xxx.iam.gserviceaccount.com"
PROJECT_ID = "xxx_project_id"
REGION = "central-us"
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI, location=REGION)

In [216]:
@dsl.pipeline(
    name=DISPLAY_NAME,
    description="Netscout ML Pipeline Test",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(project_id: str = PROJECT_ID, dataset: str = 'xxx_dataset', table: str = 'xxx_table', storage_gcs: str = 'gs://xxx_bucket'):
    netscout_preprocess_task = netscout_ml_w6017_amf_server(project_id, dataset, table, storage_gcs)


In [217]:
from kfp.v2 import compiler  # noqa: F811

compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline-anomaly-detection-job.json")

In [1]:
job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="pipeline-anomaly-detection-job.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run(
        service_account=SERVICE_ACCOUNT
)