Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);

/**
* Add {@link Resource.StateListener}s to the consumer.
*
* @param listeners listeners
* @return this builder instance
* @since 1.3.0
*/
ConsumerBuilder listeners(Resource.StateListener... listeners);

/**
* Enable {@link ManualTrackingStrategy}.
*
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/rabbitmq/stream/InvalidStateException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream;

/**
* Exception thrown when a resource is not in an appropriate state.
*
* <p>An example is a connection that is initializing.
*/
public class InvalidStateException extends StreamException {
public InvalidStateException(String format, Object... args) {
super(format, args);
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ public interface ProducerBuilder {
*/
ProducerBuilder filterValue(Function<Message, String> filterValueExtractor);

/**
* Add {@link Resource.StateListener}s to the producer.
*
* @param listeners listeners
* @return this builder instance
* @since 1.3.0
*/
ProducerBuilder listeners(Resource.StateListener... listeners);

/**
* Create the {@link Producer} instance.
*
Expand Down
89 changes: 89 additions & 0 deletions src/main/java/com/rabbitmq/stream/Resource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream;

/**
* Marker interface for {@link com.rabbitmq.stream.Resource}-like classes.
*
* <p>Instances of these classes have different states during their lifecycle: open, recovering,
* closed, etc. Application can be interested in taking some actions for a given state (e.g.
* stopping publishing when a {@link com.rabbitmq.stream.Producer} is recovering after a connection
* problem and resuming publishing when it is open again).
*
* @see com.rabbitmq.stream.Producer
* @see com.rabbitmq.stream.Consumer
*/
public interface Resource {

/**
* Application listener for a {@link com.rabbitmq.stream.Resource}.
*
* <p>They are registered at creation time.
*
* @see
* com.rabbitmq.stream.ProducerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...)
* @see
* com.rabbitmq.stream.ConsumerBuilder#listeners(com.rabbitmq.stream.Resource.StateListener...)
*/
@FunctionalInterface
interface StateListener {

/**
* Handle state change.
*
* @param context state change context
*/
void handle(Context context);
}

/** Context of a resource state change. */
interface Context {

/**
* The resource instance.
*
* @return resource instance
*/
Resource resource();

/**
* The previous state of the resource.
*
* @return previous state
*/
State previousState();

/**
* The current (new) state of the resource.
*
* @return current state
*/
State currentState();
}

/** Resource state. */
enum State {
/** The resource is currently opening. */
OPENING,
/** The resource is open and functional. */
OPEN,
/** The resource is recovering. */
RECOVERING,
/** The resource is closing. */
CLOSING,
/** The resource is closed. */
CLOSED
}
}
22 changes: 22 additions & 0 deletions src/main/java/com/rabbitmq/stream/ResourceClosedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream;

/** Exception thrown when a resource is not usable because it is closed. */
public class ResourceClosedException extends InvalidStateException {
public ResourceClosedException(String format, Object... args) {
super(format, args);
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/rabbitmq/stream/StreamException.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class StreamException extends RuntimeException {

private final short code;

public StreamException(String message) {
super(message);
public StreamException(String format, Object... args) {
super(String.format(format, args));
this.code = -1;
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -506,15 +506,15 @@ SubscriptionState state() {
return this.state.get();
}

private void markConsuming() {
private void markOpen() {
if (this.consumer != null) {
this.consumer.consuming();
this.consumer.markOpen();
}
}

private void markNotConsuming() {
private void markRecovering() {
if (this.consumer != null) {
this.consumer.notConsuming();
this.consumer.markRecovering();
}
}

Expand Down Expand Up @@ -712,7 +712,7 @@ private ClientSubscriptionsManager(
"Subscription connection has {} consumer(s) over {} stream(s) to recover",
this.subscriptionTrackers.stream().filter(Objects::nonNull).count(),
this.streamToStreamSubscriptions.size());
iterate(this.subscriptionTrackers, SubscriptionTracker::markNotConsuming);
iterate(this.subscriptionTrackers, SubscriptionTracker::markRecovering);
environment
.scheduledExecutorService()
.execute(
Expand Down Expand Up @@ -787,7 +787,7 @@ private ClientSubscriptionsManager(
}

if (affectedSubscriptions != null && !affectedSubscriptions.isEmpty()) {
iterate(affectedSubscriptions, SubscriptionTracker::markNotConsuming);
iterate(affectedSubscriptions, SubscriptionTracker::markRecovering);
environment
.scheduledExecutorService()
.execute(
Expand Down Expand Up @@ -1146,7 +1146,7 @@ void add(
throw e;
}
subscriptionTracker.state(SubscriptionState.ACTIVE);
subscriptionTracker.markConsuming();
subscriptionTracker.markOpen();
LOGGER.debug("Subscribed to '{}'", subscriptionTracker.stream);
} finally {
this.subscriptionManagerLock.unlock();
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/ResourceBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.Resource.State.CLOSED;
import static com.rabbitmq.stream.Resource.State.OPEN;
import static com.rabbitmq.stream.Resource.State.OPENING;

import com.rabbitmq.stream.InvalidStateException;
import com.rabbitmq.stream.Resource;
import com.rabbitmq.stream.ResourceClosedException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

abstract class ResourceBase implements Resource {

private final AtomicReference<State> state = new AtomicReference<>();
private final StateEventSupport stateEventSupport;

ResourceBase(List<StateListener> listeners) {
this.stateEventSupport = new StateEventSupport(listeners);
this.state(OPENING);
}

protected void checkOpen() {
State state = this.state.get();
if (state == CLOSED) {
throw new ResourceClosedException("Resource is closed");
} else if (state != OPEN) {
throw new InvalidStateException("Resource is not open, current state is %s", state.name());
}
}

protected State state() {
return this.state.get();
}

protected void state(Resource.State state) {
Resource.State previousState = this.state.getAndSet(state);
if (state != previousState) {
this.dispatch(previousState, state);
}
}

private void dispatch(State previous, State current) {
this.stateEventSupport.dispatch(this, previous, current);
}
}
74 changes: 74 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/StateEventSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Resource;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class StateEventSupport {

private static final Logger LOGGER = LoggerFactory.getLogger(StateEventSupport.class);

private final List<Resource.StateListener> listeners;

StateEventSupport(List<Resource.StateListener> listeners) {
this.listeners = List.copyOf(listeners);
}

void dispatch(Resource resource, Resource.State previousState, Resource.State currentState) {
if (!this.listeners.isEmpty()) {
Resource.Context context = new DefaultContext(resource, previousState, currentState);
this.listeners.forEach(
l -> {
try {
l.handle(context);
} catch (Exception e) {
LOGGER.warn("Error in resource listener", e);
}
});
}
}

private static class DefaultContext implements Resource.Context {

private final Resource resource;
private final Resource.State previousState;
private final Resource.State currentState;

private DefaultContext(
Resource resource, Resource.State previousState, Resource.State currentState) {
this.resource = resource;
this.previousState = previousState;
this.currentState = currentState;
}

@Override
public Resource resource() {
return this.resource;
}

@Override
public Resource.State previousState() {
return this.previousState;
}

@Override
public Resource.State currentState() {
return this.currentState;
}
}
}
Loading