Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

firehose #2

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ edition = "2021"
[dependencies]
async-trait = "0.1.68"
chrono = { version = "0.4.24", features = ["serde"] }
ciborium = "0.2.0"
cid = { version = "0.10.1", features = ["serde-codec"] }
derive_builder = "0.12.0"
miette = "5.8.0"
parking_lot = "0.12.1"
reqwest = { version = "0.11.16", features = ["json", "rustls"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_bytes = "0.11.9"
serde_ipld_dagcbor = "0.3.0"
serde_json = "1.0.96"
thiserror = "1.0.40"
tokio = { version = "1.27.0", features = ["fs"] }
14 changes: 14 additions & 0 deletions examples/firehose/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "firehose"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bisky = { path = "../../" }
ciborium = "0.2.0"
futures = "0.3.28"
tokio = { version = "1.28.0", features = ["full"] }
tokio-tungstenite = { version = "0.18.0", features = ["native-tls"] }
url = "2.3.1"
41 changes: 41 additions & 0 deletions examples/firehose/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use bisky::lexicon::app::bsky::feed::Post;
use bisky::lexicon::com::atproto::sync::SubscribeRepos;
use futures::StreamExt as _;
use std::io::Cursor;
use tokio_tungstenite::tungstenite::protocol::Message;
use url::Url;

#[tokio::main]
async fn main() {
let (mut socket, _response) = tokio_tungstenite::connect_async(
Url::parse("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos").unwrap(),
)
.await
.unwrap();

while let Some(Ok(Message::Binary(message))) = socket.next().await {
let (header, body) = bisky::firehose::read(&message).unwrap();
println!("{header:?} {}", message.len());
match body {
SubscribeRepos::Commit(commit) => {
if commit.operations.is_empty() {
continue;
}
let operation = &commit.operations[0];
if !operation.path.starts_with("app.bsky.feed.post/") {
continue;
}
if let Some(cid) = operation.cid {
let mut car_reader = Cursor::new(commit.blocks);
let _car_header = bisky::car::read_header(&mut car_reader).unwrap();
let car_blocks = bisky::car::read_blocks(&mut car_reader).unwrap();

let record_reader = Cursor::new(car_blocks.get(&cid).unwrap());
let post = ciborium::de::from_reader::<Post, _>(record_reader);
println!("{post:?}");
}
}
_ => {}
}
}
}
76 changes: 76 additions & 0 deletions src/car.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use cid::Cid;
use serde::Deserialize;
use std::collections::HashMap;
use std::io::{Cursor, Read};

#[derive(Debug)]
pub enum Error {
UvarintEof,
UvarintBad,
ChunkEof,
HeaderCbor,
BlockCid,
BlockData,
}

fn read_uvarint64<T: Read>(reader: &mut T) -> Result<u64, Error> {
let mut out = 0;
let mut buf = [0; 1];

let mut i = 0;
while let Ok(_) = reader.read_exact(&mut buf) {
let b = buf[0] as u64;
let k = b & 0x7F;
out |= k << (i * 7);

if b & 0x80 != 0 {
// not final byte
} else if b == 0 && i > 0 {
// invalid data; "more minimally"
return Err(Error::UvarintBad);
} else {
return Ok(out);
}

i += 1;
}

Err(Error::UvarintEof)
}

fn read_chunk<T: Read>(reader: &mut T) -> Result<Vec<u8>, Error> {
let chunk_size = read_uvarint64(reader)? as usize;
let mut buf = vec![0; chunk_size];
reader.read_exact(&mut buf).map_err(|_| Error::ChunkEof)?;
Ok(buf)
}

#[derive(Debug, Deserialize)]
pub struct Header {
pub version: u8,
pub roots: Vec<Cid>,
}

pub fn read_header<T: Read>(reader: &mut T) -> Result<Header, Error> {
let mut reader = Cursor::new(read_chunk(reader)?);
let header =
serde_ipld_dagcbor::from_reader::<Header, _>(&mut reader).map_err(|_| Error::HeaderCbor)?;
Ok(header)
}

pub fn read_blocks<T: Read>(mut reader: &mut T) -> Result<HashMap<Cid, Vec<u8>>, Error> {
let mut blocks = HashMap::new();

while let Ok(chunk) = read_chunk(&mut reader) {
let mut block_reader = Cursor::new(chunk);

let cid = Cid::read_bytes(&mut block_reader).map_err(|_| Error::BlockCid)?;
let mut block = Vec::new();
block_reader
.read_to_end(&mut block)
.map_err(|_| Error::BlockData)?;
blocks.insert(cid, block);
}

Ok(blocks)
}
43 changes: 43 additions & 0 deletions src/firehose.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::lexicon::com::atproto::sync::SubscribeRepos;
use serde::Deserialize;
use std::io::Cursor;

#[derive(Debug, Deserialize)]
pub struct Header {
#[serde(rename(deserialize = "t"))]
pub type_: String,
#[serde(rename(deserialize = "op"))]
pub operation: u8,
}

#[derive(Debug)]
pub enum Error {
Header(ciborium::de::Error<std::io::Error>),
Body(serde_ipld_dagcbor::DecodeError<std::io::Error>),
}

impl From<ciborium::de::Error<std::io::Error>> for Error {
fn from(e: ciborium::de::Error<std::io::Error>) -> Self {
Self::Header(e)
}
}

impl From<serde_ipld_dagcbor::DecodeError<std::io::Error>> for Error {
fn from(e: serde_ipld_dagcbor::DecodeError<std::io::Error>) -> Self {
Self::Body(e)
}
}

pub fn read(data: &[u8]) -> Result<(Header, SubscribeRepos), Error> {
let mut reader = Cursor::new(data);

let header = ciborium::de::from_reader::<Header, _>(&mut reader)?;
let body = match header.type_.as_str() {
"#commit" => SubscribeRepos::Commit(serde_ipld_dagcbor::from_reader(&mut reader)?),
"#handle" => SubscribeRepos::Handle(serde_ipld_dagcbor::from_reader(&mut reader)?),
"#tombstone" => SubscribeRepos::Handle(serde_ipld_dagcbor::from_reader(&mut reader)?),
_ => unreachable!(),
};

Ok((header, body))
}
1 change: 1 addition & 0 deletions src/lexicon/com/atproto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod repo;
pub mod server;
pub mod sync;
50 changes: 50 additions & 0 deletions src/lexicon/com/atproto/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use chrono::{DateTime, Utc};
use cid::Cid;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
pub struct SubscribeReposCommitOperation {
pub path: String,
pub action: String,
pub cid: Option<Cid>,
}

#[derive(Debug, Deserialize)]
pub struct SubscribeReposCommit {
#[serde(with = "serde_bytes")]
pub blocks: Vec<u8>,
pub commit: Cid,
#[serde(rename(deserialize = "ops"))]
pub operations: Vec<SubscribeReposCommitOperation>,
pub prev: Option<Cid>,
pub rebase: bool,
pub repo: String,
#[serde(rename(deserialize = "seq"))]
pub sequence: u64,
pub time: DateTime<Utc>,
#[serde(rename(deserialize = "tooBig"))]
pub too_big: bool,
}

#[derive(Debug, Deserialize)]
pub struct SubscribeReposHandle {
pub did: String,
pub handle: String,
#[serde(rename(deserialize = "seq"))]
pub sequence: u64,
pub time: DateTime<Utc>,
}

#[derive(Debug, Deserialize)]
pub struct SubscribeReposTombstone {
pub did: String,
#[serde(rename(deserialize = "seq"))]
pub sequence: u64,
pub time: DateTime<Utc>,
}

pub enum SubscribeRepos {
Commit(SubscribeReposCommit),
Handle(SubscribeReposHandle),
Tombstone(SubscribeReposTombstone),
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub mod atproto;
pub mod bluesky;
pub mod car;
pub mod errors;
pub mod firehose;
pub mod lexicon;
pub mod mst;
pub mod storage;
35 changes: 35 additions & 0 deletions src/mst.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use cid::Cid;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
pub struct Commit {
pub did: String,
#[serde(rename(deserialize = "sig"))]
#[serde(with = "serde_bytes")]
pub signature: Vec<u8>,
pub data: Cid,
#[serde(rename(deserialize = "prev"))]
pub previous: Option<Cid>,
pub version: u8,
}

#[derive(Deserialize, Debug)]
pub struct MstEntry {
#[serde(rename(deserialize = "p"))]
pub prefix_length: usize,
#[serde(rename(deserialize = "k"))]
#[serde(with = "serde_bytes")]
pub key_suffix: Vec<u8>,
#[serde(rename(deserialize = "v"))]
pub value: Cid,
#[serde(rename(deserialize = "t"))]
pub tree: Option<Cid>,
}

#[derive(Deserialize, Debug)]
pub struct MstNode {
#[serde(rename(deserialize = "l"))]
pub left: Option<Cid>,
#[serde(rename(deserialize = "e"))]
pub entries: Vec<MstEntry>,
}