# Chrysalis CFG Side-by-Side (Bokeh)

This notebook visualizes control-flow for the **same function address** in:

- legit DB (`BluetoothService.exe.sqlite`)
- patched DB (`main_module_patched.exe.sqlite`)

It renders:

1. side-by-side CFG graphs (Bokeh)
2. side-by-side assembly listing diff

Reports for selected function are written to `notebooks/tables/db_diff_reports/`.


In [None]:
from __future__ import annotations

import sqlite3
from collections import defaultdict, deque
from pathlib import Path

import pandas as pd
from bokeh.io import output_notebook, show
from bokeh.layouts import row
from bokeh.models import ColumnDataSource, HoverTool, LabelSet
from bokeh.plotting import figure

output_notebook()

ROOT = Path.cwd()
if not (ROOT / "databases").exists() and (ROOT.parent / "databases").exists():
    ROOT = ROOT.parent

DB_DIR = ROOT / "databases"
REPORT_DIR = ROOT / "notebooks" / "tables" / "db_diff_reports"
REPORT_DIR.mkdir(parents=True, exist_ok=True)

dbs = sorted(DB_DIR.glob("*.sqlite"))
if len(dbs) < 2:
    raise RuntimeError(f"Need two .sqlite files in {DB_DIR}")

DB_LEGIT = next((p for p in dbs if "bluetoothservice" in p.name.lower()), dbs[0])
DB_PATCHED = next((p for p in dbs if "patched" in p.name.lower() or "main_module" in p.name.lower()), dbs[1])

PATCHED_CSV = ROOT / "notebooks" / "tables" / "db_diff_reports" / "patched_functions.csv"
if not PATCHED_CSV.exists():
    raise RuntimeError(f"Missing {PATCHED_CSV}. Run chrysalis_sqlite_diff_visual.ipynb first.")

patched_df = pd.read_csv(PATCHED_CSV)
patched_df = patched_df.sort_values(["inst_delta", "size_delta"], ascending=False).reset_index(drop=True)

print("LEGIT:", DB_LEGIT)
print("PATCHED:", DB_PATCHED)
print("Patched candidates:", len(patched_df))
display(patched_df[["address", "name_legit", "name_patched", "inst_delta", "size_delta"]].head(20))


In [None]:
# Pick target by address (default = top patched by instruction delta)

TARGET_ADDRESS = int(patched_df.iloc[0]["address"])
# Example manual override:
# TARGET_ADDRESS = 0x004863A0

print(f"TARGET_ADDRESS = 0x{TARGET_ADDRESS:08X}")


In [None]:
def get_func_by_address(conn: sqlite3.Connection, address: int):
    q = "SELECT id, COALESCE(name,''), COALESCE(address,0), COALESCE(size,0) FROM functions WHERE address=?"
    row = conn.execute(q, (int(address),)).fetchone()
    if row:
        return {"id": int(row[0]), "name": str(row[1]), "address": int(row[2]), "size": int(row[3])}
    return None


def load_cfg(conn: sqlite3.Connection, function_id: int):
    q_nodes = """
    SELECT fb.basic_block_id AS bb_id,
           COALESCE(bb.num, fb.basic_block_id) AS bb_num,
           COALESCE(bb.address, 0) AS bb_addr
    FROM function_bblocks fb
    LEFT JOIN basic_blocks bb ON bb.id = fb.basic_block_id
    WHERE fb.function_id = ?
    ORDER BY bb_num, bb_addr, bb_id
    """
    nodes_df = pd.read_sql_query(q_nodes, conn, params=[int(function_id)])
    if nodes_df.empty:
        return nodes_df, pd.DataFrame(columns=["src", "dst"]), {}

    bb_ids = [int(x) for x in nodes_df["bb_id"].tolist()]
    id_list = ",".join([str(x) for x in bb_ids])
    q_edges = f"""
    SELECT parent_id AS src, child_id AS dst
    FROM bb_relations
    WHERE parent_id IN ({id_list}) AND child_id IN ({id_list})
    """
    edges_df = pd.read_sql_query(q_edges, conn)

    asm_map = {}
    for bb_id in bb_ids:
        q_asm = """
        SELECT COALESCE(i.address,0) AS address, COALESCE(i.disasm,'') AS disasm
        FROM bb_instructions bi
        JOIN instructions i ON i.id = bi.instruction_id
        WHERE bi.basic_block_id = ?
        ORDER BY i.address
        """
        bb_asm = pd.read_sql_query(q_asm, conn, params=[int(bb_id)])
        if bb_asm.empty:
            asm_map[bb_id] = ""
        else:
            lines = [f"0x{int(a):08X}: {d}" for a, d in bb_asm[["address", "disasm"]].itertuples(index=False)]
            asm_map[bb_id] = "\n".join(lines[:25])

    return nodes_df, edges_df, asm_map


