In [1]:
import pandas as pd
import numpy as np
import os
import yaml
import pickle
from sklearn.preprocessing import MinMaxScaler

In [12]:
RAW_DIR = "/home/dds/yjq/data/data-TrainTicket"
TARGET_DIR = '/home/dds/yjq/DejaVu/data/TrainTicket'
DATES = ["2024-11-01", "2024-11-02", "2024-11-03"]

In [100]:
def get_pod_service_name(target_date):
    pod_names = []
    service_names = []
    for file in os.listdir(os.path.join(RAW_DIR, target_date, "metric")):
        pod = file.split('.')[0]
        service = pod.split("service")[0]+"service"
        pod_names.append(pod)
        service_names.append(service)
    return pod_names, service_names

In [101]:
pod_names, service_names = get_pod_service_name(DATES[0])
print(len(pod_names))
print(len(service_names))

40
40


In [102]:
yaml_data = []
# 构造 YAML 数据结构
data1 = {
    "class": "global_params",
    "pod": pod_names,
    'service': service_names
}

## 处理节点

In [103]:
metrics = {
    "cpu": ["cpu"],
    "memory": ["memory"],
    "io": ["io"],
    "syscall" :["syscall_read","syscall_write"],
    "network": ["net_receive","net_send","net_latency"]
}

In [104]:
nodes_data = []
for metric, values in metrics.items():
    node_data = {
        "class": "node",
        "global_params": ["pod"],
        "id": r"{pod} "+metric,
        "metrics":[r'{pod}##'+name for name in values],
        "type": f"Pod {metric}"        
    }
    nodes_data.append(node_data)
nodes_data

[{'class': 'node',
  'global_params': ['pod'],
  'id': '{pod} cpu',
  'metrics': ['{pod}##cpu'],
  'type': 'Pod cpu'},
 {'class': 'node',
  'global_params': ['pod'],
  'id': '{pod} memory',
  'metrics': ['{pod}##memory'],
  'type': 'Pod memory'},
 {'class': 'node',
  'global_params': ['pod'],
  'id': '{pod} io',
  'metrics': ['{pod}##io'],
  'type': 'Pod io'},
 {'class': 'node',
  'global_params': ['pod'],
  'id': '{pod} syscall',
  'metrics': ['{pod}##syscall_read', '{pod}##syscall_write'],
  'type': 'Pod syscall'},
 {'class': 'node',
  'global_params': ['pod'],
  'id': '{pod} network',
  'metrics': ['{pod}##net_receive', '{pod}##net_send', '{pod}##net_latency'],
  'type': 'Pod network'}]

## 处理边

In [105]:
def get_edges_from_trace(target_date):
    edges = set()
    for file in os.listdir(os.path.join(RAW_DIR, target_date, "trace"))[:1]:
        trace_file = os.path.join(RAW_DIR, target_date, "trace", file)
        print(trace_file)
        trace_df = pd.read_csv(trace_file)
        trace_df.head()
        traceid_spans = {}
        for trace_id, trace_spans in trace_df.groupby('TraceID'):
            traceid_spans[trace_id] = trace_spans
            for spans in traceid_spans.values():
                for span1 in spans.iterrows():
                    span_id = span1[1]['SpanID']
                    service1 = span1[1]['PodName'].split("service")[0] + "service"
                    for span2 in spans.iterrows():
                        parent_id = span2[1]['ParentID']
                        if span_id == parent_id:
                            service2 = span2[1]['PodName'].split("service")[0] + "service"
                            if service1 != service2:
                                edges.add((service1, service2))
        return edges

In [106]:
edges = get_edges_from_trace(DATES[0])

/home/dds/yjq/data/data-TrainTicket/2024-11-01/trace/22_28_trace.csv


In [None]:

dst = [edge[1] for edge in edges]
src = [edge[0] for edge in edges]
edge_data = {
    'class': 'edge',
    'dst': r'{dst}',
    'params': {
        'dst': dst,
        'src': src
    },
    'src': r'{src}',
    'type': 'service-service'
}

## 整合

