Skip to content

Commit

Permalink
Increase broadcast channel sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Sep 2, 2022
1 parent 0bf77f5 commit 543fd2c
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion aquadoggo/src/graphql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl GraphQLSchemaManager {
let shared = self.shared.clone();
let schemas = self.schemas.clone();

info!("Subscribing Graphql manager to schema provider");
info!("Subscribing GraphQL manager to schema provider");
let mut on_schema_added = shared.schema_provider.on_schema_added();

// Create the new GraphQL based on the current state of known p2panda application schemas
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
/// which get broadcasted across all services.
pub fn new(capacity: usize, context: D) -> Self {
let (tx, _) = broadcast::channel(capacity);
let (shutdown_signal, _) = broadcast::channel(120);
let (shutdown_signal, _) = broadcast::channel(128);
let (exit_signal, exit_handle) = triggered::trigger();

Self {
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::materializer::TaskInput;
///
/// This gives an upper bound to maximum status messages and incoming tasks being moved into worker
/// queues the channels can handle at once.
const CHANNEL_CAPACITY: usize = 1024;
const CHANNEL_CAPACITY: usize = 512_000;

/// The materializer service waits for incoming new operations to transform them into actual useful
/// application- and system data, like document views or schemas.
Expand Down
6 changes: 5 additions & 1 deletion aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ use crate::materializer::materializer_service;
use crate::replication::replication_service;
use crate::schema::SchemaProvider;

/// Capacity of the internal broadcast channel used to communicate between services.
const SERVICE_BUS_CAPACITY: usize = 512_000;

/// Makes sure database is created and migrated before returning connection pool.
async fn initialize_db(config: &Configuration) -> Result<Pool> {
// Find SSL certificate locations on the system for OpenSSL for TLS
Expand Down Expand Up @@ -57,7 +60,8 @@ impl Node {

// Create service manager with shared data between services.
let context = Context::new(store, config, schemas);
let mut manager = ServiceManager::<Context, ServiceMessage>::new(1024, context);
let mut manager =
ServiceManager::<Context, ServiceMessage>::new(SERVICE_BUS_CAPACITY, context);

// Start materializer service.
if manager
Expand Down

0 comments on commit 543fd2c

Please sign in to comment.