In [6]:
import os
import pandas as pd
import json

# 输入目录路径
input_dir = "data/2025-06-06"
output_dir = "test_data"

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 指定时间范围
start_time = "2025-06-05T16:10:02Z"
end_time = "2025-06-05T16:31:02Z"

# 转换时间为 pandas.Timestamp
start_time = pd.Timestamp(start_time)
end_time = pd.Timestamp(end_time)

# 处理 log 文件
def process_log_file(file_path, output_path):
    try:
        df = pd.read_parquet(file_path)
        if '@timestamp' in df.columns:
            df['@timestamp'] = pd.to_datetime(df['@timestamp'])
            filtered_df = df[(df['@timestamp'] >= start_time) & (df['@timestamp'] <= end_time)]
            if not filtered_df.empty:
                filtered_df.to_json(output_path, orient="records", lines=True)
                print(f"Filtered log data saved to {output_path}")
    except Exception as e:
        print(f"Error processing log file {file_path}: {e}")

# 处理 trace 文件
def process_trace_file(file_path, output_path):
    try:
        df = pd.read_parquet(file_path)
        if 'startTimeMillis' in df.columns:
            filtered_df = df[(df['startTimeMillis'] >= start_time.value // 10**6) & 
                             (df['startTimeMillis'] <= end_time.value // 10**6)]
            if not filtered_df.empty:
                filtered_df.to_json(output_path, orient="records", lines=True)
                print(f"Filtered trace data saved to {output_path}")
    except Exception as e:
        print(f"Error processing trace file {file_path}: {e}")

# 处理 metric 文件
def process_metric_file(file_path, output_path):
    try:
        df = pd.read_parquet(file_path)
        if 'time' in df.columns:
            df['time'] = pd.to_datetime(df['time'])
            filtered_df = df[(df['time'] >= start_time) & (df['time'] <= end_time)]
            if not filtered_df.empty:
                filtered_df.to_json(output_path, orient="records", lines=True)
                print(f"Filtered metric data saved to {output_path}")
    except Exception as e:
        print(f"Error processing metric file {file_path}: {e}")

# 遍历目录中的所有 Parquet 文件
for root, _, files in os.walk(input_dir):
    for file in files:
        if file.endswith(".parquet"):
            input_file_path = os.path.join(root, file)
            relative_path = os.path.relpath(root, input_dir)
            output_subdir = os.path.join(output_dir, relative_path)
            os.makedirs(output_subdir, exist_ok=True)
            output_file_path = os.path.join(output_subdir, f"{os.path.splitext(file)[0]}.json")

            # 根据目录分类处理
            if "log-parquet" in root:
                process_log_file(input_file_path, output_file_path)
            elif "trace-parquet" in root:
                process_trace_file(input_file_path, output_file_path)
            elif "metric-parquet" in root:
                process_metric_file(input_file_path, output_file_path)

Filtered trace data saved to test_data/trace-parquet/trace_jaeger-span_2025-06-06_00-00-00.json
Filtered metric data saved to test_data/metric-parquet/other/infra_pd_store_low_space_count_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_pd_learner_count_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_tikv_memory_usage_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_tikv_grpc_qps_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_pd_region_health_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_tikv_qps_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_tikv_raft_propose_wait_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_pd_storage_capacity_2025-06-06.json
Filtered metric data saved to test_data/metric-parquet/other/infra_tikv_cpu_usage_2025-06-06.json