# Data Merge and labelled

In [1]:
import pandas as pd
import re
import os

# Merge 
folder = "data/sourcedata"
files = [os.path.join(folder, f) for f in os.listdir(folder) if f.startswith("messages-")]
records = []
for f in files:
    with open(f, 'r', encoding='utf-8') as fh:
        for line in fh:
            ts, host, text = line.strip().split(' ', 2)
            records.append({'timestamp': ts, 'text': text})

df = pd.DataFrame(records)
print(f"Data merge successful, total {len(df)} records")

# Label (normal/abnormal/warning)
def label_fn(txt):
    if re.search(r'(?i)error', txt):
        return 'abnormal'
    elif re.search(r'(?i)(alert|fail|warning)', txt):
        return 'warning'
    else:
        return 'normal'

df['label'] = df['text'].apply(label_fn)

# Statistics for normal, abnormal, and warning
total = len(df)
n_abnormal = (df['label'] == 'abnormal').sum()
n_warning = (df['label'] == 'warning').sum()
n_normal = (df['label'] == 'normal').sum()
abnormal_ratio = n_abnormal / total if total > 0 else 0
warning_ratio = n_warning / total if total > 0 else 0

print(f"Abnormal ratio: {abnormal_ratio:.2%}, Warning ratio: {warning_ratio:.2%}, Normal: {n_normal}, Warning: {n_warning}, Abnormal: {n_abnormal}")

# Get 500 samples with the same abnormal ratio and warning ratio
n_sample = 2000
n_abnormal = int(n_sample * abnormal_ratio)
n_warning = int(n_sample * warning_ratio)
n_normal = n_sample - n_abnormal - n_warning

# Sampled data
abnormal = df[df['label'] == 'abnormal']
warning = df[df['label'] == 'warning']
normal = df[df['label'] == 'normal']

abnormal_sample = abnormal.sample(n=min(n_abnormal, len(abnormal)), random_state=42)
warning_sample = warning.sample(n=min(n_warning, len(warning)), random_state=42)
normal_sample = normal.sample(n=min(n_normal, len(normal)), random_state=42)

# Sort by timestamp
abnormal_sample = abnormal_sample.sort_values(by='timestamp')
warning_sample = warning_sample.sort_values(by='timestamp')
normal_sample = normal_sample.sort_values(by='timestamp')

# Combine into a new dataset
sampled = pd.concat([abnormal_sample, warning_sample, normal_sample]).sort_values(by='timestamp').reset_index(drop=True)

print(f"abnormal_sample: {len(abnormal_sample)}, warning_sample: {len(warning_sample)}, normal_sample: {len(normal_sample)}")
print(f"Sampled {len(sampled)} records (abnormal + warning + normal)")

# Save as JSONL
sampled.to_json('data/sourcedata/messages-sampled.jsonl', orient='records', lines=True, force_ascii=False)
print("Sampled data saved to 'data/sourcedata/messages-sampled.jsonl'")

Data merge successful, total 15921814 records
Sampled data saved to 'data/sourcedata/messages-sampled.jsonl'


In [2]:
import pandas as pd

df = pd.read_json("data/sourcedata/messages-sampled.jsonl", lines=True)
print(df[df['label'] == 'normal'])
print(df[df['label'] == 'warning'])
print(df[df['label'] == 'abnormal'])

                            timestamp  \
0    2025-05-12 03:49:22.535042+08:00   
1    2025-05-12 04:17:49.646275+08:00   
3    2025-05-12 04:30:22.240842+08:00   
4    2025-05-12 04:44:35.856701+08:00   
5    2025-05-12 04:51:10.625776+08:00   
...                               ...   
1995 2025-06-02 02:02:43.864290+08:00   
1996 2025-06-02 02:24:13.065783+08:00   
1997 2025-06-02 02:40:38.274371+08:00   
1998 2025-06-02 02:52:19.383783+08:00   
1999 2025-06-02 02:57:20.693974+08:00   

                                                   text   label  