In [None]:
# 将 YAML 数据写入文件
yaml_data.append(data1)
yaml_data.extend(nodes_data)
yaml_data.append(edge_data)
with open("output.yaml", "w") as yaml_file:
    yaml.dump(yaml_data, yaml_file, default_flow_style=False, sort_keys=False)
    print("YAML 文件已生成：output.yaml")

## 处理metric

In [50]:
with open('/home/dds/yjq/DejaVu/Data/D/metrics.norm.pkl', 'rb') as f:
    metric_data = pickle.load(f)

In [51]:
metric_data

Unnamed: 0,serviceName,timestamp,value,metric_kind,name,pod,metric_type
4189,ts-admin-basic-info-service,1645596000,0.390379,cost,ts-admin-basic-info-service##cost,,cost
4190,ts-admin-basic-info-service,1645596060,0.371458,cost,ts-admin-basic-info-service##cost,,cost
4191,ts-admin-basic-info-service,1645596120,-0.080145,cost,ts-admin-basic-info-service##cost,,cost
4192,ts-admin-basic-info-service,1645596180,-0.069390,cost,ts-admin-basic-info-service##cost,,cost
4193,ts-admin-basic-info-service,1645596240,-0.063645,cost,ts-admin-basic-info-service##cost,,cost
...,...,...,...,...,...,...,...
58051597,,1646510640,0.000000,succ_rate,ts-verification-code-service-1##succ_rate,ts-verification-code-service-1,succ_rate
58051598,,1646510760,0.000000,succ_rate,ts-verification-code-service-1##succ_rate,ts-verification-code-service-1,succ_rate
58051599,,1646510880,0.000000,succ_rate,ts-verification-code-service-1##succ_rate,ts-verification-code-service-1,succ_rate
58051600,,1646510940,0.000000,succ_rate,ts-verification-code-service-1##succ_rate,ts-verification-code-service-1,succ_rate


In [58]:
processed_data = []
for dt in DATES:
    for file in sorted(os.listdir(os.path.join(RAW_DIR, "2024-11-01", "metric"))):
        pod_metric = pd.read_csv(os.path.join(RAW_DIR, "2024-11-01", "metric", file))
        pod = file.split('.')[0]
        service = pod.split("service")[0]+"service"
        pod_metric['time'] = pd.to_datetime(pod_metric['time'])
        # 将 datetime 对象四舍五入到最近的分钟
        pod_metric['time'] = pod_metric['time'].dt.floor('min')
        pod_metric['timestamp'] = pod_metric['time'].astype('int64') // 10**9 - 60 * 60 * 8  # 转换为秒级时间戳
        pod_metric['serviceName'] = service
        pod_metric = pod_metric.drop(columns=['time', 'run_time'])
        pod_metric.rename(columns={'pod_name':'pod'})

        metrics = ['cpu', 'memory', 'io', 'syscall_read', 'syscall_write', 'net_receive', 'net_send', 'net_latency']


        # 对每个指标进行处理
        for metric in metrics:
            # 选择当前指标列
            metric_df = pod_metric[['serviceName', 'timestamp', 'pod_name', metric]].copy()
            
            # 重命名列以符合输出格式
            metric_df = metric_df.rename(columns={metric: 'value'})
            metric_df['metric_kind'] = metric  # 假设这些指标都是 gauge 类型
            metric_df['name'] = metric_df.apply(lambda row: f"{row['pod_name']}##{metric}", axis=1)
            metric_df['pod'] = metric_df['pod_name']
            metric_df['metric_type'] = metric  # 同上
            
            # 删除不再需要的列
            metric_df = metric_df.drop(columns=['pod_name'])
            
            # 将处理后的数据添加到列表中
            processed_data.append(metric_df)

        # 将处理后的数据转换为 DataFrame
processed_df = pd.concat(processed_data, ignore_index=True)
processed_df.to_csv(os.path.join(TARGET_DIR, 'metrics.csv'), index=False)

In [59]:
# 按照 name 归一化
scaler = {}
for name, group in processed_df.groupby('name'):
    scaler[name] = MinMaxScaler()
    processed_df.loc[group.index, 'value'] = scaler[name].fit_transform(group[['value']])

