Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic suffers under heavy load, Set comparison CPU intensive #1406

Open
SystemFw opened this issue Jan 29, 2019 · 30 comments
Open

Topic suffers under heavy load, Set comparison CPU intensive #1406

SystemFw opened this issue Jan 29, 2019 · 30 comments
Assignees

Comments

@SystemFw
Copy link
Collaborator

SystemFw commented Jan 29, 2019

There is a problem with Topic (and I suspect with Signal as well), probably due to how CPU intensive Set comparison is. One of my teams had problems where reasonably heavy traffic was causing thread starvation due to the CPU work needed for that comparison. I was able to fix by porting the old Topic (without PubSub), but we need to address this because it's a pretty serious regression right now. I'm trying to come up with a reproducible example, which isn't super easy.

@SystemFw
Copy link
Collaborator Author

SystemFw commented Jan 29, 2019

In the meantime, I can share the stack trace commonly seen in the problematic scenario

"ForkJoinPool-2-worker-6" #83 daemon prio=5 os_prio=0 tid=0x00007f56fb6d3000 nid=0x13bc runnable [0x00007f56db7fd000]
   java.lang.Thread.State: RUNNABLE
        at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:149)
        at scala.Tuple2.hashCode(Tuple2.scala:24)
        at scala.runtime.Statics.anyHash(Statics.java:122)
        at scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
        at scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
        at scala.collection.immutable.HashMap.contains(HashMap.scala:59)
        at scala.collection.MapLike$DefaultKeySet.contains(MapLike.scala:176)
        at scala.collection.GenSetLike.apply(GenSetLike.scala:48)
        at scala.collection.GenSetLike.apply$(GenSetLike.scala:48)
        at scala.collection.AbstractSet.apply(Set.scala:51)
        at scala.collection.immutable.MapLike$ImmutableDefaultKeySet.apply(MapLike.scala:112)
        at scala.collection.Iterator.forall(Iterator.scala:953)
        at scala.collection.Iterator.forall$(Iterator.scala:951)
        at scala.collection.AbstractIterator.forall(Iterator.scala:1429)
        at scala.collection.IterableLike.forall(IterableLike.scala:77)
        at scala.collection.IterableLike.forall$(IterableLike.scala:76)
        at scala.collection.AbstractIterable.forall(Iterable.scala:56)
        at scala.collection.GenSetLike.subsetOf(GenSetLike.scala:107)
        at scala.collection.GenSetLike.subsetOf$(GenSetLike.scala:107)
        at scala.collection.AbstractSet.subsetOf(Set.scala:51)
        at scala.collection.GenSetLike.liftedTree1$1(GenSetLike.scala:124)
        at scala.collection.GenSetLike.equals(GenSetLike.scala:124)
        at scala.collection.GenSetLike.equals$(GenSetLike.scala:119)
        at scala.collection.AbstractSet.equals(Set.scala:51)
        at fs2.concurrent.Topic$.$anonfun$apply$1(Topic.scala:89)
        at fs2.concurrent.Topic$.$anonfun$apply$1$adapted(Topic.scala:89)
        at fs2.concurrent.Topic$$$Lambda$4862/705260399.apply(Unknown Source)
        at cats.kernel.Eq$$anon$106.eqv(Eq.scala:85)
        at fs2.concurrent.PubSub$Strategy$Inspectable$$anon$7.get(PubSub.scala:662)
        at fs2.concurrent.PubSub$Strategy$Inspectable$$anon$7.get(PubSub.scala:637)
        at fs2.concurrent.PubSub$.go$1(PubSub.scala:123)
        at fs2.concurrent.PubSub$.consumeSubscribers$1(PubSub.scala:132)
        at fs2.concurrent.PubSub$.loop$1(PubSub.scala:171)
        at fs2.concurrent.PubSub$.$anonfun$apply$13(PubSub.scala:196)
        at fs2.concurrent.PubSub$$$Lambda$5107/462313517.apply(Unknown Source)
        at cats.effect.concurrent.Ref$SyncRef.spin$1(Ref.scala:249)
        at cats.effect.concurrent.Ref$SyncRef.$anonfun$modify$1(Ref.scala:253)