def load_function_instructions(conn: sqlite3.Connection, function_id: int):
    q = """
    SELECT COALESCE(address,0) AS address, COALESCE(disasm,'') AS disasm
    FROM instructions
    WHERE func_id = ?
    ORDER BY address
    """
    return pd.read_sql_query(q, conn, params=[int(function_id)])


def layered_layout(nodes_df: pd.DataFrame, edges_df: pd.DataFrame):
    bb_ids = [int(x) for x in nodes_df["bb_id"].tolist()]
    succ = defaultdict(list)
    pred = defaultdict(list)
    for s, d in edges_df[["src", "dst"]].itertuples(index=False):
        s = int(s); d = int(d)
        succ[s].append(d)
        pred[d].append(s)

    entry = None
    # Prefer lowest bb_num as entry, fallback lowest address
    if "bb_num" in nodes_df.columns:
        entry = int(nodes_df.sort_values(["bb_num", "bb_addr", "bb_id"]).iloc[0]["bb_id"])
    if entry is None:
        entry = int(nodes_df.sort_values(["bb_addr", "bb_id"]).iloc[0]["bb_id"])

    dist = {entry: 0}
    dq = deque([entry])
    while dq:
        u = dq.popleft()
        for v in succ.get(u, []):
            if v not in dist:
                dist[v] = dist[u] + 1
                dq.append(v)

    maxd = max(dist.values()) if dist else 0
    for bb in bb_ids:
        if bb not in dist:
            maxd += 1
            dist[bb] = maxd

    layer_map = defaultdict(list)
    for _, r in nodes_df.iterrows():
        bb = int(r["bb_id"])
        layer_map[dist[bb]].append((bb, int(r.get("bb_addr", 0))))

    coords = {}
    x_gap = 220
    y_gap = 90
    for layer in sorted(layer_map.keys()):
        layer_nodes = sorted(layer_map[layer], key=lambda x: x[1])
        for idx, (bb, _addr) in enumerate(layer_nodes):
            x = layer * x_gap
            y = -idx * y_gap
            coords[bb] = (x, y)
    return coords


