Skip to content

Commit 9a2f2e9

Browse files
committed
realtime via jetstream.
1 parent 4e5f401 commit 9a2f2e9

File tree

3 files changed

+31
-43
lines changed

3 files changed

+31
-43
lines changed

src/lib/components/column/ColumnRefreshButton.svelte

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
export let _agent = $agent;
1717
export let unique = Symbol();
1818
export let isJunk = false;
19-
const host = _agent.agent.service.host === 'bsky.social' ? 'bsky.network' : _agent.agent.service.host;
19+
const host = _agent.agent.service.host === 'bsky.social' ? 'TOKIMEKI Stream' : _agent.agent.service.host;
2020
2121
let __columns = isJunk ? junkColumns : columns;
2222
let isRefreshing = false;

src/lib/components/realtime/RealtimeListenersObserver.svelte

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
columns.forEach(column => {
1818
const _agent = $agents.get(getAccountIdByDid($agents, column.did));
1919
if (_agent) {
20-
const host = _agent.agent.service.host === 'bsky.social' ? 'bsky.network' : _agent.agent.service.host;
20+
const host = _agent.agent.service.host === 'bsky.social' ? 'TOKIMEKI Stream' : _agent.agent.service.host;
2121
2222
if (column.settings?.autoRefresh === -1) {
2323
_listeners.add(host);

src/lib/realtime.ts

+29-41
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import {addExtension, decode, decodeMultiple} from "cbor-x";
2-
import {CID} from "multiformats";
3-
import {CarReader} from "@ipld/car";
41
import {realtime, realtimeStatuses} from '$lib/stores';
2+
import { PUBLIC_TOKIMEKI_STREAM_API } from '$env/static/public';
3+
4+
const COLLECTIONS = ['app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like', 'app.bsky.graph.follow'];
55

66
export class RealtimeClient {
77
private host: string;
@@ -13,49 +13,33 @@ export class RealtimeClient {
1313

1414
connect() {
1515
if (!this.socket) {
16-
this.socket = new WebSocket('wss://' + this.host + '/xrpc/com.atproto.sync.subscribeRepos');
16+
this.socket = new WebSocket(`${PUBLIC_TOKIMEKI_STREAM_API}/subscribe?${COLLECTIONS.map(item => `wantedCollections=${item}`).join('&')}`);
1717
}
1818

19-
addExtension({
20-
Class: CID,
21-
tag: 42,
22-
encode: () => {
23-
throw new Error('Cannot encode cids');
24-
},
25-
decode: (bytes) => {
26-
if (bytes[0] !== 0) {
27-
throw new Error('Invalid cid');
28-
}
29-
return CID.decode(bytes.subarray(1)); // ignore leading 0x00
30-
},
31-
});
32-
3319
this.socket.onmessage = async function (event) {
34-
const messageBuf = await event.data.arrayBuffer();
35-
const [header, body] = decodeMultiple(new Uint8Array(messageBuf));
36-
37-
if (header.op !== 1) {
20+
if (!event.data) {
3821
return;
3922
}
4023

41-
try {
42-
const car = await CarReader.fromBytes(body.blocks);
43-
44-
for (const op of body.ops) {
45-
if (!op.cid) continue;
46-
const block = await car.get(op.cid);
47-
const record = decode(block.bytes);
48-
realtime.set({
49-
isConnected: true,
50-
data: {
51-
record: record,
52-
op: op,
53-
body: body,
54-
}
55-
})
56-
}
57-
} catch (e) {
58-
// do nothing.
24+
const dataWrapper = JSON.parse(event.data);
25+
const data = dataWrapper.commit;
26+
27+
if (data?.type === 'c') {
28+
const record = data.record;
29+
30+
realtime.set({
31+
isConnected: true,
32+
data: {
33+
record: record,
34+
op: {
35+
path: data.rkey,
36+
collection: data.collection,
37+
},
38+
body: {
39+
repo: dataWrapper.did,
40+
},
41+
}
42+
})
5943
}
6044
};
6145

@@ -72,6 +56,10 @@ export class RealtimeClient {
7256
return [...r, this.host];
7357
})
7458
}
59+
60+
this.socket.onerror = async (event) => {
61+
console.log(event)
62+
}
7563
}
7664

7765
disconnect() {
@@ -129,7 +117,7 @@ async function getRecord(_agent, uri, repost = undefined, retryCount = 0) {
129117
export async function getPostRealtime(realtime, actors, _agent) {
130118
const path = realtime.data.op.path;
131119
const repo = realtime.data.body.repo;
132-
const uri = 'at://' + repo + '/' + path;
120+
const uri = 'at://' + repo + '/' + realtime.data.op.collection + '/' + path;
133121
const isStream: boolean = actors.some(actor => actor === repo);
134122

135123
if (realtime.data.record.$type === 'app.bsky.feed.post' && typeof realtime.data.record.text === 'string') {

0 commit comments

Comments
 (0)