Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit Breaker hangs on Half Open state #743

Open
arturgalenom opened this issue Mar 2, 2021 · 0 comments
Open

Circuit Breaker hangs on Half Open state #743

arturgalenom opened this issue Mar 2, 2021 · 0 comments

Comments

@arturgalenom
Copy link

Hello there,

I am trying to use Circuit Break with Akka Streams, but after some time changing from Open state to Half Open state due errors yet not recovered, the circuit breaker stops retry and hangs forever at half state. I don't know if I am doing something wrong.

The meaningful code of my configuration go bellow:

static class CBListenerActor extends AbstractActor {

        private final LoggerWrapper log;

        CBListenerActor() {
            log = new LoggerWrapper(LoggerFactory.getLogger(CBListenerActor.class), "SQS Circuit Breaker", getSelf().path().name());
        }

        @Override public Receive createReceive() {
            return receiveBuilder()
                    .matchEquals(Open.instance(), msg -> {
                        log.info("OPENED CIRCUIT BREAK");
                    })
                    .matchEquals(Closed.instance(), msg -> {
                        log.info("CLOSED CIRCUIT BREAK");
                    })
                    .matchEquals(HalfOpen.instance(), msg -> {
                        log.info("HALF OPENED CIRCUIT BREAK");
                    })
                    .build();
        }
    }
final CircuitBreakerState state =
                AtomicCircuitBreakerState.create(
                        getClass().getSimpleName(),
                        5,
                        Duration.ofSeconds(10),
                        Duration.ofSeconds(1),
                        Duration.ofSeconds(4),
                        2,
                        system.dispatcher(),
                        system.scheduler());

        final var props = Props.create(CBListenerActor.class);
        final var actorName = getClass().getSimpleName() + CBListenerActor.class.getSimpleName() + "-" + new Random().nextInt();
        final var subscriber = this.system.actorOf(props, actorName);
        state.subscribe(subscriber, Open.instance());
        state.subscribe(subscriber, Closed.instance());
        state.subscribe(subscriber, HalfOpen.instance());

        final CircuitBreakerSettings<ProcessResult, ProcessResult, Message> settings = CircuitBreakerSettings
                .<ProcessResult, ProcessResult, Message>create(state)
                .withUniqueIdMapper(Message::messageId)
                .withFailureDecider(tryResult -> {
                    if (tryResult.isSuccess()) {
                        return !tryResult.get().isSuccess();
                    }
                    return true;
                });

        final var circuitBreaker = CircuitBreaker.create(settings);

        final var flow =
                Flow.<Pair<ProcessResult, Message>>create().map(elem -> elem);


        SqsSourceSettings sqsSourceSettings =
                SqsSourceSettings.create()
                        .withWaitTime(getSqsWaitTime())
                        .withMaxBatchSize(getSqsBatchSize())
                        .withMaxBufferSize(getSqsBufferSize())
                        .withCloseOnEmptyReceive(false);

        final var join = circuitBreaker.join(flow);

        SqsSource.create(getSqsUrl(), sqsSourceSettings, sqsClient)
                .mapAsync(getSqsParallelism(),
                        (msg) -> CompletableFuture.supplyAsync(() -> executeProcess(msg)).thenApply(result -> Pair.create(result, msg)))
                .via(join)
                .map(pair -> {
                    final var processResult = pair.first().getOrElse(() -> new ProcessResult(false, false, null));
                    if (processResult.isSuccess()) {
                        return MessageAction.delete(processResult.getMessage());
                    } else if (processResult.isRetry()) {
                        return MessageAction.ignore(processResult.getMessage());
                    }
                    return MessageAction.delete(processResult.getMessage());
                })
                .runWith(SqsAckSink.create(getSqsUrl(), SqsAckSettings.create(), sqsClient), materializer);

Small piece of the log generated by CBListenerActor (I've deleted some parts of the log to don't expose too much information):

{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor-343052081"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor-343052081"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--983211955"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--983211955"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1740369079"}}
{"timestamp":"","level":"INFO","message":"OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}
{"timestamp":"","level":"INFO","message":"HALF OPENED CIRCUIT BREAK","service_name":"service-name","context":{"SQS Circuit Breaker":"CBListenerActor--1252266457"}}

(Here never more happens changes from Open State to HalfOpen State to make a retry. I've waited for many many minutes.)

Well, Am I doing something wrong? Is that the normal behavior?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant