Skip to content

Commit

Permalink
implement streaming new posts from a user
Browse files Browse the repository at this point in the history
  • Loading branch information
jesopo committed Apr 18, 2023
1 parent 71d099f commit 73854e3
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 4 deletions.
11 changes: 11 additions & 0 deletions examples/stream_new_posts/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "get_profile"
version = "0.1.0"
edition = "2021"

[dependencies]
bisky = { path = "../../" }
clap = { version = "4.2.2", features = ["derive"] }
serde_json = "1.0.96"
tokio = { version = "1.27.0", features = ["macros", "rt-multi-thread"] }
url = "2.3.1"
39 changes: 39 additions & 0 deletions examples/stream_new_posts/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use bisky::atproto::{Client, Session};
use bisky::bluesky::Bluesky;
use bisky::storage::{File, Storage as _};
use clap::Parser;
use std::path::PathBuf;
use url::Url;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Arguments {
#[clap(index = 1)]
storage: PathBuf,
#[clap(index = 2)]
service: Url,
#[clap(index = 3)]
username: String,
#[clap(index = 4)]
password: String,
}

#[tokio::main]
async fn main() {
let args = Arguments::parse();

let mut storage = File::<Session>::new(args.storage);
if storage.get().await.is_err() {
Client::login(&args.service, &args.username, &args.password, &mut storage)
.await
.unwrap();
}

let mut client = Bluesky::new(Client::new(args.service, storage).await.unwrap());
let mut profile = client.user(args.username);
let mut stream = profile.stream_posts().await.unwrap();

while let Ok(record) = stream.next().await {
println!("{:#?}", record);
}
}
10 changes: 7 additions & 3 deletions src/atproto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,15 +312,19 @@ impl<T: Storage<Session>> Client<T> {
repo: &str,
collection: &str,
mut limit: usize,
) -> Result<Vec<Record<D>>, GetError<T>> {
reverse: bool,
mut cursor: Option<String>,
) -> Result<(Vec<Record<D>>, Option<String>), GetError<T>> {
let reverse = reverse.to_string();

let mut records = Vec::new();
let mut cursor: Option<String> = None;

while limit > 0 {
let query_limit = std::cmp::min(limit, 100).to_string();
let mut query = Vec::from([
("repo", repo),
("collection", collection),
("reverse", &reverse),
("limit", &query_limit),
]);

Expand All @@ -343,7 +347,7 @@ impl<T: Storage<Session>> Client<T> {
records.append(&mut response.records);
}

Ok(records)
Ok((records, cursor))
}

pub async fn repo_create_record<D: DeserializeOwned, S: Serialize>(
Expand Down
84 changes: 83 additions & 1 deletion src/bluesky.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use crate::lexicon::app::bsky::feed::Post;
use crate::lexicon::com::atproto::repo::{CreateRecordOutput, Record};
use crate::storage::Storage;

use std::collections::VecDeque;
use std::time::Duration;

pub struct Bluesky<T: Storage<Session>> {
client: Client<T>,
}
Expand All @@ -26,6 +29,60 @@ pub struct BlueskyUser<'a, T: Storage<Session>> {
username: String,
}

pub struct BlueskyPostStream<'a, T: Storage<Session>> {
client: &'a mut Client<T>,
username: &'a str,
queue: VecDeque<Record<Post>>,
cursor: String,
}

#[derive(Debug)]
pub enum StreamError<T: Storage<Session>> {
Get(GetError<T>),
NoCursor,
}

impl<T: Storage<Session>> From<GetError<T>> for StreamError<T> {
fn from(error: GetError<T>) -> Self {
Self::Get(error)
}
}

impl<'a, T: Storage<Session>> BlueskyPostStream<'a, T> {
pub async fn next(&mut self) -> Result<Record<Post>, StreamError<T>> {
if let Some(post) = self.queue.pop_front() {
Ok(post)
} else {
loop {
let (records, cursor) = self
.client
.repo_list_records(
self.username,
"app.bsky.feed.post",
100,
true,
Some(self.cursor.clone()),
)
.await?;

let mut records = VecDeque::from(records);
if let Some(first_record) = records.pop_front() {
if let Some(cursor) = cursor {
self.cursor = cursor;
} else {
return Err(StreamError::NoCursor);
}

self.queue.append(&mut records);
return Ok(first_record);
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
}

impl<'a, T: Storage<Session>> BlueskyUser<'a, T> {
pub async fn get_profile(&mut self) -> Result<ProfileViewDetailed, GetError<T>> {
self.client
Expand All @@ -38,8 +95,33 @@ impl<'a, T: Storage<Session>> BlueskyUser<'a, T> {

pub async fn list_posts(&mut self) -> Result<Vec<Record<Post>>, GetError<T>> {
self.client
.repo_list_records(&self.username, "app.bsky.feed.post", usize::MAX)
.repo_list_records(
&self.username,
"app.bsky.feed.post",
usize::MAX,
false,
None,
)
.await
.map(|l| l.0)
}

pub async fn stream_posts(&'a mut self) -> Result<BlueskyPostStream<'a, T>, StreamError<T>> {
let (_, cursor) = self
.client
.repo_list_records::<Post>(&self.username, "app.bsky.feed.post", 1, false, None)
.await?;

if let Some(cursor) = cursor {
Ok(BlueskyPostStream {
client: self.client,
username: &self.username,
queue: VecDeque::new(),
cursor,
})
} else {
Err(StreamError::NoCursor)
}
}
}

Expand Down

0 comments on commit 73854e3

Please sign in to comment.