Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hub-client/src/services/attribution-runs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ describe('buildCharToByteMap', () => {
// "é" is U+00E9, 2 bytes in UTF-8 (0xc3 0xa9).
const map = buildCharToByteMap('aéb');
// 'a' at char 0 → byte 0; 'é' at char 1 → byte 1; 'b' at char 2 → byte 3.
expect(map).toEqual([0, 1, 3, 4]);
expect(Array.from(map)).toEqual([0, 1, 3, 4]);
});

it('counts 3-byte UTF-8 sequences correctly (CJK)', () => {
// "中" is U+4E2D, 3 bytes in UTF-8 (0xe4 0xb8 0xad).
const map = buildCharToByteMap('a中b');
expect(map).toEqual([0, 1, 4, 5]);
expect(Array.from(map)).toEqual([0, 1, 4, 5]);
});

it('handles surrogate-pair (4-byte) codepoints', () => {
Expand Down
173 changes: 110 additions & 63 deletions hub-client/src/services/attribution-runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@
* correct in their own coordinate space.
*/

import { diff } from '@automerge/automerge';
import type { Heads } from '@automerge/automerge';
import {
applyChanges,
decodeChange,
getAllChanges,
getChanges,
init,
} from '@automerge/automerge';
import type { Change, Doc, Patch } from '@automerge/automerge';
import { decodeHeads } from '@automerge/automerge-repo';
import type { DocHandle } from '@automerge/automerge-repo';

Expand All @@ -48,6 +54,15 @@ export interface RunListAttribution {
runs: AttributionRun[];
processedHeads: unknown[];
processedHistoryIndex: number;
/**
* Internal: forward-replay doc held at `processedHeads`, fed to
* `A.applyChanges` so each incremental update only pays for new
* changes (not a full doc-state load per step like `A.diff` did).
* Absent when state is hand-constructed (tests), in which case
* `updateRunListAttribution` throws `HistoryCompactedError` and
* the caller (`useAttribution`) falls back to a full rebuild.
*/
_workDoc?: Doc<unknown>;
}

interface SplicePatch {
Expand Down Expand Up @@ -96,10 +111,12 @@ export function extractChangeHash(heads: unknown): string | null {

/**
* History entries processed between idle-callback yields. Larger
* values reduce the number of rIC round trips (faster
* time-to-attribution) but make each slice's CPU block bigger (more
* frame jank risk). 500 gives ~2.5 ms of CPU per slice at the
* prototype's bench-measured ~5 µs/entry.
* values cut yield overhead at the cost of bigger per-slice CPU.
*
* Per-entry CPU is ~15 µs (applyChanges forward-replay, roughly
* constant in N), so a 500-entry slice is ≈7-8 ms — comfortably
* under one frame (16.67 ms). Slices above ~1000 risk overrunning a
* frame in busier browsers.
*/
export const CHUNK_SIZE = 500;

Expand Down Expand Up @@ -227,6 +244,45 @@ function applyPatchToRuns(
}
}

// ---------------------------------------------------------------------------
// Internal: shared replay loop
// ---------------------------------------------------------------------------

/**
* The new change introduced at this history step is whichever hash is in
* `currHeads` but not in `prevHeads`. For the first step, take the first
* head. Returns null if no new change can be identified (defensive).
*/
function newChangeHashAt(prevHeads: string[] | null, currHeads: string[]): string | null {
if (currHeads.length === 0) return null;
if (prevHeads === null) return currHeads[0];
const prevSet = new Set(prevHeads);
return currHeads.find(h => !prevSet.has(h)) ?? null;
}

/**
* Apply one change to `workDoc`, collect any patches via patchCallback,
* and fold them into the running runs list using the change's own
* actor/time. Returns the advanced workDoc.
*/
function replayChange(
workDoc: Doc<unknown>,
change: Change,
textFieldName: string,
runs: AttributionRun[],
): Doc<unknown> {
const decoded = decodeChange(change);
const attribution: CharAttribution = { actor: decoded.actor, time: decoded.time };
let collected: Patch[] = [];
const [next] = applyChanges(workDoc, [change], {
patchCallback: (patches: Patch[]) => { collected = patches; },
});
for (const patch of collected) {
if (isTextPatch(patch, textFieldName)) applyPatchToRuns(runs, patch, attribution);
}
return next;
}

// ---------------------------------------------------------------------------
// buildRunListAttribution — full history processing
// ---------------------------------------------------------------------------
Expand All @@ -241,59 +297,46 @@ export async function buildRunListAttribution(
if (!history) return null;

if (history.length === 0) {
return { runs: [], processedHeads: [], processedHistoryIndex: 0 };
return { runs: [], processedHeads: [], processedHistoryIndex: 0, _workDoc: init() };
}

// Pre-index every change in the doc by hash so each history step can
// look up its corresponding change in O(1).
const doc = viewable.doc() as Doc<unknown>;
const changeByHash = new Map<string, Change>();
for (const c of getAllChanges(doc)) {
changeByHash.set(decodeChange(c).hash, c);
}

const runs: AttributionRun[] = [];
let prevHeads: unknown = null;
let prevHeads: string[] | null = null;
let lastHeads: unknown[] = [];
let workDoc: Doc<unknown> = init();

for (let chunkStart = 0; chunkStart < history.length; chunkStart += CHUNK_SIZE) {
await waitForIdle();
if (signal?.aborted) return null;

const chunkEnd = Math.min(chunkStart + CHUNK_SIZE, history.length);
for (let i = chunkStart; i < chunkEnd; i++) {
const currHeads = history[i];
const changeHash = extractChangeHash(currHeads);
const meta = changeHash ? viewable.metadata(changeHash) : undefined;
const attribution: CharAttribution = {
actor: meta?.actor ?? 'unknown',
time: meta?.time ?? 0,
};

const decodedCurr = decodeHeads(currHeads as Parameters<typeof decodeHeads>[0]);
let patches: unknown[];
if (prevHeads === null) {
patches = diff(
viewable.doc() as Parameters<typeof diff>[0],
[] as unknown as Heads,
decodedCurr as unknown as Heads,
);
} else {
const decodedPrev = decodeHeads(prevHeads as Parameters<typeof decodeHeads>[0]);
patches = diff(
viewable.doc() as Parameters<typeof diff>[0],
decodedPrev as unknown as Heads,
decodedCurr as unknown as Heads,
);
const newHash = newChangeHashAt(prevHeads, decodedCurr);
const change = newHash ? changeByHash.get(newHash) : undefined;
if (change) {
workDoc = replayChange(workDoc, change, textFieldName, runs);
}

for (const patch of patches) {
if (isTextPatch(patch, textFieldName)) {
applyPatchToRuns(runs, patch, attribution);
}
}

prevHeads = currHeads;
prevHeads = decodedCurr;
lastHeads = Array.isArray(currHeads) ? currHeads : [currHeads];
}

if (chunkEnd < history.length) await waitForIdle();
}

return {
runs,
processedHeads: lastHeads as unknown[],
processedHistoryIndex: history.length,
_workDoc: workDoc,
};
}

Expand All @@ -310,42 +353,42 @@ export function updateRunListAttribution(
const history = viewable.history();
if (!history) throw new HistoryCompactedError();
if (state.processedHistoryIndex > history.length) throw new HistoryCompactedError();
if (!state._workDoc) throw new HistoryCompactedError();

if (state.processedHistoryIndex === history.length) {
return state;
}

// Pull just the new changes (since workDoc's heads), index by hash.
const doc = viewable.doc() as Doc<unknown>;
const newChanges = getChanges(state._workDoc, doc);
const changeByHash = new Map<string, Change>();
for (const c of newChanges) {
changeByHash.set(decodeChange(c).hash, c);
}

const runs = state.runs.map(r => ({ ...r }));
let prevHeads = state.processedHeads;
let lastHeads = state.processedHeads;
let prevHeads = decodeHeads(state.processedHeads as Parameters<typeof decodeHeads>[0]);
let lastHeads: unknown[] = state.processedHeads;
let workDoc: Doc<unknown> = state._workDoc;

for (let i = state.processedHistoryIndex; i < history.length; i++) {
const currHeads = history[i];
const changeHash = extractChangeHash(currHeads);
const meta = changeHash ? viewable.metadata(changeHash) : undefined;
const attribution: CharAttribution = {
actor: meta?.actor ?? 'unknown',
time: meta?.time ?? 0,
};

const decodedPrev = decodeHeads(prevHeads as Parameters<typeof decodeHeads>[0]);
const decodedCurr = decodeHeads(currHeads as Parameters<typeof decodeHeads>[0]);
const patches = diff(
viewable.doc() as Parameters<typeof diff>[0],
decodedPrev as unknown as Heads,
decodedCurr as unknown as Heads,
);

for (const patch of patches) {
if (isTextPatch(patch, textFieldName)) {
applyPatchToRuns(runs, patch, attribution);
}
const newHash = newChangeHashAt(prevHeads, decodedCurr);
const change = newHash ? changeByHash.get(newHash) : undefined;
if (change) {
workDoc = replayChange(workDoc, change, textFieldName, runs);
}

prevHeads = currHeads as unknown[];
prevHeads = decodedCurr;
lastHeads = Array.isArray(currHeads) ? currHeads : [currHeads];
}

return {
runs,
processedHeads: lastHeads as unknown[],
processedHistoryIndex: history.length,
_workDoc: workDoc,
};
}

Expand All @@ -367,9 +410,13 @@ export function updateRunListAttribution(
* ASCII-only docs: map is the identity. Non-ASCII docs require this
* translation for correctness — a missing translation would silently
* misattribute any range past the first multi-byte character.
*
* Returned as `Uint32Array` so the per-codeunit storage is a single
* contiguous buffer of 32-bit ints rather than boxed `number` slots —
* matters because this is rebuilt on every debounced payload update.
*/
export function buildCharToByteMap(text: string): number[] {
const map = new Array<number>(text.length + 1);
export function buildCharToByteMap(text: string): Uint32Array {
const map = new Uint32Array(text.length + 1);
let byteOff = 0;
for (let i = 0; i < text.length; i++) {
map[i] = byteOff;
Expand Down Expand Up @@ -399,7 +446,7 @@ export function buildCharToByteMap(text: string): number[] {
*/
export function runsCharToByteOffsets(
runs: AttributionRun[],
charToByte: number[],
charToByte: Uint32Array,
): AttributionRun[] {
return runs.map(r => ({
start: charToByte[r.start] ?? r.start,
Expand Down