As the number of subscribers grows, more and more threads end up doing this most of the time, until complete starvation (in our case, kubernetes was restarting the service since the healthcheck endpoint would become unresponsive)

@vladimir-popov
Copy link

vladimir-popov commented Jan 29, 2019

We have the same issue in our team. In our case we have an income stream with ~ 1k elements per second and ~100 subscribers who just put every elements to the cache. Simple example can looks like:

object Example extends IOApp {
  def doSomething(x: Any) = IO(println(x))
  def arbitraryData = IO(Random.nextInt(100))

  override def run(args: List[String]): IO[ExitCode] = {
    for {
      // topic ← CustomTopic[IO, Int] <- will be used in comparison with default implementation
      topic  fs2.concurrent.Topic[IO, Int](Random.nextInt(100))
      subscriptions = (1 to 100).map(
        i  topic.subscribe(1).collect({ case d if d == i  d }).evalMap(doSomething)
      )
      _  (Stream.fixedDelay(10.millis).evalMap(_  arbitraryData).to(topic.publish) merge Stream
        .emits(subscriptions)
        .parJoinUnbounded).compile.drain
    } yield {
      ExitCode.Success
    }
  }
}

We tried to solve it by using implementation of the Topic with fs2.concurrent.Queue (not full example):

class CustomTopic[F[_], A](cache: Ref[F, List[Queue[F, A]]])(implicit C: Concurrent[F])
    extends Topic[F, A] {
  override def publish: Sink[F, A] =
    _.evalMap(publish1)

  override def publish1(a: A): F[Unit] =
    cache.get.flatMap { subscribers 
      subscribers.traverse(_.enqueue1(a)).void
    }

  override def subscribe(maxQueued: Int): Stream[F, A] =
    emptyQueue(maxQueued).evalTap(q  cache.update(_ :+ q)).flatMap(_.dequeue)

  private def emptyQueue(maxQueued: Int): Stream[F, fs2.concurrent.Queue[F, A]] = {
    Stream.bracket(fs2.concurrent.Queue.bounded[F, A](maxQueued))(
      queue  cache.update(_.filter(_ ne queue))
    )
  }
}

object CustomTopic {
  def apply[F[_], A](implicit C: Concurrent[F]): F[CustomTopic[F, A]] =
    Ref.of[F, List[fs2.concurrent.Queue[F, A]]](List.empty).map(new CustomTopic(_))
}

When we use our custom topic in the example above then we have a very different picture in the profiler:
Results with default topic:
results with default Topic
Result with our implementation:
results with custom Topic

UPD: I created PR to illustrate our full solution: #1407

@vladimir-popov
Copy link

Besides it we noticed that current implementation of the Topic in our case generates a lot of living instances of the immutable.Queue and the PubSub.Strategy.Inspectable.State:
screenshot 2019-01-29 at 20 05 28

@pchlupacek
Copy link
Contributor

@SystemFw the current implementation of topic won't perform super-well on heavy contention between busy publisher and lot of subscribers. The older implemenatiton was slightly better, specifically due the fact that each subscriber has its own queue where things were fed to, that was in fact registered with the state, and set during publish.

The observation of lot of queues and states is correct, as what you see is a lot of spins and attempts to fight for setting that single ref, which has own single state. Simply too many subscribers :-)

I think there is a workaround to reimplement Topic with slightly modified State and PubSub, again part of registration will be actually the queue of the subscriber.

Another alternatives with current implementation:

Change the topic to deeper hierarchy, i.e. 1 topic handling 16 subs, and these will handle each lets say 16 subs will give you 6 pow 16 subscribers with really low contention.

I wouldn't suggest yet to give up and go to old implementation, as that had different problems, i.e. flow control etc.

@pchlupacek
Copy link
Contributor

@vladimir-popov I am not surprised, the current implementation of topic will perfrom poorly under your load. This is sort of worst case for the current implementation. However I am quite confident topic may be improved with PubSub to perform similarly like your solution.

@pchlupacek
Copy link
Contributor

Also @SystemFw, @vladimir-popov perhaps if we just change the internals of how pubsub works, perhaps we get a better performance w/o changing anything in state logic.
Essentially we could try instead of using the ref to store state, to use Lock, and then use guarded var for state. That will cause no waste in state calculations and could be perhaps almost in pair for good paths of pubsub.

@SystemFw
Copy link
Collaborator Author

Actually I'm not sure the problem is contention on Ref, and I don't even know whether it's a problem with PubSub in general, necessarily.
I can't be sure cause I don't have an exact reproduction yet, but the initial hypothesis is just that relying on Set comparison like the Strategy for Topic does just wastes a lot of CPU cycles.

The older implemenatiton was slightly better,

Well, the thing is that in my scenario the old implementation just works with no issues, whereas the new one chokes and causes the service to restart continuously, so that's more than slightly better :P

I wouldn't suggest yet to give up and go to old implementation, as that had different problems, i.e. flow control etc.

Well, that's a hot fix that I need to do right now, otherwise everything just chokes, but ideally I would want something based on PubSub, but more scalable, so I agree with you on that.
Whether that is striping with 16 subs or something else, I guess we will have to see, but for now I just wanted to signal this, given that it's effectively a regression compared to the old implementation

@pchlupacek
Copy link
Contributor

pchlupacek commented Jan 30, 2019

@SystemFw I think the issue, this demonstrates here is that you have so many concurrent attempts to CAS the State, so the overhead of this is killing the whole thing.

I hope to squeeze this over weekend to ake a look on this, but I feel these are two approach that may fix this

  • instead of using ref to hold state, we may just use it to sort of lock the subscribers, but actually computation to modify state will be always performed only once and only by winner
  • or we could sort of revert back to queue per subscriber, but sill use pub sub. That will be little more woodoo on Topic implementation, but I am quite sure performance will be almost identical to old Topic.

@pchlupacek
Copy link
Contributor

@SystemFw, @vladimir-popov, I have diven a bit into it, and wrote a simple bechmark, that was run with 8 threads, 500 messages in topic and 1 to 128 subscribers.

Code is here(https://gist.github.com/pchlupacek/f033993302ee2a741a6473286306c9b3), the results are below:

[info] Benchmark                          Mode  Cnt      Score       Error  Units
[info] TopicBenchmark.topicBroadcast_1    avgt    3      1.124 ±     0.059  ms/op
[info] TopicBenchmark.topicBroadcast_4    avgt    3      5.128 ±     0.905  ms/op
[info] TopicBenchmark.topicBroadcast_8    avgt    3     20.289 ±     2.331  ms/op
[info] TopicBenchmark.topicBroadcast_16   avgt    3     27.693 ±     7.103  ms/op
[info] TopicBenchmark.topicBroadcast_32   avgt    3    200.506 ±   106.002  ms/op
[info] TopicBenchmark.topicBroadcast_64   avgt    3   1965.638 ±   316.895  ms/op
[info] TopicBenchmark.topicBroadcast_128  avgt    3  12560.988 ± 17843.409  ms/op

From the code you see there are no operations on messages, so this shall measure overhead only.

Hence this is run on 6 core / 12 hyper machine but on 8 jvm threads I would expect the sweet point to be somewhere at 8-16 subscribers. For the larger amount of subscribers, in ideal state the execution shall be something like (Nx/8)*T8, where Nx is number of subscribers to execute, and T8 is the time for 8 subscribers.

I do not understand that much the Error ranges, they seem a bit off for me, and when I compared the individual results they were usually +/- 5% range, not like the 128 shows to be more than 100%. So I suspect this will be something wrong with jmh, or my lack of understanding it.

As you see the results are somewhat expected for 1-16 subscribers, where there is just different from setting up the program and subscribers. Interesting is the difference between 8-16 subscribers, which is getting close to ideal.

Anything after that is complete disaster, and clearly demonstrates the issue with concurrency. I think it confirm my original suspicion of single point contention problem.

I will follow up with more experiments to see how we can avoid the problem.

@SystemFw
Copy link
Collaborator Author

SystemFw commented Feb 3, 2019

Thanks for looking into this @pchlupacek :)

FYI, the old implementation in the service I mentioned above is handling 1400 subs at peak load without breaking a sweat, on a 2 CPU machine which also has some blocking code on it

@pchlupacek
Copy link
Contributor

@SystemFw I am not surprised that old implementation performs much better. I would say the only limit is the RAM, and obvioulsy more subscribers -> more time to publish one element. Problem of that implementation is that it is highly specialised and yours only control over the OOM in case of slow subscriber is to use bounded queues.

@pchlupacek
Copy link
Contributor

Ok, so I have changed the PubSub to use Lock + Var and this is the result :

[info] Benchmark                          Mode  Cnt     Score    Error  Units
[info] TopicBenchmark.topicBroadcast_1    avgt    3     7.103 ±  0.588  ms/op
[info] TopicBenchmark.topicBroadcast_4    avgt    3    22.579 ±  2.968  ms/op
[info] TopicBenchmark.topicBroadcast_8    avgt    3    42.171 ±  1.685  ms/op
[info] TopicBenchmark.topicBroadcast_16   avgt    3    83.709 ± 13.964  ms/op
[info] TopicBenchmark.topicBroadcast_32   avgt    3   187.778 ± 11.095  ms/op
[info] TopicBenchmark.topicBroadcast_64   avgt    3   438.190 ± 36.365  ms/op
[info] TopicBenchmark.topicBroadcast_128  avgt    3  1192.197 ± 94.964  ms/op

Now this is much better, however still you see that we are blocked by the single contention, so that why you see almost linear time, instead of using parallelism. Also, you will see that this uses only 1.5 cores, clearly showing that the bottleneck is the single ref, however, now, there is no wasted CPU cycles, overall resulting in much better performance.

@SystemFw
Copy link
Collaborator Author

SystemFw commented Feb 3, 2019

The non lock implementation has the advantage of being...well, lock free, so I'd prefer it if possible.
Are there any other strategies to improve this case, for example striping the topic as you suggested above to reduce contention?

I really like the abstraction given by PubSub, however the old Topic was both lock-free and had lower contention, so I feel that anything we do should be proven to be better than the old implementation

@pchlupacek
Copy link
Contributor

@SystemFw this is still lock free. It uses semaphore. So it is semantically locking.

I am not sure that old topic had lower contention per se. It just distributed tasks a bit better. So in situations where you have one publishers and many subscribers it performed better.

@SystemFw
Copy link
Collaborator Author

SystemFw commented Feb 3, 2019

If it uses semaphore it's not blocking, but it's not lock-free either: if the thread holding the lock cannot proceed you get stuck all the same.
More seriously, in the current cancelation model writing safe code with semaphore is hard or even impossible in some cases (that is a problem per se, but for another day).

I am not sure that old topic had lower contention per se. It just distributed tasks a bit better. So in situations where you have one publishers and many subscribers it performed better.

You're probably right, it still has a single Ref, but it's spending less time on it because then you go on the queues. The thing is that one publisher and many subscribers is like the perfect Topic use case, and we're talking an order of magnitude better, so that's great (and after all, you wrote both :P )

@pchlupacek
Copy link
Contributor

@SystemFw yes, and I think there is a solution with this. Event with Ref implementation. I'll update you. Essentially we use PubSub to implement topic in old way :-)

@SystemFw
Copy link
Collaborator Author

SystemFw commented Feb 3, 2019

Essentially we use PubSub to implement topic in old way

that would be ideal :)

@pchlupacek
Copy link
Contributor

@SystemFw I think current pubsub signatures doesn't allow for the more concurrent subscriber behaviour. I have some ideas on improving it.

I think, that the improved implementation with locking reduces significantly contention. However as you have pointed out still, the one element to pub/sub is taking about 2*N+1 exclusive accesses on single contention ref, whereas the former implementation had only one exclusive access, and N exclusive accesses distributed over individual subscribers, resulting in much better contention control.

I need some time to think how to improve pubsub to have same performance characteristics, whilst not losing the features of it as we have today.

@pchlupacek
Copy link
Contributor

pchlupacek commented Feb 10, 2019

@SystemFw I went ahead a bit and just created the benchmark of the queue-based topic implementation. I took liberty of using the code submitted by @vladimir-popov, as a reference point.
Alos I have added 1k, and 2k subscriber benhcmarks.

