# Init Testing Environment

In [2]:
import requests
import logging
import argparse
import os
from rxoms.utils import rxoms_utils as rutils

DEFAULT_DATA_PATH = "/data/anomaly_data/"
DEFAULT_CONFIG_FOLDER = "/configuration/service/"
DEFAULT_PATH_LEVEL = 2

# User must export RXOMS_PATH before using
RXOMS_PATH = rutils.get_parent_directory(os.getcwd(), DEFAULT_PATH_LEVEL)
logging.basicConfig(
    format="%(asctime)s:%(levelname)s -- %(message)s", level=logging.INFO
)
logging.info(f"RXOMS_PATH: {RXOMS_PATH}")

# Bug: 
# Python 3.12: https://stackoverflow.com/questions/77364550/attributeerror-module-pkgutil-has-no-attribute-impimporter-did-you-mean
# python -m ensurepip --upgrade
# python -m pip install --upgrade setuptools

2024-10-21 22:02:03,234:INFO -- RXOMS_PATH: /Users/tringuyen/workplace/Study/PhD/Github/rdsea/RXOMS


# Test APIs of the Knowledge Graph 

In [95]:
# Test Knowledge Graph APIs
from rxoms.utils import rxoms_utils as rutils


def test_kg_api(url, method, payload=None, log=False):
    try:
        if method == "GET":
            response = requests.get(url)
        elif method == "POST":
            response = requests.post(url, json=payload)
        else:
            logging.error("Method not supported")
            return
        logging.info(f"Status Code: {response.status_code}")
        if log:
            logging.info(f"Response: {response.json()}")
    except Exception as e:
        logging.error(f"Error: {e}")


config = rutils.load_config(f"{RXOMS_PATH}{DEFAULT_CONFIG_FOLDER}test_kg_api.yaml")
"""
    Call tet_kg() with api=-1 to test all APIs, or specify the api number to test a specific API
    /graph api:0
    /get_cti_by_id api:1
    /get_dt_by_mac  00:00:00:00:00:07 api:2
    /get_flow api:3
    /get_incident_metadata api:4
    /update_cti_by_id api:5
    /update_asset_by_mac api:6
    /update_switch api:7
    /reset_graph api:8
    /update_flow api:9
    /trace_root_cause_by_flow api:10
    trace_root_cause_by_switch api:11
    /trace_consequence_by_switch api:12
    /get_detector api:13
"""
url_kg_list = config["url_kg_list"]
payload_kg_list = config["payload_kg_list"]
method_kg_list = config["method_kg_list"]


def test_kg(api=-1, log=False):
    if api is -1:
        for i in range(len(url_kg_list)):
            logging.info(f"Testing API: {i}")
            test_kg_api(
                url=url_kg_list[i],
                method=method_kg_list[i],
                payload=payload_kg_list[i],
                log=log,
            )
    else:
        test_kg_api(
            url=url_kg_list[api],
            method=method_kg_list[api],
            payload=payload_kg_list[api],
            log=log,
        )


test_kg(-1, log=False)

  if api is -1:
2024-10-22 12:18:33,601:INFO -- Testing API: 0
2024-10-22 12:18:33,663:INFO -- Status Code: 200
2024-10-22 12:18:33,664:INFO -- Testing API: 1
2024-10-22 12:18:33,688:INFO -- Status Code: 200
2024-10-22 12:18:33,689:INFO -- Testing API: 2
2024-10-22 12:18:33,713:INFO -- Status Code: 200
2024-10-22 12:18:33,714:INFO -- Testing API: 3
2024-10-22 12:18:33,737:INFO -- Status Code: 200
2024-10-22 12:18:33,738:INFO -- Testing API: 4
2024-10-22 12:18:33,759:INFO -- Status Code: 200
2024-10-22 12:18:33,761:INFO -- Testing API: 5
2024-10-22 12:18:33,784:INFO -- Status Code: 200
2024-10-22 12:18:33,784:INFO -- Testing API: 6
2024-10-22 12:18:33,809:INFO -- Status Code: 200
2024-10-22 12:18:33,810:INFO -- Testing API: 7
2024-10-22 12:18:33,836:INFO -- Status Code: 200
2024-10-22 12:18:33,837:INFO -- Testing API: 8
2024-10-22 12:18:33,921:INFO -- Status Code: 200
2024-10-22 12:18:33,922:INFO -- Testing API: 9
2024-10-22 12:18:33,947:INFO -- Status Code: 200
2024-10-22 12:18:33,948:

# Test the whole RXOMS services with anomaly data

In [100]:
# Test Data Enrichment APIs
import pandas as pd
import time

data_path = str(RXOMS_PATH) + DEFAULT_DATA_PATH + "all_flow.csv"

url = "http://0.0.0.0:5051/anomaly_report"

# Read the data using pandas
df = pd.read_csv(data_path)

# Sort the data by the 'runtime' column
df_sorted = df.sort_values(by="runtime")

# Iterate over each row, convert it to a dictionary, and send it as a payload
for _, row in df_sorted.iterrows():
    pay_load = row.to_dict()
    logging.info(f"Sending payload: {pay_load}")
    response = requests.post(url, json=pay_load)
    logging.info(f"Response status code: {response.status_code}")
    logging.info(f"Response content: {response.content}")
    time.sleep(0.1)