0       systemd: Starting Session 1237560 of user root.  normal  
1     python: - ** ---------- .> results:     elasti...  normal  
3        systemd: Started Session 1239069 of user root.  normal  
4                                       python: [tasks]  normal  
5     python: --- * ***  * -- Linux-3.10.0-1160.el7....  normal  
...                                                 ...     ...  
1995                                    pytho

# Data preprocessing

In [3]:
import os
import pandas as pd

df = pd.read_json('data/sourcedata/messages-sampled.jsonl', lines=True)

# Time order
df = df.sort_values(by='timestamp').reset_index(drop=True)

# train/test 8:2
train_size = int(len(df) * 0.5)
train_df = df.iloc[:train_size].reset_index(drop=True)
test_df = df.iloc[train_size:].reset_index(drop=True)

print(f"Number of training samples: {len(train_df)}")
print(f"Number of test samples: {len(test_df)}")

os.makedirs('data/sampledatasets', exist_ok=True)
train_df.to_json('data/sampledatasets/messages-train.jsonl', orient='records', lines=True, force_ascii=False)
test_df.to_json('data/sampledatasets/messages-test.jsonl', orient='records', lines=True, force_ascii=False)

print("Train set label counts:")
print(train_df['label'].value_counts())
print("Test set label counts:")
print(test_df['label'].value_counts())


Number of training samples: 1000
Number of test samples: 1000
Train set label counts:
label
normal      935
abnormal      6
Name: count, dtype: int64
Test set label counts:
label
normal      928
abnormal      5
Name: count, dtype: int64


# Inbalance Data -- Synthetic Minority Oversampling Technique(SMOTE)

In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from imblearn.over_sampling import SMOTE
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

# Load the original dataset and training set
df = pd.read_json('data/sourcedata/messages-sampled.jsonl', lines=True)
train_df = pd.read_json('data/sampledatasets/messages-train.jsonl', lines=True)

# Label mapping
label_to_id = {'normal': 0, 'warning': 1, 'abnormal': 2}
if train_df['label'].dtype == 'object':
    train_df['label'] = train_df['label'].map(label_to_id)
if df['label'].dtype == 'object':
    df['label'] = df['label'].map(label_to_id)

# Get the number of normal samples as target
target_count = train_df['label'].value_counts().max()  

# Step 1: Handle abnormal class (random sampling from original data)
abnormal_count = train_df['label'].value_counts().get(2, 0) 
warning_count = train_df['label'].value_counts().get(1, 0)
if abnormal_count < warning_count:
    additional_abnormal = df[df['label'] == 2].sample(n=target_count - abnormal_count, random_state=42, replace=False)
    train_df = pd.concat([train_df, additional_abnormal]).reset_index(drop=True)

# Step 2: Apply SMOTE to warning class
vectorizer = TfidfVectorizer(max_features=500)
X = vectorizer.fit_transform(train_df['text']).toarray()
y = train_df['label'].values

smote = SMOTE(random_state=42, sampling_strategy={1: target_count}, k_neighbors=1)
X_res, y_res = smote.fit_resample(X, y)

# Step 3: Convert TF-IDF features back to text
embedder = SentenceTransformer('all-MiniLM-L6-v2')
original_embeddings = embedder.encode(train_df['text'].tolist())
resampled_texts = vectorizer.inverse_transform(X_res)
resampled_texts = [' '.join(text) for text in resampled_texts]
resampled_embeddings = embedder.encode(resampled_texts)

# Find the most similar original text for each synthetic sample
similarities = cosine_similarity(resampled_embeddings, original_embeddings)
most_similar_indices = similarities.argmax(axis=1)
train_df_resampled = train_df.iloc[most_similar_indices].copy()
train_df_resampled['label'] = y_res

# Step 4: Ensure abnormal class is fully supplemented with random samples
abnormal_indices = train_df_resampled[train_df_resampled['label'] == 2].index
if len(abnormal_indices) < target_count:
    additional_abnormal_resampled = df[df['label'] == 2].sample(n=target_count - len(abnormal_indices), random_state=42, replace=True)
    train_df_resampled = pd.concat([train_df_resampled, additional_abnormal_resampled]).reset_index(drop=True)