These are the results :

[info] Benchmark                           Mode  Cnt     Score      Error  Units
[info] TopicBenchmark.topicBroadcast_1     avgt    3     5.468 ±    1.231  ms/op
[info] TopicBenchmark.topicBroadcast_4     avgt    3    13.200 ±    1.766  ms/op
[info] TopicBenchmark.topicBroadcast_8     avgt    3    23.985 ±    2.859  ms/op
[info] TopicBenchmark.topicBroadcast_16    avgt    3    41.069 ±    7.160  ms/op
[info] TopicBenchmark.topicBroadcast_32    avgt    3    78.257 ±   16.757  ms/op
[info] TopicBenchmark.topicBroadcast_64    avgt    3   137.253 ±   80.961  ms/op
[info] TopicBenchmark.topicBroadcast_128   avgt    3   276.561 ±   10.986  ms/op
[info] TopicBenchmark.topicBroadcast_512   avgt    3  1198.688 ±  450.535  ms/op
[info] TopicBenchmark.topicBroadcast_1024  avgt    3  2518.192 ± 1873.308  ms/op
[info] TopicBenchmark.topicBroadcast_2048  avgt    3  5719.329 ± 1568.653  ms/op

One interesting observation I made is that neither this implementation was able to utilize cores at full. Also from the performance standpoint, it is interesting that this is not that better compare to semantically locking solution. That means there is good chance we can get in pair performance with PubSub

@pchlupacek
Copy link
Contributor

update: Somehow current implementation when Semantic lock is put in place is always using single thread. This should not be the case, as the F.start is used on completion of subscriber. I am investigating this more.

@pchlupacek
Copy link
Contributor

update:
these are top contributors to running a lot of subscribers under pub/sub:

screenshot 2019-02-10 at 09 32 30

Neither of them is related to concurrency. My theory is that cost of looking up the subscriber status in map is extraordinaly high.

@pchlupacek pchlupacek self-assigned this Feb 28, 2019
@pchlupacek
Copy link
Contributor

Just a quick update on this one. I am still experimenting with the ways how to tackle on this one. It seems that i may need to touch pubsub strategy signature a bit, but not yet sure 100% how.

Not sure about urgency of this one, but I am targeting this to be resolved until next major release

In the menarime there is workaround with manual implementatio.

Let me know if that works with you.

@SystemFw
Copy link
Collaborator Author

It does work for me, but perhaps we should make it a bit easier for other people to grab the old implementation if they stumble upon this

@mpilquist
Copy link
Member

@pchlupacek Do you think this is solvable in next week or so? We'll have a minor API breakage release in 1.1 where you could change PubSub signature if needed.

@epifab
Copy link

epifab commented Jan 28, 2021

It does work for me, but perhaps we should make it a bit easier for other people to grab the old implementation if they stumble upon this

yes this would be ideal. @SystemFw any pointers? I'd love to find a workaround while this gets fixed properly

@SystemFw
Copy link
Collaborator Author

I think the latest commit with the old implementation is this one https://github.com/typelevel/fs2/blob/4ddd75a2dc032b7604dc1205c86d7d6adc993859/core/shared/src/main/scala/fs2/concurrent/Topic.scala

Which should be mostly source compatible. Are you hitting a problem because of Topic under load?

@epifab
Copy link

epifab commented Jan 28, 2021

@SystemFw awesome, thanks a lot!
I wasn't 100% sure that the poor performances were indeed caused by Topic under heavy load, but I am indeed now.
Blindly reverting to the old implementation made a huge impact, talking about 2x or 3x times better...

@epifab
Copy link

epifab commented Jan 28, 2021

actually, the performance boost is even more impressive than that. I think I can handle approximately 5x more messages. what I didn't discover yet though are the implications of this workaround

@SystemFw
Copy link
Collaborator Author

SystemFw commented Mar 2, 2021

FYI in fs2 3.0 we have reinstated the implementation strategy of the old version of Topic

@vasilmkd
Copy link
Member

vasilmkd commented May 15, 2021

As far as I understand, this issue is now present only on series/2.5.x?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants