Skip to content

Commit

Permalink
Use Postgres notify/listen for event queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pbzweihander committed Aug 16, 2023
1 parent 183324d commit 4f9f47e
Show file tree
Hide file tree
Showing 20 changed files with 115 additions and 273 deletions.
135 changes: 2 additions & 133 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ async-trait = "0.1.72"
axum = { version = "0.6.20", features = ["headers"] }
axum-extra = { version = "0.7.7", features = ["async-read-body"] }
bcrypt = "0.15.0"
cached = { version = "0.44.0", features = ["async", "redis_store", "redis_tokio"] }
chrono = { version = "0.4.26", features = ["serde"] }
derivative = "2.2.0"
dotenvy = "0.15.7"
Expand All @@ -23,11 +22,12 @@ mime = "0.3.17"
mime_serde_shim = "0.2.2"
object_store = { version = "0.6.1", features = ["aws"] }
once_cell = "1.18.0"
redis = { version = "0.23.2", features = ["aio", "tokio-comp", "connection-manager"] }
reqwest = { version = "0.11.18", features = ["json"] }
sea-orm = { version = "0.12.2", features = ["sqlx-postgres", "runtime-tokio-native-tls", "macros", "sea-orm-internal"] }
serde = { version = "1.0.182", features = ["derive"] }
serde_json = "1.0.104"
sqlx = { version = "0.7.1", features = ["postgres"] }
sqlx-postgres = "0.7.1"
stopper = "0.2.0"
tokio = { version = "1.29.1", features = ["rt-multi-thread", "macros", "signal", "time", "sync"] }
tower-http = { version = "0.4.3", features = ["trace"] }
Expand Down
4 changes: 2 additions & 2 deletions backend/src/ap/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl ActivityHandler for Announce {
let event = Event::Update(Update::CreatePost {
post_id: post.id.into(),
});
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

if let Some(repost_id) = post.repost_id {
let local_person_reposted_count = post::Entity::find_by_id(repost_id)
Expand All @@ -84,7 +84,7 @@ impl ActivityHandler for Announce {
.into(),
post_id: repost_id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions backend/src/ap/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl ActivityHandler for Delete {
let event = Event::Update(Update::DeletePost {
post_id: post_id.into(),
});
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

return Ok(());
}
Expand All @@ -134,7 +134,7 @@ impl ActivityHandler for Delete {
let event = Event::Update(Update::DeleteUser {
user_id: user_id.into(),
});
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

return Ok(());
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/ap/flag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl ActivityHandler for Flag {
let event = Event::Notification(Notification::new(NotificationType::CreateReport {
report_id: report.id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions backend/src/ap/follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ActivityHandler for Follow {
let event = Event::Notification(Notification::new(NotificationType::CreateFollower {
user_id: follower.from_id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

Ok(())
}
Expand Down Expand Up @@ -149,7 +149,7 @@ impl ActivityHandler for FollowAccept {
let event = Event::Notification(Notification::new(NotificationType::AcceptFollow {
user_id: follow.to_id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

Ok(())
}
Expand Down Expand Up @@ -247,7 +247,7 @@ impl ActivityHandler for FollowReject {
let event = Event::Notification(Notification::new(NotificationType::RejectFollow {
user_id: follow_id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

Ok(())
} else {
Expand Down
4 changes: 2 additions & 2 deletions backend/src/ap/like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ActivityHandler for Like {
let event = Event::Update(Update::CreateReaction {
post_id: reaction.post_id.into(),
});
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

let local_person_reacted_count = reaction
.find_related(post::Entity)
Expand All @@ -96,7 +96,7 @@ impl ActivityHandler for Like {
post_id: reaction.post_id.into(),
reaction_id: reaction.id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;
}

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions backend/src/ap/note.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ActivityHandler for CreateNote {
let event = Event::Update(Update::CreatePost {
post_id: post.id.into(),
});
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

let local_person_mentioned_count = post
.find_related(mention::Entity)
Expand All @@ -141,7 +141,7 @@ impl ActivityHandler for CreateNote {
let event = Event::Notification(Notification::new(NotificationType::Mentioned {
post_id: post.id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;
}

if let Some(repost_id) = post.repost_id {
Expand All @@ -154,7 +154,7 @@ impl ActivityHandler for CreateNote {
let event = Event::Notification(Notification::new(NotificationType::Quoted {
post_id: post.id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion backend/src/ap/person.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl ActivityHandler for PersonUpdate {
let event = Event::Update(Update::UpdateUser {
user_id: user.id.into(),
});
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion backend/src/ap/undo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl ActivityHandler for Undo<Follow> {
let event = Event::Notification(Notification::new(NotificationType::DeleteFollower {
user_id: follower_id.into(),
}));
event.send(&*data.db, &mut data.redis()).await?;
event.send(&*data.db).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit 4f9f47e

Please sign in to comment.