# Save the resampled training set
train_df_resampled.to_json('data/sampledatasets/messages-train-resampled.jsonl', orient='records', lines=True, force_ascii=False)

# Print the new label distribution
print("Resampled training set label counts:")
print(train_df_resampled['label'].value_counts())

Resampled training set label counts:
label
0    935
1    935
2    935
Name: count, dtype: int64


# LLM & Fine-Tunning & RAG

In [1]:
import pandas as pd
import numpy as np
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
    EarlyStoppingCallback
)
from peft import LoraConfig, get_peft_model
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from datasets import Dataset
from sentence_transformers import SentenceTransformer
import faiss

# 1. Data Loading
train_df = pd.read_json('data/sampledatasets/messages-train-resampled.jsonl', lines=True)
test_df = pd.read_json('data/sampledatasets/messages-test.jsonl', lines=True)

# Verify format
def verify(df):
    assert 'text' in df.columns and 'label' in df.columns, "JSONL must contain 'text' and 'label'"

verify(train_df)
verify(test_df)

# Label mapping
label_to_id = {'normal': 0, 'warning': 1, 'abnormal': 2}
id_to_label = {v: k for k, v in label_to_id.items()}
for df in (train_df, test_df):
    if df['label'].dtype == 'object':
        df['label'] = df['label'].map(label_to_id)
num_labels = len(label_to_id)

# Class weights
y = train_df['label'].values
classes = np.unique(y)
weights = compute_class_weight('balanced', classes=classes, y=y)
class_weights = torch.tensor(weights, dtype=torch.float).to('cuda')

# 2. Build FAISS index for RAG KB
embedder = SentenceTransformer('all-MiniLM-L6-v2')
kb = [
    {"text": item['text'], "solution": item['solution']} 
    for item in [
        {
        "text": "systemd: infi-celery.service: main process exited, code=exited, status=1/FAILURE",
        "solution": "Check the infi-celery service log, verify configuration and dependencies, and try restarting the service."
    },
    {
        "text": "systemd: infi-celery.service holdoff time over, scheduling restart.",
        "solution": "The service will automatically restart after an exception. Investigate the root cause of the abnormal exit."
    },
    {
        "text": "systemd: infi-celery.service: control process exited, code=exited status=1",
        "solution": "Check the control process log and verify service configuration and permissions."
    },
    {
        "text": "systemd: Started Infinity Celery Worker Service.",
        "solution": "The service started successfully. No action required."
    },
    {
        "text": "systemd-logind: Removed session",
        "solution": "User session was removed. This is usually a normal operation."
    },
    {
        "text": "xinetd[4580]: EXIT: mysql_status status=0",
        "solution": "MySQL status check is normal. No action required."
    },
    {
        "text": "xinetd[4580]: EXIT: zk_status status=0",
        "solution": "Zookeeper status check is normal. No action required."
    },
    {
        "text": "python: /usr/lib/python2.7/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is",
        "solution": "It is not recommended to run celery worker as root. Please use a regular user."
    },
    {
        "text": "infinity[4139103]: an error occurred while requesting bindings <urlopen error [Errno 111] Connection refused>",
        "solution": "Check network connectivity and ensure the target service port is open."
    },
    {
        "text": "kill: kill: cannot find process",
        "solution": "The target process does not exist. Please verify the process ID."
    }
    ]
]
kb_texts = [entry['text'] for entry in kb]
kb_embs = embedder.encode(kb_texts)
dim = kb_embs.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(kb_embs)

def retrieve_solutions(text, k=3):
    emb = embedder.encode([text])
    D, I = index.search(np.array(emb), k)
    return " ".join(kb[i]['solution'] for i in I[0])

# 3. Prepare datasets
d_train = Dataset.from_pandas(train_df)
d_test = Dataset.from_pandas(test_df)

tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen3-0.6B')
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

