In [None]:
import pandas as pd # type: ignore
import json
import glob

files = glob.glob("../phasetwo/2025-06-18/log-parquet/*.parquet")
structured = open("error.06-18.log", "w", encoding="utf-8")
for log in files:
    df = pd.read_parquet(log)
    # df = pd.read_parquet("../data/sample/normal/log-parquet/" + log)
    print(f"Processing {log} with {len(df)} records")

    # 逐行打印message列的内容
    for index, row in df.iterrows():
        if 'message' in row:
            message = row['message']
            if not message.startswith('{"'):
                continue
            message = json.loads(message)
            if 'error' in message:
                structured.write(f"Row {index}: {message}\n")
            # elif 'session' in message:
            #     print(message['session'])
        else:
            print(row)
# print(f"k8_namespace\t{row['k8_namespace']}\ntimestamp\t{row['@timestamp']}\nagent_name\t{row['agent_name']}\nk8_pod\t{row['k8_pod']}\nmessage\t{row['message']}\nk8_node_name\t{row['k8_node_name']}")
# print(row)

In [1]:
import pandas as pd
import json
import glob
import os

# Find all date folders (assuming they follow the pattern ../phasetwo/YYYY-MM-DD/)
date_folders = glob.glob("../phasetwo/*/")  # Gets all directories in phasetwo

for date_folder in date_folders:
    # Extract date from folder path (assuming format ../phasetwo/YYYY-MM-DD/)
    date = os.path.basename(os.path.normpath(date_folder))

    # Skip if not in YYYY-MM-DD format
    if len(date) != 10 or date[4] != '-' or date[7] != '-':
        continue

    # Find all parquet files for this date
    parquet_files = glob.glob(f"{date_folder}/log-parquet/*.parquet")

    if not parquet_files:
        print(f"No parquet files found for date {date}")
        continue

    # Create error log file for this date
    error_log_path = f"error-{date}.log"
    print(f"Processing date {date}, output to {error_log_path}")

    with open(error_log_path, "w", encoding="utf-8") as structured:
        for log_file in parquet_files:
            try:
                df = pd.read_parquet(log_file)
                print(f"  Processing {os.path.basename(log_file)} with {len(df)} records")

                for index, row in df.iterrows():
                    if 'message' in row:
                        message = row['message']
                        if not isinstance(message, str) or not message.startswith('{"'):
                            continue
                        try:
                            message_json = json.loads(message)
                            if 'error' in message_json:
                                structured.write(f"File: {os.path.basename(log_file)}, Row {index}: {message_json}\n")
                        except json.JSONDecodeError:
                            continue
            except Exception as e:
                print(f"Error processing {log_file}: {str(e)}")

    print(f"Finished processing date {date}\n")

Processing date 2025-06-20, output to error-2025-06-20.log
  Processing log_filebeat-server_2025-06-20_11-00-00.parquet with 458782 records
  Processing log_filebeat-server_2025-06-20_22-00-00.parquet with 155634 records
  Processing log_filebeat-server_2025-06-20_06-00-00.parquet with 127235 records
  Processing log_filebeat-server_2025-06-20_10-00-00.parquet with 379589 records
  Processing log_filebeat-server_2025-06-20_12-00-00.parquet with 320925 records
  Processing log_filebeat-server_2025-06-20_05-00-00.parquet with 156744 records
  Processing log_filebeat-server_2025-06-20_14-00-00.parquet with 358944 records
  Processing log_filebeat-server_2025-06-20_04-00-00.parquet with 158182 records
  Processing log_filebeat-server_2025-06-20_20-00-00.parquet with 382684 records
  Processing log_filebeat-server_2025-06-20_23-00-00.parquet with 152935 records
  Processing log_filebeat-server_2025-06-20_02-00-00.parquet with 141808 records
  Processing log_filebeat-server_2025-06-20_13-00-

In [3]:
import re

error_str = "failed to complete the order: rpc error: code = Internal desc = cart failure: failed to get user cart during checkout: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing dial tcp: lookup cartservice on 10.233.33.139:53: server misbehaving\""
import re
import pandas as pd

def extract_key_info(error: str):
    if not isinstance(error, str):
        return pd.Series([None, None, None])

    # 1. 提取服务名和IP
    service_ip_pattern = r'(\w+)\s+on\s+(\d+\.\d+\.\d+\.\d+:\d+)'
    service_ip_match = re.search(service_ip_pattern, error)
    service = service_ip_match.group(1) if service_ip_match else None
    ip = service_ip_match.group(2) if service_ip_match else None

    # 2. 提取操作描述，尝试匹配 "failed to ..." 到下一个冒号或句号之前的内容
    operation_pattern = r'failed to ([^:]+)'
    operation_match = re.search(operation_pattern, error)
    operation = operation_match.group(1).strip() if operation_match else None

    return pd.Series([service, ip, operation])

print(extract_key_info(error_str))

0           cartservice
1      10.233.33.139:53
2    complete the order
dtype: object


In [7]:
import pandas as pd
import json
import glob

def normalize_error(error: str) -> str:
    if not isinstance(error, str):
        return error

    # 先替换以#开头的ID
    error = re.sub(r'#\w+', '<PRODUCT_ID>', error)

    # 再替换不带#的产品ID，假设是8-12位大写字母数字组合
    error = re.sub(r'\b[A-Z0-9]{8,12}\b', '<PRODUCT_ID>', error)

    return error

error_logs = []
files = glob.glob("../phasetwo/*/log-parquet/*.parquet")  # Gets all directories in phasetwo
for file in files:
    try:
        df = pd.read_parquet(file)
        print(f"Processing {file} with {len(df)} records")

        for index, row in df.iterrows():
            if 'message' in row:
                message = row['message']
                if not isinstance(message, str) or not message.startswith('{"'):
                    continue
                try:
                    message_json = json.loads(message)
                    if 'error' in message_json:
                        msg = normalize_error(message_json['error'])
                        if msg not in error_logs:
                            error_logs.append(msg)
                            print(msg)
                except json.JSONDecodeError:
                    continue
    except Exception as e:
        print(f"Error processing {log_file}: {str(e)}")
# with open("error_logs.txt", "w", encoding="utf-8") as error_logs:
#     error_logs.write("\n".join(error_logs))

Processing ../phasetwo/2025-06-20/log-parquet/log_filebeat-server_2025-06-20_11-00-00.parquet with 458782 records
failed to complete the order: rpc error: code = Internal desc = failed to charge card: could not charge the card: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing dial tcp 10.233.55.4:50051: connect: connection refused"
Processing ../phasetwo/2025-06-20/log-parquet/log_filebeat-server_2025-06-20_22-00-00.parquet with 155634 records
Processing ../phasetwo/2025-06-20/log-parquet/log_filebeat-server_2025-06-20_06-00-00.parquet with 127235 records
could not retrieve product: rpc error: code = Unknown desc = Error 9005 (HY000): Region is unavailable
failed to complete the order: rpc error: code = Internal desc = failed to prepare order: failed to get product #"<PRODUCT_ID>"
failed to get product recommendations: failed to get recommended product info (<PRODUCT_ID>): rpc error: code = Unknown desc = Error 9005 (HY000): Region is unavai

KeyboardInterrupt: 