In [34]:
# warnings....ignore
import pandas as pd
import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)


pd.options.mode.chained_assignment = None

In [35]:
DATA = []
BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10]
TIME_VECTOR = 5  # second

In [36]:
import json


def get_data_from_file(filename: str):
    f = open(filename, "r")
    while True:
        line = f.readline()
        if not line:
            break
        # Standardize json to dict
        row = json.loads(line)
        # Append to Array
        DATA.append(row)

In [37]:
get_data_from_file("../statics/request_data_with_anomaly_pod_instance.json")

In [38]:
group_by_el = ["path", "method", "service_name", "controller_pod"]

In [39]:
# Create dataframe
root_df = pd.DataFrame(DATA)

In [40]:
root_df.head()

Unnamed: 0,timestamp,path,method,service_name,controller_pod,duration_time
0,"04/02/2024, 22:06:03",/users,GET,process_log,pod-2,0.328
1,"04/02/2024, 22:06:04",/users,GET,process_log,pod-2,0.77
2,"04/02/2024, 22:06:04",/users,GET,process_log,pod-1,0.349
3,"04/02/2024, 22:06:04",/users,GET,process_log,pod-2,0.722
4,"04/02/2024, 22:06:05",/users,GET,process_log,pod-1,0.64


In [41]:
# Bins data to bucket latency
def bucketing(duration_time):
    res = []
    for bucket in BUCKETS:
        if duration_time <= bucket:
            res.append(bucket)
    res.append("+Inf")
    return res

root_df["le"] = root_df["duration_time"].apply(lambda x: bucketing(x))

In [42]:
root_df.query("path == '/users' and controller_pod == 'pod-1'")['le']

2            [0.5, 1, 2, 5, 10, +Inf]
4                 [1, 2, 5, 10, +Inf]
9                 [1, 2, 5, 10, +Inf]
12           [0.5, 1, 2, 5, 10, +Inf]
14                [1, 2, 5, 10, +Inf]
                    ...              
723    [0.25, 0.5, 1, 2, 5, 10, +Inf]
724          [0.5, 1, 2, 5, 10, +Inf]
725               [1, 2, 5, 10, +Inf]
727          [0.5, 1, 2, 5, 10, +Inf]
729          [0.5, 1, 2, 5, 10, +Inf]
Name: le, Length: 178, dtype: object

In [43]:
group_dataset = root_df.groupby(group_by_el).count().reset_index()[group_by_el]

In [44]:
group_dataset

Unnamed: 0,path,method,service_name,controller_pod
0,/home,GET,process_log,pod-1
1,/home,GET,process_log,pod-2
2,/users,GET,process_log,pod-1
3,/users,GET,process_log,pod-2


In [45]:
from typing import Dict


def bins_value(row, bins_dict: Dict):
    # unwind data from list
    list_le = row['le']
    for le in list_le:
        if le in bins_dict:
            bins_dict[le] += 1
    bins_series = pd.Series(
        data=bins_dict, index=bins_dict.keys()  # type: ignore
    )
    # Join two series
    new_row = pd.concat([row, bins_series])
    return new_row

In [46]:
# Bins value to bucket latency
df_by_group = {}
for idx, dataset in group_dataset.iterrows():
    bins_dict = {k: 0 for k in BUCKETS}
    bins_dict['+Inf'] = 0  # type: ignore
    # Query for each group get dataframe and save it to datastructure
    df_by_group[idx] = root_df.query(
        "path == @dataset['path'] \
                and method == @dataset['method'] \
                and service_name == @dataset['service_name'] \
                and controller_pod == @dataset['controller_pod']") \
        .apply(lambda x: bins_value(x, bins_dict), axis=1)

# Join all group dataframe processed by one dataframe
df_by_group_incl_latency = pd.concat(df_by_group.values())

In [47]:
from datetime import datetime

df_by_group_incl_latency['timestamp'] = df_by_group_incl_latency['timestamp'].apply(
    lambda x: datetime.strptime(x, "%d/%m/%Y, %H:%M:%S"))
# Convert to datetime pandas and set datetime columns is index for dataframe
df_by_group_incl_latency['timestamp'] = pd.to_datetime(root_df['timestamp'])
df_by_group_incl_latency = df_by_group_incl_latency.set_index('timestamp')

In [48]:
df_by_group_incl_latency.rename(columns={v: str(v) for v in BUCKETS}, inplace=True)

In [49]:
df_by_group_incl_latency.groupby([pd.Grouper(freq="30s")] + group_by_el).agg({str(el): "last" for el in BUCKETS + [
    "+Inf"]}).reset_index().set_index('timestamp').to_csv("request_data_with_anomaly_pod_instance_2.csv")