-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathmain.rs
88 lines (82 loc) · 3.22 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use anyhow::{anyhow, Result};
use atrium_api::app::bsky::feed::post::Record;
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
use atrium_api::types::{CidLink, Collection};
use chrono::Local;
use firehose::cid_compat::CidOld;
use firehose::stream::frames::Frame;
use firehose::subscription::{CommitHandler, Subscription};
use futures::StreamExt;
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
struct RepoSubscription {
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl RepoSubscription {
async fn new(bgs: &str) -> Result<Self, Box<dyn std::error::Error>> {
let (stream, _) = connect_async(format!("wss://{bgs}/xrpc/{NSID}")).await?;
Ok(RepoSubscription { stream })
}
async fn run(&mut self, handler: impl CommitHandler) -> Result<(), Box<dyn std::error::Error>> {
while let Some(result) = self.next().await {
if let Ok(Frame::Message(Some(t), message)) = result {
if t.as_str() == "#commit" {
let commit = serde_ipld_dagcbor::from_reader(message.body.as_slice())?;
if let Err(err) = handler.handle_commit(&commit).await {
eprintln!("FAILED: {err:?}");
}
}
}
}
Ok(())
}
}
impl Subscription for RepoSubscription {
async fn next(&mut self) -> Option<Result<Frame, <Frame as TryFrom<&[u8]>>::Error>> {
if let Some(Ok(Message::Binary(data))) = self.stream.next().await {
Some(Frame::try_from(data.as_slice()))
} else {
None
}
}
}
struct Firehose;
impl CommitHandler for Firehose {
async fn handle_commit(&self, commit: &Commit) -> Result<()> {
for op in &commit.ops {
let collection = op.path.split('/').next().expect("op.path is empty");
if op.action != "create" || collection != atrium_api::app::bsky::feed::Post::NSID {
continue;
}
let (items, _) = rs_car::car_read_all(&mut commit.blocks.as_slice(), true).await?;
if let Some((_, item)) = items.iter().find(|(cid, _)| {
//
// convert cid from v0.10.1 to v0.11.1
let cid = CidOld::from(*cid).try_into().expect("couldn't convert old to new cid");
Some(CidLink(cid)) == op.cid
}) {
let record = serde_ipld_dagcbor::from_reader::<Record, _>(&mut item.as_slice())?;
println!(
"{} - {}",
record.created_at.as_ref().with_timezone(&Local),
commit.repo.as_str()
);
for line in record.text.split('\n') {
println!(" {line}");
}
} else {
return Err(anyhow!(
"FAILED: could not find item with operation cid {:?} out of {} items",
op.cid,
items.len()
));
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
RepoSubscription::new("bsky.network").await?.run(Firehose).await
}