Skip to content

Commit a72cc21

Browse files
committed
Add graph reachability example, e2e test, and clarify paper assumptions
1 parent a89264d commit a72cc21

File tree

7 files changed

+598
-7
lines changed

7 files changed

+598
-7
lines changed

examples/LiveHarness.res

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ module Client = {
4545
let updateInput = (broker, entries) =>
4646
SkipruntimeHelpers.update(broker, "numbers", entries)
4747

48+
let updateEdges = (broker, entries) =>
49+
SkipruntimeHelpers.update(broker, "edges", entries)
50+
4851
let getStreamUrl = async (opts: SkipruntimeServer.runOptions, broker, resource) => {
4952
let uuid = await SkipruntimeHelpers.getStreamUUID(broker, resource, None)
5053
`http://${localhost}:${opts.streaming_port->Int.toString}/v1/streams/${uuid}`
@@ -138,16 +141,30 @@ let run = async () => {
138141
await Client.snapshot(broker, "numbers", "harness: numbers")
139142
await Client.snapshot(broker, "doubled", "harness: doubled")
140143
await Client.snapshot(broker, "sum", "harness: sum")
144+
await Client.snapshot(broker, "dead", "harness: dead code (unreachable nodes)")
141145
Console.log2("harness: counters after initial snapshot", Server.getRunStats())
142146

143147
// Phase 2: Update c from 3 to 5.
144148
await Client.updateInput(broker, [(JSON.String("c"), [JSON.Number(5.)])])
145149
await Client.snapshot(broker, "numbers", "harness: numbers after c→5")
146150
await Client.snapshot(broker, "doubled", "harness: doubled after c→5")
147151
await Client.snapshot(broker, "sum", "harness: sum after c→5")
152+
await Client.snapshot(broker, "dead", "harness: dead code after c→5 (unchanged)")
148153
Console.log2("harness: client sum after c→5 (from SSE)", ClientSum.getTotal())
149154
Console.log2("harness: counters after c→5", Server.getRunStats())
150155

156+
// Phase 3: Remove an edge to create dead code.
157+
await Client.updateEdges(broker, [
158+
(
159+
JSON.String("fileA"),
160+
[
161+
// drop util -> lib edge to make lib/helper unreachable
162+
JSON.Array([JSON.String("main"), JSON.String("util")]),
163+
],
164+
),
165+
])
166+
await Client.snapshot(broker, "dead", "harness: dead code after dropping util→lib")
167+
151168
ClientSum.close()
152169
await Server.stop(server)
153170
Console.log("harness: service closed")

examples/LiveHarness.res.js

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/LiveHarnessService.js

Lines changed: 228 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,198 @@ const log = (...args) => {
33
if (ENABLE_LOGS)
44
console.log(...args);
55
};
6+
// Utilities for graph state manipulation (JSON-friendly in the accumulator; Sets/Maps for computation).
7+
const toSets = (state) => {
8+
const nodes = new Set(state.nodes);
9+
const roots = new Set(state.roots);
10+
const reachable = new Set(state.reachable);
11+
const edgesOut = new Map();
12+
for (const [k, vs] of Object.entries(state.edgesOut)) {
13+
edgesOut.set(k, new Set(vs));
14+
}
15+
const incoming = new Map();
16+
for (const [k, v] of Object.entries(state.incoming)) {
17+
incoming.set(k, v);
18+
}
19+
return { nodes, roots, reachable, edgesOut, incoming };
20+
};
21+
const fromSets = (s) => ({
22+
nodes: Array.from(s.nodes),
23+
roots: Array.from(s.roots),
24+
reachable: Array.from(s.reachable),
25+
edgesOut: Object.fromEntries(Array.from(s.edgesOut.entries()).map(([k, vs]) => [k, Array.from(vs)])),
26+
incoming: Object.fromEntries(Array.from(s.incoming.entries())),
27+
});
28+
const ensureNode = (s, id) => {
29+
s.nodes.add(id);
30+
if (!s.edgesOut.has(id))
31+
s.edgesOut.set(id, new Set());
32+
if (!s.incoming.has(id))
33+
s.incoming.set(id, 0);
34+
};
35+
const propagateReachable = (s, start) => {
36+
const queue = [...start];
37+
while (queue.length > 0) {
38+
const u = queue.pop();
39+
if (s.reachable.has(u))
40+
continue;
41+
s.reachable.add(u);
42+
const outs = s.edgesOut.get(u);
43+
if (!outs)
44+
continue;
45+
for (const v of outs) {
46+
ensureNode(s, v);
47+
const newCount = (s.incoming.get(v) ?? 0) + 1;
48+
s.incoming.set(v, newCount);
49+
if (!s.reachable.has(v)) {
50+
queue.push(v);
51+
}
52+
}
53+
}
54+
};
55+
const propagateUnreachable = (s, start) => {
56+
const queue = [...start];
57+
while (queue.length > 0) {
58+
const u = queue.pop();
59+
if (!s.reachable.has(u))
60+
continue;
61+
if (s.roots.has(u))
62+
continue; // roots stay reachable
63+
const incoming = s.incoming.get(u) ?? 0;
64+
if (incoming > 0)
65+
continue;
66+
s.reachable.delete(u);
67+
const outs = s.edgesOut.get(u);
68+
if (!outs)
69+
continue;
70+
for (const v of outs) {
71+
const newCount = Math.max(0, (s.incoming.get(v) ?? 0) - 1);
72+
s.incoming.set(v, newCount);
73+
if (newCount === 0 && !s.roots.has(v)) {
74+
queue.push(v);
75+
}
76+
}
77+
}
78+
};
79+
class GraphReducer {
80+
constructor() {
81+
this.initial = {
82+
nodes: [],
83+
roots: [],
84+
reachable: [],
85+
edgesOut: {},
86+
incoming: {},
87+
};
88+
}
89+
add(acc, value) {
90+
const state = toSets(acc ?? this.initial);
91+
if (value.kind === "node") {
92+
ensureNode(state, value.id);
93+
}
94+
else if (value.kind === "root") {
95+
ensureNode(state, value.id);
96+
if (!state.roots.has(value.id)) {
97+
state.roots.add(value.id);
98+
propagateReachable(state, [value.id]);
99+
}
100+
}
101+
else {
102+
ensureNode(state, value.from);
103+
ensureNode(state, value.to);
104+
const outs = state.edgesOut.get(value.from);
105+
if (!outs.has(value.to)) {
106+
outs.add(value.to);
107+
if (state.reachable.has(value.from)) {
108+
const newCount = (state.incoming.get(value.to) ?? 0) + 1;
109+
state.incoming.set(value.to, newCount);
110+
if (!state.reachable.has(value.to)) {
111+
propagateReachable(state, [value.to]);
112+
}
113+
}
114+
}
115+
}
116+
return fromSets(state);
117+
}
118+
remove(acc, value) {
119+
const state = toSets(acc ?? this.initial);
120+
if (value.kind === "node") {
121+
// Removing a node: drop from sets; edges incident to it are left intact for simplicity.
122+
state.nodes.delete(value.id);
123+
state.roots.delete(value.id);
124+
if (state.reachable.has(value.id)) {
125+
state.reachable.delete(value.id);
126+
propagateUnreachable(state, Array.from(state.edgesOut.get(value.id) ?? []));
127+
}
128+
state.edgesOut.delete(value.id);
129+
state.incoming.delete(value.id);
130+
}
131+
else if (value.kind === "root") {
132+
state.roots.delete(value.id);
133+
if (state.roots.has(value.id)) {
134+
// no-op if still a root via another contribution
135+
}
136+
else if (state.reachable.has(value.id)) {
137+
// Remove reachability if no other incoming
138+
if ((state.incoming.get(value.id) ?? 0) === 0) {
139+
propagateUnreachable(state, [value.id]);
140+
}
141+
}
142+
}
143+
else {
144+
const outs = state.edgesOut.get(value.from);
145+
if (outs && outs.has(value.to)) {
146+
outs.delete(value.to);
147+
if (state.reachable.has(value.from)) {
148+
const newCount = Math.max(0, (state.incoming.get(value.to) ?? 0) - 1);
149+
state.incoming.set(value.to, newCount);
150+
if (newCount === 0 && !state.roots.has(value.to)) {
151+
propagateUnreachable(state, [value.to]);
152+
}
153+
}
154+
}
155+
}
156+
return fromSets(state);
157+
}
158+
}
159+
class EdgeToGraphMapper {
160+
mapEntry(_key, values, _ctx) {
161+
return values.toArray().map(([from, to]) => ["graph", { kind: "edge", from, to }]);
162+
}
163+
}
164+
class NodeToGraphMapper {
165+
mapEntry(_key, values, _ctx) {
166+
return values.toArray().map((id) => ["graph", { kind: "node", id }]);
167+
}
168+
}
169+
class RootToGraphMapper {
170+
mapEntry(_key, values, _ctx) {
171+
return values.toArray().map((id) => ["graph", { kind: "root", id }]);
172+
}
173+
}
6174
// Mapper: multiply numeric values by 2, keep the same key.
7175
class DoubleMapper {
8-
static runs = 0;
9176
mapEntry(key, values, _ctx) {
10177
DoubleMapper.runs += 1;
11178
log("mapper:doubled run", DoubleMapper.runs, "key", key);
12179
const n = values.getUnique();
13180
return [[key, n * 2]];
14181
}
15182
}
183+
DoubleMapper.runs = 0;
16184
// Mapper for sum: emit all values under a single "total" key.
17185
class TotalMapper {
18-
static runs = 0;
19186
mapEntry(_key, values, _ctx) {
20187
TotalMapper.runs += 1;
21188
log("mapper:total run", TotalMapper.runs);
22189
return values.toArray().map((v) => ["total", v]);
23190
}
24191
}
192+
TotalMapper.runs = 0;
25193
// Reducer for sum: correctly implements add/remove.
26194
class SumReducer {
27-
static runsAdd = 0;
28-
static runsRemove = 0;
29-
initial = 0;
195+
constructor() {
196+
this.initial = 0;
197+
}
30198
add(acc, value) {
31199
SumReducer.runsAdd += 1;
32200
log("reducer:sum add", SumReducer.runsAdd);
@@ -38,6 +206,8 @@ class SumReducer {
38206
return acc - value;
39207
}
40208
}
209+
SumReducer.runsAdd = 0;
210+
SumReducer.runsRemove = 0;
41211
class NumbersResource {
42212
instantiate(collections) {
43213
return collections.numbers;
@@ -53,6 +223,20 @@ class SumResource {
53223
return collections.numbers.map(TotalMapper).reduce(SumReducer);
54224
}
55225
}
226+
class DeadNodesResource {
227+
instantiate(collections) {
228+
// Map graph state to dead-node list: nodes minus reachable.
229+
class DeadMapper {
230+
mapEntry(key, values, _ctx) {
231+
const state = values.getUnique();
232+
const reachable = new Set(state.reachable);
233+
const dead = state.nodes.filter((n) => !reachable.has(n));
234+
return [[key, dead]];
235+
}
236+
}
237+
return collections.graphState.map(DeadMapper);
238+
}
239+
}
56240
export const service = {
57241
initialData: {
58242
numbers: [
@@ -67,13 +251,51 @@ export const service = {
67251
["i", [9]],
68252
["j", [10]],
69253
],
254+
edges: [
255+
["fileA", [
256+
["main", "util"],
257+
["util", "lib"],
258+
]],
259+
["fileB", [["lib", "helper"]]],
260+
["fileC", []],
261+
],
262+
nodes: [
263+
["fileA", ["main", "util"]],
264+
["fileB", ["lib", "helper"]],
265+
["fileC", ["unused"]],
266+
],
267+
roots: [
268+
["fileA", ["main"]],
269+
["fileB", []],
270+
["fileC", []],
271+
],
70272
},
71273
resources: {
72274
numbers: NumbersResource,
73275
doubled: DoubledResource,
74276
sum: SumResource,
277+
dead: DeadNodesResource,
278+
},
279+
createGraph: (inputs) => {
280+
const toGraphEdges = inputs.edges.map(EdgeToGraphMapper);
281+
const toGraphNodes = inputs.nodes.map(NodeToGraphMapper);
282+
const toGraphRoots = inputs.roots.map(RootToGraphMapper);
283+
const graphInputs = toGraphEdges.merge(toGraphNodes, toGraphRoots);
284+
const graphState = graphInputs.reduce(GraphReducer);
285+
return {
286+
...inputs,
287+
graphInputs,
288+
graphState,
289+
deadNodes: graphState.map(class DeadMapper {
290+
mapEntry(key, values, _ctx) {
291+
const state = values.getUnique();
292+
const reachable = new Set(state.reachable);
293+
const dead = state.nodes.filter((n) => !reachable.has(n));
294+
return [[key, dead]];
295+
}
296+
}),
297+
};
75298
},
76-
createGraph: (inputs) => inputs,
77299
};
78300
export const resetRunStats = () => {
79301
DoubleMapper.runs = 0;

0 commit comments

Comments
 (0)