# 打印处理后的 DataFrame
processed_df

Unnamed: 0,serviceName,timestamp,value,metric_kind,name,pod,metric_type
0,ts-admin-basic-info-service,1730451120,0.059664,cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6##cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6,cpu
1,ts-admin-basic-info-service,1730451180,0.065265,cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6##cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6,cpu
2,ts-admin-basic-info-service,1730451240,0.055786,cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6##cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6,cpu
3,ts-admin-basic-info-service,1730451300,0.062432,cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6##cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6,cpu
4,ts-admin-basic-info-service,1730451360,0.197233,cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6##cpu,ts-admin-basic-info-service-5f44d7855b-nrbk6,cpu
...,...,...,...,...,...,...,...
409915,ts-voucher-service,1730476500,0.000000,net_latency,ts-voucher-service-5658485d65-d44dx##net_latency,ts-voucher-service-5658485d65-d44dx,net_latency
409916,ts-voucher-service,1730476560,0.000000,net_latency,ts-voucher-service-5658485d65-d44dx##net_latency,ts-voucher-service-5658485d65-d44dx,net_latency
409917,ts-voucher-service,1730476620,0.000000,net_latency,ts-voucher-service-5658485d65-d44dx##net_latency,ts-voucher-service-5658485d65-d44dx,net_latency
409918,ts-voucher-service,1730476680,0.000000,net_latency,ts-voucher-service-5658485d65-d44dx##net_latency,ts-voucher-service-5658485d65-d44dx,net_latency


In [60]:
processed_df.to_csv(os.path.join(TARGET_DIR, 'metrics.norm.csv'), index=False)

In [63]:
with open(os.path.join(TARGET_DIR, 'metrics.norm.pkl'), 'wb') as f:
    pickle.dump(processed_df,f)

## 转换标签

In [8]:
import json
temp = []
for dt in DATES:
    label_json_file = os.path.join(RAW_DIR, dt, dt+'-fault_list.json')
    with open(label_json_file, 'r') as f:
        faults = json.load(f)
    temp.extend(faults.values())

In [9]:
fault_list = []
for tmp in temp:
    fault_list.extend(tmp)
fault_list

[{'inject_time': '2024-11-01 17:30:09',
  'inject_timestamp': '1730453409',
  'inject_pod': 'ts-contacts-service-6b9f8dcd78-m68rp',
  'inject_type': 'cpu_contention'},
 {'inject_time': '2024-11-01 17:42:21',
  'inject_timestamp': '1730454141',
  'inject_pod': 'ts-contacts-service-6b9f8dcd78-m68rp',
  'inject_type': 'network_delay'},
 {'inject_time': '2024-11-01 17:54:33',
  'inject_timestamp': '1730454873',
  'inject_pod': 'ts-contacts-service-6b9f8dcd78-m68rp',
  'inject_type': 'io_contention'},
 {'inject_time': '2024-11-01 18:06:42',
  'inject_timestamp': '1730455602',
  'inject_pod': 'ts-contacts-service-6b9f8dcd78-m68rp',
  'inject_type': 'network_loss'},
 {'inject_time': '2024-11-01 18:18:51',
  'inject_timestamp': '1730456331',
  'inject_pod': 'ts-contacts-service-6b9f8dcd78-m68rp',
  'inject_type': 'memory_stress'},
 {'inject_time': '2024-11-01 18:31:02',
  'inject_timestamp': '1730457062',
  'inject_pod': 'ts-basic-service-5958769c99-mj82d',
  'inject_type': 'cpu_contention'},


In [13]:
import csv
converted_data = []
for item in fault_list:
    timestamp = int(item['inject_timestamp'])
    duration = '3m'
    root_cause_node = item['inject_pod']
    experiment_type = item['inject_type'].replace('_', '-').replace('cpu', 'CPU').replace('network', 'pod-network')
    converted_data.append([timestamp, duration, root_cause_node, experiment_type])

# 写入CSV文件
csv_file = os.path.join(TARGET_DIR, 'faults.csv')
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['timestamp', 'duration', 'root_cause_node', 'experiment_type'])
    writer.writerows(converted_data)