Skip to content

Commit

Permalink
ISPN-13229 LocalPublisherManagerImpl doesn't clean up changeListener …
Browse files Browse the repository at this point in the history
…for publish methods
  • Loading branch information
wburns authored and tristantarrant committed Jan 25, 2023
1 parent 8de5d30 commit 6afc609
Showing 1 changed file with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult;
import org.infinispan.reactive.publisher.impl.commands.reduction.SegmentPublisherResult;
import org.infinispan.stream.StreamMarshalling;
Expand Down Expand Up @@ -241,19 +240,18 @@ private SegmentAwarePublisherImpl(IntSet segments, CacheSet<I> set,

@Override
public void subscribe(Subscriber<? super R> s, IntConsumer completedSegmentConsumer, IntConsumer lostSegmentConsumer) {
Flowable<Publisher<R>> segmentPublishers;
Flowable<R> resultPublisher;
switch (deliveryGuarantee) {
case AT_MOST_ONCE:
segmentPublishers = Flowable.fromStream(segments.intStream().mapToObj(segment -> {
Publisher<I> publisher = set.localPublisher(segment);
if (predicate != null) {
publisher = Flowable.fromPublisher(publisher)
.filter(predicate);
}
resultPublisher = Flowable.fromIterable(segments).concatMap(segment -> {
Publisher<I> publisher = set.localPublisher(segment);
if (predicate != null) {
publisher = Flowable.fromPublisher(publisher)
.filter(predicate);
}
return Flowable.fromPublisher(transformer.apply(publisher))
.doOnComplete(() -> completedSegmentConsumer.accept(segment));
}));

});
break;
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
Expand All @@ -265,7 +263,7 @@ public void subscribe(Subscriber<? super R> s, IntConsumer completedSegmentConsu
// Check topology before submitting
listener.verifyTopology(distributionManager.getCacheTopology());

segmentPublishers = Flowable.fromStream(segments.intStream().mapToObj(segment -> {
resultPublisher = Flowable.fromIterable(segments).concatMap(segment -> {
if (!concurrentSet.contains(segment)) {
return Flowable.empty();
}
Expand All @@ -282,13 +280,13 @@ public void subscribe(Subscriber<? super R> s, IntConsumer completedSegmentConsu
lostSegmentConsumer.accept(segment);
}
});
}));
}).doFinally(() -> changeListener.remove(listener));
break;
default:
throw new UnsupportedOperationException("Unsupported delivery guarantee: " + deliveryGuarantee);
}

segmentPublishers.concatMap(RxJavaInterop.identityFunction()).subscribe(s);
resultPublisher.subscribe(s);
}
}

Expand Down

0 comments on commit 6afc609

Please sign in to comment.