Skip to content

Commit

Permalink
ISPN-13229 LocalPublisherManagerImpl doesn't clean up changeListener …
Browse files Browse the repository at this point in the history
…for publish methods
  • Loading branch information
Madhumitha-G-ML committed Dec 12, 2023
1 parent 49fe7f8 commit 7dbc6df
Showing 1 changed file with 11 additions and 13 deletions.
Expand Up @@ -50,7 +50,6 @@
import io.reactivex.Maybe;
import io.reactivex.Scheduler;
import io.reactivex.functions.Predicate;
import io.reactivex.internal.functions.Functions;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.UnicastProcessor;
Expand Down Expand Up @@ -242,19 +241,18 @@ private SegmentAwarePublisherImpl(IntSet segments, CacheSet<I> set,

@Override
public void subscribe(Subscriber<? super R> s, IntConsumer completedSegmentConsumer, IntConsumer lostSegmentConsumer) {
Flowable<Publisher<R>> segmentPublishers;
Flowable<R> resultPublisher;
switch (deliveryGuarantee) {
case AT_MOST_ONCE:
segmentPublishers = new FlowableFromIntSetFunction<>(segments, segment -> {
Publisher<I> publisher = set.localPublisher(segment);
if (predicate != null) {
publisher = Flowable.fromPublisher(publisher)
.filter(predicate);
}
resultPublisher = Flowable.fromIterable(segments).concatMap(segment -> {
Publisher<I> publisher = set.localPublisher(segment);
if (predicate != null) {
publisher = Flowable.fromPublisher(publisher)
.filter(predicate);
}
return Flowable.fromPublisher(transformer.apply(publisher))
.doOnComplete(() -> completedSegmentConsumer.accept(segment));
});

});
break;
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
Expand All @@ -266,7 +264,7 @@ public void subscribe(Subscriber<? super R> s, IntConsumer completedSegmentConsu
// Check topology before submitting
listener.verifyTopology(distributionManager.getCacheTopology());

segmentPublishers = new FlowableFromIntSetFunction<>(segments, segment -> {
resultPublisher = Flowable.fromIterable(segments).concatMap(segment -> {
if (!concurrentSet.contains(segment)) {
return Flowable.empty();
}
Expand All @@ -283,13 +281,13 @@ public void subscribe(Subscriber<? super R> s, IntConsumer completedSegmentConsu
lostSegmentConsumer.accept(segment);
}
});
});
}).doFinally(() -> changeListener.remove(listener));
break;
default:
throw new UnsupportedOperationException("Unsupported delivery guarantee: " + deliveryGuarantee);
}

segmentPublishers.concatMap(Functions.identity()).subscribe(s);
resultPublisher.subscribe(s);
}
}

Expand Down

0 comments on commit 7dbc6df

Please sign in to comment.