Skip to content

Commit

Permalink
chore: fmt and clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed May 19, 2024
1 parent 86224dd commit 5875d9d
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 79 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 5 additions & 9 deletions crates/fluvio-auth/src/x509/authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,12 @@ impl X509Authenticator {

info!("client_certificate {:?}", client_certificate);

let principal = Self::principal_from_raw_certificate(
&client_certificate
.to_der()
.map_err(|err| {
info!("DEBUG_3_MAP_ERR client_certificate to_der error {:?}", err);
let principal =
Self::principal_from_raw_certificate(&client_certificate.to_der().map_err(|err| {
info!("DEBUG_3_MAP_ERR client_certificate to_der error {:?}", err);

err.into_io_error()
})?,
)?;
err.into_io_error()
})?)?;

Ok(principal)
}
Expand Down Expand Up @@ -134,7 +131,6 @@ impl Authenticator for X509Authenticator {
incoming_tls_stream: &DefaultServerTlsStream,
target_tcp_stream: &TcpStream,
) -> Result<bool, IoError> {

info!("DEBUG_1");

let principal = Self::principal_from_tls_stream(incoming_tls_stream)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-auth/src/x509/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl X509Identity {
return Err(std::io::Error::new(
std::io::ErrorKind::Interrupted,
"connection closed",
))
));
}
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-controlplane-metadata/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-controlplane-metadata"
edition = "2021"
version = "0.28.1"
version = "0.28.2"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Metadata definition for Fluvio control plane"
repository = "https://github.com/infinyon/fluvio"
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-controlplane-metadata/src/mirror/status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::time::Duration;
use fluvio_protocol::{Encoder, Decoder};

#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -63,7 +62,7 @@ pub struct ConnectionStat {