# Tokenize function
def tokenize_fn(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=128)

d_train = d_train.map(tokenize_fn, batched=True)
d_test = d_test.map(tokenize_fn, batched=True)

# Rename & format
d_train = d_train.rename_column('label', 'labels')
d_test = d_test.rename_column('label', 'labels')
for ds in (d_train, d_test):
    ds.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])

# 4. Model & LoRA setup
model = AutoModelForSequenceClassification.from_pretrained(
    'Qwen/Qwen3-0.6B', num_labels=num_labels
)
model.config.pad_token_id = tokenizer.pad_token_id

lora_cfg = LoraConfig(
    r=8,
    lora_alpha=16,
    target_modules=['q_proj', 'v_proj'],
    lora_dropout=0.1,
    bias='none'
)
model = get_peft_model(model, lora_cfg)
model.to('cuda')

# 5. 修复的Metrics函数
def compute_metrics(eval_pred):
    """计算评估指标"""
    try:
        predictions, labels = eval_pred
        print(f"Debug: predictions shape: {predictions.shape if hasattr(predictions, 'shape') else type(predictions)}")
        print(f"Debug: labels shape: {labels.shape if hasattr(labels, 'shape') else type(labels)}")
        
        # 确保predictions是numpy数组
        if isinstance(predictions, tuple):
            predictions = predictions[0]
        
        # 转换为numpy数组
        if not isinstance(predictions, np.ndarray):
            predictions = np.array(predictions)
        if not isinstance(labels, np.ndarray):
            labels = np.array(labels)
        
        # 获取预测结果
        predictions = np.argmax(predictions, axis=-1)
        
        # 计算指标
        accuracy = accuracy_score(labels, predictions)
        f1 = f1_score(labels, predictions, average='weighted', zero_division=0)
        precision = precision_score(labels, predictions, average='weighted', zero_division=0)
        recall = recall_score(labels, predictions, average='weighted', zero_division=0)
        
        result = {
            'accuracy': float(accuracy),
            'f1': float(f1),
            'precision': float(precision),
            'recall': float(recall)
        }
        
        print(f"Debug: computed metrics: {result}")
        return result
        
    except Exception as e:
        print(f"Error in compute_metrics: {e}")
        # 返回默认指标以避免训练中断
        return {
            'accuracy': 0.0,
            'f1': 0.0,
            'precision': 0.0,
            'recall': 0.0
        }

# 6. Custom Trainer
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs['labels']
        outputs = model(**inputs)
        logits = outputs.logits
        loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights)
        loss = loss_fn(logits, labels)
        return (loss, outputs) if return_outputs else loss

# 7. 修改后的Training Arguments - 使用更保守的设置
training_args = TrainingArguments(
    output_dir='./results',
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    learning_rate=1e-5,
    weight_decay=0.01,
    warmup_steps=100,
    logging_dir='./logs',
    logging_steps=10,
    eval_strategy='steps',
    eval_steps=100,
    save_steps=100,
    # 临时禁用最佳模型加载以避免指标问题
    load_best_model_at_end=False,
    # metric_for_best_model='eval_f1',  # 暂时注释掉
    # greater_is_better=True,  # 暂时注释掉
    fp16=True,
    dataloader_drop_last=False,
    # 确保评估数据集被使用
    do_eval=True,
    # 添加更多调试信息
    report_to=None,  # 禁用wandb等报告
)

# 8. Initialize Trainer - 添加调试信息
print("初始化Trainer...")
print(f"训练数据集大小: {len(d_train)}")
print(f"测试数据集大小: {len(d_test)}")
print(f"数据集列: {d_train.column_names}")

# 检查数据集格式
sample_batch = d_train[:2]
print(f"样本批次键: {sample_batch.keys()}")
for key, value in sample_batch.items():
    if hasattr(value, 'shape'):
        print(f"{key} shape: {value.shape}")
    else:
        print(f"{key} type: {type(value)}")

trainer = WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=d_train,
    eval_dataset=d_test,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

