Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate replication manager with networking stack #387

Merged
merged 126 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 121 commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
d3bc9e7
Use SyncMessage in replication network behaviour
sandreae May 22, 2023
2d2c05c
Use target set in sync request
sandreae May 22, 2023
d812c06
Convert integer to Mode
sandreae May 22, 2023
3c2d696
Add replication to main behaviour struct
adzialocha May 22, 2023
9e522a2
Add SyncManager to replication behaviour
adzialocha May 23, 2023
e73c3a7
Add schema provider to behaviour
adzialocha May 23, 2023
3bd31c4
Move mananger again out of network behaviour, add replication service
adzialocha May 23, 2023
abdaa74
Introduce event loop to handle swarm and channel events
adzialocha May 23, 2023
3c11663
Add new service message types to enum
adzialocha May 24, 2023
9a67a46
Better method name and structure for event loop
adzialocha May 24, 2023
26d6445
Send and receive service messages on new or closed connections and re…
adzialocha May 24, 2023
b0e44eb
Have peer id on network config struct
adzialocha May 24, 2023
e7a9bab
Introduce connection manager in replication service
adzialocha May 24, 2023
93ae241
Prepare methods for finished or failing sessions
adzialocha May 24, 2023
ffd2d09
Add and remove peers in connection manager
adzialocha May 24, 2023
a113ac4
Count failed and successful sessions
adzialocha May 24, 2023
448e8f1
Initiate replication with peers
adzialocha May 24, 2023
2e12f3a
Add some basic logging
adzialocha May 24, 2023
00a32ba
Do not override with default when building config in cli
adzialocha May 24, 2023
f89be7e
Fix checking only for certain messages in async loop
adzialocha May 24, 2023
2f255e4
Clippy happy, developer happy
adzialocha May 24, 2023
414b866
Make Domain error in IngestError transparent
sandreae May 25, 2023
f60b1b2
Add logging for replication entry exchange
sandreae May 25, 2023
d561290
Sort system schema to the front of TargetSet
sandreae May 25, 2023
3d7bd86
Refactor log height diff logic
sandreae May 25, 2023
df90bbb
Don't diff over schema sub-range of target set
sandreae May 25, 2023
7214083
Introduce DuplicateSessionRequestError
sandreae May 26, 2023
f96865a
More logging and use new error type
sandreae May 26, 2023
34de908
Logging for dropping and re-initiating duplicate session requests
sandreae May 26, 2023
18f7e48
Log when re-initiating session with peer
sandreae May 26, 2023
1b350b6
Fix issue when calculating local log heights
sandreae May 26, 2023
99ad0a8
More logging in manager
sandreae May 26, 2023
4e0aacf
Improve logging message
sandreae May 26, 2023
c0b6816
Fix diff test
sandreae May 26, 2023
20e2112
Correct expect error message
sandreae May 26, 2023
22e391e
Ignore duplicate inbound sync requests
sandreae May 27, 2023
5671844
Add messaging diagram to lifetime test
sandreae May 28, 2023
c482b18
Logging in behaviour
sandreae May 28, 2023
7db4f1c
Remove re-initiating dropped duplicate sessions if they had a differe…
sandreae May 29, 2023
b035f9a
Diagram for sync lifetime test
sandreae May 29, 2023
af82212
Test for concurrent sync request handling
sandreae May 29, 2023
6270743
Remove duplicate diagram
sandreae May 29, 2023
d7e4c22
Make random target set include more
sandreae May 29, 2023
4fe8592
Small logging and improved comments
sandreae May 29, 2023
7ac01ee
Elegantly handle concurrent session requests with duplicate target set
sandreae May 29, 2023
a2b9493
Correct validation of TargetSet
sandreae May 29, 2023
ff8668f
Better naming in TargetSet fixture
sandreae May 29, 2023
897bb67
Update tests
sandreae May 29, 2023
461e676
Order log heights in Have message
sandreae May 29, 2023
2f842f3
Implement Human on Message and SyncMessage
sandreae May 29, 2023
9a6b37d
Some work on logging
sandreae May 29, 2023
2194cd6
Fix remote log height logging
sandreae May 29, 2023
848dd5e
fmt
sandreae May 29, 2023
c3d4c0e
Remove all sessions for a peer on replication error
sandreae May 30, 2023
b93efd5
Add error logging to handler
sandreae May 30, 2023
c632fd1
Add ConnectionId to peer identifier in replication service
sandreae May 30, 2023
9364ccb
Doc string for PeerConnectionIdentifier
sandreae May 30, 2023
286bad9
Add comment to PeerConnectionId defaults
sandreae May 30, 2023
f87af83
Add (very) basic replication scheduler
sandreae May 30, 2023
e5bf152
Refactor replication behaviour event triggering
sandreae May 31, 2023
b6d139b
Temp fix for UNIQUE
sandreae May 31, 2023
f7c7cda
Send SyncMessages to one handler by ConnectionId
sandreae May 31, 2023
afd24a5
Maintain list of peers and all their connections on ConnectionManager
sandreae May 31, 2023
6183ca7
Remove connection from ConnectionManager when swarm issues Connection…
sandreae May 31, 2023
f3c3f6c
Refactor ConnectionEstablished messaging in replication behaviour
sandreae May 31, 2023
86222a4
Improve error handling and logging
sandreae May 31, 2023
cd7629f
Update api in behaviour network tests
sandreae May 31, 2023
5152ff6
Error logging in replication connection handler
sandreae May 31, 2023
d620053
Cargo clippy
sandreae May 31, 2023
91ceb9e
fmt
sandreae May 31, 2023
c5373ef
More tests for TargetSet validation
sandreae May 31, 2023
8510d15
Only identify peers by their PeerId (not ConnectionId) in replication…
sandreae Jun 2, 2023
8512680
Rename ConnectionEstablished to PeerConnected etc..
sandreae Jun 2, 2023
8dad289
Poll ticking stream for scheduling replication
adzialocha Jun 3, 2023
58a024e
Dynamically retrieve target set when starting replication
adzialocha Jun 3, 2023
fc5d45b
Add some more doc strings
adzialocha Jun 3, 2023
7bb4cb2
Fix formatting
adzialocha Jun 3, 2023
e550d53
Fix missing peer id in e2e test
adzialocha Jun 3, 2023
bbc6140
Remove unnecessary type casting in entry SQL
adzialocha Jun 3, 2023
996f95e
Give error logging more context
adzialocha Jun 3, 2023
2969a91
Fix SQL query by making seq_num IN values a string
adzialocha Jun 3, 2023
a48bce7
Try different string literal
adzialocha Jun 3, 2023
19bf865
Use IntervalStream from tokio for scheduler
adzialocha Jun 3, 2023
722a4b3
Add doc strings
adzialocha Jun 3, 2023
ba05484
Fix filtering active sessions logic
adzialocha Jun 3, 2023
84587f2
Update comments
adzialocha Jun 4, 2023
8d24268
Remove repeating debug log
adzialocha Jun 4, 2023
5042593
Re-initiate dropped session if its concerning a different target set
adzialocha Jun 4, 2023
79ceacc
Allow max 3 sessions per peer and max one for the same target set
adzialocha Jun 4, 2023
c7655f4
Update test and fix bug in re-initiating session logic
adzialocha Jun 4, 2023
a3f4b89
Correct diagram
adzialocha Jun 4, 2023
ec8bdd2
Inform connection handler about replication errors, introduce timeout
adzialocha Jun 4, 2023
a5fc932
Close all connection handlers on critical errors
adzialocha Jun 4, 2023
f7b894e
Fix import style
adzialocha Jun 5, 2023
7f24e41
Fix import style
adzialocha Jun 5, 2023
dbb6546
Remove no longer relevant log message
sandreae Jun 5, 2023
445d1aa
Stop dialing peer after one address dialed successfully
sandreae Jun 8, 2023
2b904f5
Only accept one inbound and one outbound connection per peer
sandreae Jun 8, 2023
58275db
fmt x clippy
sandreae Jun 8, 2023
759aaee
Use libp2p from git main
sandreae Jun 9, 2023
24d67ed
Add network info logging on incoming connection errors
sandreae Jun 9, 2023
e4a3e75
Revert
adzialocha Jun 12, 2023
2965230
Make clippy happy
adzialocha Jun 12, 2023
7d95942
Do never actively close connections
adzialocha Jun 12, 2023
c47cfee
Remove dead code
adzialocha Jun 12, 2023
4b2885c
Check more often when using ping and mDNS discovery
adzialocha Jun 16, 2023
fdf4f4a
Close replication session on all errors
adzialocha Jun 16, 2023
4682e18
Better error logging
adzialocha Jun 16, 2023
0c3dde9
Fix issue where outbound streams could not be re-established after error
adzialocha Jun 16, 2023
bfd9def
Add behaviour logic which always uses latest healthy connection
adzialocha Jun 16, 2023
891892f
Rename to peers behaviour
adzialocha Jun 16, 2023
94378ee
Make clippy happy
adzialocha Jun 16, 2023
45a5976
Add entry to CHANGELOG.md
adzialocha Jun 16, 2023
a2c28da
Use connection ids to identify peers
adzialocha Jun 18, 2023
158ca76
Clean up logging a little bit
adzialocha Jun 18, 2023
bca3e16
A little bit less verbose logging
adzialocha Jun 18, 2023
c8e6f04
Fix tests
adzialocha Jun 18, 2023
b4a77cc
Add a test for connection manager
adzialocha Jun 18, 2023
908e4fe
Write some more doc-strings
adzialocha Jun 18, 2023
4461a19
Add more docs
adzialocha Jun 18, 2023
e3daade
Disconnect from all peers before shutdown
adzialocha Jun 18, 2023
0215c62
Dial peers by multiaddr on mdns discovery
sandreae Jun 19, 2023
1bcba23
Rename Naive -> LogHeight strategy
sandreae Jun 19, 2023
9091145
Naming improvement
sandreae Jun 19, 2023
d6ea4f4
Doc strings
sandreae Jun 19, 2023
462c4ff
fmt
sandreae Jun 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363)
- Replication message de- / serialization [#375](https://github.com/p2panda/aquadoggo/pull/375)
- Naive protocol replication [#380](https://github.com/p2panda/aquadoggo/pull/380)
- Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞
adzialocha marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tokio = { version = "1.25.0", features = [
"sync",
"time",
] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tower-http = { version = "0.3.4", default-features = false, features = [
"cors",
] }
Expand Down
17 changes: 17 additions & 0 deletions aquadoggo/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use p2panda_rs::operation::OperationId;

use crate::manager::Sender;
use crate::network::Peer;
use crate::replication::SyncMessage;

/// Sender for cross-service communication bus.
pub type ServiceSender = Sender<ServiceMessage>;
Expand All @@ -12,4 +14,19 @@ pub type ServiceSender = Sender<ServiceMessage>;
pub enum ServiceMessage {
/// A new operation arrived at the node.
NewOperation(OperationId),

/// Node established a bi-directional connection to another node.
PeerConnected(Peer),

/// Node closed a connection to another node.
PeerDisconnected(Peer),

/// Node sent a message to remote node for replication.
SentReplicationMessage(Peer, SyncMessage),

/// Node received a message from remote node for replication.
ReceivedReplicationMessage(Peer, SyncMessage),

/// Replication protocol failed with an critical error.
ReplicationFailed(Peer),
}
23 changes: 23 additions & 0 deletions aquadoggo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ impl Configuration {
}
};

// Derive peer id from key pair
// @TODO: This needs refactoring: https://github.com/p2panda/aquadoggo/issues/388
let key_pair = NetworkConfiguration::load_or_generate_key_pair(config.base_path.clone())?;
config.network.set_peer_id(&key_pair.public());

Ok(config)
}
}

#[cfg(test)]
impl Configuration {
/// Returns a new configuration object for a node which stores all data temporarily in memory.
pub fn new_ephemeral() -> Self {
let mut config = Configuration {
database_url: Some("sqlite::memory:".to_string()),
..Default::default()
};

// Generate a random key pair and just keep it in memory
// @TODO: This needs refactoring: https://github.com/p2panda/aquadoggo/issues/388
let key_pair: libp2p::identity::Keypair = crate::network::identity::Identity::new();
config.network.set_peer_id(&key_pair.public());

config
}
}
1 change: 1 addition & 0 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ async fn insert_document_view(
)
VALUES
($1, $2, $3)
ON CONFLICT(document_view_id) DO NOTHING -- @TODO: temp fix for double document view insertions: https://github.com/p2panda/aquadoggo/issues/398
adzialocha marked this conversation as resolved.
Show resolved Hide resolved
adzialocha marked this conversation as resolved.
Show resolved Hide resolved
",
)
.bind(document_view.id().to_string())
Expand Down
10 changes: 6 additions & 4 deletions aquadoggo/src/db/stores/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ impl EntryStore for SqlStore {
log_id: &LogId,
initial_seq_num: &SeqNum,
) -> Result<Vec<StorageEntry>, EntryStorageError> {
// Formatting query string in this way as `sqlx` currently doesn't support binding list
// arguments for IN queries.
let cert_pool_seq_nums = get_lipmaa_links_back_to(initial_seq_num.as_u64(), 1)
.iter()
.map(|seq_num| seq_num.to_string())
.map(|seq_num| format!("'{seq_num}'"))
.collect::<Vec<String>>()
.join(",");

// Formatting query string in this way as `sqlx` currently
// doesn't support binding list arguments for IN queries.
let sql_str = format!(
"SELECT
public_key,
Expand All @@ -299,7 +299,7 @@ impl EntryStore for SqlStore {
WHERE
public_key = $1
AND log_id = $2
AND CAST(seq_num AS NUMERIC) IN ({})
AND seq_num IN ({})
ORDER BY
CAST(seq_num AS NUMERIC) DESC
",
Expand Down Expand Up @@ -337,6 +337,8 @@ impl SqlStore {
logs.schema = $1
GROUP BY
entries.public_key, entries.log_id
ORDER BY
entries.public_key, CAST(entries.log_id AS NUMERIC)
",
)
.bind(schema_id.to_string())
Expand Down
3 changes: 2 additions & 1 deletion aquadoggo/src/graphql/scalars/document_view_id_scalar.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::{fmt::Display, str::FromStr};
use std::fmt::Display;
use std::str::FromStr;

use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value};
use p2panda_rs::document::DocumentViewId;
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo/src/graphql/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl GraphQLSchemaManager {
let shared = self.shared.clone();
let schemas = self.schemas.clone();

info!("Subscribing GraphQL manager to schema provider");
debug!("Subscribing GraphQL manager to schema provider");
let mut on_schema_added = shared.schema_provider.on_schema_added();

// Create the new GraphQL based on the current state of known p2panda application schemas
Expand Down
46 changes: 24 additions & 22 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,30 @@ pub async fn materializer_service(

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
operation_id
)
}) {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
operation_id
)
}) {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
}
}
}
Expand Down
49 changes: 45 additions & 4 deletions aquadoggo/src/network/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::time::Duration;

