Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BoundedElasticScheduler to be less blocking #2909

Merged
merged 5 commits into from
Feb 17, 2022

Conversation

simonbasle
Copy link
Member

@simonbasle simonbasle commented Feb 2, 2022

  • Add BlockHound exception for BoundedElasticScheduler ensureQueueCapacity
  • Remove PriorityQueue in BoundedElasticScheduler, use array+iteration

Fixes #2137.

@simonbasle simonbasle added this to the 3.4.x Backlog milestone Feb 2, 2022
@simonbasle simonbasle added the type/enhancement A general enhancement label Feb 2, 2022
@simonbasle simonbasle self-assigned this Feb 2, 2022
@simonbasle simonbasle requested a review from a team February 2, 2022 09:23
@simonbasle
Copy link
Member Author

note that I wasn't really able to trigger the blocking behavior and BlockHound complaint around the PriorityQueue.
I also need to perform some perf measurement (even maybe commit a jmh bench) to see the impact of that change.

@simonbasle
Copy link
Member Author

I need to refine the benchmark, but preliminary benchmark of running 1000 tasks on a 100 thread bounded scheduler shows the following results:

Benchmark                                               Mode  Cnt   Score   Error  Units
BoundedElasticBusyStructureBenchmark.withArray          avgt   50  19.148 ± 2.504  ms/op
BoundedElasticBusyStructureBenchmark.withPriorityQueue  avgt   50  29.360 ± 0.731  ms/op

@simonbasle simonbasle marked this pull request as ready for review February 7, 2022 17:37
@simonbasle
Copy link
Member Author

I'm not entirely comfortable delivering this in next week's release without some user feedback.
@ssalamov would you be able to build a snapshot locally with this branch and validate that all is working well for you ?

@OlegDokuka
Copy link
Contributor

we may need a stresstest for the updated impl.

@OlegDokuka
Copy link
Contributor

Looks good to me in general. Curious if it is possible to see the efficiency of the algorithm in case of tasks which are heterogeneous in time

BoundedState choice = arr[0];
int leastBusy = Integer.MAX_VALUE;

for (int i = 0; i < arr.length; i++) {
Copy link
Contributor

@OlegDokuka OlegDokuka Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned before, can we do random choices here instead of iterating? Alternatively - can we just iterate a constant times or less if the array is smaller?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me capture that as a follow-up issue and merge this one for now 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #2927

@ssalamov
Copy link

@simonbasle thank you for your effort. Soon I will finish testing and keep you updated. Wondering if this fix will solve the below issue as well?

java.lang.Error: jdk.internal.misc.Unsafe#park
	at com.ocado.lineitems.config.util.internal.LineitemsBlockHoundIntegration.lambda$applyTo$0(LineitemsBlockHoundIntegration.java:14)
	at reactor.blockhound.BlockHound$Builder.lambda$install$8(BlockHound.java:427)
	at reactor.blockhound.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:89)
	at java.base/jdk.internal.misc.Unsafe.park(Unsafe.java)
	at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
	at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
	at java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
	at java.base/java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:478)
	at java.base/java.util.concurrent.PriorityBlockingQueue.add(PriorityBlockingQueue.java:460)
	at reactor.core.scheduler.BoundedElasticScheduler$BoundedServices.pick(BoundedElasticScheduler.java:394)
	at reactor.core.scheduler.BoundedElasticScheduler.createWorker(BoundedElasticScheduler.java:315)
	at reactor.core.scheduler.Schedulers$CachedScheduler.createWorker(Schedulers.java:1231)
	at reactor.core.publisher.MonoSubscribeOn.subscribeOrReturn(MonoSubscribeOn.java:48)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4385)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:276)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.Operators.complete(Operators.java:137)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:148)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:83)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
	at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:58)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onSubscribe(MdcContextConfiguration.java:48)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:62)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onSubscribe(MdcContextConfiguration.java:48)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onSubscribe(MdcContextConfiguration.java:48)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147)
	at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:362)
	at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:227)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onSubscribe(MonoCollectList.java:79)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onSubscribe(MdcContextConfiguration.java:48)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.drain(FluxFilterWhen.java:301)
	at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.onNext(FluxFilterWhen.java:140)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onNext(MdcContextConfiguration.java:54)
	at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272)
	at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
	at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.onSubscribe(FluxFilterWhen.java:200)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onSubscribe(MdcContextConfiguration.java:48)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onComplete(MdcContextConfiguration.java:65)
	at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:294)
	at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
	at com.ocado.lineitems.config.logging.MdcContextLifter.onSubscribe(MdcContextConfiguration.java:48)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
	at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:71)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:962)
	at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:671)
	at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:560)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:220)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

@simonbasle
Copy link
Member Author

@ssalamov it should, yes, as it will entirely remove the PriorityBlockingQueue from the implementation

@ssalamov
Copy link

@simonbasle, I tested and couldn't find the blocking operation with BoundedElasticScheduler. I would like to confirm this fix works ;) Thank you for handling it.

