From aa675e3a646b67244e833584f5e26ee94a3a064d Mon Sep 17 00:00:00 2001 From: HONGDAE KIM Date: Sun, 15 Feb 2026 14:26:16 +0900 Subject: [PATCH] Add anomaly reason candidate engine and dashboard panel --- bitnet_tools/explain.py | 182 +++++++++++++++++++++++++++++++++++++ bitnet_tools/multi_csv.py | 15 +++ bitnet_tools/ui/app.js | 8 ++ bitnet_tools/ui/index.html | 3 + tests/test_explain.py | 38 ++++++++ 5 files changed, 246 insertions(+) create mode 100644 bitnet_tools/explain.py create mode 100644 tests/test_explain.py diff --git a/bitnet_tools/explain.py b/bitnet_tools/explain.py new file mode 100644 index 0000000..c270675 --- /dev/null +++ b/bitnet_tools/explain.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import csv +import re +from datetime import datetime +from pathlib import Path +from typing import Any + +_DATE_FORMATS = ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y-%m-%d %H:%M:%S"] + + +def _parse_date(value: str) -> datetime | None: + text = str(value or '').strip() + if not text: + return None + for fmt in _DATE_FORMATS: + try: + return datetime.strptime(text, fmt) + except ValueError: + continue + return None + + +def _extract_unit(value: str) -> str | None: + text = str(value or '').strip() + if not text or not any(ch.isdigit() for ch in text): + return None + m = re.search(r'([A-Za-z가-힣/%]+)$', text) + if not m: + return None + unit = m.group(1) + if len(unit) > 8: + return None + return unit.lower() + + +def _rule_missing_concentration(path: str, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: + targets = [(col, p.get('missing_ratio', 0.0)) for col, p in profiles.items() if p.get('missing_ratio', 0.0) >= 0.2] + if not targets: + return None + col, ratio = max(targets, key=lambda x: x[1]) + score = round(min(100.0, ratio * 100.0), 2) + return { + 'rule': '결측 집중', + 'score': score, + 'reason': f"{Path(path).name}:{col} 결측이 집중됨 (결측비율 {ratio:.1%})", + 'evidence': {'column': col, 'missing_ratio': ratio}, + } + + +def _rule_category_bias(path: str, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: + best: tuple[str, dict[str, Any]] | None = None + for col, p in profiles.items(): + if p.get('dtype') != 'string' and p.get('semantic_type') != 'category': + continue + dom = float(p.get('dominant_value_ratio', 0.0)) + if dom < 0.65: + continue + if best is None or dom > float(best[1].get('dominant_value_ratio', 0.0)): + best = (col, p) + if best is None: + return None + col, p = best + top_values = p.get('top_values') or [] + top_val = top_values[0]['value'] if top_values else '(unknown)' + dom = float(p.get('dominant_value_ratio', 0.0)) + score = round(min(100.0, dom * 100.0), 2) + return { + 'rule': '특정 카테고리 편중', + 'score': score, + 'reason': f"{Path(path).name}:{col} 값 '{top_val}' 편중 ({dom:.1%})", + 'evidence': {'column': col, 'dominant_value_ratio': dom, 'top_value': top_val}, + } + + +def _rule_unit_mismatch(path: str, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: + best_candidate: dict[str, Any] | None = None + + for col, p in profiles.items(): + top_values = p.get('top_values') or [] + units: dict[str, float] = {} + for item in top_values: + unit = _extract_unit(item.get('value', '')) + if not unit: + continue + units[unit] = units.get(unit, 0.0) + float(item.get('ratio', 0.0)) + + if len(units) < 2: + continue + coverage = sum(units.values()) + if coverage < 0.2: + continue + score = round(min(100.0, (len(units) - 1) * 18 + coverage * 50), 2) + candidate = { + 'rule': '단위 불일치', + 'score': score, + 'reason': f"{Path(path).name}:{col} 다중 단위 혼재 ({', '.join(sorted(units.keys()))})", + 'evidence': {'column': col, 'units': sorted(units.keys()), 'coverage': round(coverage, 4)}, + } + if best_candidate is None or candidate['score'] > best_candidate['score']: + best_candidate = candidate + + return best_candidate + + +def _rule_recent_change(path: str, csv_path: Path, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: + date_cols = [col for col, p in profiles.items() if p.get('semantic_type') == 'date'] + num_cols = [col for col, p in profiles.items() if p.get('dtype') == 'float'] + if not date_cols or not num_cols: + return None + + with csv_path.open('r', encoding='utf-8-sig', newline='') as f: + rows = list(csv.DictReader(f)) + + best: dict[str, Any] | None = None + for date_col in date_cols: + series: list[tuple[datetime, dict[str, float]]] = [] + for row in rows: + dt = _parse_date(row.get(date_col, '')) + if dt is None: + continue + values: dict[str, float] = {} + for ncol in num_cols: + raw = str(row.get(ncol, '')).strip().replace(',', '') + if not raw: + continue + try: + values[ncol] = float(raw) + except ValueError: + continue + if values: + series.append((dt, values)) + + if len(series) < 6: + continue + + series.sort(key=lambda x: x[0]) + window = max(3, len(series) // 5) + prev = series[-2 * window:-window] + recent = series[-window:] + if not prev or not recent: + continue + + for ncol in num_cols: + prev_vals = [v[ncol] for _, v in prev if ncol in v] + recent_vals = [v[ncol] for _, v in recent if ncol in v] + if len(prev_vals) < 2 or len(recent_vals) < 2: + continue + prev_mean = sum(prev_vals) / len(prev_vals) + recent_mean = sum(recent_vals) / len(recent_vals) + baseline = max(abs(prev_mean), 1e-9) + change_ratio = abs(recent_mean - prev_mean) / baseline + if change_ratio < 0.5: + continue + score = round(min(100.0, change_ratio * 100.0), 2) + candidate = { + 'rule': '최근 급변', + 'score': score, + 'reason': f"{Path(path).name}:{ncol} 최근 평균 급변 ({change_ratio:.1%})", + 'evidence': { + 'date_column': date_col, + 'column': ncol, + 'prev_mean': round(prev_mean, 4), + 'recent_mean': round(recent_mean, 4), + 'change_ratio': round(change_ratio, 4), + }, + } + if best is None or candidate['score'] > best['score']: + best = candidate + return best + + +def generate_reason_candidates(path: str, csv_path: Path, profiles: dict[str, dict[str, Any]], top_k: int = 3) -> list[dict[str, Any]]: + candidates = [ + _rule_missing_concentration(path, profiles), + _rule_category_bias(path, profiles), + _rule_unit_mismatch(path, profiles), + _rule_recent_change(path, csv_path, profiles), + ] + filtered = [c for c in candidates if c is not None] + filtered.sort(key=lambda x: (x.get('score', 0.0), x.get('rule', '')), reverse=True) + return filtered[:top_k] diff --git a/bitnet_tools/multi_csv.py b/bitnet_tools/multi_csv.py index 70c0f51..b42887b 100644 --- a/bitnet_tools/multi_csv.py +++ b/bitnet_tools/multi_csv.py @@ -12,6 +12,7 @@ from typing import Any from .analysis import _to_float +from .explain import generate_reason_candidates CACHE_DIR = Path('.bitnet_cache') UNIQUE_BITMAP_SIZE = 65536 @@ -340,6 +341,9 @@ def _generate_insights(files: list[dict[str, Any]], schema_drift: dict[str, Any] insights.append(f"공통 컬럼 {col}의 타입이 파일 간 다르게 탐지됨") if drift['mean_range'] > 0: insights.append(f"공통 컬럼 {col}의 평균 범위 변화: {drift['mean_range']:.4f}") + for f in files: + for reason in f.get('reason_candidates', [])[:3]: + insights.append(f"{f['path']} 이유후보[{reason['rule']}] {reason['reason']}") return insights[:30] @@ -402,12 +406,22 @@ def analyze_multiple_csv( 'summary': profiled['summary'], 'column_profiles': profiled['column_profiles'], 'group_target_ratio': profiled['group_target_ratio'], + 'reason_candidates': generate_reason_candidates( + str(path), + path, + profiled['column_profiles'], + ), } ) shared_columns = sorted(set.intersection(*all_columns)) if all_columns else [] union_columns = sorted(set.union(*all_columns)) if all_columns else [] schema_drift = _schema_drift(files, shared_columns) + all_reason_candidates: list[dict[str, Any]] = [] + for f in files: + for reason in f.get('reason_candidates', []): + all_reason_candidates.append({'file': f['path'], **reason}) + all_reason_candidates.sort(key=lambda x: x.get('score', 0.0), reverse=True) return { 'question': question, @@ -418,6 +432,7 @@ def analyze_multiple_csv( 'files': files, 'schema_drift': schema_drift, 'insights': _generate_insights(files, schema_drift), + 'reason_candidates': all_reason_candidates[:3], 'code_guidance': build_code_guidance(shared_columns, group_column, target_column), } diff --git a/bitnet_tools/ui/app.js b/bitnet_tools/ui/app.js index 15c3db9..9d86f3c 100644 --- a/bitnet_tools/ui/app.js +++ b/bitnet_tools/ui/app.js @@ -45,6 +45,7 @@ const UI = { filterColumn: document.getElementById('filterColumn'), filterType: document.getElementById('filterType'), insightList: document.getElementById('insightList'), + reasonCandidates: document.getElementById('reasonCandidates'), insightDrilldown: document.getElementById('insightDrilldown'), geoLatCol: document.getElementById('geoLatCol'), geoLonCol: document.getElementById('geoLonCol'), @@ -661,6 +662,13 @@ function renderDashboard(data) { ? insights.map((x, i) => `${i + 1}. ${x}`).join('\n') : '인사이트 항목이 없습니다.'; + const reasons = Array.isArray(data.reason_candidates) ? data.reason_candidates : []; + if (UI.reasonCandidates) { + UI.reasonCandidates.textContent = reasons.length + ? reasons.map((x, i) => `${i + 1}. [${x.rule}] score=${x.score}\n- 파일: ${x.file}\n- 근거: ${x.reason}`).join('\n\n') + : '이유 후보가 없습니다.'; + } + appState.structuredInsights = buildStructuredInsights(data); renderFilters(); renderInsightList(); diff --git a/bitnet_tools/ui/index.html b/bitnet_tools/ui/index.html index 230b363..8955364 100644 --- a/bitnet_tools/ui/index.html +++ b/bitnet_tools/ui/index.html @@ -186,6 +186,9 @@

