Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make TokenBucketGroup a trait to allow easier customization #7

Closed
jxtps opened this issue Jul 31, 2018 · 2 comments
Closed

Make TokenBucketGroup a trait to allow easier customization #7

jxtps opened this issue Jul 31, 2018 · 2 comments

Comments

@jxtps
Copy link

jxtps commented Jul 31, 2018

If you create a trait:

trait TokenBucketGroup {
  def consume(key: Any, required: Int): Long
}

and rename the current class to DefaultTokenBucketGroup, then it would be easier to customize the TokenBucketGroup implementation - there are tradeoffs between exactness and computational complexity, the current implementation is exact but at the cost of doing up to rate bucket rebuilds per second, which is totally fine at 10/s, but less so at 10k/s.

I would also consider changing the return type from Long to Boolean since it's only (intended to be?) used in a boolean way - that would also hide more of the TokenBucketGroup internals.

(Yes, it's of course possible to just subclass and ignore the private members of the current TokenBucketGroup)

(Happy to provide a pull request, wanted to check for interest first)

@jxtps
Copy link
Author

jxtps commented Aug 1, 2018

Ended up rolling a fully custom solution. The TokenBucketGroup-equivalent part:

  object TokenBuckets {
    val NanosPerSecond: Long = 1000L * 1000 * 1000
  }

  class TokenBuckets(size: Int, rate: Double, minNanosPerUpdate: Long = TokenBuckets.NanosPerSecond)
                    (implicit actorSystem: ActorSystem) {

    require(size > 0)
    require(rate >= 0.0001)
    require(rate < 1e6)

    def now(): Long = System.nanoTime()

    //private val floor = 0
    private val floor = -size // Penalize abusers a little
    private val buckets = new ConcurrentHashMap[String, AtomicInteger]()
    private var lastUpdateAt: Long = now()
    private val tokensPerNano: Double = rate / TokenBuckets.NanosPerSecond
    private val nanosPerToken: Double = 1 / tokensPerNano
    private val nanosPerUpdate: Long = Math.max(nanosPerToken.ceil.toLong, minNanosPerUpdate)

    def consume(ip: String): Int = {
      var int = buckets.get(ip)
      if (int == null) {
        int = new AtomicInteger(size)
        buckets.put(ip, int) // May drop a token if there's a concurrent request, that's ok
      }
      if (int.get() > floor) int.decrementAndGet() else floor
    }

    import scala.concurrent.duration._
    import scala.concurrent.ExecutionContext.Implicits.global

    actorSystem.scheduler.schedule(nanosPerUpdate.nanos, nanosPerUpdate.nanos) {
      val deltaNanos: Long = now() - lastUpdateAt
      val tokensToAdd = (deltaNanos * tokensPerNano).round.toInt
      if (tokensToAdd > 0) { // Should always be true
        buckets.forEach((ip: String, int: AtomicInteger) => {
          val tokensInBucket = int.addAndGet(tokensToAdd)
          if (tokensInBucket >= size) buckets.remove(ip) // May drop a token if there's a concurrent request, that's ok
        })
        lastUpdateAt += (tokensToAdd * nanosPerToken).ceil.toLong
      }
    }
  }

This way there's no synchronization and only constant-time operations in the critical-path consume call, and the cleanup / adding of new tokens is done at a fixed rate in the background.

The tradeoff is that it's no longer 100% exact, so there will be an extra connection allowed through every now and then, and if the background job gets stalled for whatever reason, then there will be no new tokens granted => request stoppage.

Anyway, feel free to use or ignore, I'll close this issue in the meantime.

@jxtps jxtps closed this as completed Aug 1, 2018
@sief
Copy link
Owner

sief commented Jan 28, 2019

sorry for the late response, I somehow didn't notice the issue. I've also been thinking about a more relaxed approach in favour of performance. But when I did some throughput measurements I was surprised that the bucket rebuild cost seems to be negligible. Did you notice any performance issues?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants