Skip to content

Commit deadb51

Browse files
committed
Maintain multiple websockets with smaller queries
1 parent 22c1bf3 commit deadb51

File tree

2 files changed

+160
-120
lines changed

2 files changed

+160
-120
lines changed

ui/public/topologies/prototype.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ nodes:
1212
- 100
1313
producers:
1414
UpstreamNode:
15-
latency-ms: 17.0
15+
latency-ms: 200.0
1616
DownstreamNode:
1717
stake: 0
1818
location:
1919
- 0
2020
- 200
2121
producers:
2222
Node0:
23-
latency-ms: 32.0
23+
latency-ms: 200.0

ui/src/components/Sim/hooks/useLokiWebSocket.ts

Lines changed: 158 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,18 @@ import {
44
EServerMessageType,
55
IRankingBlockSent,
66
} from "@/components/Sim/types";
7-
import { useCallback, useEffect, useRef, useState } from "react";
7+
import { useRef } from "react";
8+
9+
interface QueryConfig {
10+
query: string;
11+
parser: (
12+
streamLabels: any,
13+
timestamp: number,
14+
logLine: string,
15+
) => IServerMessage | null;
16+
}
17+
18+
// FIXME: latency in topology is wrong
819

920
// TODO: Replace with topology-based mapping
1021
const HOST_PORT_TO_NODE: Record<string, string> = {
@@ -14,22 +25,22 @@ const HOST_PORT_TO_NODE: Record<string, string> = {
1425
// Add more mappings as needed
1526
};
1627

17-
const parseCardanoNodeLog = (
28+
const parseBlockFetchServerLog = (
1829
streamLabels: any,
1930
timestamp: number,
2031
logLine: string,
2132
): IServerMessage | null => {
2233
try {
2334
const logData = JSON.parse(logLine);
2435

25-
// Handle MsgBlock with Send direction
26-
if (logData.msg === "MsgBlock" && logData.direction === "Send") {
36+
// Handle BlockFetchServer kind
37+
if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) {
2738
// Extract sender from stream labels (process name)
2839
const sender = streamLabels.process;
2940

3041
// Parse connection to extract recipient
31-
// connectionId format: "127.0.0.1:3001 127.0.0.1:3002"
32-
const connectionId = logData.connectionId;
42+
// connectionId format: "127.0.0.1:3002 127.0.0.1:3003"
43+
const connectionId = logData.peer.connectionId;
3344
let recipient = "Node0"; // fallback
3445

3546
if (connectionId) {
@@ -44,8 +55,8 @@ const parseCardanoNodeLog = (
4455

4556
const message: IRankingBlockSent = {
4657
type: EServerMessageType.RBSent,
47-
slot: logData.prevCount || 0, // FIXME: Use proper slot number
48-
id: `rb-${logData.prevCount + 1}`, // FIXME: use proper block hash
58+
slot: 0, // FIXME: Use proper slot number
59+
id: `rb-blockfetch-${logData.block.substring(0, 8)}`,
4960
sender,
5061
recipient,
5162
};
@@ -55,15 +66,29 @@ const parseCardanoNodeLog = (
5566
message,
5667
};
5768
}
69+
} catch (error) {
70+
console.warn("Failed to parse BlockFetchServer log line:", logLine, error);
71+
}
5872

59-
// Handle BlockFetchServer kind
60-
if (logData.kind === "BlockFetchServer" && logData.peer && logData.block) {
73+
return null;
74+
};
75+
76+
const parseUpstreamNodeLog = (
77+
streamLabels: any,
78+
timestamp: number,
79+
logLine: string,
80+
): IServerMessage | null => {
81+
try {
82+
const logData = JSON.parse(logLine);
83+
84+
// Handle MsgBlock with Send direction
85+
if (logData.msg === "MsgBlock" && logData.direction === "Send") {
6186
// Extract sender from stream labels (process name)
6287
const sender = streamLabels.process;
6388

6489
// Parse connection to extract recipient
65-
// connectionId format: "127.0.0.1:3002 127.0.0.1:3003"
66-
const connectionId = logData.peer.connectionId;
90+
// connectionId format: "127.0.0.1:3001 127.0.0.1:3002"
91+
const connectionId = logData.connectionId;
6792
let recipient = "Node0"; // fallback
6893

6994
if (connectionId) {
@@ -78,8 +103,8 @@ const parseCardanoNodeLog = (
78103

79104
const message: IRankingBlockSent = {
80105
type: EServerMessageType.RBSent,
81-
slot: 0, // FIXME: Use proper slot number
82-
id: `rb-${logData.block.substring(0, 8)}`,
106+
slot: logData.prevCount || 0, // FIXME: Use proper slot number
107+
id: `rb-upstream-${logData.prevCount + 1}`, // FIXME: use proper block hash
83108
sender,
84109
recipient,
85110
};
@@ -90,130 +115,145 @@ const parseCardanoNodeLog = (
90115
};
91116
}
92117
} catch (error) {
93-
console.warn("Failed to parse log line:", logLine, error);
118+
console.warn("Failed to parse UpstreamNode log line:", logLine, error);
94119
}
95120

96121
return null;
97122
};
98123

99-
export const useLokiWebSocket = () => {
100-
const {
101-
state: { lokiHost },
102-
dispatch,
103-
} = useSimContext();
104-
const [connecting, setConnecting] = useState(false);
105-
const [connected, setConnected] = useState(false);
106-
const wsRef = useRef<WebSocket | null>(null);
124+
// Query configurations
125+
const QUERY_CONFIGS: QueryConfig[] = [
126+
{
127+
query: '{service="cardano-node", ns="BlockFetch.Server.SendBlock"}',
128+
parser: parseBlockFetchServerLog,
129+
},
130+
{
131+
query: '{service="cardano-node", process="UpstreamNode"} |= `MsgBlock`',
132+
parser: parseUpstreamNodeLog,
133+
},
134+
];
107135

108-
const connect = useCallback(() => {
109-
if (!lokiHost || connecting || connected) return;
136+
function connectLokiWebSockets(lokiHost: string, dispatch: any): () => void {
137+
const websockets: WebSocket[] = [];
138+
let connectedCount = 0;
110139

111-
setConnecting(true);
112-
dispatch({ type: "RESET_TIMELINE" });
140+
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
113141

114-
try {
115-
// TODO: Multiple websockets instead? e.g. query={ns="BlockFetch.Client.CompletedBlockFetch"}
116-
const query = encodeURIComponent('{service="cardano-node"}');
117-
const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}&limit=10000`;
118-
console.log("Connecting to ", wsUrl);
119-
const ws = new WebSocket(wsUrl);
120-
wsRef.current = ws;
121-
122-
let count = 0;
123-
ws.onopen = () => {
124-
setConnecting(false);
125-
setConnected(true);
142+
const createWebSocket = (config: QueryConfig, index: number): WebSocket => {
143+
const query = encodeURIComponent(config.query);
144+
const wsUrl = `ws://${lokiHost}/loki/api/v1/tail?query=${query}`;
145+
console.log(`Connecting with query ${index}:`, wsUrl);
146+
const ws = new WebSocket(wsUrl);
147+
148+
let count = 0;
149+
ws.onopen = () => {
150+
connectedCount += 1;
151+
if (connectedCount === QUERY_CONFIGS.length) {
126152
dispatch({ type: "SET_LOKI_CONNECTED", payload: true });
127-
};
153+
}
154+
};
155+
156+
ws.onmessage = (event) => {
157+
try {
158+
const data = JSON.parse(event.data);
159+
console.debug(`Received Loki streams from query ${index}:`, data);
128160

129-
ws.onmessage = (event) => {
130-
try {
131-
const data = JSON.parse(event.data);
132-
console.debug("Received Loki streams:", data);
133-
134-
if (data.streams && Array.isArray(data.streams)) {
135-
const events: IServerMessage[] = [];
136-
137-
data.streams.forEach((stream: any) => {
138-
console.debug("Stream labels:", stream.stream);
139-
if (stream.values && Array.isArray(stream.values)) {
140-
stream.values.forEach(
141-
([timestamp, logLine]: [string, string]) => {
142-
count++;
143-
console.debug("Stream value:", count, {
144-
timestamp,
145-
logLine,
146-
});
147-
148-
const timestampSeconds = parseFloat(timestamp) / 1000000000;
149-
const event = parseCardanoNodeLog(
150-
stream.stream,
151-
timestampSeconds,
152-
logLine,
153-
);
154-
if (event) {
155-
console.debug("Parsed", event.time_s, event.message);
156-
events.push(event);
157-
}
158-
},
159-
);
160-
}
161-
});
162-
163-
if (events.length > 0) {
164-
dispatch({
165-
type: "ADD_TIMELINE_EVENT_BATCH",
166-
payload: events,
167-
});
161+
if (data.streams && Array.isArray(data.streams)) {
162+
const events: IServerMessage[] = [];
163+
164+
data.streams.forEach((stream: any) => {
165+
console.debug("Stream labels:", stream.stream);
166+
if (stream.values && Array.isArray(stream.values)) {
167+
stream.values.forEach(
168+
([timestamp, logLine]: [string, string]) => {
169+
count++;
170+
console.debug(`Stream value from query ${index}:`, count, {
171+
timestamp,
172+
logLine,
173+
});
174+
175+
const timestampSeconds = parseFloat(timestamp) / 1000000000;
176+
const event = config.parser(
177+
stream.stream,
178+
timestampSeconds,
179+
logLine,
180+
);
181+
if (event) {
182+
console.debug("Parsed", event.time_s, event.message);
183+
events.push(event);
184+
}
185+
},
186+
);
168187
}
188+
});
189+
190+
if (events.length > 0) {
191+
dispatch({ type: "ADD_TIMELINE_EVENT_BATCH", payload: events });
169192
}
170-
} catch (error) {
171-
console.error("Error processing Loki message:", error);
172193
}
173-
};
194+
} catch (error) {
195+
console.error(
196+
`Error processing Loki message from query ${index}:`,
197+
error,
198+
);
199+
}
200+
};
174201

175-
ws.onerror = (error) => {
176-
console.error("WebSocket error:", error);
177-
setConnecting(false);
178-
setConnected(false);
179-
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
180-
};
202+
ws.onerror = (error) => {
203+
console.error(`WebSocket error for query ${index}:`, error);
204+
connectedCount = 0;
205+
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
206+
};
181207

182-
ws.onclose = () => {
183-
setConnecting(false);
184-
setConnected(false);
208+
ws.onclose = () => {
209+
connectedCount = Math.max(0, connectedCount - 1);
210+
if (connectedCount === 0) {
185211
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
186-
wsRef.current = null;
187-
};
188-
} catch (error) {
189-
console.error("Failed to create WebSocket connection:", error);
190-
setConnecting(false);
191-
setConnected(false);
192-
}
193-
}, [lokiHost, connecting, connected, dispatch]);
212+
}
213+
};
194214

195-
const disconnect = useCallback(() => {
196-
if (wsRef.current) {
197-
wsRef.current.close();
198-
wsRef.current = null;
199-
}
200-
setConnecting(false);
201-
setConnected(false);
215+
return ws;
216+
};
217+
218+
try {
219+
QUERY_CONFIGS.forEach((config, index) => {
220+
websockets.push(createWebSocket(config, index));
221+
});
222+
} catch (error) {
223+
console.error("Failed to create WebSocket connections:", error);
202224
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
203-
}, [dispatch]);
225+
}
204226

205-
useEffect(() => {
206-
return () => {
207-
if (wsRef.current) {
208-
wsRef.current.close();
227+
// Return cleanup function
228+
return () => {
229+
websockets.forEach((ws) => {
230+
if (ws) {
231+
ws.close();
209232
}
210-
};
211-
}, []);
233+
});
234+
};
235+
}
212236

213-
return {
214-
connect,
215-
disconnect,
216-
connecting,
217-
connected,
237+
export const useLokiWebSocket = () => {
238+
const {
239+
state: { lokiHost, lokiConnected },
240+
dispatch,
241+
} = useSimContext();
242+
const cleanupRef = useRef<(() => void) | null>(null);
243+
244+
const connect = () => {
245+
if (!lokiHost || lokiConnected) return;
246+
247+
dispatch({ type: "RESET_TIMELINE" });
248+
249+
cleanupRef.current = connectLokiWebSockets(lokiHost, dispatch);
218250
};
251+
252+
const disconnect = () => {
253+
cleanupRef.current?.();
254+
cleanupRef.current = null;
255+
dispatch({ type: "SET_LOKI_CONNECTED", payload: false });
256+
};
257+
258+
return { connect, disconnect };
219259
};

0 commit comments

Comments
 (0)