diff --git a/client/src/api/index.ts b/client/src/api/index.ts index 52d1887..aa5eb29 100644 --- a/client/src/api/index.ts +++ b/client/src/api/index.ts @@ -2,6 +2,7 @@ import axios from 'axios' // 创建 axios 实例 const api = axios.create({ + // baseURL: (import.meta as any).env?.VITE_API_BASE_URL || 'http://10.210.10.33:8080', timeout: 10000, headers: { 'Content-Type': 'application/json' diff --git a/client/src/mock/services.ts b/client/src/mock/services.ts index 1894f22..46ce50d 100644 --- a/client/src/mock/services.ts +++ b/client/src/mock/services.ts @@ -1845,27 +1845,9 @@ loadServiceAlertStatus() */ export const serviceVersionAlertStatusMap: Record> = {} -const saveServiceVersionAlertStatus = () => { - try { - localStorage.setItem('serviceVersionAlertStatusMap', JSON.stringify(serviceVersionAlertStatusMap)) - console.log('服务版本告警状态已保存到 localStorage') - } catch (error) { - console.error('保存服务版本告警状态失败:', error) - } -} +const saveServiceVersionAlertStatus = () => {} -const loadServiceVersionAlertStatus = () => { - try { - const data = localStorage.getItem('serviceVersionAlertStatusMap') - if (data) { - const parsed = JSON.parse(data) - Object.assign(serviceVersionAlertStatusMap, parsed) - console.log('已从 localStorage 加载服务版本告警状态') - } - } catch (error) { - console.error('从 localStorage 加载服务版本告警状态失败:', error) - } -} +const loadServiceVersionAlertStatus = () => {} /** * 根据告警状态更新服务版本状态 @@ -1919,8 +1901,7 @@ export const clearServiceVersionAlertStatus = (serviceName: string, version?: st console.log(`已清除服务 ${serviceName} ${version ? '版本 ' + version : '所有版本'} 的告警状态`) } -// 页面加载时恢复服务版本告警状态 -loadServiceVersionAlertStatus() +// 页面加载时不再从 localStorage 恢复服务版本告警状态(禁用持久化) // ==================== 发布任务状态管理 ==================== // 管理服务的发布任务状态,用于显示发布指示器 diff --git a/client/src/views/ChangeLogView.vue b/client/src/views/ChangeLogView.vue index 98c4aff..3c39d73 100644 --- a/client/src/views/ChangeLogView.vue +++ b/client/src/views/ChangeLogView.vue @@ -84,11 +84,31 @@ import { ref, computed, onMounted, watch } from 'vue' import { useAppStore, type ChangeItem, type AlarmChangeItem } from '@/stores/app' import { mockApi } from '@/mock/api' -import type { DeploymentChangelogResponse, DeploymentChangelogItem, AlertRuleChangelogResponse, AlertRuleChangeItem } from '@/mock/services' +import { apiService } from '@/api' +import type { DeploymentChangelogResponse } from '@/mock/services' import ChangeCard from '@/components/ChangeCard.vue' import AlarmChangeCard from '@/components/AlarmChangeCard.vue' import { ArrowLeft, Loading } from '@element-plus/icons-vue' +interface AlertRuleChangeValue { + name: string + old: string + new: string +} + +interface AlertRuleChangeItem { + name: string + editTime: string + scope: string + values: AlertRuleChangeValue[] + reason: string +} + +interface AlertRuleChangelogResponse { + items: AlertRuleChangeItem[] + next?: string +} + const appStore = useAppStore() const activeTab = ref('service') @@ -160,7 +180,7 @@ const transformAlertRuleChangelogToAlarmChangeItems = (changelogData: AlertRuleC const serviceName = item.scope?.startsWith('service:') ? item.scope.slice('service:'.length) + '服务' : '全局服务' // 构建变更描述 - const changeDescription = item.values.map(value => { + const changeDescription = item.values.map((value) => { return `${value.name}: ${value.old} -> ${value.new}` }).join(', ') @@ -200,7 +220,7 @@ const loadDeploymentChangelog = async (start?: string, limit?: number) => { } } -// 加载告警规则变更记录 +// 加载告警规则变更记录(使用真实 API) const loadAlertRuleChangelog = async (start?: string, limit?: number) => { if (alertRuleLoading.value) return // 防止重复加载 @@ -208,13 +228,13 @@ const loadAlertRuleChangelog = async (start?: string, limit?: number) => { alertRuleLoading.value = true error.value = null - const response = await mockApi.getAlertRuleChangelog(start, limit) - alertRuleChangelog.value = response + const response = await apiService.getAlertRuleChangelog(start, limit ?? 10) + alertRuleChangelog.value = response.data // 转换数据格式 - alarmChangeItems.value = transformAlertRuleChangelogToAlarmChangeItems(response.items) + alarmChangeItems.value = transformAlertRuleChangelogToAlarmChangeItems(response.data.items) - console.log('告警规则变更记录加载成功:', response) + console.log('告警规则变更记录加载成功:', response.data) } catch (err) { error.value = '加载告警规则变更记录失败' console.error('加载告警规则变更记录失败:', err) diff --git a/client/vite.config.ts b/client/vite.config.ts index cf2a4e3..bb2aaf8 100644 --- a/client/vite.config.ts +++ b/client/vite.config.ts @@ -18,7 +18,7 @@ export default defineConfig({ server: { proxy: { '/v1': { - target: 'http://127.0.0.1:8080', + target: 'http://10.210.10.33:8080', changeOrigin: true, secure: false, } diff --git a/cmd/zeroops/Dockerfile b/cmd/zeroops/Dockerfile index 50e0f3e..9c4b523 100644 --- a/cmd/zeroops/Dockerfile +++ b/cmd/zeroops/Dockerfile @@ -1,11 +1,11 @@ -FROM golang:1.24-alpine AS builder +FROM docker.m.daocloud.io/library/golang:1.24-alpine AS builder WORKDIR /src COPY go.mod go.sum ./ RUN go mod download COPY . . RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /out/zeroops ./cmd/zeroops -FROM gcr.io/distroless/base-debian12 +FROM gcr.m.daocloud.io/distroless/base-debian12:nonroot WORKDIR /app COPY --from=builder /out/zeroops /app/zeroops # 复制配置文件目录 diff --git a/cmd/zeroops/main.go b/cmd/zeroops/main.go index f779473..9e8aae4 100644 --- a/cmd/zeroops/main.go +++ b/cmd/zeroops/main.go @@ -6,7 +6,7 @@ import ( "strconv" "time" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" alertapi "github.com/qiniu/zeroops/internal/alerting/api" adb "github.com/qiniu/zeroops/internal/alerting/database" "github.com/qiniu/zeroops/internal/alerting/service/healthcheck" @@ -96,22 +96,24 @@ func main() { go rem.Start(ctx, alertCh) // start Prometheus anomaly detection scheduler - promInterval := parseDuration(cfg.Alerting.Prometheus.SchedulerInterval, 6*time.Hour) + promInterval := parseDuration(cfg.Alerting.Prometheus.SchedulerInterval, 5*time.Minute) promStep := parseDuration(cfg.Alerting.Prometheus.QueryStep, time.Minute) promRange := parseDuration(cfg.Alerting.Prometheus.QueryRange, 6*time.Hour) promCfg := healthcheck.NewPrometheusConfigFromApp(&cfg.Alerting.Prometheus) - promClient := healthcheck.NewPrometheusClient(promCfg) + anomalyDetectClient := healthcheck.NewAnomalyDetectClient(promCfg) go healthcheck.StartPrometheusScheduler(ctx, healthcheck.PrometheusDeps{ - DB: alertDB, - PrometheusClient: promClient, - Interval: promInterval, - QueryStep: promStep, - QueryRange: promRange, - RulesetBase: cfg.Alerting.Ruleset.APIBase, - RulesetTimeout: parseDuration(cfg.Alerting.Ruleset.APITimeout, 10*time.Second), + DB: alertDB, + AnomalyDetectClient: anomalyDetectClient, + Interval: promInterval, + QueryStep: promStep, + QueryRange: promRange, + RulesetBase: cfg.Alerting.Ruleset.APIBase, + RulesetTimeout: parseDuration(cfg.Alerting.Ruleset.APITimeout, 10*time.Second), }) - router := fox.New() + router := gin.New() + router.Use(gin.Logger()) + router.Use(gin.Recovery()) router.Use(middleware.Authentication) alertapi.NewApiWithConfig(router, cfg) if err := serviceManagerSrv.UseApi(router); err != nil { diff --git a/configs/alerting/rules.json b/configs/alerting/rules.json index ca5e571..488d7f2 100644 --- a/configs/alerting/rules.json +++ b/configs/alerting/rules.json @@ -6,7 +6,7 @@ "expr": "histogram_quantile(0.98, sum(rate(http_latency_seconds_bucket{}[2m])) by (service, service_version, le))", "op": ">", "severity": "P0", - "watch_time": "5 minutes", + "watch_time": "5m", "metas": [ { "labels": { "service": "storage-service", "service_version": "1.0.0" }, "threshold": 1000 }, { "labels": { "service": "queue-service", "service_version": "1.0.0" }, "threshold": 1000 }, @@ -19,7 +19,7 @@ "expr": "histogram_quantile(0.98, sum(rate(http_latency_seconds_bucket{}[2m])) by (service, service_version, le))", "op": ">", "severity": "P1", - "watch_time": "4 minutes", + "watch_time": "4m", "metas": [ { "labels": { "service": "storage-service", "service_version": "1.0.0" }, "threshold": 500 }, { "labels": { "service": "queue-service", "service_version": "1.0.0" }, "threshold": 500 }, @@ -31,7 +31,7 @@ "description":"HTTP error rate by service P0", "op":">", "severity":"P0", - "watch_time":"5 minutes", + "watch_time":"5m", "expr":"sum(rate(http_latency_seconds_count{\"http.status_code\"=~\"4..|5..\", \"http.route\"!=\"/metrics\"}[2m])) by (service, service_version) / sum(rate(http_latency_seconds_count{\"http.route\"!=\"/metrics\"}[2m])) by (service, service_version)", "metas":[ {"labels":{"service":"storage-service","service_version":"1.0.0"},"threshold":5}, @@ -44,7 +44,7 @@ "description":"HTTP error rate by service P1", "op":">", "severity":"P1", - "watch_time":"5 minutes", + "watch_time":"5m", "expr":"sum(rate(http_latency_seconds_count{\"http.status_code\"=~\"4..|5..\", \"http.route\"!=\"/metrics\"}[2m])) by (service, service_version) / sum(rate(http_latency_seconds_count{\"http.route\"!=\"/metrics\"}[2m])) by (service, service_version)", "metas":[ {"labels":{"service":"storage-service","service_version":"1.0.0"},"threshold":3}, diff --git a/docker-compose.override.yml b/docker-compose.override.yml similarity index 100% rename from docker-compose.override.yml rename to docker-compose.override.yml diff --git a/docs/alerting/api.md b/docs/alerting/api.md index 80d4735..a5f1b56 100644 --- a/docs/alerting/api.md +++ b/docs/alerting/api.md @@ -315,6 +315,49 @@ curl -X POST http://localhost:8080/v1/integrations/alertmanager/webhook \ }' ``` +### 4. 获取告警规则变更记录 + +用于查询统一化告警规则的变更记录(阈值、观察窗口等),支持按时间游标分页。 + +**请求:** +```http +GET /v1/changelog/alertrules?start={start}&limit={limit} +``` + +**查询参数:** + +| 参数名 | 类型 | 必填 | 说明 | +|--------|------|------|------| +| start | string | 否 | 游标时间(ISO 8601)。第一页可不传;翻页使用上次响应的 `next` | +| limit | integer | 是 | 返回数量,范围 1-100 | + +分页说明:按 `change_time` 倒序返回,`start` 为上界(`<= start`)。响应中的 `next` 为当前页最后一条的 `editTime`。 + +**响应示例:** +```json +{ + "items": [ + { + "name": "http_request_latency_p98_seconds_P1", + "editTime": "2024-01-03T03:00:00Z", + "scope": "", + "values": [ + {"name": "threshold", "old": "10", "new": "15"} + ], + "reason": "Update" + } + ], + "next": "2024-01-03T03:00:00Z" +} +``` + +**状态码:** +- `200 OK`: 成功 +- `400 Bad Request`: 参数错误 +- `401 Unauthorized`: 认证失败 +- `500 Internal Server Error`: 服务器内部错误 + ## 版本历史 +- **v1.1** (2025-10-07): 新增 `GET /v1/changelog/alertrules` - **v1.0** (2025-09-11): 初始版本,支持基础的告警列表和详情查询 diff --git a/docs/alerting/database-design.md b/docs/alerting/database-design.md index 5de6ef7..7b2ff1b 100644 --- a/docs/alerting/database-design.md +++ b/docs/alerting/database-design.md @@ -13,7 +13,7 @@ ## 数据表设计 -### 1) talert_issues(告警问题表) +### 1) alert_issues(告警问题表) 存储告警问题的主要信息。 @@ -23,7 +23,7 @@ | state | enum(Closed, Open) | 问题状态 | | level | varchar(32) | 告警等级:如 P0/P1/Px | | alert_state | enum(Pending, Restored, AutoRestored, InProcessing) | 处理状态 | -| title | varchar(255) | 告警标题 | +| title | varchar(255) | 告警标题 | labels | json | 标签,格式:[{key, value}] | | alert_since | TIMESTAMP(6) | 告警发生时间 | | resolved_at | TIMESTAMP(6) | 告警结束时间 | @@ -64,8 +64,6 @@ | labels | text | labels 的 JSON 字符串表示(规范化后) | | old_threshold | numeric | 旧阈值(可空) | | new_threshold | numeric | 新阈值(可空) | -| old_watch | interval | 旧观察窗口(可空) | -| new_watch | interval | 新观察窗口(可空) | **索引建议:** diff --git a/go.mod b/go.mod index 6094f9c..65daf58 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/qiniu/zeroops go 1.24 require ( - github.com/fox-gonic/fox v0.0.6 + github.com/gin-gonic/gin v1.10.1 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.5.5 github.com/lib/pq v1.10.9 diff --git a/internal/alerting/README.md b/internal/alerting/README.md index 68adbfd..af756db 100644 --- a/internal/alerting/README.md +++ b/internal/alerting/README.md @@ -170,9 +170,9 @@ docker exec -i zeroops-postgres-1 psql -U postgres -d zeroops -c \ "CREATE TABLE IF NOT EXISTS alert_issue_comments (issue_id text, create_at timestamp, content text, PRIMARY KEY(issue_id, create_at));" ``` -### 2) 初始化/重置规则表(alert_rules / alert_rule_metas) +### 2) 初始化/重置规则表(alert_rules / alert_rule_metas / alert_meta_change_logs) -注意:该脚本会 DROP 并重建 `alert_rules` 与 `alert_rule_metas`,仅用于本地/开发环境。 +注意:该脚本会 DROP 并重建 `alert_rules`、`alert_rule_metas` 和 `alert_meta_change_logs`,仅用于本地/开发环境。 脚本位置:`scripts/sql/alert_rules_bootstrap.sql` @@ -191,12 +191,13 @@ psql -U postgres -d zeroops -f scripts/sql/alert_rules_bootstrap.sql ```bash docker exec -i zeroops-postgres-1 psql -U postgres -d zeroops -c "SELECT name, severity FROM alert_rules;" docker exec -i zeroops-postgres-1 psql -U postgres -d zeroops -c "SELECT alert_name, labels, threshold FROM alert_rule_metas;" +docker exec -i zeroops-postgres-1 psql -U postgres -d zeroops -c "SELECT alert_name, change_type, change_time FROM alert_meta_change_logs;" ``` ### 2) 清空数据库与缓存(可选,保证从空开始) ```bash -docker exec -i zeroops-pg psql -U postgres -d zeroops -c "TRUNCATE TABLE alert_issue_comments, service_states, alert_issues;" +docker exec -i zeroops-pg psql -U postgres -d zeroops -c "TRUNCATE TABLE alert_issue_comments, service_states, alert_issues, alert_meta_change_logs;" docker exec -i zeroops-redis redis-cli --raw DEL $(docker exec -i zeroops-redis redis-cli --raw KEYS 'alert:*' | tr '\n' ' ') 2>/dev/null || true docker exec -i zeroops-redis redis-cli --raw DEL $(docker exec -i zeroops-redis redis-cli --raw KEYS 'service_state:*' | tr '\n' ' ') 2>/dev/null || true ``` diff --git a/internal/alerting/api/api.go b/internal/alerting/api/api.go index 51a3224..dfedcb8 100644 --- a/internal/alerting/api/api.go +++ b/internal/alerting/api/api.go @@ -3,7 +3,7 @@ package api import ( "fmt" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" adb "github.com/qiniu/zeroops/internal/alerting/database" "github.com/qiniu/zeroops/internal/alerting/service/healthcheck" receiver "github.com/qiniu/zeroops/internal/alerting/service/receiver" @@ -12,15 +12,15 @@ import ( type Api struct{} -func NewApi(router *fox.Engine) *Api { return NewApiWithConfig(router, nil) } +func NewApi(router *gin.Engine) *Api { return NewApiWithConfig(router, nil) } -func NewApiWithConfig(router *fox.Engine, cfg *config.Config) *Api { +func NewApiWithConfig(router *gin.Engine, cfg *config.Config) *Api { api := &Api{} api.setupRouters(router, cfg) return api } -func (api *Api) setupRouters(router *fox.Engine, cfg *config.Config) { +func (api *Api) setupRouters(router *gin.Engine, cfg *config.Config) { var h *receiver.Handler var alertDB *adb.Database if cfg != nil { diff --git a/internal/alerting/api/issues_api.go b/internal/alerting/api/issues_api.go index 043a84a..83e029f 100644 --- a/internal/alerting/api/issues_api.go +++ b/internal/alerting/api/issues_api.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" adb "github.com/qiniu/zeroops/internal/alerting/database" "github.com/qiniu/zeroops/internal/config" "github.com/redis/go-redis/v9" @@ -21,13 +21,14 @@ type IssueAPI struct { // RegisterIssueRoutes registers issue query routes. If rdb is nil, a client is created from env. // db can be nil; when nil, comments will be empty. -func RegisterIssueRoutes(router *fox.Engine, rdb *redis.Client, db *adb.Database) { +func RegisterIssueRoutes(router *gin.Engine, rdb *redis.Client, db *adb.Database) { if rdb == nil { rdb = newRedisFromEnv() } api := &IssueAPI{R: rdb, DB: db} router.GET("/v1/issues/:issueID", api.GetIssueByID) router.GET("/v1/issues", api.ListIssues) + router.GET("/v1/changelog/alertrules", api.ListAlertRuleChangeLogs) } func newRedisFromEnv() *redis.Client { return nil } @@ -70,7 +71,7 @@ type comment struct { Content string `json:"content"` } -func (api *IssueAPI) GetIssueByID(c *fox.Context) { +func (api *IssueAPI) GetIssueByID(c *gin.Context) { issueID := c.Param("issueID") if issueID == "" { c.JSON(http.StatusBadRequest, map[string]any{"error": map[string]any{"code": "INVALID_PARAMETER", "message": "missing issueID"}}) @@ -162,7 +163,7 @@ type issueListItem struct { AlertSince string `json:"alertSince"` } -func (api *IssueAPI) ListIssues(c *fox.Context) { +func (api *IssueAPI) ListIssues(c *gin.Context) { start := strings.TrimSpace(c.Query("start")) limitStr := strings.TrimSpace(c.Query("limit")) if limitStr == "" { @@ -257,3 +258,138 @@ func (api *IssueAPI) ListIssues(c *fox.Context) { } c.JSON(http.StatusOK, resp) } + +// ===== Alert Rule ChangeLog ===== + +type alertRuleChangeValue struct { + Name string `json:"name"` + Old string `json:"old"` + New string `json:"new"` +} + +type alertRuleChangeItem struct { + Name string `json:"name"` + EditTime string `json:"editTime"` + Scope string `json:"scope"` + Values []alertRuleChangeValue `json:"values"` + Reason string `json:"reason"` +} + +type alertRuleChangeListResponse struct { + Items []alertRuleChangeItem `json:"items"` + Next string `json:"next,omitempty"` +} + +// ListAlertRuleChangeLogs implements GET /v1/changelog/alertrules?start=...&limit=... +func (api *IssueAPI) ListAlertRuleChangeLogs(c *gin.Context) { + if api.DB == nil { + c.JSON(http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": "INTERNAL_ERROR", "message": "database not configured"}}) + return + } + + start := strings.TrimSpace(c.Query("start")) + limitStr := strings.TrimSpace(c.Query("limit")) + if limitStr == "" { + c.JSON(http.StatusBadRequest, map[string]any{"error": map[string]any{"code": "INVALID_PARAMETER", "message": "limit is required"}}) + return + } + limit, err := strconv.Atoi(limitStr) + if err != nil || limit < 1 || limit > 100 { + c.JSON(http.StatusBadRequest, map[string]any{"error": map[string]any{"code": "INVALID_PARAMETER", "message": "limit must be 1-100"}}) + return + } + + var ( + q string + args []any + ) + if start == "" { + q = ` +SELECT alert_name, change_time, labels, old_threshold, new_threshold, change_type +FROM alert_meta_change_logs +WHERE change_type = 'Update' +ORDER BY change_time DESC +LIMIT $1` + args = append(args, limit) + } else { + if _, err := time.Parse(time.RFC3339, start); err != nil { + if _, err2 := time.Parse(time.RFC3339Nano, start); err2 != nil { + c.JSON(http.StatusBadRequest, map[string]any{"error": map[string]any{"code": "INVALID_PARAMETER", "message": "start must be ISO 8601 time"}}) + return + } + } + q = ` +SELECT alert_name, change_time, labels, old_threshold, new_threshold, change_type +FROM alert_meta_change_logs +WHERE change_time <= $1 AND change_type = 'Update' +ORDER BY change_time DESC +LIMIT $2` + args = append(args, start, limit) + } + + rows, err := api.DB.QueryContext(c.Request.Context(), q, args...) + if err != nil { + c.JSON(http.StatusInternalServerError, map[string]any{"error": map[string]any{"code": "INTERNAL_ERROR", "message": err.Error()}}) + return + } + defer rows.Close() + + items := make([]alertRuleChangeItem, 0, limit) + var lastTime string + for rows.Next() { + var ( + name string + changeTime time.Time + labelsRaw string + oldTh *float64 + newTh *float64 + changeType string + ) + if err := rows.Scan(&name, &changeTime, &labelsRaw, &oldTh, &newTh, &changeType); err != nil { + continue + } + scope := "" + var lm map[string]any + if err := json.Unmarshal([]byte(labelsRaw), &lm); err == nil { + if svc, ok := lm["service"].(string); ok && svc != "" { + scope = "service:" + svc + if ver, ok := lm["service_version"].(string); ok && ver != "" { + scope = scope + "v" + ver + } + } + } + + values := make([]alertRuleChangeValue, 0, 2) + if oldTh != nil || newTh != nil { + values = append(values, alertRuleChangeValue{ + Name: "threshold", + Old: floatToString(oldTh), + New: floatToString(newTh), + }) + } + + item := alertRuleChangeItem{ + Name: name, + EditTime: changeTime.UTC().Format(time.RFC3339), + Scope: scope, + Values: values, + Reason: "检测到异常且未发生告警,降低阈值以尽早发现问题", + } + items = append(items, item) + lastTime = item.EditTime + } + + resp := alertRuleChangeListResponse{Items: items} + if lastTime != "" { + resp.Next = lastTime + } + c.JSON(http.StatusOK, resp) +} + +func floatToString(p *float64) string { + if p == nil { + return "" + } + s := strconv.FormatFloat(*p, 'f', -1, 64) + return s +} diff --git a/internal/alerting/service/healthcheck/bootstrap.go b/internal/alerting/service/healthcheck/bootstrap.go index 1294485..f55f76a 100644 --- a/internal/alerting/service/healthcheck/bootstrap.go +++ b/internal/alerting/service/healthcheck/bootstrap.go @@ -83,9 +83,14 @@ func BootstrapRulesFromConfigWithApp(ctx context.Context, db *adb.Database, c *c for _, m := range r.Metas { labelsJSON, _ := json.Marshal(m.Labels) metaUpdates = append(metaUpdates, ruleMetaUpdate{Labels: string(labelsJSON), Threshold: m.Threshold}) + log.Debug().Str("rule", r.Name). + Str("bootstrap_labels", string(labelsJSON)). + Str("bootstrap_labels_ckey", canonicalKeyFromLabelsJSON(string(labelsJSON))). + Float64("bootstrap_threshold", m.Threshold). + Msg("bootstrap meta item") } if len(metaUpdates) > 0 { - if err := putRuleMetasBootstrap(ctx, client, base, r.Name, metaUpdates); err != nil { + if err := putRuleMetas(ctx, client, base, r.Name, metaUpdates); err != nil { log.Error().Err(err).Str("rule", r.Name).Msg("external PUT rule metas failed") continue } @@ -147,13 +152,16 @@ func parseWatchTimeToSeconds(watchTime string) (int, error) { return int(duration.Seconds()), nil } -// putRuleMetasBootstrap calls PUT /v1/alert-rules-meta/{rule_name} with the correct format -func putRuleMetasBootstrap(ctx context.Context, client *http.Client, base, ruleName string, metas []ruleMetaUpdate) error { +// putRuleMetas calls PUT /v1/alert-rules-meta/{rule_name} with the correct format +func putRuleMetas(ctx context.Context, client *http.Client, base, ruleName string, metas []ruleMetaUpdate) error { endpoint := base + "/v1/alert-rules-meta/" + url.PathEscape(ruleName) + log.Debug().Str("endpoint", endpoint).Str("rule_name", ruleName).Int("meta_count", len(metas)).Msg("prepared bootstrap ruleset meta PUT endpoint") body := map[string]interface{}{ "metas": metas, } + log.Debug().Any("body", body).Msg("bootstrap ruleset meta PUT body") bs, _ := json.Marshal(body) + log.Debug().Str("bs", string(bs)).Msg("bootstrap ruleset meta PUT bs") req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, io.NopCloser(strings.NewReader(string(bs)))) if err != nil { return err diff --git a/internal/alerting/service/healthcheck/prometheus.go b/internal/alerting/service/healthcheck/prometheus.go index 4059fad..e8f57fc 100644 --- a/internal/alerting/service/healthcheck/prometheus.go +++ b/internal/alerting/service/healthcheck/prometheus.go @@ -17,17 +17,17 @@ import ( "github.com/rs/zerolog/log" ) -// PrometheusConfig holds configuration for Prometheus client -type PrometheusConfig struct { +// AnomalyDetectConfig holds configuration for Prometheus client +type AnomalyDetectConfig struct { BaseURL string QueryTimeout time.Duration AnomalyAPIURL string AnomalyAPITimeout time.Duration } -// NewPrometheusConfigFromEnv creates PrometheusConfig from environment variables -func NewPrometheusConfigFromEnv() *PrometheusConfig { - return &PrometheusConfig{ +// NewAnomalyDetectConfigFromEnv creates PrometheusConfig from environment variables +func NewAnomalyDetectConfigFromEnv() *AnomalyDetectConfig { + return &AnomalyDetectConfig{ BaseURL: getEnvOrDefault("PROMETHEUS_URL", "http://localhost:9090"), QueryTimeout: getDurationFromEnv("PROMETHEUS_QUERY_TIMEOUT", 30*time.Second), AnomalyAPIURL: getEnvOrDefault("ANOMALY_DETECTION_API_URL", "http://localhost:8081/api/v1/anomaly/detect"), @@ -36,13 +36,13 @@ func NewPrometheusConfigFromEnv() *PrometheusConfig { } // NewPrometheusConfigFromApp converts app config to runtime PrometheusConfig -func NewPrometheusConfigFromApp(c *config.PrometheusConfig) *PrometheusConfig { +func NewPrometheusConfigFromApp(c *config.PrometheusConfig) *AnomalyDetectConfig { if c == nil { - return NewPrometheusConfigFromEnv() + return NewAnomalyDetectConfigFromEnv() } qt, _ := time.ParseDuration(getNonEmpty(c.QueryTimeout, "30s")) at, _ := time.ParseDuration(getNonEmpty(c.AnomalyAPITimeout, "10s")) - return &PrometheusConfig{ + return &AnomalyDetectConfig{ BaseURL: getNonEmpty(c.URL, "http://localhost:9090"), QueryTimeout: qt, AnomalyAPIURL: getNonEmpty(c.AnomalyAPIURL, "http://localhost:8081/api/v1/anomaly/detect"), @@ -50,15 +50,15 @@ func NewPrometheusConfigFromApp(c *config.PrometheusConfig) *PrometheusConfig { } } -// PrometheusClient handles communication with Prometheus -type PrometheusClient struct { - config *PrometheusConfig +// AnomalyDetectClient handles communication with Prometheus +type AnomalyDetectClient struct { + config *AnomalyDetectConfig httpClient *http.Client } -// NewPrometheusClient creates a new Prometheus client -func NewPrometheusClient(config *PrometheusConfig) *PrometheusClient { - return &PrometheusClient{ +// NewAnomalyDetectClient creates a new Prometheus client +func NewAnomalyDetectClient(config *AnomalyDetectConfig) *AnomalyDetectClient { + return &AnomalyDetectClient{ config: config, httpClient: &http.Client{ Timeout: config.QueryTimeout, @@ -67,7 +67,7 @@ func NewPrometheusClient(config *PrometheusConfig) *PrometheusClient { } // QueryRange executes a Prometheus query_range request -func (c *PrometheusClient) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (*PrometheusResponse, error) { +func (c *AnomalyDetectClient) QueryRange(ctx context.Context, query string, start, end time.Time, step time.Duration) (*PrometheusResponse, error) { params := url.Values{} params.Set("query", query) params.Set("start", strconv.FormatInt(start.Unix(), 10)) @@ -105,7 +105,7 @@ func (c *PrometheusClient) QueryRange(ctx context.Context, query string, start, } // DetectAnomalies calls the anomaly detection API for each time series individually -func (c *PrometheusClient) DetectAnomalies(ctx context.Context, timeSeries []PrometheusTimeSeries, queries []PrometheusQuery) (*AnomalyDetectionResponse, error) { +func (c *AnomalyDetectClient) DetectAnomalies(ctx context.Context, timeSeries []PrometheusTimeSeries, queries []PrometheusQuery) (*AnomalyDetectionResponse, error) { var allAnomalies []Anomaly // Process each time series individually @@ -135,7 +135,7 @@ func (c *PrometheusClient) DetectAnomalies(ctx context.Context, timeSeries []Pro // detectAnomaliesForSingleTimeSeries calls the anomaly detection API for a single time series -func (c *PrometheusClient) detectAnomaliesForSingleTimeSeries(ctx context.Context, timeSeries PrometheusTimeSeries, query *PrometheusQuery) ([]Anomaly, error) { +func (c *AnomalyDetectClient) detectAnomaliesForSingleTimeSeries(ctx context.Context, timeSeries PrometheusTimeSeries, query *PrometheusQuery) ([]Anomaly, error) { // Build request body with metadata and time series data in the new format dataPoints := make([]map[string]interface{}, 0, len(timeSeries.Values)) for _, pair := range timeSeries.Values { diff --git a/internal/alerting/service/healthcheck/scheduler.go b/internal/alerting/service/healthcheck/scheduler.go index aed48ea..7fc603d 100644 --- a/internal/alerting/service/healthcheck/scheduler.go +++ b/internal/alerting/service/healthcheck/scheduler.go @@ -1,13 +1,12 @@ package healthcheck import ( - "bytes" "context" "encoding/json" "fmt" "math" "net/http" - "net/url" + "sort" "strings" "time" @@ -27,11 +26,11 @@ type Deps struct { // PrometheusDeps holds dependencies for Prometheus anomaly detection task type PrometheusDeps struct { - DB *adb.Database - PrometheusClient *PrometheusClient - Interval time.Duration - QueryStep time.Duration - QueryRange time.Duration + DB *adb.Database + AnomalyDetectClient *AnomalyDetectClient + Interval time.Duration + QueryStep time.Duration + QueryRange time.Duration // ruleset API integration RulesetBase string RulesetTimeout time.Duration @@ -76,7 +75,7 @@ func StartScheduler(ctx context.Context, deps Deps) { // StartPrometheusScheduler starts the Prometheus anomaly detection scheduler func StartPrometheusScheduler(ctx context.Context, deps PrometheusDeps) { if deps.Interval <= 0 { - deps.Interval = 6 * time.Hour // Default 6 hours + deps.Interval = 5 * time.Minute // Default 6 hours } if deps.QueryStep <= 0 { deps.QueryStep = 1 * time.Minute // Default 1 minute step @@ -274,7 +273,7 @@ func runPrometheusAnomalyDetection(ctx context.Context, deps PrometheusDeps) err for _, query := range queries { log.Debug().Str("query", query.Expr).Interface("labels", query.Labels).Msg("executing promql") - resp, err := deps.PrometheusClient.QueryRange(ctx, query.Expr, start, end, deps.QueryStep) + resp, err := deps.AnomalyDetectClient.QueryRange(ctx, query.Expr, start, end, deps.QueryStep) if err != nil { log.Error().Err(err).Str("query", query.Expr).Msg("Failed to execute PromQL query") continue @@ -285,8 +284,10 @@ func runPrometheusAnomalyDetection(ctx context.Context, deps PrometheusDeps) err allQueries = append(allQueries, query) } allTimeSeries = append(allTimeSeries, resp.Data.Result...) - log.Debug().Int("series", len(resp.Data.Result)).Str("query", query.Expr).Msg("promql result series appended") + for _, ts := range resp.Data.Result { + log.Debug().Interface("metric", ts.Metric).Int("value", len(ts.Values)).Msg("promql result series appended") + } // no file exports } @@ -313,18 +314,22 @@ func runPrometheusAnomalyDetection(ctx context.Context, deps PrometheusDeps) err break } q := allQueries[i] + log.Debug().Any("q", q).Msg("per-series anomaly detection query") // per-series anomaly detection - anomalies, derr := deps.PrometheusClient.detectAnomaliesForSingleTimeSeries(ctx, ts, &q) + anomalies, derr := deps.AnomalyDetectClient.detectAnomaliesForSingleTimeSeries(ctx, ts, &q) if derr != nil { log.Error().Err(derr).Msg("per-series anomaly detection failed") continue } + log.Debug().Any("anomalies", anomalies).Msg("per-series anomaly detection completed") totalDetected += len(anomalies) // filter by existing alert time ranges filtered := FilterAnomaliesByAlertTimeRanges(anomalies, alertIssues) + log.Debug().Any("filtered", filtered).Msg("per-series anomaly detection filtered") totalFiltered += len(filtered) if len(filtered) == 0 { + log.Debug().Msg("no anomalies filtered") continue } @@ -334,13 +339,16 @@ func runPrometheusAnomalyDetection(ctx context.Context, deps PrometheusDeps) err log.Debug().Msg("skip threshold update due to missing service or alert_name") continue } + log.Debug().Str("alert_name", q.AlertName).Str("service", service).Msg("adjusting thresholds") versionKey, version := detectVersionFromLabels(q.Labels) + log.Debug().Str("alert_name", q.AlertName).Str("service", service).Str("version_key", versionKey).Str("version", version).Msg("detect version from labels") if version == "" { log.Debug().Str("alert_name", q.AlertName).Str("service", service).Msg("skip update: version is empty in labels") continue } baseTh, ok, terr := fetchExactThreshold(ctx, deps.DB, q.AlertName, service, versionKey, version) + log.Debug().Str("alert_name", q.AlertName).Str("service", service).Str("version", version).Float64("base_threshold", baseTh).Bool("ok", ok).Err(terr).Msg("fetch exact threshold") if terr != nil { log.Error().Err(terr).Str("alert_name", q.AlertName).Str("service", service).Str("version", version).Msg("fetch exact threshold failed") continue @@ -353,6 +361,7 @@ func runPrometheusAnomalyDetection(ctx context.Context, deps PrometheusDeps) err // fetch all metas for this service (across versions) metas, ferr := fetchMetasForService(ctx, deps.DB, q.AlertName, service) + log.Debug().Str("alert_name", q.AlertName).Str("service", service).Any("metas", metas).Msg("fetch metas for service") if ferr != nil { log.Error().Err(ferr).Str("alert_name", q.AlertName).Str("service", service).Msg("fetch metas failed") continue @@ -372,38 +381,59 @@ func runPrometheusAnomalyDetection(ctx context.Context, deps PrometheusDeps) err }, 0, 1) targetMatched := false for _, m := range metas { - isTarget := strings.Contains(m.LabelsJSON, fmt.Sprintf(`"%s":"%s"`, versionKey, version)) + bServiceMatch := strings.Contains(m.LabelsJSON, service) + bVersionMatch := strings.Contains(m.LabelsJSON, version) + isTarget := bServiceMatch && bVersionMatch + log.Debug().Bool("bServiceMatch", bServiceMatch).Bool("bVersionMatch", bVersionMatch).Bool("isTarget", isTarget).Any("m.LabelsJSON", m.LabelsJSON) + compact := compactLabelsJSON(m.LabelsJSON) + log.Debug().Str("labels", compact).Str("labels_ckey", canonicalKeyFromLabelsJSON(compact)).Bool("is_target", isTarget).Msg("metas appended") if isTarget { targetMatched = true - updates = append(updates, ruleMetaUpdate{Labels: m.LabelsJSON, Threshold: newThreshold}) + updates = append(updates, ruleMetaUpdate{Labels: compact, Threshold: newThreshold}) + log.Debug().Str("labels", compact).Float64("1.threshold", m.Threshold).Float64("new_threshold", newThreshold).Msg("metas appended") if math.Abs(m.Threshold-newThreshold) > eps { changed = append(changed, struct { LabelsJSON string Old float64 New float64 - }{LabelsJSON: m.LabelsJSON, Old: m.Threshold, New: newThreshold}) + }{LabelsJSON: compact, Old: m.Threshold, New: newThreshold}) } } else { - updates = append(updates, ruleMetaUpdate{Labels: m.LabelsJSON, Threshold: m.Threshold}) + updates = append(updates, ruleMetaUpdate{Labels: compact, Threshold: m.Threshold}) + log.Debug().Str("labels", compact).Float64("2.threshold", m.Threshold).Float64("new_threshold", newThreshold).Msg("metas appended") } + log.Debug().Str("labels", compact).Float64("3.threshold", m.Threshold).Float64("new_threshold", newThreshold).Msg("metas appended") } if !targetMatched || len(changed) == 0 { - log.Debug().Str("alert_name", q.AlertName).Str("service", service).Str("version", version).Msg("no threshold change for exact version; skip") + log.Debug().Str("alert_name", q.AlertName).Str("service", service).Str("version", version).Bool("target_matched", targetMatched).Int("changed_count", len(changed)).Msg("no threshold change for exact version; skip") continue } if rulesetBase != "" { + log.Debug().Str("alert_name", q.AlertName).Str("rulesetBase", rulesetBase).Any("meta", updates).Msg("ruleset meta PUT") + // extra diagnostics: log labels and canonical keys for each meta before PUT + for idx, u := range updates { + log.Debug().Int("meta_index", idx). + Str("labels", u.Labels). + Str("labels_ckey", canonicalKeyFromLabelsJSON(u.Labels)). + Float64("threshold", u.Threshold). + Msg("ruleset meta PUT item") + } if err := putRuleMetas(ctx, httpClient, rulesetBase, q.AlertName, updates); err != nil { log.Error().Err(err).Str("alert_name", q.AlertName).Str("service", service).Msg("ruleset meta PUT failed") } else { log.Info().Str("alert_name", q.AlertName).Str("service", service).Int("meta_count", len(updates)).Msg("ruleset meta updated") - // Log change records only after successful external update, only for changed metas + // Persist changes into DB and log change records, only for changed metas for _, c := range changed { - _ = insertMetaChangeLog(ctx, deps.DB, "Update", q.AlertName, c.LabelsJSON, c.Old, c.New) + if err := updateMetaThreshold(ctx, deps.DB, q.AlertName, c.LabelsJSON, c.New); err != nil { + log.Error().Err(err).Str("alert_name", q.AlertName).Str("labels", c.LabelsJSON).Msg("update alert_rule_metas threshold failed") + } else { + _ = insertMetaChangeLog(ctx, deps.DB, "Update", q.AlertName, c.LabelsJSON, c.Old, c.New) + } } } } else { - log.Warn().Msg("RULESET_API_BASE not set; skip PUT /v1/alert-rule-metas/{rule_name}") + log.Warn().Msg("RULESET_API_BASE not set; skip PUT /v1/alert-rules-meta/{rule_name}") } } @@ -468,9 +498,12 @@ func fetchExactThreshold(ctx context.Context, db *adb.Database, alertName, servi // detectVersionFromLabels tries common keys for version and returns (key, value) func detectVersionFromLabels(labels map[string]string) (string, string) { if v := labels["service_version"]; v != "" { + log.Debug().Str("labels", fmt.Sprintf("%v", labels)).Str("service_version", v).Msg("detect version from labels") return "service_version", v } + if v := labels["version"]; v != "" { + log.Debug().Str("labels", fmt.Sprintf("%v", labels)).Str("version", v).Msg("detect version from labels") return "version", v } return "", "" @@ -486,28 +519,29 @@ type ruleMetaPutReq struct { Metas []ruleMetaUpdate `json:"metas"` } -// putRuleMetas calls PUT /v1/alert-rule-metas/{rule_name} -func putRuleMetas(ctx context.Context, client *http.Client, base, ruleName string, metas []ruleMetaUpdate) error { - if client == nil { - client = http.DefaultClient - } - endpoint := strings.TrimSuffix(base, "/") + "/v1/alert-rule-metas/" + url.PathEscape(ruleName) - body, _ := json.Marshal(ruleMetaPutReq{RuleName: ruleName, Metas: metas}) - req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, bytes.NewReader(body)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - resp, err := client.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("ruleset api status %d", resp.StatusCode) - } - return nil -} +// // putRuleMetas calls PUT /v1/alert-rules-meta/{rule_name} +// func putRuleMetas(ctx context.Context, client *http.Client, base, ruleName string, metas []ruleMetaUpdate) error { +// if client == nil { +// client = http.DefaultClient +// } +// endpoint := strings.TrimSuffix(base, "/") + "/v1/alert-rules-meta/" + url.PathEscape(ruleName) +// log.Debug().Str("endpoint", endpoint).Str("rule_name", ruleName).Int("meta_count", len(metas)).Msg("prepared ruleset meta PUT endpoint") +// body, _ := json.Marshal(ruleMetaPutReq{RuleName: ruleName, Metas: metas}) +// req, err := http.NewRequestWithContext(ctx, http.MethodPut, endpoint, bytes.NewReader(body)) +// if err != nil { +// return err +// } +// req.Header.Set("Content-Type", "application/json") +// resp, err := client.Do(req) +// if err != nil { +// return err +// } +// defer resp.Body.Close() +// if resp.StatusCode < 200 || resp.StatusCode >= 300 { +// return fmt.Errorf("ruleset api status %d", resp.StatusCode) +// } +// return nil +// } // insertMetaChangeLog writes a change record to alert_meta_change_logs (best-effort) func insertMetaChangeLog(ctx context.Context, db *adb.Database, changeType, alertName, labels string, oldTh, newTh float64) error { @@ -523,3 +557,75 @@ func insertMetaChangeLog(ctx context.Context, db *adb.Database, changeType, aler } return nil } + +// updateMetaThreshold updates alert_rule_metas.threshold for a specific rule+labels +func updateMetaThreshold(ctx context.Context, db *adb.Database, alertName, labelsJSON string, newThreshold float64) error { + if db == nil { + return nil + } + const q = `UPDATE alert_rule_metas SET threshold=$1, updated_at=NOW() WHERE alert_name=$2 AND labels=$3::jsonb` + if _, err := db.ExecContext(ctx, q, newThreshold, alertName, labelsJSON); err != nil { + return err + } + return nil +} + +// canonicalKeyFromLabelsJSON parses labels JSON and returns a stable key "k=v|..." +func canonicalKeyFromLabelsJSON(s string) string { + // try map form {"k":"v"} + m := map[string]string{} + if err := json.Unmarshal([]byte(s), &m); err == nil && len(m) > 0 { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, strings.ToLower(strings.TrimSpace(k))) + } + sort.Strings(keys) + var b strings.Builder + for i, k := range keys { + if i > 0 { + b.WriteByte('|') + } + b.WriteString(k) + b.WriteByte('=') + b.WriteString(strings.TrimSpace(m[k])) + } + return b.String() + } + // try array form [{"key":"k","value":"v"}] + var arr []struct{ Key, Value string } + if err := json.Unmarshal([]byte(s), &arr); err == nil && len(arr) > 0 { + m2 := make(map[string]string, len(arr)) + for _, kv := range arr { + m2[strings.ToLower(strings.TrimSpace(kv.Key))] = strings.TrimSpace(kv.Value) + } + keys := make([]string, 0, len(m2)) + for k := range m2 { + keys = append(keys, k) + } + sort.Strings(keys) + var b strings.Builder + for i, k := range keys { + if i > 0 { + b.WriteByte('|') + } + b.WriteString(k) + b.WriteByte('=') + b.WriteString(m2[k]) + } + return b.String() + } + return "{}" +} + +// compactLabelsJSON removes spaces from a JSON object string deterministically. +func compactLabelsJSON(s string) string { + var m map[string]string + if err := json.Unmarshal([]byte(s), &m); err == nil && len(m) > 0 { + b, err := json.Marshal(m) + if err == nil { + return string(b) + } + } + // fallback keep original + return s +} diff --git a/internal/alerting/service/receiver/auth.go b/internal/alerting/service/receiver/auth.go index 5b6ad1b..f275894 100644 --- a/internal/alerting/service/receiver/auth.go +++ b/internal/alerting/service/receiver/auth.go @@ -4,7 +4,7 @@ import ( "net/http" "os" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" ) var ( @@ -32,7 +32,7 @@ func authEnabled() bool { } // AuthMiddleware returns false if unauthorized and writes a 401 response. -func AuthMiddleware(c *fox.Context) bool { +func AuthMiddleware(c *gin.Context) bool { if !authEnabled() { return true } diff --git a/internal/alerting/service/receiver/handler.go b/internal/alerting/service/receiver/handler.go index c69d924..56894e8 100644 --- a/internal/alerting/service/receiver/handler.go +++ b/internal/alerting/service/receiver/handler.go @@ -5,7 +5,8 @@ import ( "strings" "time" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" ) type Handler struct { @@ -24,43 +25,64 @@ func NewHandlerWithCache(dao AlertIssueDAO, cache AlertIssueCache) *Handler { return &Handler{dao: dao, cache: cache} } -func (h *Handler) AlertmanagerWebhook(c *fox.Context) { +func (h *Handler) AlertmanagerWebhook(c *gin.Context) { + log.Info().Msg("AlertmanagerWebhook: starting webhook processing") + if !AuthMiddleware(c) { + log.Warn().Msg("AlertmanagerWebhook: authentication failed") return } + log.Debug().Msg("AlertmanagerWebhook: authentication successful") + var req AMWebhook if err := c.ShouldBindJSON(&req); err != nil { + log.Error().Err(err).Msg("AlertmanagerWebhook: failed to parse JSON request") c.JSON(http.StatusBadRequest, map[string]any{"ok": false, "error": "invalid JSON"}) return } + log.Debug().Int("alert_count", len(req.Alerts)).Str("status", req.Status).Msg("AlertmanagerWebhook: JSON parsed successfully") if err := ValidateAMWebhook(&req); err != nil { + log.Error().Err(err).Msg("AlertmanagerWebhook: webhook validation failed") c.JSON(http.StatusBadRequest, map[string]any{"ok": false, "error": err.Error()}) return } + log.Debug().Msg("AlertmanagerWebhook: webhook validation passed") if strings.ToLower(req.Status) != "firing" { + log.Info().Str("status", req.Status).Msg("AlertmanagerWebhook: ignoring non-firing alert") c.JSON(http.StatusOK, map[string]any{"ok": true, "msg": "ignored (not firing)"}) return } + log.Info().Int("alert_count", len(req.Alerts)).Msg("AlertmanagerWebhook: processing firing alerts") created := 0 - for _, a := range req.Alerts { + for i, a := range req.Alerts { + log.Debug().Int("alert_index", i).Str("alert_name", a.Labels["alertname"]).Msg("AlertmanagerWebhook: processing alert") + key := BuildIdempotencyKey(a) // Distributed idempotency (best-effort). If key exists, skip. if ok, _ := h.cache.TryMarkIdempotent(c.Request.Context(), a); !ok { + log.Debug().Str("idempotency_key", key).Msg("AlertmanagerWebhook: alert already processed (distributed cache)") continue } if AlreadySeen(key) { + log.Debug().Str("idempotency_key", key).Msg("AlertmanagerWebhook: alert already processed (local cache)") continue } + row, err := MapToAlertIssueRow(&req, &a) if err != nil { + log.Error().Err(err).Str("alert_name", a.Labels["alertname"]).Msg("AlertmanagerWebhook: failed to map alert to issue row") continue } + log.Debug().Str("issue_id", row.ID).Str("level", row.Level).Msg("AlertmanagerWebhook: alert mapped to issue row") + if err := h.dao.InsertAlertIssue(c.Request.Context(), row); err != nil { + log.Error().Err(err).Str("issue_id", row.ID).Msg("AlertmanagerWebhook: failed to insert alert issue to database") continue } + log.Info().Str("issue_id", row.ID).Str("level", row.Level).Msg("AlertmanagerWebhook: alert issue inserted to database") if w, ok := h.dao.(ServiceStateWriter); ok { service := strings.TrimSpace(a.Labels["service"]) @@ -72,15 +94,36 @@ func (h *Handler) AlertmanagerWebhook(c *fox.Context) { } else if row.Level == "P1" || row.Level == "P2" { derived = "Warning" } - _ = w.UpsertServiceState(c.Request.Context(), service, version, nil, derived, row.ID) - _ = h.cache.WriteServiceState(c.Request.Context(), service, version, time.Time{}, derived) + log.Debug().Str("service", service).Str("version", version).Str("derived_state", derived).Msg("AlertmanagerWebhook: updating service state") + + if err := w.UpsertServiceState(c.Request.Context(), service, version, nil, derived, row.ID); err != nil { + log.Error().Err(err).Str("service", service).Str("version", version).Msg("AlertmanagerWebhook: failed to upsert service state") + } else { + log.Info().Str("service", service).Str("version", version).Str("state", derived).Msg("AlertmanagerWebhook: service state updated") + } + + if err := h.cache.WriteServiceState(c.Request.Context(), service, version, time.Time{}, derived); err != nil { + log.Error().Err(err).Str("service", service).Str("version", version).Msg("AlertmanagerWebhook: failed to write service state to cache") + } else { + log.Debug().Str("service", service).Str("version", version).Msg("AlertmanagerWebhook: service state written to cache") + } + } else { + log.Debug().Msg("AlertmanagerWebhook: no service label found, skipping service state update") } } + // Write-through to cache. Errors are ignored to avoid impacting webhook ack. - _ = h.cache.WriteIssue(c.Request.Context(), row, a) + if err := h.cache.WriteIssue(c.Request.Context(), row, a); err != nil { + log.Error().Err(err).Str("issue_id", row.ID).Msg("AlertmanagerWebhook: failed to write issue to cache") + } else { + log.Debug().Str("issue_id", row.ID).Msg("AlertmanagerWebhook: issue written to cache") + } + MarkSeen(key) created++ + log.Info().Str("issue_id", row.ID).Int("total_created", created).Msg("AlertmanagerWebhook: alert processing completed") } + log.Info().Int("total_alerts", len(req.Alerts)).Int("created_issues", created).Msg("AlertmanagerWebhook: webhook processing completed") c.JSON(http.StatusOK, map[string]any{"ok": true, "created": created}) } diff --git a/internal/alerting/service/receiver/handler_test.go b/internal/alerting/service/receiver/handler_test.go index 74ef05e..f38ef1e 100644 --- a/internal/alerting/service/receiver/handler_test.go +++ b/internal/alerting/service/receiver/handler_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" ) type mockDAO struct{ calls int } @@ -17,7 +17,7 @@ type mockDAO struct{ calls int } func (m *mockDAO) InsertAlertIssue(_ context.Context, _ *AlertIssueRow) error { m.calls++; return nil } func TestHandlerCreatesIssues(t *testing.T) { - r := fox.New() + r := gin.New() m := &mockDAO{} h := NewHandler(m) RegisterReceiverRoutes(r, h) diff --git a/internal/alerting/service/receiver/router.go b/internal/alerting/service/receiver/router.go index a668ba5..2bc5e18 100644 --- a/internal/alerting/service/receiver/router.go +++ b/internal/alerting/service/receiver/router.go @@ -1,7 +1,7 @@ package receiver -import "github.com/fox-gonic/fox" +import "github.com/gin-gonic/gin" -func RegisterReceiverRoutes(r *fox.Engine, h *Handler) { +func RegisterReceiverRoutes(r *gin.Engine, h *Handler) { r.POST("/v1/integrations/alertmanager/webhook", h.AlertmanagerWebhook) } diff --git a/internal/alerting/service/ruleset/manager.go b/internal/alerting/service/ruleset/manager.go index f00785d..0f15b72 100644 --- a/internal/alerting/service/ruleset/manager.go +++ b/internal/alerting/service/ruleset/manager.go @@ -2,9 +2,12 @@ package ruleset import ( "context" + "encoding/json" "errors" "fmt" "time" + + "github.com/rs/zerolog/log" ) var ( @@ -77,7 +80,23 @@ func (m *Manager) UpsertRuleMetas(ctx context.Context, meta *AlertRuleMeta) erro if meta == nil { return ErrInvalidMeta } + // diagnostics: log labels before/after normalization and canonical key + beforeLabels := meta.Labels meta.Labels = NormalizeLabels(meta.Labels, m.aliasMap) + var beforeJSON, afterJSON string + if b, err := json.Marshal(beforeLabels); err == nil { + beforeJSON = string(b) + } + if a, err := json.Marshal(meta.Labels); err == nil { + afterJSON = string(a) + } + log.Debug(). + Str("alert_name", meta.AlertName). + Str("labels_before", beforeJSON). + Str("labels_after", afterJSON). + Str("labels_ckey", CanonicalLabelKey(meta.Labels)). + Float64("threshold", meta.Threshold). + Msg("ruleset UpsertRuleMetas normalized labels") if err := validateMeta(meta); err != nil { return err } diff --git a/internal/alerting/service/ruleset/promsync_exporter.go b/internal/alerting/service/ruleset/promsync_exporter.go index 2debc12..539c38a 100644 --- a/internal/alerting/service/ruleset/promsync_exporter.go +++ b/internal/alerting/service/ruleset/promsync_exporter.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "sync" + + "github.com/rs/zerolog/log" ) // ExporterSync is an in-memory PromSync implementation that maintains threshold/watch values @@ -47,6 +49,8 @@ func (e *ExporterSync) SyncMetaToPrometheus(ctx context.Context, m *AlertRuleMet e.mu.Lock() defer e.mu.Unlock() key := e.keyFor(m.AlertName, m.Labels) + // diagnostic: log rule and canonical key + log.Debug().Str("rule", m.AlertName).Str("labels_ckey", key).Float64("threshold", m.Threshold).Msg("promsync sync meta") e.thresholds[key] = m.Threshold return nil } diff --git a/internal/middleware/auth.go b/internal/middleware/auth.go index 582ce3d..fd976e4 100644 --- a/internal/middleware/auth.go +++ b/internal/middleware/auth.go @@ -1,11 +1,11 @@ package middleware import ( - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" ) // Authentication is a placeholder global middleware. It currently allows all requests. // Per-alerting webhook uses its own path-scoped auth. -func Authentication(c *fox.Context) { +func Authentication(c *gin.Context) { c.Next() } diff --git a/internal/service_manager/api/api.go b/internal/service_manager/api/api.go index 46b710c..06632d8 100644 --- a/internal/service_manager/api/api.go +++ b/internal/service_manager/api/api.go @@ -1,7 +1,7 @@ package api import ( - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" "github.com/qiniu/zeroops/internal/service_manager/database" "github.com/qiniu/zeroops/internal/service_manager/service" ) @@ -9,10 +9,10 @@ import ( type Api struct { db *database.Database service *service.Service - router *fox.Engine + router *gin.Engine } -func NewApi(db *database.Database, service *service.Service, router *fox.Engine) (*Api, error) { +func NewApi(db *database.Database, service *service.Service, router *gin.Engine) (*Api, error) { api := &Api{ db: db, service: service, @@ -23,7 +23,7 @@ func NewApi(db *database.Database, service *service.Service, router *fox.Engine) return api, nil } -func (api *Api) setupRouters(router *fox.Engine) { +func (api *Api) setupRouters(router *gin.Engine) { // 服务信息相关路由 api.setupInfoRouters(router) diff --git a/internal/service_manager/api/deploy_api.go b/internal/service_manager/api/deploy_api.go index 126a2ff..f5b0c7f 100644 --- a/internal/service_manager/api/deploy_api.go +++ b/internal/service_manager/api/deploy_api.go @@ -4,14 +4,14 @@ import ( "net/http" "strconv" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" "github.com/qiniu/zeroops/internal/service_manager/model" "github.com/qiniu/zeroops/internal/service_manager/service" "github.com/rs/zerolog/log" ) // setupDeployRouters 设置部署管理相关路由 -func (api *Api) setupDeployRouters(router *fox.Engine) { +func (api *Api) setupDeployRouters(router *gin.Engine) { // 部署任务基本操作 router.POST("/v1/deployments", api.CreateDeployment) router.GET("/v1/deployments", api.GetDeployments) @@ -28,7 +28,7 @@ func (api *Api) setupDeployRouters(router *fox.Engine) { // ===== 部署管理相关API ===== // CreateDeployment 创建发布任务(POST /v1/deployments) -func (api *Api) CreateDeployment(c *fox.Context) { +func (api *Api) CreateDeployment(c *gin.Context) { ctx := c.Request.Context() var req model.CreateDeploymentRequest @@ -82,7 +82,7 @@ func (api *Api) CreateDeployment(c *fox.Context) { } // GetDeploymentByID 获取发布任务详情(GET /v1/deployments/:deployID) -func (api *Api) GetDeploymentByID(c *fox.Context) { +func (api *Api) GetDeploymentByID(c *gin.Context) { ctx := c.Request.Context() deployID := c.Param("deployID") @@ -115,7 +115,7 @@ func (api *Api) GetDeploymentByID(c *fox.Context) { } // GetDeployments 获取发布任务列表(GET /v1/deployments) -func (api *Api) GetDeployments(c *fox.Context) { +func (api *Api) GetDeployments(c *gin.Context) { ctx := c.Request.Context() query := &model.DeploymentQuery{ @@ -146,7 +146,7 @@ func (api *Api) GetDeployments(c *fox.Context) { } // UpdateDeployment 修改发布任务(POST /v1/deployments/:deployID) -func (api *Api) UpdateDeployment(c *fox.Context) { +func (api *Api) UpdateDeployment(c *gin.Context) { ctx := c.Request.Context() deployID := c.Param("deployID") @@ -197,7 +197,7 @@ func (api *Api) UpdateDeployment(c *fox.Context) { } // DeleteDeployment 删除发布任务(DELETE /v1/deployments/:deployID) -func (api *Api) DeleteDeployment(c *fox.Context) { +func (api *Api) DeleteDeployment(c *gin.Context) { ctx := c.Request.Context() deployID := c.Param("deployID") @@ -239,7 +239,7 @@ func (api *Api) DeleteDeployment(c *fox.Context) { } // PauseDeployment 暂停发布任务(POST /v1/deployments/:deployID/pause) -func (api *Api) PauseDeployment(c *fox.Context) { +func (api *Api) PauseDeployment(c *gin.Context) { ctx := c.Request.Context() deployID := c.Param("deployID") @@ -281,7 +281,7 @@ func (api *Api) PauseDeployment(c *fox.Context) { } // ContinueDeployment 继续发布任务(POST /v1/deployments/:deployID/continue) -func (api *Api) ContinueDeployment(c *fox.Context) { +func (api *Api) ContinueDeployment(c *gin.Context) { ctx := c.Request.Context() deployID := c.Param("deployID") @@ -323,7 +323,7 @@ func (api *Api) ContinueDeployment(c *fox.Context) { } // RollbackDeployment 回滚发布任务(POST /v1/deployments/:deployID/rollback) -func (api *Api) RollbackDeployment(c *fox.Context) { +func (api *Api) RollbackDeployment(c *gin.Context) { ctx := c.Request.Context() deployID := c.Param("deployID") diff --git a/internal/service_manager/api/info_api.go b/internal/service_manager/api/info_api.go index 9a8a9fd..89fbfa4 100644 --- a/internal/service_manager/api/info_api.go +++ b/internal/service_manager/api/info_api.go @@ -3,14 +3,14 @@ package api import ( "net/http" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" "github.com/qiniu/zeroops/internal/service_manager/model" "github.com/qiniu/zeroops/internal/service_manager/service" "github.com/rs/zerolog/log" ) // setupInfoRouters 设置服务信息相关路由 -func (api *Api) setupInfoRouters(router *fox.Engine) { +func (api *Api) setupInfoRouters(router *gin.Engine) { // 服务列表和信息查询 router.GET("/v1/services", api.GetServices) router.GET("/v1/services/:service/activeVersions", api.GetServiceActiveVersions) @@ -26,7 +26,7 @@ func (api *Api) setupInfoRouters(router *fox.Engine) { // ===== 服务信息相关API ===== // GetServices 获取所有服务列表(GET /v1/services) -func (api *Api) GetServices(c *fox.Context) { +func (api *Api) GetServices(c *gin.Context) { ctx := c.Request.Context() response, err := api.service.GetServicesResponse(ctx) @@ -43,7 +43,7 @@ func (api *Api) GetServices(c *fox.Context) { } // GetServiceActiveVersions 获取服务活跃版本(GET /v1/services/:service/activeVersions) -func (api *Api) GetServiceActiveVersions(c *fox.Context) { +func (api *Api) GetServiceActiveVersions(c *gin.Context) { ctx := c.Request.Context() serviceName := c.Param("service") @@ -71,7 +71,7 @@ func (api *Api) GetServiceActiveVersions(c *fox.Context) { } // GetServiceAvailableVersions 获取可用服务版本(GET /v1/services/:service/availableVersions) -func (api *Api) GetServiceAvailableVersions(c *fox.Context) { +func (api *Api) GetServiceAvailableVersions(c *gin.Context) { ctx := c.Request.Context() serviceName := c.Param("service") versionType := c.Query("type") @@ -103,7 +103,7 @@ func (api *Api) GetServiceAvailableVersions(c *fox.Context) { } // GetServiceMetricTimeSeries 获取服务时序指标数据(GET /v1/metrics/:service/:name) -func (api *Api) GetServiceMetricTimeSeries(c *fox.Context) { +func (api *Api) GetServiceMetricTimeSeries(c *gin.Context) { ctx := c.Request.Context() serviceName := c.Param("service") metricName := c.Param("name") @@ -149,7 +149,7 @@ func (api *Api) GetServiceMetricTimeSeries(c *fox.Context) { // ===== 服务管理API(CRUD操作) ===== // CreateService 创建服务(POST /v1/services) -func (api *Api) CreateService(c *fox.Context) { +func (api *Api) CreateService(c *gin.Context) { ctx := c.Request.Context() var service model.Service @@ -185,7 +185,7 @@ func (api *Api) CreateService(c *fox.Context) { } // UpdateService 更新服务信息(PUT /v1/services/:service) -func (api *Api) UpdateService(c *fox.Context) { +func (api *Api) UpdateService(c *gin.Context) { ctx := c.Request.Context() serviceName := c.Param("service") @@ -232,7 +232,7 @@ func (api *Api) UpdateService(c *fox.Context) { } // DeleteService 删除服务(DELETE /v1/services/:service) -func (api *Api) DeleteService(c *fox.Context) { +func (api *Api) DeleteService(c *gin.Context) { ctx := c.Request.Context() serviceName := c.Param("service") diff --git a/internal/service_manager/server.go b/internal/service_manager/server.go index bd760cf..7d52502 100644 --- a/internal/service_manager/server.go +++ b/internal/service_manager/server.go @@ -3,7 +3,7 @@ package servicemanager import ( "fmt" - "github.com/fox-gonic/fox" + "github.com/gin-gonic/gin" "github.com/qiniu/zeroops/internal/config" "github.com/qiniu/zeroops/internal/service_manager/api" "github.com/qiniu/zeroops/internal/service_manager/database" @@ -35,7 +35,7 @@ func NewServiceManagerServer(cfg *config.Config) (*ServiceManagerServer, error) return server, nil } -func (s *ServiceManagerServer) UseApi(router *fox.Engine) error { +func (s *ServiceManagerServer) UseApi(router *gin.Engine) error { _, err := api.NewApi(s.db, s.service, router) if err != nil { return err diff --git a/scripts/sql/alert_rules_bootstrap.sql b/scripts/sql/alert_rules_bootstrap.sql index b260e31..821bc47 100644 --- a/scripts/sql/alert_rules_bootstrap.sql +++ b/scripts/sql/alert_rules_bootstrap.sql @@ -6,6 +6,7 @@ BEGIN; -- Drop tables if exist (order matters due to FK) DROP TABLE IF EXISTS alert_rule_metas CASCADE; DROP TABLE IF EXISTS alert_rules CASCADE; +DROP TABLE IF EXISTS alert_meta_change_logs CASCADE; -- Create alert_rules CREATE TABLE alert_rules ( @@ -26,6 +27,21 @@ CREATE TABLE alert_rule_metas ( PRIMARY KEY (alert_name, labels) ); +-- Create alert_meta_change_logs +CREATE TABLE alert_meta_change_logs ( + id varchar(64) PRIMARY KEY, + change_type varchar(16) NOT NULL CHECK (change_type IN ('Create', 'Update', 'Delete', 'Rollback')), + change_time timestamptz NOT NULL DEFAULT now(), + alert_name varchar(255) NOT NULL, + labels text NOT NULL, + old_threshold numeric, + new_threshold numeric +); + +-- Create indexes for alert_meta_change_logs +CREATE INDEX idx_alert_meta_change_logs_change_time ON alert_meta_change_logs (change_time); +CREATE INDEX idx_alert_meta_change_logs_alert_name_time ON alert_meta_change_logs (alert_name, change_time); + -- Seed sample data per request INSERT INTO alert_rules(name, description, expr, op, severity, watch_time) VALUES (