-
Notifications
You must be signed in to change notification settings - Fork 0
Codex-generated pull request #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,182 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import csv | ||
| import re | ||
| from datetime import datetime | ||
| from pathlib import Path | ||
| from typing import Any | ||
|
|
||
| _DATE_FORMATS = ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y-%m-%d %H:%M:%S"] | ||
|
|
||
|
|
||
| def _parse_date(value: str) -> datetime | None: | ||
| text = str(value or '').strip() | ||
| if not text: | ||
| return None | ||
| for fmt in _DATE_FORMATS: | ||
| try: | ||
| return datetime.strptime(text, fmt) | ||
| except ValueError: | ||
| continue | ||
| return None | ||
|
|
||
|
|
||
| def _extract_unit(value: str) -> str | None: | ||
| text = str(value or '').strip() | ||
| if not text or not any(ch.isdigit() for ch in text): | ||
| return None | ||
| m = re.search(r'([A-Za-z가-힣/%]+)$', text) | ||
| if not m: | ||
| return None | ||
| unit = m.group(1) | ||
| if len(unit) > 8: | ||
| return None | ||
| return unit.lower() | ||
|
|
||
|
|
||
| def _rule_missing_concentration(path: str, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: | ||
| targets = [(col, p.get('missing_ratio', 0.0)) for col, p in profiles.items() if p.get('missing_ratio', 0.0) >= 0.2] | ||
| if not targets: | ||
| return None | ||
| col, ratio = max(targets, key=lambda x: x[1]) | ||
| score = round(min(100.0, ratio * 100.0), 2) | ||
| return { | ||
| 'rule': '결측 집중', | ||
| 'score': score, | ||
| 'reason': f"{Path(path).name}:{col} 결측이 집중됨 (결측비율 {ratio:.1%})", | ||
| 'evidence': {'column': col, 'missing_ratio': ratio}, | ||
| } | ||
|
|
||
|
|
||
| def _rule_category_bias(path: str, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: | ||
| best: tuple[str, dict[str, Any]] | None = None | ||
| for col, p in profiles.items(): | ||
| if p.get('dtype') != 'string' and p.get('semantic_type') != 'category': | ||
| continue | ||
| dom = float(p.get('dominant_value_ratio', 0.0)) | ||
| if dom < 0.65: | ||
| continue | ||
| if best is None or dom > float(best[1].get('dominant_value_ratio', 0.0)): | ||
| best = (col, p) | ||
| if best is None: | ||
| return None | ||
| col, p = best | ||
| top_values = p.get('top_values') or [] | ||
| top_val = top_values[0]['value'] if top_values else '(unknown)' | ||
| dom = float(p.get('dominant_value_ratio', 0.0)) | ||
| score = round(min(100.0, dom * 100.0), 2) | ||
| return { | ||
| 'rule': '특정 카테고리 편중', | ||
| 'score': score, | ||
| 'reason': f"{Path(path).name}:{col} 값 '{top_val}' 편중 ({dom:.1%})", | ||
| 'evidence': {'column': col, 'dominant_value_ratio': dom, 'top_value': top_val}, | ||
| } | ||
|
|
||
|
|
||
| def _rule_unit_mismatch(path: str, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: | ||
| best_candidate: dict[str, Any] | None = None | ||
|
|
||
| for col, p in profiles.items(): | ||
| top_values = p.get('top_values') or [] | ||
| units: dict[str, float] = {} | ||
| for item in top_values: | ||
| unit = _extract_unit(item.get('value', '')) | ||
| if not unit: | ||
| continue | ||
| units[unit] = units.get(unit, 0.0) + float(item.get('ratio', 0.0)) | ||
|
|
||
| if len(units) < 2: | ||
| continue | ||
| coverage = sum(units.values()) | ||
| if coverage < 0.2: | ||
| continue | ||
| score = round(min(100.0, (len(units) - 1) * 18 + coverage * 50), 2) | ||
| candidate = { | ||
| 'rule': '단위 불일치', | ||
| 'score': score, | ||
| 'reason': f"{Path(path).name}:{col} 다중 단위 혼재 ({', '.join(sorted(units.keys()))})", | ||
| 'evidence': {'column': col, 'units': sorted(units.keys()), 'coverage': round(coverage, 4)}, | ||
| } | ||
| if best_candidate is None or candidate['score'] > best_candidate['score']: | ||
| best_candidate = candidate | ||
|
|
||
| return best_candidate | ||
|
|
||
|
|
||
| def _rule_recent_change(path: str, csv_path: Path, profiles: dict[str, dict[str, Any]]) -> dict[str, Any] | None: | ||
| date_cols = [col for col, p in profiles.items() if p.get('semantic_type') == 'date'] | ||
| num_cols = [col for col, p in profiles.items() if p.get('dtype') == 'float'] | ||
| if not date_cols or not num_cols: | ||
| return None | ||
|
|
||
| with csv_path.open('r', encoding='utf-8-sig', newline='') as f: | ||
| rows = list(csv.DictReader(f)) | ||
|
|
||
| best: dict[str, Any] | None = None | ||
| for date_col in date_cols: | ||
| series: list[tuple[datetime, dict[str, float]]] = [] | ||
| for row in rows: | ||
| dt = _parse_date(row.get(date_col, '')) | ||
| if dt is None: | ||
| continue | ||
| values: dict[str, float] = {} | ||
| for ncol in num_cols: | ||
| raw = str(row.get(ncol, '')).strip().replace(',', '') | ||
| if not raw: | ||
| continue | ||
| try: | ||
| values[ncol] = float(raw) | ||
|
Comment on lines
+124
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Recent-change detection parses numeric cells with Useful? React with 👍 / 👎. |
||
| except ValueError: | ||
| continue | ||
| if values: | ||
| series.append((dt, values)) | ||
|
|
||
| if len(series) < 6: | ||
| continue | ||
|
|
||
| series.sort(key=lambda x: x[0]) | ||
| window = max(3, len(series) // 5) | ||
| prev = series[-2 * window:-window] | ||
| recent = series[-window:] | ||
| if not prev or not recent: | ||
| continue | ||
|
|
||
| for ncol in num_cols: | ||
| prev_vals = [v[ncol] for _, v in prev if ncol in v] | ||
| recent_vals = [v[ncol] for _, v in recent if ncol in v] | ||
| if len(prev_vals) < 2 or len(recent_vals) < 2: | ||
| continue | ||
| prev_mean = sum(prev_vals) / len(prev_vals) | ||
| recent_mean = sum(recent_vals) / len(recent_vals) | ||
| baseline = max(abs(prev_mean), 1e-9) | ||
| change_ratio = abs(recent_mean - prev_mean) / baseline | ||
| if change_ratio < 0.5: | ||
| continue | ||
| score = round(min(100.0, change_ratio * 100.0), 2) | ||
| candidate = { | ||
| 'rule': '최근 급변', | ||
| 'score': score, | ||
| 'reason': f"{Path(path).name}:{ncol} 최근 평균 급변 ({change_ratio:.1%})", | ||
| 'evidence': { | ||
| 'date_column': date_col, | ||
| 'column': ncol, | ||
| 'prev_mean': round(prev_mean, 4), | ||
| 'recent_mean': round(recent_mean, 4), | ||
| 'change_ratio': round(change_ratio, 4), | ||
| }, | ||
| } | ||
| if best is None or candidate['score'] > best['score']: | ||
| best = candidate | ||
| return best | ||
|
|
||
|
|
||
| def generate_reason_candidates(path: str, csv_path: Path, profiles: dict[str, dict[str, Any]], top_k: int = 3) -> list[dict[str, Any]]: | ||
| candidates = [ | ||
| _rule_missing_concentration(path, profiles), | ||
| _rule_category_bias(path, profiles), | ||
| _rule_unit_mismatch(path, profiles), | ||
| _rule_recent_change(path, csv_path, profiles), | ||
| ] | ||
| filtered = [c for c in candidates if c is not None] | ||
| filtered.sort(key=lambda x: (x.get('score', 0.0), x.get('rule', '')), reverse=True) | ||
| return filtered[:top_k] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| from bitnet_tools.multi_csv import analyze_multiple_csv | ||
|
|
||
|
|
||
| def test_reason_candidates_score_sort_and_reason_text(tmp_path): | ||
| p = tmp_path / 'reason_case.csv' | ||
| p.write_text( | ||
| '\n'.join( | ||
| [ | ||
| 'dt,cat,amount,desc', | ||
| '2024-01-01,A,100,10kg', | ||
| '2024-01-02,A,110,12kg', | ||
| '2024-01-03,A,120,1lb', | ||
| '2024-01-04,A,,3kg', | ||
| '2024-01-05,B,105,2lb', | ||
| '2024-01-06,A,,5kg', | ||
| '2024-01-07,A,1000,2lb', | ||
| '2024-01-08,A,1200,6kg', | ||
| '2024-01-09,A,1300,7kg', | ||
| '2024-01-10,A,1400,8kg', | ||
| ] | ||
| ) | ||
| + '\n', | ||
| encoding='utf-8', | ||
| ) | ||
|
|
||
| result = analyze_multiple_csv([p], '이상치 이유를 보여줘') | ||
| reasons = result['reason_candidates'] | ||
|
|
||
| assert 1 <= len(reasons) <= 3 | ||
| assert reasons == sorted(reasons, key=lambda x: x['score'], reverse=True) | ||
|
|
||
| for reason in reasons: | ||
| assert reason['score'] > 0 | ||
| assert reason['rule'] | ||
| assert reason['reason'] | ||
|
|
||
| reason_text = ' '.join(r['reason'] for r in reasons) | ||
| assert any(token in reason_text for token in ['결측', '편중', '단위', '급변']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line loads the entire file into memory before computing recent-change signals, so every dataset with both a date and numeric column incurs an O(n) in-memory copy in addition to profiling; on large CSVs this can significantly slow analysis or trigger memory failures. Since
analyze_multiple_csvis otherwise stream-oriented, this regression can break large-file workflows and should be replaced with incremental windowed processing.Useful? React with 👍 / 👎.