def build_cfg_plot(title: str, nodes_df: pd.DataFrame, edges_df: pd.DataFrame, asm_map: dict, color: str):
    coords = layered_layout(nodes_df, edges_df)

    node_rows = []
    for _, r in nodes_df.iterrows():
        bb = int(r["bb_id"])
        num = int(r.get("bb_num", bb))
        addr = int(r.get("bb_addr", 0))
        x, y = coords[bb]
        asm_preview = asm_map.get(bb, "")
        node_rows.append({
            "bb_id": bb,
            "bb_num": num,
            "bb_num_label": str(num),
            "addr_hex": f"0x{addr:08X}",
            "x": x,
            "y": y,
            "asm_preview": asm_preview,
        })
    node_src = ColumnDataSource(pd.DataFrame(node_rows))

    edge_xs = []
    edge_ys = []
    for s, d in edges_df[["src", "dst"]].itertuples(index=False):
        s = int(s); d = int(d)
        if s in coords and d in coords:
            edge_xs.append([coords[s][0], coords[d][0]])
            edge_ys.append([coords[s][1], coords[d][1]])
    edge_src = ColumnDataSource({"xs": edge_xs, "ys": edge_ys})

    p = figure(title=title, width=760, height=560, tools="pan,wheel_zoom,box_zoom,reset,save,hover")
    p.xgrid.visible = False
    p.ygrid.visible = False
    p.axis.visible = False

    p.multi_line(xs="xs", ys="ys", source=edge_src, line_width=2, alpha=0.7, color="#888888")
    p.circle(x="x", y="y", size=22, source=node_src, color=color, alpha=0.9)

    labels = LabelSet(x="x", y="y", text="bb_num_label", source=node_src, x_offset=8, y_offset=8, text_font_size="9pt")
    p.add_layout(labels)

    hover = p.select_one(HoverTool)
    hover.tooltips = [
        ("BB ID", "@bb_id"),
        ("BB #", "@bb_num"),
        ("Address", "@addr_hex"),
        ("Assembly", "@asm_preview"),
    ]

    return p


In [None]:
with sqlite3.connect(DB_LEGIT) as c_legit, sqlite3.connect(DB_PATCHED) as c_patched:
    f_legit = get_func_by_address(c_legit, TARGET_ADDRESS)
    f_patched = get_func_by_address(c_patched, TARGET_ADDRESS)

    if not f_legit:
        raise RuntimeError(f"Address 0x{TARGET_ADDRESS:08X} not found in LEGIT DB")
    if not f_patched:
        raise RuntimeError(f"Address 0x{TARGET_ADDRESS:08X} not found in PATCHED DB")

    print("LEGIT FUNC:", f_legit)
    print("PATCHED FUNC:", f_patched)

    n_legit, e_legit, asm_legit = load_cfg(c_legit, f_legit["id"])
    n_patched, e_patched, asm_patched = load_cfg(c_patched, f_patched["id"])

    p1 = build_cfg_plot(
        f"LEGIT 0x{TARGET_ADDRESS:08X} {f_legit['name']}",
        n_legit, e_legit, asm_legit, color="#3B82F6"
    )
    p2 = build_cfg_plot(
        f"PATCHED 0x{TARGET_ADDRESS:08X} {f_patched['name']}",
        n_patched, e_patched, asm_patched, color="#EF4444"
    )
    show(row(p1, p2))

    i_legit = load_function_instructions(c_legit, f_legit["id"])
    i_patched = load_function_instructions(c_patched, f_patched["id"])

# Side-by-side listing by ordinal index
max_len = max(len(i_legit), len(i_patched))
asm_cmp = pd.DataFrame({"idx": range(max_len)})
asm_cmp = asm_cmp.merge(i_legit.reset_index().rename(columns={"index":"idx", "address":"legit_addr", "disasm":"legit_disasm"}), on="idx", how="left")
asm_cmp = asm_cmp.merge(i_patched.reset_index().rename(columns={"index":"idx", "address":"patched_addr", "disasm":"patched_disasm"}), on="idx", how="left")
asm_cmp["same_line"] = (asm_cmp["legit_disasm"].fillna("") == asm_cmp["patched_disasm"].fillna(""))

asm_cmp["legit_addr_hex"] = asm_cmp["legit_addr"].apply(lambda x: f"0x{int(x):08X}" if pd.notna(x) else "")
asm_cmp["patched_addr_hex"] = asm_cmp["patched_addr"].apply(lambda x: f"0x{int(x):08X}" if pd.notna(x) else "")

out_csv = REPORT_DIR / f"asm_side_by_side_0x{TARGET_ADDRESS:08X}.csv"
asm_cmp.to_csv(out_csv, index=False)
print("Wrote", out_csv)

display(asm_cmp[["idx", "legit_addr_hex", "legit_disasm", "patched_addr_hex", "patched_disasm", "same_line"]].head(300))
