Skip to content

Commit

Permalink
avoid indexing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegYch committed Nov 9, 2018
1 parent e4b8662 commit 7e07dfe
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 44 deletions.
Expand Up @@ -2,11 +2,11 @@ package com.olegych.scastie.balancer

import com.olegych.scastie.api._
import com.olegych.scastie.balancer.utils.Histogram
import org.slf4j.LoggerFactory

import scala.util.Random
import scala.collection.immutable.Queue
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import org.slf4j.LoggerFactory
import scala.util.Random

case class Ip(v: String)
case class Record(config: Inputs, ip: Ip)
Expand All @@ -17,11 +17,8 @@ case class Task(config: Inputs, ip: Ip, taskId: TaskId) {

case class History(data: Queue[Record], size: Int) {
def add(record: Record): History = {
// the user has changed configuration, we assume he will not go back to the
// previous configuration

// the user has changed configuration, we assume he will not go back to the previous configuration
val data0 = data.filterNot(_.ip == record.ip).enqueue(record)

val data1 =
if (data0.size > size) {
val (_, q) = data0.dequeue
Expand All @@ -39,8 +36,6 @@ case class LoadBalancer[R, S <: ServerState](
) {
private val log = LoggerFactory.getLogger(getClass)

private lazy val configs = servers.map(_.currentConfig)

def done(taskId: TaskId): Option[LoadBalancer[R, S]] = {
Some(copy(servers = servers.map(_.done(taskId))))
}
Expand All @@ -63,40 +58,28 @@ case class LoadBalancer[R, S <: ServerState](

if (availableServers.nonEmpty) {
val updatedHistory = history.add(task.toRecord)
lazy val historyHistogram = updatedHistory.data.map(_.config).to[Histogram]

val hits = availableServers.indices
.to[Vector]
.filterNot(i => availableServers(i).currentConfig.needsReload(task.config))

val overBooked = hits.forall { i =>
availableServers(i).cost(taskCost, reloadCost) > reloadCost
val hits = availableServers.filterNot(s => s.currentConfig.needsReload(task.config))
val overBooked = hits.forall { s =>
s.cost(taskCost, reloadCost) > reloadCost
}

val cacheMiss = hits.isEmpty

val selectedServerIndice =
if (cacheMiss || overBooked) {
// we try to find a new configuration to minimize the distance with
// the historical data
randomMin(configs.indices) { i =>
val config = task.config
val distance = distanceFromHistory(i, config, historyHistogram)
val load = availableServers(i).cost(taskCost, reloadCost)
(distance, load)
}
} else {
random(hits)
val selectedServer = if (cacheMiss || overBooked) {
// we try to find a new configuration to minimize the distance with the historical data
val historyHistogram = updatedHistory.data.map(_.config).to[Histogram]
randomMin(availableServers) { s =>
val config = task.config
val newConfigsHistogram = availableServers.map(olds => if (olds.id == s.id) config else olds.currentConfig).to[Histogram]
val distance = historyHistogram.distance(newConfigsHistogram)
val load = s.cost(taskCost, reloadCost)
(distance, load)
}

val updatedServers = {
val i = selectedServerIndice
availableServers.updated(i, availableServers(i).add(task))
} else {
random(hits)
}

val updatedServers = availableServers.map(olds => if (olds.id == selectedServer.id) olds.add(task) else olds)
Some(
(
availableServers(selectedServerIndice),
selectedServer,
copy(
servers = updatedServers ++ unavailableServers,
history = updatedHistory
Expand All @@ -113,11 +96,6 @@ case class LoadBalancer[R, S <: ServerState](
}
}

private def distanceFromHistory(targetServerIndex: Int, config: Inputs, historyHistogram: Histogram[Inputs]): Double = {
val newConfigsHistogram = configs.updated(targetServerIndex, config).to[Histogram]
historyHistogram.distance(newConfigsHistogram)
}

// find min by f, select one min at random
private def randomMin[A, B: Ordering](xs: Seq[A])(f: A => B): A = {
val evals = xs.map(x => (x, f(x)))
Expand Down
Expand Up @@ -2,10 +2,11 @@ package com.olegych.scastie.balancer

import com.olegych.scastie.api._

import scala.concurrent.duration.{FiniteDuration, DurationInt}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.collection.immutable.Queue
import scala.util.Random

case class Server[R, S](ref: R, lastConfig: Inputs, mailbox: Queue[Task], state: S) {
case class Server[R, S](ref: R, lastConfig: Inputs, mailbox: Queue[Task], state: S, id: Int = Random.nextInt()) {

def currentTaskId: Option[TaskId] = mailbox.headOption.map(_.taskId)
def currentConfig: Inputs = mailbox.headOption.map(_.config).getOrElse(lastConfig)
Expand Down
Expand Up @@ -97,7 +97,7 @@ class LoadBalancerTest extends LoadBalancerTestUtils {
assertConfigs(add(balancer, sbtConfig("c6")))(
1 * "c1",
1 * "c2",
1 * "c3",
1 * "c4",
1 * "c6",
1 * "c5"
)
Expand Down

0 comments on commit 7e07dfe

Please sign in to comment.