This is a helper script that allows you to fetch prometheus data after the fact.
(e.g., when the data you collected while running the experiments were incorrect.)

In [1]:
# Forces the notebook to always reload packages.
%reload_ext autoreload
%autoreload 2

import itertools
import json
import pandas as pd
from datetime import datetime, timedelta
import os
import math

from gssi_experiment.util.prometheus_helper import (
    LatestCpuUtilizationFetcher,
    TIME_FORMAT,
)
from gssi_experiment.util.util import iterate_through_nested_folders


In [2]:
# Fetches prometheus data.

output_path = "./posterior_prometheus_fetcher_data.csv"

time_window_in_minutes = math.floor(4 * 24 * 60)
print(f"{time_window_in_minutes=}")

start_time = datetime.now() - timedelta(minutes=time_window_in_minutes * 2)
fetcher = LatestCpuUtilizationFetcher(output_path, time_window_in_minutes, start_time)
# fetcher.fetch_latest_cpu_utilization()

time_window_in_minutes=5760
Prometheus response status: 200
Found containers=['alertmanager', 'config-reloader', 'config-reloader', 'coredns', 'discovery', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'gateway-aggregator', 'grafana', 'grafana-sc-dashboard', 'grafana-sc-datas

In [3]:
# Loads the dataset without ruining RAM

df: pd.DataFrame = None
for i, chunk in enumerate(pd.read_csv(output_path, header=0, chunksize=30000)):
    print(f'loaded chunk {i + 1}')
    if df is None:
        df = chunk
    else:
        df = pd.concat([df, chunk])

loaded chunk 1
loaded chunk 2
loaded chunk 3
loaded chunk 4
loaded chunk 5
loaded chunk 6
loaded chunk 7
loaded chunk 8


In [None]:
columns = df.columns
unique_columns = set((col.split(".")[0] for col in columns))
print(unique_columns)

counter = {col: 0 for col in unique_columns}

new_cols = []
for col in columns:
    name = col.split(".")[0]

    if counter[name] == 0:
        new_cols.append(name)
    else:
        new_cols.append(f'{name}.{counter[name]}')
    counter[name] += 1

print(f'{new_cols=}')
df.columns = new_cols

{'metrics-server', 's2b', 'kube-prometheus-stack', 's2', 's1a', 'gw-nginx', 'traefik', 'node-exporter', 'istio-init', 'gateway-aggregator', 'config-reloader', 'alertmanager', 'timestamp', 'grafana-sc-datasources', 'gw', 'prometheus', 'istio-proxy', 'lb-tcp-443', 's1', 'local-path-provisioner', 's2a', 'kiali', 's4', 'discovery', 'grafana', 'jaeger', 's1b', 'coredns', 'kube-state-metrics', 'lb-tcp-80', 's3', 'grafana-sc-dashboard'}
new_cols=['timestamp', 'alertmanager', 'config-reloader', 'config-reloader.1', 'coredns', 'discovery', 'gateway-aggregator', 'gateway-aggregator.1', 'gateway-aggregator.2', 'gateway-aggregator.3', 'gateway-aggregator.4', 'gateway-aggregator.5', 'gateway-aggregator.6', 'gateway-aggregator.7', 'gateway-aggregator.8', 'gateway-aggregator.9', 'gateway-aggregator.10', 'gateway-aggregator.11', 'gateway-aggregator.12', 'gateway-aggregator.13', 'gateway-aggregator.14', 'gateway-aggregator.15', 'gateway-aggregator.16', 'gateway-aggregator.17', 'grafana', 'grafana-sc-da

In [None]:
def reshape_timestamp(series: pd.Series):
    data = []
    for ele in series.values:
        try:
            q = datetime.strptime(ele, "%Y-%m-%d %H:%M:%S.%f")
        except:
            q = ""
        data.append(q)
    ser = pd.Series(data)
    return ser


df["parsed_timestamp"] = df["timestamp"].transform(reshape_timestamp)

# Removes entries with invalid datetime formats;
# which are marked by empty strings.
invalid_dates = df[df["parsed_timestamp"] == ""]
broken_entries_count = len(invalid_dates)
print(f"{broken_entries_count=}")
df = df.drop(invalid_dates.index)
print(f'{len(df)=}')

broken_entries_count=238
len(df)=229849


In [None]:
def reset_column_counters(df: pd.DataFrame):
    """Resets the counter suffix in the column names."""
    base_names = [col.split(".")[0] for col in df.columns]
    counters = {un: 0 for un in set(base_names)}
    new_columns = []
    for column in base_names:
        if counters[column] == 0:
            # Adds the cleaned up name if there are no duplicates.
            new_columns.append(column)
        else:
            # Adds the index in the suffix again if its a duplicate.
            new_columns.append(f"{column}.{counters[column]}")
        counters[column] += 1
    df.columns = new_columns


def sample_relevant_data(experiment_folder: str):
    """Creates a .csv file for each experiment containing
    only relevant data to that experiment."""
    # Identifies the start and end time of the experiment.
    metadata_path = f"{experiment_folder}/metadata.json"
    with open(metadata_path, "r", encoding="utf-8") as metadata_file:
        metadata = json.loads(metadata_file.read())
    start_time = datetime.strptime(metadata["start_time"], TIME_FORMAT)
    end_time = datetime.strptime(metadata["end_time"], TIME_FORMAT)

    # Samples all data based on those times.
    sub_df: pd.DataFrame = df[df["parsed_timestamp"] >= start_time]
    sub_df = sub_df[sub_df["parsed_timestamp"] <= end_time]

    # Drops useless / duplicate data.
    sub_df.dropna(how="all", axis=1, inplace=True)
    # if "parsed_timestamp" in df.columns:
    #     sub_df = sub_df.drop(["parsed_timestamp"], axis=1)

    # Resets column names and row indices.
    reset_column_counters(sub_df)
    sub_df.reset_index()

    # Writes it to the output.
    cpu_utilization_path = f"{experiment_folder}/cpu_utilization_raw.csv"
    sub_df.to_csv(cpu_utilization_path, index=False)


exp_name = "pinciroli_replication_2"

ga_folder = os.path.abspath(f"../gateway_aggregator/results/{exp_name}")
go_folder = os.path.abspath(f"../gateway_offloading/results/{exp_name}")
pnfj_folder = os.path.abspath(
    f"../pipes_and_filters/pipes_and_filters_joint/results/{exp_name}"
)
pnfs_folder = os.path.abspath(
    f"../pipes_and_filters/pipes_and_filters_separated/results/{exp_name}"
)

experiment_folders = itertools.chain(
    iterate_through_nested_folders(ga_folder, max_depth=2),
    iterate_through_nested_folders(go_folder, max_depth=3),
    iterate_through_nested_folders(pnfj_folder, max_depth=3),
    iterate_through_nested_folders(pnfs_folder, max_depth=3),
)

count = 0
for experiment_folder in experiment_folders:
    print(experiment_folder)
    sample_relevant_data(experiment_folder)
    count += 1
print(f"Replaced {count} cpu utilization files.")

/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_26/0_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_26/1_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_26/2_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_26/3_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_26/4_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_26/5_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replication_2/experiment_1/2023_11_28/0_steps
/workspaces/muBench-experiment/gssi_experiment/gateway_aggregator/results/pinciroli_replic