2024-10-22 12:39:43,079:INFO -- Sending payload: {'switch': 1, 'in_port': 1, 'eth_dst': '00:00:00:00:00:04', 'packet_count': 368, 'byte_count': 553650, 'user_id': 'aaltosea1', 'instance_id': 'rohe_test_isntance', 'run_id': 'experiment1', 'stage_id': 'rohe_test_stage', 'application_name': 'sdn', 'runtime': 10, 'packet_count_shifted': 328, 'byte_count_shifted': 493170, 'packet_count_average': 6958.2, 'byte_count_average': 10548648.4, 'byte_count_average_norm': 0.9793685741094618, 'packet_count_average_norm': 0.975857500368026, 'byte_scores': 0.5, 'packet_scores': 0.499999445188199, 'byte_anomaly': 1, 'packet_anomaly': 1}
2024-10-22 12:39:43,178:INFO -- Response status code: 200
2024-10-22 12:39:43,178:INFO -- Response content: b'{"status":"success"}'
2024-10-22 12:39:43,281:INFO -- Sending payload: {'switch': 2, 'in_port': 4, 'eth_dst': '00:00:00:00:00:05', 'packet_count': 9083, 'byte_count': 13729086, 'user_id': 'aaltosea1', 'instance_id': 'rohe_test_isntance', 'run_id': 'experiment1', 

KeyboardInterrupt: 

# Test With specific data records

In [None]:
# Test Data Enrichment APIs
import pandas as pd
import time

data_path = str(RXOMS_PATH) + DEFAULT_DATA_PATH + "all_flow.csv"

url = "http://0.0.0.0:5051/anomaly_report"

pay_loads = [
    {
        "switch": 1,
        "in_port": 5,
        "eth_dst": "00:00:00:00:00:04",
        "packet_count": 368,
        "byte_count": 5739366,
        "user_id": "aaltosea1",
        "instance_id": "rohe_test_isntance",
        "run_id": "experiment1",
        "stage_id": "rohe_test_stage",
        "application_name": "sdn",
        "runtime": 10,
        "packet_count_shifted": 328,
        "byte_count_shifted": 493170,
        "packet_count_average": 6958.2,
        "byte_count_average": 10548648.4,
        "byte_count_average_norm": 0.9793685741094618,
        "packet_count_average_norm": 0.975857500368026,
        "byte_scores": 0.7,
        "packet_scores": 0.799999445188199,
        "byte_anomaly": -1,
        "packet_anomaly": -1,
    },
    {
        "switch": 2,
        "in_port": 5,
        "eth_dst": "00:00:00:00:00:04",
        "packet_count": 368,
        "byte_count": 6846438,
        "user_id": "aaltosea1",
        "instance_id": "rohe_test_isntance",
        "run_id": "experiment1",
        "stage_id": "rohe_test_stage",
        "application_name": "sdn",
        "runtime": 10,
        "packet_count_shifted": 328,
        "byte_count_shifted": 493170,
        "packet_count_average": 6958.2,
        "byte_count_average": 10548648.4,
        "byte_count_average_norm": 0.9793685741094618,
        "packet_count_average_norm": 0.975857500368026,
        "byte_scores": 0.7,
        "packet_scores": 0.799999445188199,
        "byte_anomaly": -1,
        "packet_anomaly": -1,
    },
    {
        "switch": 2,
        "in_port": 2,
        "eth_dst": "00:00:00:00:00:02",
        "packet_count": 368,
        "byte_count": 2,
        "user_id": "aaltosea1",
        "instance_id": "rohe_test_isntance",
        "run_id": "experiment1",
        "stage_id": "rohe_test_stage",
        "application_name": "sdn",
        "runtime": 10,
        "packet_count_shifted": 328,
        "byte_count_shifted": 493170,
        "packet_count_average": 6958.2,
        "byte_count_average": 10548648.4,
        "byte_count_average_norm": 0.9793685741094618,
        "packet_count_average_norm": 0.975857500368026,
        "byte_scores": 0.7,
        "packet_scores": 0.799999445188199,
        "byte_anomaly": -1,
        "packet_anomaly": -1,
    },
    {
        "switch": 2,
        "in_port": 1,
        "eth_dst": "00:00:00:00:00:02",
        "packet_count": 368,
        "byte_count": 2,
        "user_id": "aaltosea1",
        "instance_id": "rohe_test_isntance",
        "run_id": "experiment1",
        "stage_id": "rohe_test_stage",
        "application_name": "sdn",
        "runtime": 10,
        "packet_count_shifted": 328,
        "byte_count_shifted": 493170,
        "packet_count_average": 6958.2,
        "byte_count_average": 10548648.4,
        "byte_count_average_norm": 0.9793685741094618,
        "packet_count_average_norm": 0.975857500368026,
        "byte_scores": 0.7,
        "packet_scores": 0.799999445188199,
        "byte_anomaly": -1,
        "packet_anomaly": -1,
    },
    {
        "switch": 1,
        "in_port": 4,
        "eth_dst": "00:00:00:00:00:02",
        "packet_count": 368,
        "byte_count": 2,
        "user_id": "aaltosea1",
        "instance_id": "rohe_test_isntance",
        "run_id": "experiment1",
        "stage_id": "rohe_test_stage",
        "application_name": "sdn",
        "runtime": 10,
        "packet_count_shifted": 328,
        "byte_count_shifted": 493170,
        "packet_count_average": 6958.2,
        "byte_count_average": 10548648.4,
        "byte_count_average_norm": 0.9793685741094618,
        "packet_count_average_norm": 0.975857500368026,
        "byte_scores": 0.7,
        "packet_scores": 0.799999445188199,
        "byte_anomaly": -1,
        "packet_anomaly": -1,
    },
]
for pay_load in pay_loads:
    logging.info(f"Sending payload: {pay_load}")
    response = requests.post(url, json=pay_load)
    logging.info(f"Response status code: {response.status_code}")
    logging.info(f"Response content: {response.content}")