use anyhow::Result;
use libp2p::identity::Keypair;
use libp2p::swarm::behaviour::toggle::Toggle;
Expand All @@ -8,9 +10,31 @@ use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous
use log::debug;

use crate::network::config::NODE_NAMESPACE;
use crate::network::peers;
use crate::network::NetworkConfiguration;

/// How often do we broadcast mDNS queries into the network.
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5);

/// How often do we ping other peers to check for a healthy connection.
const PING_INTERVAL: Duration = Duration::from_secs(5);

/// How long do we wait for an answer from the other peer before we consider the connection as
/// stale.
const PING_TIMEOUT: Duration = Duration::from_secs(3);

/// Network behaviour for the aquadoggo node.
///
/// In libp2p all different behaviours are "merged" into one "main behaviour" with help of the
/// `NetworkBehaviour` derive macro.
///
/// All behaviours share the same connections with each other. Together they form something we
/// could call our "custom" networking behaviour.
///
/// It is possible for a peer to not support all behaviours, internally libp2p negotiates the
/// capabilities of each peer for us and upgrades the protocol accordingly. For example two peers
/// can handle p2panda messages with each others (using the `peers` behaviour) but do not
/// necessarily need to be able to support the `relay` behaviour.
#[derive(NetworkBehaviour)]
pub struct Behaviour {
/// Determine NAT status by requesting remote peers to dial the public address of the
Expand Down Expand Up @@ -42,9 +66,12 @@ pub struct Behaviour {
/// Register with a rendezvous server and query remote peer addresses.
pub rendezvous_client: Toggle<rendezvous::client::Behaviour>,

/// Serve as a rendezvous point for remote peers to register their external addresses
/// and query the addresses of other peers.
/// Serve as a rendezvous point for remote peers to register their external addresses and query
/// the addresses of other peers.
pub rendezvous_server: Toggle<rendezvous::server::Behaviour>,

/// Register peer connections and handle p2panda messaging with them.
pub peers: peers::Behaviour,
}

impl Behaviour {
Expand Down Expand Up @@ -86,15 +113,25 @@ impl Behaviour {
// Create an mDNS behaviour with default configuration if the mDNS flag is set
let mdns = if network_config.mdns {
debug!("mDNS network behaviour enabled");
Some(mdns::Behaviour::new(Default::default(), peer_id)?)
Some(mdns::Behaviour::new(
mdns::Config {
query_interval: MDNS_QUERY_INTERVAL,
..mdns::Config::default()
},
peer_id,
)?)
} else {
None
};

// Create a ping behaviour with default configuration if the ping flag is set
let ping = if network_config.ping {
debug!("Ping network behaviour enabled");
Some(ping::Behaviour::default())
Some(ping::Behaviour::new(
ping::Config::new()
.with_interval(PING_INTERVAL)
.with_timeout(PING_TIMEOUT),
))
} else {
None
};
Expand Down Expand Up @@ -132,6 +169,9 @@ impl Behaviour {
None
};

// Create behaviour to manage peer connections and handle p2panda messaging
let peers = peers::Behaviour::new();

Ok(Self {
autonat: autonat.into(),
identify: identify.into(),
Expand All @@ -142,6 +182,7 @@ impl Behaviour {
rendezvous_server: rendezvous_server.into(),
relay_client: relay_client.into(),
relay_server: relay_server.into(),
peers,
})
}
}
19 changes: 14 additions & 5 deletions aquadoggo/src/network/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use anyhow::Result;
use libp2p::connection_limits::ConnectionLimits;
use libp2p::identity::Keypair;
use libp2p::identity::{Keypair, PublicKey};
use libp2p::{Multiaddr, PeerId};
use log::info;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -72,8 +72,8 @@ pub struct NetworkConfiguration {

/// Ping behaviour enabled.
///
/// Send outbound pings to connected peers every 15 seconds and respond to inbound pings.
/// Every sent ping must yield a response within 20 seconds in order to be successful.
/// Send outbound pings to connected peers every 15 seconds and respond to inbound pings. Every
/// sent ping must yield a response within 20 seconds in order to be successful.
pub ping: bool,

/// QUIC transport port.
Expand Down Expand Up @@ -103,6 +103,9 @@ pub struct NetworkConfiguration {
///
/// Serve as a rendezvous point for peer discovery, allowing peer registration and queries.
pub rendezvous_server_enabled: bool,

/// Our local peer id.
pub peer_id: Option<PeerId>,
}

impl Default for NetworkConfiguration {
Expand All @@ -127,11 +130,17 @@ impl Default for NetworkConfiguration {
rendezvous_address: None,
rendezvous_peer_id: None,
rendezvous_server_enabled: false,
peer_id: None,
}
}
}

impl NetworkConfiguration {
/// Derive peer id from a given public key.
pub fn set_peer_id(&mut self, public_key: &PublicKey) {
self.peer_id = Some(PeerId::from_public_key(public_key));
}

/// Define the connection limits of the swarm.
pub fn connection_limits(&self) -> ConnectionLimits {
ConnectionLimits::default()
Expand All @@ -144,8 +153,8 @@ impl NetworkConfiguration {

/// Load the key pair from the file at the specified path.
///
/// If the file does not exist, a random key pair is generated and saved.
/// If no path is specified, a random key pair is generated.
/// If the file does not exist, a random key pair is generated and saved. If no path is
/// specified, a random key pair is generated.
pub fn load_or_generate_key_pair(path: Option<PathBuf>) -> Result<Keypair> {
let key_pair = match path {
Some(mut path) => {
Expand Down
Loading
Loading