Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Relax mutability requirements of FluvioAdmin client #1179

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## Platform Version 0.8.5 - UNRELEASED
* Add unstable Admin Watch API for topics, partitions, and SPUs ([#1136](https://github.com/infinyon/fluvio/pull/1136))
* Make recipes for smoke tests no longer build by default, helps caching. ([#1165](https://github.com/infinyon/fluvio/pull/1165))
* Relax requirement of `FluvioAdmin` methods from `&mut self` to `&self`. ([#1178](https://github.com/infinyon/fluvio/pull/1178))

## Platform Version 0.8.4 - 2020-05-29
* Don't hang when check for non exist topic. ([#697](https://github.com/infinyon/fluvio/pull/697))
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/cli/src/consumer/partition/list.rs
Expand Up @@ -27,7 +27,7 @@ impl ListPartitionOpt {
O: Terminal,
{
let output = self.output.format;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let partitions = admin.list::<PartitionSpec, _>(vec![]).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/create.rs
Expand Up @@ -88,7 +88,7 @@ impl CreateTopicOpt {
let (name, topic_spec) = self.validate()?;

debug!("creating topic: {} spec: {:#?}", name, topic_spec);
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.create(name.clone(), dry_run, topic_spec).await?;
println!("topic \"{}\" created", name);

Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/delete.rs
Expand Up @@ -21,7 +21,7 @@ pub struct DeleteTopicOpt {
impl DeleteTopicOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<()> {
debug!("deleting topic: {}", &self.topic);
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.delete::<TopicSpec, _>(&self.topic).await?;
println!("topic \"{}\" deleted", &self.topic);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/describe.rs
Expand Up @@ -35,7 +35,7 @@ impl DescribeTopicsOpt {
let output_type = self.output.format;
debug!("describe topic: {}, {}", topic, output_type);

let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
let topics = admin.list::<TopicSpec, _>(vec![topic]).await?;

display::describe_topics(topics, output_type, out).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/list.rs
Expand Up @@ -30,7 +30,7 @@ impl ListTopicsOpt {
pub async fn process<O: Terminal>(self, out: Arc<O>, fluvio: &Fluvio) -> Result<()> {
let output_type = self.output.format;
debug!("list topics {:#?} ", output_type);
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let topics = admin.list::<TopicSpec, _>(vec![]).await?;
display::format_response_output(out, topics, output_type)?;
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.8.5"
version = "0.8.6"
edition = "2018"
license = "Apache-2.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
Expand Down
13 changes: 4 additions & 9 deletions src/client/src/admin.rs
Expand Up @@ -125,7 +125,7 @@ impl FluvioAdmin {
}

#[instrument(skip(self, request))]
async fn send_receive<R>(&mut self, request: R) -> Result<R::Response, FlvSocketError>
async fn send_receive<R>(&self, request: R) -> Result<R::Response, FlvSocketError>
where
R: AdminRequest + Send + Sync,
{
Expand All @@ -134,12 +134,7 @@ impl FluvioAdmin {

/// create new object
#[instrument(skip(self, name, dry_run, spec))]
pub async fn create<S>(
&mut self,
name: String,
dry_run: bool,
spec: S,
) -> Result<(), FluvioError>
pub async fn create<S>(&self, name: String, dry_run: bool, spec: S) -> Result<(), FluvioError>
where
S: Into<AllCreatableSpec>,
{
Expand All @@ -157,7 +152,7 @@ impl FluvioAdmin {
/// delete object by key
/// key is depend on spec, most are string but some allow multiple types
#[instrument(skip(self, key))]
pub async fn delete<S, K>(&mut self, key: K) -> Result<(), FluvioError>
pub async fn delete<S, K>(&self, key: K) -> Result<(), FluvioError>
where
S: DeleteSpec,
K: Into<S::DeleteKey>,
Expand All @@ -168,7 +163,7 @@ impl FluvioAdmin {
}

#[instrument(skip(self, filters))]
pub async fn list<S, F>(&mut self, filters: F) -> Result<Vec<Metadata<S>>, FluvioError>
pub async fn list<S, F>(&self, filters: F) -> Result<Vec<Metadata<S>>, FluvioError>
where
S: ListSpec + Encoder + Decoder,
S::Status: Encoder + Decoder,
Expand Down
2 changes: 1 addition & 1 deletion src/client/src/producer.rs
Expand Up @@ -122,7 +122,7 @@ impl TopicProducer {
let requests = assemble_requests(&self.topic, partitions_by_spu);

for (leader, request) in requests {
let mut spu_client = self.pool.create_serial_socket_from_leader(leader).await?;
let spu_client = self.pool.create_serial_socket_from_leader(leader).await?;
spu_client.send_receive(request).await?;
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/src/sockets.rs
Expand Up @@ -209,7 +209,7 @@ impl VersionedSerialSocket {

/// send and wait for reply serially
#[instrument(level = "trace", skip(self, request))]
pub async fn send_receive<R>(&mut self, request: R) -> Result<R::Response, FlvSocketError>
pub async fn send_receive<R>(&self, request: R) -> Result<R::Response, FlvSocketError>
where
R: Request + Send + Sync,
{
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/group/create.rs
Expand Up @@ -44,7 +44,7 @@ impl CreateManagedSpuGroupOpt {
let (name, spec) = self.validate();
debug!("creating spg: {}, spec: {:#?}", name, spec);

let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.create(name, false, spec).await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/group/delete.rs
Expand Up @@ -23,7 +23,7 @@ pub struct DeleteManagedSpuGroupOpt {

impl DeleteManagedSpuGroupOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ClusterCliError> {
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.delete::<SpuGroupSpec, _>(&self.name).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/group/list.rs
Expand Up @@ -26,7 +26,7 @@ impl ListManagedSpuGroupsOpt {
out: Arc<O>,
fluvio: &Fluvio,
) -> Result<(), ClusterCliError> {
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
let lists = admin.list::<SpuGroupSpec, _>(vec![]).await?;

output::spu_group_response_to_output(out, lists, self.output.format)
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/spu/list.rs
Expand Up @@ -34,7 +34,7 @@ impl ListSpusOpt {
out: Arc<O>,
fluvio: &Fluvio,
) -> Result<(), ClusterCliError> {
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let spus = if self.custom {
// List custom SPUs only
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/spu/register.rs
Expand Up @@ -38,7 +38,7 @@ pub struct RegisterCustomSpuOpt {
impl RegisterCustomSpuOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ClusterCliError> {
let (name, spec) = self.validate()?;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.create(name, false, spec).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/spu/unregister.rs
Expand Up @@ -35,7 +35,7 @@ pub struct UnregisterCustomSpuOpt {
impl UnregisterCustomSpuOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ClusterCliError> {
let delete_key = self.validate()?;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.delete::<CustomSpuSpec, _>(delete_key).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/start/k8.rs
Expand Up @@ -1334,7 +1334,7 @@ impl ClusterInstaller {
debug!("trying to create managed spu: {:#?}", cluster);
let name = self.config.group_name.clone();
let fluvio = Fluvio::connect_with_config(cluster).await?;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let spu_spec = SpuGroupSpec {
replicas: self.config.spu_replicas,
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/start/local.rs
Expand Up @@ -661,7 +661,7 @@ impl LocalInstaller {
sleep(Duration::from_secs(delay)).await;

let client = Fluvio::connect().await?;
let mut admin = client.admin().await;
let admin = client.admin().await;

// wait for list of spu
for _ in 0..30u16 {
Expand Down
2 changes: 1 addition & 1 deletion tests/runner/src/tests/smoke/consume.rs
Expand Up @@ -171,7 +171,7 @@ async fn validate_consume_message_api(
// wait 500m second and ensure partition list
sleep(Duration::from_millis(500)).await;

let mut admin = client.admin().await;
let admin = client.admin().await;
let partitions = admin
.list::<PartitionSpec, _>(vec![])
.await
Expand Down
2 changes: 1 addition & 1 deletion tests/runner/src/utils/test_runner.rs
Expand Up @@ -36,7 +36,7 @@ impl FluvioTest {
println!("Creating the topic: {}", &option.topic_name);
}

let mut admin = client.admin().await;
let admin = client.admin().await;
let topic_spec = TopicSpec::new_computed(1, option.replication() as i32, None);

let topic_create = admin
Expand Down