Skip to content

Commit

Permalink
make sure all destroyHandler are handled asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephane Maldini committed Apr 25, 2019
1 parent 5000e53 commit f58a528
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 18 deletions.
12 changes: 4 additions & 8 deletions src/main/java/reactor/pool/AffinityPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ void doAcquire(Borrower<POOLABLE> borrower) {

//TODO test this scenario
if (poolConfig.evictionPredicate.test(element.poolable, element)) {
destroyPoolable(element).subscribe(); //this returns a permit
allocateOrPend(subPool, borrower);
destroyPoolable(element).subscribe(null, t -> allocateOrPend(subPool, borrower), () -> allocateOrPend(subPool, borrower)); //this returns a permit
}
else {
borrower.stopPendingCountdown();
Expand Down Expand Up @@ -275,7 +274,7 @@ public void dispose() {
toClose.clear();

while(!availableElements.isEmpty()) {
destroyPoolable(availableElements.poll()).block();
destroyPoolable(availableElements.poll()).subscribe();
}
}
}
Expand Down Expand Up @@ -522,7 +521,7 @@ public void onError(Throwable throwable) {
return;
}

pool.destroyPoolable(slot).subscribe(); //TODO manage further errors?
pool.destroyPoolable(slot).subscribe(null, t -> pool.bestEffortAllocateOrPend(), pool::bestEffortAllocateOrPend); //TODO manage further errors?
actual.onError(throwable);
}

Expand All @@ -539,10 +538,7 @@ public void onComplete() {
pool.recycle(slot);
}
else {
pool.destroyPoolable(slot).subscribe(); //TODO manage errors?

//simplified version of what we do in doAcquire, with the caveat that we don't try to create a SubPool
pool.bestEffortAllocateOrPend();
pool.destroyPoolable(slot).subscribe(null, t -> pool.bestEffortAllocateOrPend(), pool::bestEffortAllocateOrPend); //TODO manage errors?
}

actual.onComplete();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/reactor/pool/SimpleFifoPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void dispose() {
}

while (!elements.isEmpty()) {
destroyPoolable(elements.poll()).block();
destroyPoolable(elements.poll()).subscribe();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/reactor/pool/SimpleLifoPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void dispose() {
}

while (!elements.isEmpty()) {
destroyPoolable(elements.poll()).block();
destroyPoolable(elements.poll()).subscribe();
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/reactor/pool/SimplePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ final void maybeRecycleAndDrain(QueuePooledRef<POOLABLE> poolSlot) {
if (!poolConfig.evictionPredicate.test(poolSlot.poolable, poolSlot)) {
metricsRecorder.recordRecycled();
elements.offer(poolSlot);
drain();
}
else {
destroyPoolable(poolSlot).subscribe(); //TODO manage errors?
destroyPoolable(poolSlot).subscribe(null, e -> drain(), this::drain); //TODO manage errors?
}
drain();
}
else {
destroyPoolable(poolSlot).subscribe(); //TODO manage errors?
destroyPoolable(poolSlot).subscribe(null, e -> drain(), this::drain); //TODO manage errors?
}
}

Expand Down Expand Up @@ -196,7 +196,7 @@ else if (pendingCount > 0) {

//TODO test the idle eviction scenario
if (poolConfig.evictionPredicate.test(slot.poolable, slot)) {
destroyPoolable(slot).subscribe();
destroyPoolable(slot).subscribe(null, e -> drain(), this::drain);
continue;
}

Expand Down Expand Up @@ -328,8 +328,7 @@ public void onError(Throwable throwable) {
//TODO should we separate reset errors?
pool.metricsRecorder.recordResetLatency(pool.metricsRecorder.measureTime(start));

pool.destroyPoolable(slot).subscribe(); //TODO manage errors?
pool.drain();
pool.destroyPoolable(slot).subscribe(null, null, pool::drain); //TODO manage errors?

actual.onError(throwable);
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ void smokeTestAsyncLifo(Function<PoolBuilder<PoolableTest>, AbstractPool<Poolabl
pool.acquire().subscribe(acquired3::add);
pool.acquire().subscribe(acquired3::add);

if (!latch1.await(1, TimeUnit.SECONDS)) { //wait for creation of max elements
if (!latch1.await(15, TimeUnit.SECONDS)) { //wait for creation of max elements
fail("not enough elements created initially, missing " + latch1.getCount());
}
assertThat(acquired1).hasSize(3);
Expand All @@ -537,7 +537,7 @@ void smokeTestAsyncLifo(Function<PoolBuilder<PoolableTest>, AbstractPool<Poolabl
slot.release().block();
}

if (latch2.await(2, TimeUnit.SECONDS)) { //wait for the re-creation of max elements
if (latch2.await(15, TimeUnit.SECONDS)) { //wait for the re-creation of max elements

assertThat(acquired2).hasSize(3);

Expand Down

0 comments on commit f58a528

Please sign in to comment.