Skip to content

Commit

Permalink
Ingress dispatcher is concurrent and integrated with bifrost
Browse files Browse the repository at this point in the history
- IngressDispatcher is not a coroutine anymore
- Handles inputs concurrently
- Writes to bifrost and receives responses from network
  • Loading branch information
AhmedSoliman committed Mar 7, 2024
1 parent 02c65dc commit ad55dda
Show file tree
Hide file tree
Showing 32 changed files with 775 additions and 804 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
criterion = "0.5"
dashmap = { version = "5.5.3" }
datafusion = { version = "35.0.0" }
datafusion-expr = { version = "35.0.0" }
derive_builder = "0.12.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglets/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use async_trait::async_trait;
use restate_types::logs::{Payload, SequenceNumber};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex as AsyncMutex;
use tracing::info;
use tracing::{debug, info};

use crate::loglet::{Loglet, LogletBase, LogletOffset, LogletProvider};
use crate::metadata::LogletParams;
Expand Down Expand Up @@ -208,7 +208,7 @@ impl LogletBase for MemoryLoglet {
async fn append(&self, payload: Payload) -> Result<LogletOffset, Error> {
let mut log = self.log.lock().unwrap();
let offset = self.index_to_offset(log.len());
info!(
debug!(
"Appending record to in-memory loglet {:?} at offset {}",
self.params, offset,
);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ pub use task_center_types::*;
mod test_env;

#[cfg(any(test, feature = "test-util"))]
pub use test_env::{create_mock_nodes_config, TestCoreEnv, TestCoreEnvBuilder};
pub use test_env::{create_mock_nodes_config, MockNetworkSender, TestCoreEnv, TestCoreEnvBuilder};
2 changes: 1 addition & 1 deletion crates/core/src/task_center.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl TaskCenter {
future: F,
) -> O
where
F: Future<Output = O> + Send,
F: Future<Output = O>,
{
let cancel_token = CancellationToken::new();
let id = TaskId::from(NEXT_TASK_ID.fetch_add(1, Ordering::SeqCst));
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ pub enum TaskKind {
#[strum(props(OnCancel = "abort"))]
MetadataBackgroundSync,
RpcServer,
/// A type for ingress until we start enforcing timeouts for inflight requests. This enables us
/// to shutdown cleanly without waiting indefinitely.
#[strum(props(OnCancel = "abort"))]
IngressServer,
RoleRunner,
SystemService,
Ingress,
Expand Down
4 changes: 2 additions & 2 deletions crates/ingress-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ mocks = ["dep:restate-test-util"]
[dependencies]
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-futures-util = { workspace = true }
restate-node-protocol = { workspace = true }
restate-pb = { workspace = true, features = ["restate-types"] }
restate-schema-api = { workspace = true, features = ["subscription"] }
restate-test-util = { workspace = true, optional = true }
restate-types = { workspace = true }
restate-wal-protocol = { workspace = true }

anyhow = { workspace = true }
assert2 = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
dashmap = { workspace = true }
drain = { workspace = true }
prost = { workspace = true }
thiserror = { workspace = true }
Expand Down
Loading

0 comments on commit ad55dda

Please sign in to comment.