Skip to content

Commit

Permalink
Relax mutability requirements of FluvioAdmin client (#1179)
Browse files Browse the repository at this point in the history
Closes #1178. Will help to prevent bugs like infinyon/fluvio-client-wasm#42

- Changes `&mut self` to `&self` for `create`, `delete`, and `list` methods on `FluvioAdmin`.
  • Loading branch information
nicholastmosher committed Jun 18, 2021
1 parent 9607a3b commit f61aaae
Show file tree
Hide file tree
Showing 21 changed files with 24 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -3,6 +3,7 @@
## Platform Version 0.8.5 - UNRELEASED
* Add unstable Admin Watch API for topics, partitions, and SPUs ([#1136](https://github.com/infinyon/fluvio/pull/1136))
* Make recipes for smoke tests no longer build by default, helps caching. ([#1165](https://github.com/infinyon/fluvio/pull/1165))
* Relax requirement of `FluvioAdmin` methods from `&mut self` to `&self`. ([#1178](https://github.com/infinyon/fluvio/pull/1178))

## Platform Version 0.8.4 - 2020-05-29
* Don't hang when check for non exist topic. ([#697](https://github.com/infinyon/fluvio/pull/697))
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/cli/src/consumer/partition/list.rs
Expand Up @@ -27,7 +27,7 @@ impl ListPartitionOpt {
O: Terminal,
{
let output = self.output.format;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let partitions = admin.list::<PartitionSpec, _>(vec![]).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/create.rs
Expand Up @@ -88,7 +88,7 @@ impl CreateTopicOpt {
let (name, topic_spec) = self.validate()?;

debug!("creating topic: {} spec: {:#?}", name, topic_spec);
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.create(name.clone(), dry_run, topic_spec).await?;
println!("topic \"{}\" created", name);

Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/delete.rs
Expand Up @@ -21,7 +21,7 @@ pub struct DeleteTopicOpt {
impl DeleteTopicOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<()> {
debug!("deleting topic: {}", &self.topic);
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.delete::<TopicSpec, _>(&self.topic).await?;
println!("topic \"{}\" deleted", &self.topic);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/describe.rs
Expand Up @@ -35,7 +35,7 @@ impl DescribeTopicsOpt {
let output_type = self.output.format;
debug!("describe topic: {}, {}", topic, output_type);

let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
let topics = admin.list::<TopicSpec, _>(vec![topic]).await?;

display::describe_topics(topics, output_type, out).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/src/consumer/topic/list.rs
Expand Up @@ -30,7 +30,7 @@ impl ListTopicsOpt {
pub async fn process<O: Terminal>(self, out: Arc<O>, fluvio: &Fluvio) -> Result<()> {
let output_type = self.output.format;
debug!("list topics {:#?} ", output_type);
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let topics = admin.list::<TopicSpec, _>(vec![]).await?;
display::format_response_output(out, topics, output_type)?;
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.8.5"
version = "0.8.6"
edition = "2018"
license = "Apache-2.0"
authors = ["Fluvio Contributors <team@fluvio.io>"]
Expand Down
13 changes: 4 additions & 9 deletions src/client/src/admin.rs
Expand Up @@ -125,7 +125,7 @@ impl FluvioAdmin {
}

#[instrument(skip(self, request))]
async fn send_receive<R>(&mut self, request: R) -> Result<R::Response, FlvSocketError>
async fn send_receive<R>(&self, request: R) -> Result<R::Response, FlvSocketError>
where
R: AdminRequest + Send + Sync,
{
Expand All @@ -134,12 +134,7 @@ impl FluvioAdmin {

/// create new object
#[instrument(skip(self, name, dry_run, spec))]
pub async fn create<S>(
&mut self,
name: String,
dry_run: bool,
spec: S,
) -> Result<(), FluvioError>
pub async fn create<S>(&self, name: String, dry_run: bool, spec: S) -> Result<(), FluvioError>
where
S: Into<AllCreatableSpec>,
{
Expand All @@ -157,7 +152,7 @@ impl FluvioAdmin {
/// delete object by key
/// key is depend on spec, most are string but some allow multiple types
#[instrument(skip(self, key))]
pub async fn delete<S, K>(&mut self, key: K) -> Result<(), FluvioError>
pub async fn delete<S, K>(&self, key: K) -> Result<(), FluvioError>
where
S: DeleteSpec,
K: Into<S::DeleteKey>,
Expand All @@ -168,7 +163,7 @@ impl FluvioAdmin {
}

#[instrument(skip(self, filters))]
pub async fn list<S, F>(&mut self, filters: F) -> Result<Vec<Metadata<S>>, FluvioError>
pub async fn list<S, F>(&self, filters: F) -> Result<Vec<Metadata<S>>, FluvioError>
where
S: ListSpec + Encoder + Decoder,
S::Status: Encoder + Decoder,
Expand Down
2 changes: 1 addition & 1 deletion src/client/src/producer.rs
Expand Up @@ -122,7 +122,7 @@ impl TopicProducer {
let requests = assemble_requests(&self.topic, partitions_by_spu);

for (leader, request) in requests {
let mut spu_client = self.pool.create_serial_socket_from_leader(leader).await?;
let spu_client = self.pool.create_serial_socket_from_leader(leader).await?;
spu_client.send_receive(request).await?;
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/src/sockets.rs
Expand Up @@ -209,7 +209,7 @@ impl VersionedSerialSocket {

/// send and wait for reply serially
#[instrument(level = "trace", skip(self, request))]
pub async fn send_receive<R>(&mut self, request: R) -> Result<R::Response, FlvSocketError>
pub async fn send_receive<R>(&self, request: R) -> Result<R::Response, FlvSocketError>
where
R: Request + Send + Sync,
{
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/group/create.rs
Expand Up @@ -44,7 +44,7 @@ impl CreateManagedSpuGroupOpt {
let (name, spec) = self.validate();
debug!("creating spg: {}, spec: {:#?}", name, spec);

let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.create(name, false, spec).await?;

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/group/delete.rs
Expand Up @@ -23,7 +23,7 @@ pub struct DeleteManagedSpuGroupOpt {

impl DeleteManagedSpuGroupOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ClusterCliError> {
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.delete::<SpuGroupSpec, _>(&self.name).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/group/list.rs
Expand Up @@ -26,7 +26,7 @@ impl ListManagedSpuGroupsOpt {
out: Arc<O>,
fluvio: &Fluvio,
) -> Result<(), ClusterCliError> {
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
let lists = admin.list::<SpuGroupSpec, _>(vec![]).await?;

output::spu_group_response_to_output(out, lists, self.output.format)
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/spu/list.rs
Expand Up @@ -34,7 +34,7 @@ impl ListSpusOpt {
out: Arc<O>,
fluvio: &Fluvio,
) -> Result<(), ClusterCliError> {
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let spus = if self.custom {
// List custom SPUs only
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/spu/register.rs
Expand Up @@ -38,7 +38,7 @@ pub struct RegisterCustomSpuOpt {
impl RegisterCustomSpuOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ClusterCliError> {
let (name, spec) = self.validate()?;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.create(name, false, spec).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/cli/spu/unregister.rs
Expand Up @@ -35,7 +35,7 @@ pub struct UnregisterCustomSpuOpt {
impl UnregisterCustomSpuOpt {
pub async fn process(self, fluvio: &Fluvio) -> Result<(), ClusterCliError> {
let delete_key = self.validate()?;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;
admin.delete::<CustomSpuSpec, _>(delete_key).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/start/k8.rs
Expand Up @@ -1334,7 +1334,7 @@ impl ClusterInstaller {
debug!("trying to create managed spu: {:#?}", cluster);
let name = self.config.group_name.clone();
let fluvio = Fluvio::connect_with_config(cluster).await?;
let mut admin = fluvio.admin().await;
let admin = fluvio.admin().await;

let spu_spec = SpuGroupSpec {
replicas: self.config.spu_replicas,
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/src/start/local.rs
Expand Up @@ -661,7 +661,7 @@ impl LocalInstaller {
sleep(Duration::from_secs(delay)).await;

let client = Fluvio::connect().await?;
let mut admin = client.admin().await;
let admin = client.admin().await;

// wait for list of spu
for _ in 0..30u16 {
Expand Down
2 changes: 1 addition & 1 deletion tests/runner/src/tests/smoke/consume.rs
Expand Up @@ -171,7 +171,7 @@ async fn validate_consume_message_api(
// wait 500m second and ensure partition list
sleep(Duration::from_millis(500)).await;

let mut admin = client.admin().await;
let admin = client.admin().await;
let partitions = admin
.list::<PartitionSpec, _>(vec![])
.await
Expand Down
2 changes: 1 addition & 1 deletion tests/runner/src/utils/test_runner.rs
Expand Up @@ -36,7 +36,7 @@ impl FluvioTest {
println!("Creating the topic: {}", &option.topic_name);
}

let mut admin = client.admin().await;
let admin = client.admin().await;
let topic_spec = TopicSpec::new_computed(1, option.replication() as i32, None);

let topic_create = admin
Expand Down

0 comments on commit f61aaae

Please sign in to comment.