impl MirrorStatus {
#[cfg(feature = "use_serde")]
pub fn last_seen(&self, since: Duration) -> String {
pub fn last_seen(&self, since: std::time::Duration) -> String {
use humantime_serde::re::humantime;

let since_sec = since.as_secs();
Expand Down Expand Up @@ -120,6 +119,7 @@ impl std::fmt::Display for ConnectionStatus {
#[cfg(test)]
mod test {
use super::*;
use std::time::Duration;

#[test]
fn test_last_seen() {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-sc-schema"
version = "0.24.0"
version = "0.24.1"
edition = "2021"
authors = ["Fluvio Contributors <team@fluvio.io>"]
description = "Fluvio API for SC"
Expand Down
19 changes: 17 additions & 2 deletions crates/fluvio-sc/src/services/public_api/mirroring/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use fluvio_future::{task::spawn, timer::sleep};
use fluvio_protocol::api::{RequestHeader, ResponseMessage};
use fluvio_sc_schema::{
core::MetadataItem,
mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus},
mirror::{ConnectionStatus, MirrorPairStatus, MirrorStatus, MirrorType},
spu::SpuSpec,
store::ChangeListener,
topic::{MirrorConfig, ReplicaSpec, TopicSpec},
};
use fluvio_socket::ExclusiveFlvSink;
use fluvio_types::event::StickyEvent;
use tracing::{debug, error, info, instrument, trace};
use tracing::{debug, error, info, instrument, trace, warn};
use anyhow::{Result, anyhow};

use crate::core::Context;
Expand Down Expand Up @@ -55,6 +55,21 @@ impl<C: MetadataItem> RemoteFetchingFromHomeController<C> {
#[instrument(skip(self), name = "RemoteFetchingFromHomeControllerLoop")]
async fn dispatch_loop(mut self) {
use tokio::select;

// check if remote cluster exists
let mirrors = self.ctx.mirrors().store().value(&self.req.remote_id).await;
let remote = mirrors
.iter()
.find(|mirror| match &mirror.spec.mirror_type {
MirrorType::Remote(r) => r.id == self.req.remote_id,
_ => false,
});

if remote.is_none() {
warn!("remote cluster not found: {}", self.req.remote_id);
return;
}

info!(
name = self.req.remote_id,
"received mirroring connect request"
Expand Down
5 changes: 3 additions & 2 deletions crates/fluvio-sc/src/services/public_api/mirroring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub enum MirrorRequests {
#[instrument(skip(request, auth_ctx, sink, end_event))]
pub fn handle_mirroring_request<AC: AuthContext, C: MetadataItem>(
request: RequestMessage<ObjectMirroringRequest>,
auth_ctx: &AuthServiceContext<AC, C>,
auth_ctx: Arc<AuthServiceContext<AC, C>>,
sink: ExclusiveFlvSink,
end_event: Arc<StickyEvent>,
) -> Result<()> {
Expand All @@ -39,10 +39,11 @@ pub fn handle_mirroring_request<AC: AuthContext, C: MetadataItem>(
// authorization check
if !auth_ctx.auth.allow_remote_id(&req.remote_id) {
warn!("identity mismatch for remote_id: {}", req.remote_id);
return Err(anyhow!("identity mismatch for remote_id {}", req.remote_id));
return Err(anyhow!("identity mismatch"));
}

let ctx = auth_ctx.global_ctx.clone();

RemoteFetchingFromHomeController::start(req, sink, end_event, ctx, header);
}
};
Expand Down
3 changes: 1 addition & 2 deletions crates/fluvio-sc/src/services/public_api/public_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ where
mut socket: FluvioSocket,
_connection: ConnectInfo,
) -> Result<()> {

info!("public_server DEBUG_1");

let auth_context = ctx
Expand Down Expand Up @@ -116,7 +115,7 @@ where
"list handler"
),
AdminPublicDecodedRequest::MirroringRequest(request) =>
super::mirroring::handle_mirroring_request(request, &service_context, shared_sink.clone(), end_event.clone())?,
super::mirroring::handle_mirroring_request(request, service_context.clone(), shared_sink.clone(), end_event.clone())?,
AdminPublicDecodedRequest::WatchRequest(request) =>
super::watch::handle_watch_request(
request,
Expand Down
7 changes: 4 additions & 3 deletions crates/fluvio-socket/src/versioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ impl ClientConfig {
/// create new config with prefix add to domain, this is useful for SNI
#[instrument(skip(self))]
pub fn with_prefix_sni_domain(&self, prefix: &str) -> Self {
//let new_domain = format!("{}.{}", prefix, self.connector.domain());
//debug!(sni_domain = %new_domain);
let connector = self.connector.new_domain(self.connector.domain().to_string());
let new_domain = format!("{}.{}", prefix, self.connector.domain());
debug!(sni_domain = %new_domain);
let connector = self.connector.new_domain(new_domain);

Self {
addr: self.addr.clone(),
client_id: self.client_id.clone(),
Expand Down
20 changes: 19 additions & 1 deletion crates/fluvio-spu/src/mirroring/home/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fmt, sync::Arc};
use std::sync::atomic::AtomicU64;

use fluvio_auth::AuthContext;
use fluvio_controlplane_metadata::mirror::MirrorType;
use tokio::select;
use tracing::{debug, error, instrument, warn};
use anyhow::Result;
Expand All @@ -18,7 +19,7 @@ use crate::mirroring::remote::api_key::MirrorRemoteApiEnum;
use crate::mirroring::remote::remote_api::RemoteMirrorRequest;
use crate::mirroring::remote::sync::DefaultPartitionSyncRequest;
use crate::replication::leader::SharedFileLeaderState;
use crate::services::auth::{SpuAuthGlobalContext, SpuAuthServiceContext};
use crate::services::auth::SpuAuthServiceContext;

use super::update_offsets::UpdateHomeOffsetRequest;

Expand Down Expand Up @@ -79,6 +80,23 @@ impl MirrorHomeHandler {
return;
}

// check if remote cluster exists
let mirrors = auth_ctx.global_ctx.mirrors_localstore().all_values();
let remote = mirrors
.iter()
.find(|mirror| match &mirror.spec.mirror_type {
MirrorType::Remote(r) => r.id == req_msg.request.remote_cluster_id,
_ => false,
});

if remote.is_none() {
warn!(
"remote cluster not found: {}",
req_msg.request.remote_cluster_id
);
return;
}

debug!("handling mirror request: {:#?}", req_msg);
let remote_replica = req_msg.request.remote_replica;
let remote_cluster_id = req_msg.request.remote_cluster_id;
Expand Down
1 change: 0 additions & 1 deletion crates/fluvio-spu/src/mirroring/remote/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ where

let (mut home_sink, mut home_stream) = home_socket.split();


if tls {
debug!("tls enabled, disabling zero copy sink");
home_sink.disable_zerocopy();
Expand Down
12 changes: 9 additions & 3 deletions crates/fluvio-spu/src/mirroring/test/integration.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use tracing::debug;

use fluvio_controlplane_metadata::partition::{RemotePartitionConfig, HomePartitionConfig};
use fluvio_future::timer::sleep;
use fluvio_protocol::{fixture::create_raw_recordset, record::ReplicaKey};

use crate::services::public::create_public_server;
use crate::services::{
auth::{root::SpuRootAuthorization, SpuAuthGlobalContext},
public::create_public_server,
};

use super::fixture::{ReplicaConfig, local_port};

Expand Down Expand Up @@ -64,7 +67,10 @@ async fn test_mirroring_new_records() {

// start home server
debug!("starting home server");
let _remote_end = create_public_server(home_port.clone(), home_gctx.clone()).run();

let auth_global_ctx =
SpuAuthGlobalContext::new(home_gctx.clone(), Arc::new(SpuRootAuthorization::new()));
let _remote_end = create_public_server(home_port.to_owned(), auth_global_ctx.clone()).run();

// sleep 1 seconds
debug!("waiting for home public server to up");
Expand Down
1 change: 0 additions & 1 deletion crates/fluvio-spu/src/services/auth/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ impl Authorization for SpuRemoteAuthorization {
&self,
socket: &mut FluvioSocket,
) -> Result<Self::Context, AuthError> {

info!("remote_cert DEBUG_1");

let identity = X509Identity::create_from_connection(socket)
Expand Down
9 changes: 6 additions & 3 deletions crates/fluvio-spu/src/services/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ use std::fmt::Debug;
pub(crate) type SpuPublicServer<A> =
FluvioApiServer<SpuServerRequest, SpuServerApiKey, SpuAuthGlobalContext<A>, PublicService<A>>;

pub fn create_public_server<A>(addr: String, ctx: SpuAuthGlobalContext<A>) -> SpuPublicServer<A>
pub fn create_public_server<A>(
addr: String,
auth_ctx: SpuAuthGlobalContext<A>,
) -> SpuPublicServer<A>
where
A: Authorization + Sync + Send + Debug + 'static,
SpuAuthGlobalContext<A>: Clone + Debug,
<A as Authorization>::Context: Send + Sync,
{
info!(
spu_id = ctx.global_ctx.local_spu_id(),
spu_id = auth_ctx.global_ctx.local_spu_id(),
%addr,
"Starting SPU public service:",
);

FluvioApiServer::new(addr, ctx, PublicService::<A>::new())
FluvioApiServer::new(addr, auth_ctx, PublicService::<A>::new())
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 5875d9d

Please sign in to comment.