# 验证compute_metrics函数
print("测试compute_metrics函数...")
try:
    # 创建一个测试样本
    test_logits = np.random.rand(4, num_labels)  # 4个样本，3个类别
    test_labels = np.array([0, 1, 2, 1])
    test_result = compute_metrics((test_logits, test_labels))
    print(f"compute_metrics测试成功: {test_result}")
except Exception as e:
    print(f"compute_metrics测试失败: {e}")

# 9. Train
print("开始训练...")
trainer.train()

# 10. RAG Inference
def rag_predict(texts, model, tokenizer, k=3):
    model.eval()
    preds = []
    for txt in texts:
        context = retrieve_solutions(txt, k=k)
        inp = tokenizer(f"{txt} [CONTEXT] {context}",
                        return_tensors='pt',
                        padding=True,
                        truncation=True,
                        max_length=128)
        inp = {k: v.to('cuda') for k, v in inp.items()}
        with torch.no_grad():
            out = model(**inp)
        pred = torch.argmax(out.logits, dim=-1).cpu().item()
        preds.append(pred)
    return preds

# 11. 修复的评估部分
print("开始评估...")
test_texts = test_df['text'].tolist()
y_true = test_df['label'].tolist()

# 使用RAG预测
y_pred_rag = rag_predict(test_texts, model, tokenizer)

# 计算指标
accuracy = accuracy_score(y_true, y_pred_rag)
f1 = f1_score(y_true, y_pred_rag, average='weighted')
precision = precision_score(y_true, y_pred_rag, average='weighted')
recall = recall_score(y_true, y_pred_rag, average='weighted')

print("RAG预测结果:")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

# 混淆矩阵
cm = confusion_matrix(y_true, y_pred_rag)
print("\n混淆矩阵:")
print(cm)

# 按类别显示结果
print(f"\n标签映射: {id_to_label}")
for i, label in id_to_label.items():
    mask = np.array(y_true) == i
    if mask.sum() > 0:
        class_acc = accuracy_score(np.array(y_true)[mask], np.array(y_pred_rag)[mask])
        print(f"{label} 类准确率: {class_acc:.4f}")

Map:   0%|          | 0/2805 [00:00<?, ? examples/s]

Map:   0%|          | 0/1000 [00:00<?, ? examples/s]

Some weights of Qwen3ForSequenceClassification were not initialized from the model checkpoint at Qwen/Qwen3-0.6B and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = WeightedTrainer(
No label_names provided for model class `PeftModel`. Since `PeftModel` hides base models input arguments, if label_names is not given, label_names can't be set automatically within `Trainer`. Note that empty label_names list will be used instead.


初始化Trainer...
训练数据集大小: 2805
测试数据集大小: 1000
数据集列: ['timestamp', 'text', 'labels', 'input_ids', 'attention_mask']
样本批次键: dict_keys(['labels', 'input_ids', 'attention_mask'])
labels shape: torch.Size([2])
input_ids shape: torch.Size([2, 128])
attention_mask shape: torch.Size([2, 128])
测试compute_metrics函数...
Debug: predictions shape: (4, 3)
Debug: labels shape: (4,)
Debug: computed metrics: {'accuracy': 0.0, 'f1': 0.0, 'precision': 0.0, 'recall': 0.0}
compute_metrics测试成功: {'accuracy': 0.0, 'f1': 0.0, 'precision': 0.0, 'recall': 0.0}
开始训练...


Step,Training Loss,Validation Loss
100,1.6017,No log
200,0.4997,No log
300,0.1174,No log
400,0.0325,No log
500,0.0054,No log
600,0.0013,No log
700,0.0009,No log
800,0.0006,No log
900,0.001,No log
1000,0.0007,No log


开始评估...
RAG预测结果:
Accuracy: 0.9290
F1 Score: 0.8958
Precision: 0.9290
Recall: 0.9290

混淆矩阵:
[[928   0   0]
 [ 66   1   0]
 [  5   0   0]]

normal 类准确率: 1.0000
abnormal 类准确率: 0.0000


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
