# Compute Disruption Point

In [87]:
import pandas as pd
import numpy as np
import os
from datetime import datetime
from scipy.stats import mannwhitneyu, shapiro

In [88]:
# Thresholds
RESPONSE_TIME_THRESHOLD = 50_000
FAILURE_RATE_THRESHOLD = 0.05
# Paths
NORMAL_PATH = (
    "/Users/ketaiqiu/Projects/train-ticket/dataset/normal-2weeks/normal-2weeks.csv"
)
FAILURE_INJECTION_LOG_PATH = "/Users/ketaiqiu/Projects/train-ticket/dataset/failure-experiments/failure-injection-logs.csv"
FAILURE_DATASET_PATH = (
    "/Users/ketaiqiu/Projects/train-ticket/dataset/failure-experiments"
)

In [89]:
df_normal = pd.read_csv(NORMAL_PATH)
df_failure_injection_log = pd.read_csv(FAILURE_INJECTION_LOG_PATH)

In [90]:
@staticmethod
def a12_unpaired(lst1, lst2):
    more = same = 0.0
    for x in lst1:
        for y in lst2:
            if x == y:
                same += 1
            elif x > y:
                more += 1
    return (more + 0.5 * same) / (len(lst1) * len(lst2))

In [91]:
def find_disruption_index_by_test(
    start_failure_index,
    normal_response_time,
    faulty_response_time,
    normal_http_failure_rate,
    faulty_http_failure_rate,
):
    window_size = 10
    for i in range(start_failure_index, len(faulty_response_time) - window_size + 1):
        _, p = mannwhitneyu(
            faulty_response_time[i : i + window_size],
            normal_response_time,
            nan_policy="omit",
        )
        a_unpaired = a12_unpaired(
            faulty_response_time[i : i + window_size], normal_response_time
        )
        if (
            faulty_response_time[i] > RESPONSE_TIME_THRESHOLD
            and np.average(faulty_response_time[i : i + window_size])
            > RESPONSE_TIME_THRESHOLD / 3
            and p < 0.05
            and 2 * np.abs(a_unpaired - 0.5) >= 0.474
        ):
            print("Find disruption index from response time.")
            return i + window_size // 2 - 1
    window_size = 5
    for i in range(
        start_failure_index, len(faulty_http_failure_rate) - window_size + 1
    ):
        _, p = mannwhitneyu(
            faulty_http_failure_rate[i : i + window_size],
            normal_http_failure_rate,
            nan_policy="omit",
        )
        a_unpaired = a12_unpaired(
            faulty_http_failure_rate[i : i + window_size], normal_http_failure_rate
        )
        if (
            faulty_http_failure_rate[i] > FAILURE_RATE_THRESHOLD
            and np.average(faulty_http_failure_rate[i : i + window_size])
            > FAILURE_RATE_THRESHOLD
            and p < 0.05
            and 2 * np.abs(a_unpaired - 0.5) >= 0.474
        ):
            print("Find disruption index from HTTP failure rate.")
            return int(i + window_size / 2 + 0.5 - 1)

    print("Find disruption index from the later point with the highest metric.")
    max_response_index = faulty_response_time.index(
        max(faulty_response_time[start_failure_index:])
    )
    max_http_failure_rate_index = faulty_http_failure_rate.index(
        max(faulty_http_failure_rate[start_failure_index:])
    )
    return max(max_response_index, max_http_failure_rate_index)

In [92]:
def gen_disruptive_indices(df_failure_injection_log: pd.DataFrame) -> pd.DataFrame:
    disruptive_indices = {}
    for i in df_failure_injection_log.index:
        exp_name = df_failure_injection_log.loc[i]["folder_name"]
        failure_injection_time = datetime.fromisoformat(
            df_failure_injection_log.loc[i, "failure_begin_timestamp"]
        ).timestamp()
        experiment_end_time = datetime.fromisoformat(
            df_failure_injection_log.loc[i, "experiment_end_timestamp"]
        ).timestamp()
        df = pd.read_csv(
            os.path.join(FAILURE_DATASET_PATH, exp_name, exp_name + ".csv")
        )

        df["timestamp"] = pd.to_datetime(df["timestamp"])
        df["timestamp"] = df["timestamp"].apply(lambda dt: dt.timestamp())
        df.set_index("timestamp", inplace=True)

        df.sort_index(inplace=True)
        df = df.loc[:experiment_end_time]
        response_time = df["lm-95%"].dropna().to_list()
        failure_rate = df["lm-Failures/s"].dropna().to_list()
        normal_response_time = df_normal["lm-95%"].dropna().to_list()
        normal_failure_rate = df_normal["lm-Failures/s"].dropna().to_list()
        start_failure_index = pd.DataFrame(
            abs(df.index - failure_injection_time)
        ).idxmin()["timestamp"]
        disruptive_index = find_disruption_index_by_test(
            start_failure_index,
            normal_response_time,
            response_time,
            normal_failure_rate,
            failure_rate,
        )
        disruptive_indices[exp_name] = disruptive_index

    df_failure_disruption = df_failure_injection_log.copy()
    df_failure_disruption["Disruption"] = df_failure_disruption["folder_name"].apply(
        lambda fname: disruptive_indices[fname]
    )
    return df_failure_disruption

In [93]:
# Compute
df_failure_disruption = gen_disruptive_indices(df_failure_injection_log)

Find disruption index from response time.
Find disruption index from HTTP failure rate.
Find disruption index from the later point with the highest metric.
Find disruption index from the later point with the highest metric.
Find disruption index from the later point with the highest metric.
Find disruption index from response time.
Find disruption index from response time.
Find disruption index from response time.
Find disruption index from HTTP failure rate.
Find disruption index from the later point with the highest metric.
Find disruption index from the later point with the highest metric.
Find disruption index from the later point with the highest metric.
Find disruption index from HTTP failure rate.
Find disruption index from HTTP failure rate.
Find disruption index from HTTP failure rate.
Find disruption index from HTTP failure rate.
Find disruption index from response time.
Find disruption index from response time.
Find disruption index from HTTP failure rate.
Find disruption in

In [94]:
df_failure_disruption[["folder_name", "Disruption"]]

Unnamed: 0,folder_name,Disruption
0,linear-cpu-stress-ts-auth-service-012416,162
1,linear-cpu-station-memory-train-020810,109
2,linear-cpu-stress-ts-basic-service-020616,169
3,linear-cpu-stress-ts-station-service-020211,114
4,linear-cpu-stress-ts-train-service-020713,105
5,linear-cpu-train-delay-station-021212,75
6,linear-cpu-station-delay-train-021310,84
7,linear-cpu-station-delay-train-021310,84
8,linear-cpu-train-memory-station-020912,126
9,linear-cpu-stress-ts-station-service-022114,129


In [95]:
# Store disruption points
df_failure_disruption.to_csv(
    os.path.join(FAILURE_DATASET_PATH, "failure-injection-logs-with-disruption.csv"),
    index=False,
)