Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase broadcast channel sizes #257

Merged
merged 2 commits into from
Sep 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Remove `Scalar` suffix from scalar types in GraphQL schema [#231](https://github.com/p2panda/aquadoggo/pull/231)
- Implement new API for untagged operations [#245](https://github.com/p2panda/aquadoggo/pull/235)
- Use `DocumentStore` trait from `p2panda_rs` [#249](https://github.com/p2panda/aquadoggo/pull/249)
- Increase broadcast channel sizes [#257](https://github.com/p2panda/aquadoggo/pull/257)

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/graphql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl GraphQLSchemaManager {
let shared = self.shared.clone();
let schemas = self.schemas.clone();

info!("Subscribing Graphql manager to schema provider");
info!("Subscribing GraphQL manager to schema provider");
let mut on_schema_added = shared.schema_provider.on_schema_added();

// Create the new GraphQL based on the current state of known p2panda application schemas
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
/// which get broadcasted across all services.
pub fn new(capacity: usize, context: D) -> Self {
let (tx, _) = broadcast::channel(capacity);
let (shutdown_signal, _) = broadcast::channel(120);
let (shutdown_signal, _) = broadcast::channel(128);
let (exit_signal, exit_handle) = triggered::trigger();

Self {
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::materializer::TaskInput;
///
/// This gives an upper bound to maximum status messages and incoming tasks being moved into worker
/// queues the channels can handle at once.
const CHANNEL_CAPACITY: usize = 1024;
const CHANNEL_CAPACITY: usize = 512_000;

/// The materializer service waits for incoming new operations to transform them into actual useful
/// application- and system data, like document views or schemas.
Expand Down
6 changes: 5 additions & 1 deletion aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use crate::materializer::materializer_service;
use crate::replication::replication_service;
use crate::schema::SchemaProvider;

/// Capacity of the internal broadcast channel used to communicate between services.
const SERVICE_BUS_CAPACITY: usize = 512_000;

/// Makes sure database is created and migrated before returning connection pool.
async fn initialize_db(config: &Configuration) -> Result<Pool> {
// Find SSL certificate locations on the system for OpenSSL for TLS
Expand Down Expand Up @@ -57,7 +60,8 @@ impl Node {

// Create service manager with shared data between services.
let context = Context::new(store, config, schemas);
let mut manager = ServiceManager::<Context, ServiceMessage>::new(1024, context);
let mut manager =
ServiceManager::<Context, ServiceMessage>::new(SERVICE_BUS_CAPACITY, context);

// Start materializer service.
if manager
Expand Down