Skip to content

Commit

Permalink
enabled timeout when fetch read-side offset (#1278)
Browse files Browse the repository at this point in the history
* enabled timeout when fetch read-side offset

* fixing test compilation
  • Loading branch information
octonato authored and ignasi35 committed Mar 21, 2018
1 parent 4941c08 commit 48a8a3c
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 30 deletions.
3 changes: 3 additions & 0 deletions persistence-jdbc/core/src/main/resources/reference.conf
Expand Up @@ -38,6 +38,9 @@ db.default {

# 5 * number of threads
maxConnections = 100

# if true, a Mbean for AsyncExecutor will be registered
registerMbeans = false
}

# Hikari is the default connection pool and it's fine-tuned to use the same
Expand Down
Expand Up @@ -59,7 +59,8 @@ private[lagom] object SlickDbProvider {
minThreads = asyncExecConfig.minConnections,
maxThreads = asyncExecConfig.numThreads,
queueSize = asyncExecConfig.queueSize,
maxConnections = asyncExecConfig.maxConnections
maxConnections = asyncExecConfig.maxConnections,
registerMbeans = asyncExecConfig.registerMbeans
)
)
}
Expand All @@ -83,6 +84,7 @@ private[lagom] trait AsyncExecutorConfig {
def minConnections: Int
def maxConnections: Int
def queueSize: Int
def registerMbeans: Boolean
}

private[lagom] object AsyncExecutorConfig {
Expand All @@ -95,7 +97,8 @@ private[lagom] object AsyncExecutorConfig {
val minConnections: Int = config.getInt("minConnections")
val maxConnections: Int = config.getInt("maxConnections")
val queueSize: Int = config.getInt("queueSize")
val registerMbeans: Boolean = config.getBoolean("registerMbeans")

override def toString: String = s"AsyncExecutorConfig($numThreads, $minConnections, $maxConnections, $queueSize)"
override def toString: String = s"AsyncExecutorConfig($numThreads, $minConnections, $maxConnections, $queueSize, $registerMbeans)"
}
}
Expand Up @@ -19,6 +19,7 @@ object SlickDbTestProvider {
override val minConnections: Int = 20
override val maxConnections: Int = 100
override val queueSize: Int = 100
override def registerMbeans: Boolean = false
}

/** Builds Slick Database (with AsyncExecutor) and bind it as JNDI resource for test purposes */
Expand Down
33 changes: 18 additions & 15 deletions persistence/core/src/main/resources/reference.conf
@@ -1,22 +1,22 @@
#//#persistence
lagom.persistence {

# As a rule of thumb, the number of shards should be a factor ten greater
# As a rule of thumb, the number of shards should be a factor ten greater
# than the planned maximum number of cluster nodes. Less shards than number
# of nodes will result in that some nodes will not host any shards. Too many
# shards will result in less efficient management of the shards, e.g.
# rebalancing overhead, and increased latency because the coordinator is
# involved in the routing of the first message for each shard. The value
# must be the same on all nodes in a running cluster. It can be changed
# of nodes will result in that some nodes will not host any shards. Too many
# shards will result in less efficient management of the shards, e.g.
# rebalancing overhead, and increased latency because the coordinator is
# involved in the routing of the first message for each shard. The value
# must be the same on all nodes in a running cluster. It can be changed
# after stopping all nodes in the cluster.
max-number-of-shards = 100

# Persistent entities saves snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# It may be configured to "off" to disable snapshots.
snapshot-after = 100
# A persistent entity is passivated automatically if it does not receive

# A persistent entity is passivated automatically if it does not receive
# any messages during this timeout. Passivation is performed to reduce
# memory consumption. Objects referenced by the entity can be garbage
# collected after passivation. Next message will activate the entity
Expand All @@ -25,15 +25,15 @@ lagom.persistence {
# entities is bounded and their state, sharded across the cluster, will
# fit in memory.
passivate-after-idle-timeout = 120s

# Specifies that entities run on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
# The entities can still be accessed from other nodes.
run-entities-on-role = ""

# Default timeout for PersistentEntityRef.ask replies.
ask-timeout = 5s

dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
Expand All @@ -43,20 +43,23 @@ lagom.persistence {
throughput = 1
}
}
#//#persistence
#//#persistence

#//#persistence-read-side
lagom.persistence.read-side {

# how long should we wait when retrieving the last known offset
offset-timeout = 5s

# Exponential backoff for failures in ReadSideProcessor
failure-exponential-backoff {
# minimum (initial) duration until processor is started again
# after failure
min = 3s

# the exponential back-off is capped to this duration
max = 30s

# additional random delay is based on this factor
random-factor = 0.2
}
Expand Down
Expand Up @@ -11,6 +11,7 @@ import play.api.Configuration
import scala.concurrent.duration._

case class ReadSideConfig(
offsetTimeout: FiniteDuration = 5.seconds,
minBackoff: FiniteDuration = 3.seconds,
maxBackoff: FiniteDuration = 30.seconds,
randomBackoffFactor: Double = 0.2,
Expand All @@ -22,6 +23,7 @@ object ReadSideConfig {

def apply(conf: Config): ReadSideConfig = {
ReadSideConfig(
conf.getDuration("offset-timeout", TimeUnit.MILLISECONDS).millis,
conf.getDuration("failure-exponential-backoff.min", TimeUnit.MILLISECONDS).millis,
conf.getDuration("failure-exponential-backoff.max", TimeUnit.MILLISECONDS).millis,
conf.getDouble("failure-exponential-backoff.random-factor"),
Expand Down
Expand Up @@ -81,12 +81,15 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
) { () =>
val handler: ReadSideProcessor.ReadSideHandler[Event] = processorFactory().buildHandler()
val futureOffset = handler.prepare(tag).toScala
scaladsl.Source.fromFuture(futureOffset).flatMapConcat {
offset =>
val eventStreamSource = eventStreamFactory(tag, offset).asScala
val userlandFlow = handler.handle()
eventStreamSource.via(userlandFlow)
}
scaladsl.Source
.fromFuture(futureOffset)
.initialTimeout(config.offsetTimeout)
.flatMapConcat {
offset =>
val eventStreamSource = eventStreamFactory(tag, offset).asScala
val userlandFlow = handler.handle()
eventStreamSource.via(userlandFlow)
}
}

val (killSwitch, streamDone) = backoffSource
Expand Down
Expand Up @@ -76,14 +76,17 @@ private[lagom] class ReadSideActor[Event <: AggregateEvent[Event]](
config.maxBackoff,
config.randomBackoffFactor
) { () =>
val handler = processor().buildHandler
val handler = processor().buildHandler()
val futureOffset = handler.prepare(tag)
Source.fromFuture(futureOffset).flatMapConcat {
offset =>
val eventStreamSource = eventStreamFactory(tag, offset)
val userlandFlow = handler.handle()
eventStreamSource.via(userlandFlow)
}
Source
.fromFuture(futureOffset)
.initialTimeout(config.offsetTimeout)
.flatMapConcat {
offset =>
val eventStreamSource = eventStreamFactory(tag, offset)
val userlandFlow = handler.handle()
eventStreamSource.via(userlandFlow)
}
}

val (killSwitch, streamDone) = backoffSource
Expand Down

0 comments on commit 48a8a3c

Please sign in to comment.