인사이트 필터

인사이트 리스트

+

이유 후보 보기

+
이유 후보가 없습니다.
+

드릴다운(근거 데이터)

인사이트를 선택하면 근거 데이터가 표시됩니다.
diff --git a/tests/test_explain.py b/tests/test_explain.py new file mode 100644 index 0000000..9863a1b --- /dev/null +++ b/tests/test_explain.py @@ -0,0 +1,38 @@ +from bitnet_tools.multi_csv import analyze_multiple_csv + + +def test_reason_candidates_score_sort_and_reason_text(tmp_path): + p = tmp_path / 'reason_case.csv' + p.write_text( + '\n'.join( + [ + 'dt,cat,amount,desc', + '2024-01-01,A,100,10kg', + '2024-01-02,A,110,12kg', + '2024-01-03,A,120,1lb', + '2024-01-04,A,,3kg', + '2024-01-05,B,105,2lb', + '2024-01-06,A,,5kg', + '2024-01-07,A,1000,2lb', + '2024-01-08,A,1200,6kg', + '2024-01-09,A,1300,7kg', + '2024-01-10,A,1400,8kg', + ] + ) + + '\n', + encoding='utf-8', + ) + + result = analyze_multiple_csv([p], '이상치 이유를 보여줘') + reasons = result['reason_candidates'] + + assert 1 <= len(reasons) <= 3 + assert reasons == sorted(reasons, key=lambda x: x['score'], reverse=True) + + for reason in reasons: + assert reason['score'] > 0 + assert reason['rule'] + assert reason['reason'] + + reason_text = ' '.join(r['reason'] for r in reasons) + assert any(token in reason_text for token in ['결측', '편중', '단위', '급변'])