gcf-merge-on-green bot referenced this pull request in GoogleCloudPlatform/cloud-sql-jdbc-socket-factory Mar 25, 2022
….16 (#778)

[![WhiteSource Renovate](https://app.renovatebot.com/images/banner.svg)](https://renovatebot.com)

This PR contains the following updates:

| Package | Change | Age | Adoption | Passing | Confidence |
|---|---|---|---|---|---|
| [io.projectreactor:reactor-core](https://togithub.com/reactor/reactor-core) | `3.4.15` -> `3.4.16` | [![age](https://badges.renovateapi.com/packages/maven/io.projectreactor:reactor-core/3.4.16/age-slim)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://badges.renovateapi.com/packages/maven/io.projectreactor:reactor-core/3.4.16/adoption-slim)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://badges.renovateapi.com/packages/maven/io.projectreactor:reactor-core/3.4.16/compatibility-slim/3.4.15)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://badges.renovateapi.com/packages/maven/io.projectreactor:reactor-core/3.4.16/confidence-slim/3.4.15)](https://docs.renovatebot.com/merge-confidence/) |

---

### Release Notes

<details>
<summary>reactor/reactor-core</summary>

### [`v3.4.16`](https://togithub.com/reactor/reactor-core/releases/v3.4.16)

[Compare Source](https://togithub.com/reactor/reactor-core/compare/v3.4.15...v3.4.16)

Reactor-Core `3.4.16` is part of **`2020.0.17` Release Train (`Europium` SR17)**.

This service release contains a few bugfixes and improvements.

#### What's Changed

##### ✨ New features and improvements

-   Improve BoundedElasticScheduler to be less blocking by [@&#8203;simonbasle](https://togithub.com/simonbasle) in [https://github.com/reactor/reactor-core/pull/2909](https://togithub.com/reactor/reactor-core/pull/2909)
-   Add EmitFailureHandler.busyLoop flavor by [@&#8203;Animesh27](https://togithub.com/Animesh27) in [https://github.com/reactor/reactor-core/pull/2943](https://togithub.com/reactor/reactor-core/pull/2943)

##### 🐞 Bug fixes

-   Fix Mono.then not cancelling between Callable sources by [@&#8203;simonbasle](https://togithub.com/simonbasle) in [https://github.com/reactor/reactor-core/pull/2934](https://togithub.com/reactor/reactor-core/pull/2934)

##### 📖 Documentation, Tests and Build

-   Update Gradle to v7.4 in [https://github.com/reactor/reactor-core/pull/2922](https://togithub.com/reactor/reactor-core/pull/2922)
-   \[doc] Correct flux subscribe example in faq by [@&#8203;liukun2634](https://togithub.com/liukun2634) in [https://github.com/reactor/reactor-core/pull/2924](https://togithub.com/reactor/reactor-core/pull/2924)
-   Show how-to-fix hints in CI when preliminary steps fail (check of license headers, api compatibility) by [@&#8203;simonbasle](https://togithub.com/simonbasle) in [https://github.com/reactor/reactor-core/pull/2932](https://togithub.com/reactor/reactor-core/pull/2932)
-   \[guide] Remove ref to Swing/SwtScheduler in addons appendix by [@&#8203;simonbasle](https://togithub.com/simonbasle) in [https://github.com/reactor/reactor-core/pull/2959](https://togithub.com/reactor/reactor-core/pull/2959)
-   \[build] Have jcstress part of slowerChecks by [@&#8203;simonbasle](https://togithub.com/simonbasle) in [https://github.com/reactor/reactor-core/pull/2958](https://togithub.com/reactor/reactor-core/pull/2958)

##### 🆙 Dependency Upgrades

-   Update plugin spotless to v6.3.0 in [https://github.com/reactor/reactor-core/pull/2925](https://togithub.com/reactor/reactor-core/pull/2925)
-   Update plugin bnd to v6.2.0 in [https://github.com/reactor/reactor-core/pull/2941](https://togithub.com/reactor/reactor-core/pull/2941)
-   Update dependency org.awaitility:awaitility to v4.2.0 in [https://github.com/reactor/reactor-core/pull/2945](https://togithub.com/reactor/reactor-core/pull/2945)
-   Update dependency ch.qos.logback:logback-classic to v1.2.11 in [https://github.com/reactor/reactor-core/pull/2946](https://togithub.com/reactor/reactor-core/pull/2946)
-   Update dependency com.tngtech.archunit:archunit to v0.23.1 in [https://github.com/reactor/reactor-core/pull/2940](https://togithub.com/reactor/reactor-core/pull/2940)
-   Update plugin japicmp to v0.4.0 in [https://github.com/reactor/reactor-core/pull/2948](https://togithub.com/reactor/reactor-core/pull/2948)
-   Update dependency org.mockito:mockito-core to v4.4.0 in [https://github.com/reactor/reactor-core/pull/2951](https://togithub.com/reactor/reactor-core/pull/2951)
-   Update plugin download to v5.0.2 in [https://github.com/reactor/reactor-core/pull/2950](https://togithub.com/reactor/reactor-core/pull/2950)

#### New Contributors

-   [@&#8203;Animesh27](https://togithub.com/Animesh27) made their first contribution in [https://github.com/reactor/reactor-core/pull/2943](https://togithub.com/reactor/reactor-core/pull/2943) 👍

**Full Changelog**: reactor/reactor-core@v3.4.15...v3.4.16

</details>

---

### Configuration

📅 **Schedule**: "after 8am on Friday,before 12pm on Friday" (UTC).

🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied.

♻ **Rebasing**: Whenever PR is behind base branch, or you tick the rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about this update again.

---

 - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, click this checkbox.

---

This PR has been generated by [WhiteSource Renovate](https://renovate.whitesourcesoftware.com). View repository job log [here](https://app.renovatebot.com/dashboard#github/GoogleCloudPlatform/cloud-sql-jdbc-socket-factory).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BoundedElasticScheduler should not block on submit/dispose
3 participants