# FileCharCountDistributionStatistic 开发笔记

本 notebook 演示如何开发 FileCharCount 字符区间统计卡片，
并在快速上手框架中调试单次运行与趋势分析的呈现效果。

## 数据准备

运行下方单元以加载示例运行数据。根据实际情况调整 `repo_name` 和 `run_name`，以便查看不同执行结果。

In [None]:
import sys
from pathlib import Path

NOTEBOOKS_DIR = Path.cwd().resolve().parent
if str(NOTEBOOKS_DIR) not in sys.path:
    sys.path.insert(0, str(NOTEBOOKS_DIR))

from quickstart_dashboard import RunDataLoader

loader = RunDataLoader(base_dir="../../artifacts")
repos = loader.list_repos()

if not repos:
    print("⚠️ 未找到任何项目，请确认 ../../artifacts 目录存在分析结果。")
    repo_name = None
    run_name = None
    sample_run = None
    sample_history = None
else:
    repo_name = repos[0]
    print(f"使用示例项目: {repo_name}")
    runs = loader.list_runs(repo_name)
    if not runs:
        print("⚠️ 项目下暂未找到运行记录。")
        run_name = None
        sample_run = None
        sample_history = None
    else:
        run_name = runs[0]
        print(f"使用示例运行: {run_name}")
        sample_run = loader.load_run(repo_name, run_name)
        sample_history = loader.load_history(repo_name, limit=20)


## 定义统计卡片

在下方代码单元中实现字符区间统计逻辑。运行后即可在当前会话中使用该类。

In [None]:
from typing import List, Optional, Sequence

import pandas as pd
import ipywidgets as widgets
from IPython.display import display

from quickstart_dashboard import BaseStatistic, RunData, RunHistory, _card_container


class FileCharCountDistributionStatistic(BaseStatistic):
    """Bucket files by character count and highlight diff additions/modifications."""

    BINS: Sequence[tuple[str, int, Optional[int]]] = (
        ("<1k", 0, 1_000),
        ("1k-2k", 1_000, 2_000),
        ("2k-3k", 2_000, 3_000),
        ("3k-10k", 3_000, 10_000),
        ("10k-50k", 10_000, 50_000),
        (">=50k", 50_000, None),
    )

    def __init__(self) -> None:
        self.name = "文件字符区间"
        self.description = "基于 FileCharCount 分析结果统计字符区间，并区分新增/变更文件数量"

    # -- helpers -----------------------------------------------------------

    def _load_char_counts(self, run: RunData) -> Optional[pd.DataFrame]:
        df = run.dataframes.get("analysis_results_df")
        if df is None or df.empty:
            return None

        if "analyzer_type" not in df.columns or "count" not in df.columns:
            return None

        filtered = df[df["analyzer_type"] == "FileCharCount"].copy()
        if filtered.empty:
            return None

        filtered["count"] = pd.to_numeric(filtered["count"], errors="coerce")
        filtered.dropna(subset=["count"], inplace=True)
        if filtered.empty:
            return None

        if "commit_hash" not in filtered.columns:
            filtered["commit_hash"] = ""
        else:
            filtered["commit_hash"] = filtered["commit_hash"].fillna("").astype(str)

        if "path" not in filtered.columns:
            filtered["path"] = ""
        else:
            filtered["path"] = filtered["path"].fillna("").astype(str)
        return filtered

    def _attach_diff_metadata(self, run: RunData, df: pd.DataFrame) -> tuple[pd.DataFrame, str, bool]:
        run_type = (run.metadata or {}).get("run_type", "")
        df = df.copy()
        diff_info_available = False

        if run_type == "diff":
            diff_df = run.dataframes.get("diff_results_df")
            if diff_df is not None and not diff_df.empty:
                diff_df = diff_df.copy()
                for column in (
                    "target_path",
                    "source_path",
                    "diff_change_type",
                ):
                    if column not in diff_df.columns:
                        diff_df[column] = ""

                diff_df["target_path"] = diff_df["target_path"].fillna("").astype(str)
                diff_df["source_path"] = diff_df["source_path"].fillna("").astype(str)
                diff_df["diff_change_type"] = diff_df["diff_change_type"].fillna("").astype(str)

                diff_df["_merge_path"] = diff_df["target_path"].where(
                    diff_df["target_path"] != "",
                    diff_df["source_path"],
                )
                diff_map = (
                    diff_df[["_merge_path", "diff_change_type"]]
                    .rename(columns={"_merge_path": "path"})
                    .drop_duplicates(subset=["path"])
                )

                df = df.merge(diff_map, on="path", how="left")
                df["diff_change_type"] = df["diff_change_type"].replace("", pd.NA)
                if df["diff_change_type"].notna().any():
                    diff_info_available = True
            else:
                df["diff_change_type"] = pd.NA
        else:
            df["diff_change_type"] = pd.NA

        return df, str(run_type), diff_info_available

    def _bucket_label(self, value: float) -> str:
        for label, lower, upper in self.BINS:
            if value < lower:
                continue
            if upper is None or value < upper:
                return label
        return self.BINS[-1][0]

    def _summaries(self, run: RunData) -> Optional[dict[str, object]]:
        base_df = self._load_char_counts(run)
        if base_df is None:
            return None

        df, run_type, diff_info_available = self._attach_diff_metadata(run, base_df)
        if df.empty:
            return None

        df["range_label"] = df["count"].apply(self._bucket_label)

        rows: List[dict[str, object]] = []
        for label, _, _ in self.BINS:
            subset = df[df["range_label"] == label]
            total = int(subset.shape[0])
            added: Optional[int] = None
            changed: Optional[int] = None

            if run_type == "diff" and diff_info_available:
                added = int(subset["diff_change_type"].isin({"A"}).sum())
                changed = int(subset["diff_change_type"].isin({"M", "R"}).sum())

            rows.append(
                {
                    "range": label,
                    "total": total,
                    "added": added,
                    "changed": changed,
                }
            )

        return {
            "rows": rows,
            "run_type": run_type,
            "diff_info": diff_info_available,
        }

    def _render_distribution_table(self, rows: Sequence[dict[str, object]], diff_info: bool) -> widgets.HTML:
        header_cells = ["区间", "文件数", "新增", "变更"]
        html = [
            "<table style='border-collapse:collapse;font-size:12px;width:100%;max-width:520px;'>",
            "<thead><tr>",
        ]
        for cell in header_cells:
            html.append(
                f"<th style='border-bottom:1px solid #ddd;padding:6px 8px;text-align:left;color:#555;font-weight:600;'>{cell}</th>"
            )
        html.append("</tr></thead><tbody>")

        for row in rows:
            total = row["total"]
            added = row.get("added")
            changed = row.get("changed")
            html.append("<tr>")
            html.append(
                f"<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#333;'>{row['range']}</td>"
            )
            html.append(
                f"<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#333;'>{total}</td>"
            )
            if diff_info:
                html.append(
                    f"<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#333;'>{added if added is not None else 0}</td>"
                )
                html.append(
                    f"<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#333;'>{changed if changed is not None else 0}</td>"
                )
            else:
                html.append(
                    "<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#999;'>-</td>"
                )
                html.append(
                    "<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#999;'>-</td>"
                )
            html.append("</tr>")

        html.append("</tbody></table>")
        return widgets.HTML(value="".join(html))

    def _render_matrix_table(
        self, title: str, df: pd.DataFrame, label_order: Sequence[str]
    ) -> widgets.HTML:
        if df.empty:
            return widgets.HTML(value="")

        df = df.copy()
        df = df.reindex(self._bin_labels(), axis=0, fill_value=0)
        df = df[[label for label in label_order if label in df.columns]]

        html = [
            f"<div style='font-size:12px;color:#555;font-weight:600;margin:8px 0 4px;'>{title}</div>",
            "<table style='border-collapse:collapse;font-size:12px;width:100%;max-width:720px;'>",
            "<thead><tr>",
            "<th style='border-bottom:1px solid #ddd;padding:6px 8px;text-align:left;color:#555;font-weight:600;'>区间</th>",
        ]
        for col in df.columns:
            html.append(
                f"<th style='border-bottom:1px solid #ddd;padding:6px 8px;text-align:right;color:#555;font-weight:600;'>{col}</th>"
            )
        html.append("</tr></thead><tbody>")

        for idx in df.index:
            html.append("<tr>")
            html.append(
                f"<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#333;'>{idx}</td>"
            )
            for col in df.columns:
                value = int(df.loc[idx, col]) if pd.notna(df.loc[idx, col]) else ""
                html.append(
                    f"<td style='padding:6px 8px;border-bottom:1px solid #f0f0f0;color:#333;text-align:right;'>{value}</td>"
                )
            html.append("</tr>")

        html.append("</tbody></table>")
        return widgets.HTML(value="".join(html))

    def _bin_labels(self) -> List[str]:
        return [label for label, _, _ in self.BINS]

    # -- rendering ---------------------------------------------------------

    def render_single(self, run: RunData) -> widgets.Widget:
        summary = self._summaries(run)
        if summary is None:
            return _card_container(
                self.name,
                description=self.description,
                body_html="<div style='color:#666;'>暂无 FileCharCount 分析结果可用于统计。</div>",
                min_width="360px",
            )

        diff_run = summary["run_type"] == "diff"
        diff_info = diff_run and summary["diff_info"]
        if diff_info:
            note = "统计基于 diff 运行匹配的 FileCharCount 结果。"
        elif diff_run:
            note = "diff 运行未匹配到文件级变更，新增/变更列以 “-” 显示。"
        else:
            run_label = summary["run_type"] or "未知"
            note = f"当前运行类型为 {run_label}，新增/变更列以 “-” 显示。"

        table = self._render_distribution_table(summary["rows"], diff_info)
        return _card_container(
            self.name,
            description=self.description,
            body_widgets=[widgets.HTML(value=f"<div style='color:#666;font-size:12px;'>{note}</div>"), table],
            min_width="360px",
        )

    def render_trend(self, history: RunHistory) -> widgets.Widget:
        records: List[dict[str, object]] = []
        label_order: List[str] = []
        seen_labels: set[str] = set()
        diff_info_available = False

        for run in history.runs:
            summary = self._summaries(run)
            if summary is None:
                continue

            label = run.trend_label
            if label not in seen_labels:
                label_order.append(label)
                seen_labels.add(label)

            for row in summary["rows"]:
                records.append(
                    {
                        "range": row["range"],
                        "total": row["total"],
                        "added": row.get("added"),
                        "changed": row.get("changed"),
                        "label": label,
                        "run_type": summary["run_type"],
                        "diff_info": summary["diff_info"],
                    }
                )

            if summary["run_type"] == "diff" and summary["diff_info"]:
                diff_info_available = True

        if not records:
            return _card_container(
                self.name,
                description=self.description,
                body_html="<div style='color:#666;'>暂无 FileCharCount 历史数据可以展示。</div>",
                min_width="360px",
                flex="1 1 100%",
            )

        df = pd.DataFrame(records)
        df["range"] = pd.Categorical(df["range"], categories=self._bin_labels(), ordered=True)

        pivot_total = (
            df.pivot_table(
                index="range",
                columns="label",
                values="total",
                aggfunc="sum",
                fill_value=0,
                observed=False,
            )
            if not df.empty
            else pd.DataFrame()
        )

        children: List[widgets.Widget] = []

        if not pivot_total.empty:
            children.append(self._render_matrix_table("各运行的文件数量分布", pivot_total, label_order))

        if diff_info_available:
            diff_df = df[(df["run_type"] == "diff") & (df["diff_info"])]
            if not diff_df.empty:
                pivot_added = diff_df.pivot_table(
                    index="range",
                    columns="label",
                    values="added",
                    aggfunc="sum",
                    fill_value=0,
                    observed=False,
                )
                pivot_changed = diff_df.pivot_table(
                    index="range",
                    columns="label",
                    values="changed",
                    aggfunc="sum",
                    fill_value=0,
                    observed=False,
                )

                if not pivot_added.empty:
                    children.append(
                        self._render_matrix_table("新增文件分布", pivot_added, label_order)
                    )
                if not pivot_changed.empty:
                    children.append(
                        self._render_matrix_table("变更文件分布", pivot_changed, label_order)
                    )

        if not children:
            children.append(
                widgets.HTML(value="<div style='color:#666;'>暂无可视化数据。</div>")
            )

        return _card_container(
            self.name,
            description=self.description,
            body_widgets=children,
            min_width="360px",
            flex="1 1 100%",
        )


## 调试与预览

运行以下单元，在 notebook 中直接查看单次运行卡片和趋势统计，便于上线前快速验证。

In [None]:
if sample_run is None:
    print("⚠️ 没有可用的运行数据，无法预览单次统计卡片。")
else:
    stat = FileCharCountDistributionStatistic()
    widget = stat.render_single(sample_run)
    display(widget)


In [None]:
if sample_history is None or not sample_history.runs:
    print("⚠️ 没有足够的历史数据，无法绘制趋势图。")
else:
    stat = FileCharCountDistributionStatistic()
    widget = stat.render_trend(sample_history)
    display(widget)


## 导出到仪表盘

确认逻辑后，可以在 `quickstart_dashboard.ipynb` 中通过
`load_statistic_from_notebook("custom_statistics/file_char_count_distribution_statistic.ipynb")`
将此统计类加载并注册到仪表盘。