Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'put_bytes_22' into put_bytes_30

Conflicts:
	src/main/scala/net/lag/kestrel/Kestrel.scala
	src/main/scala/net/lag/kestrel/PersistentQueue.scala
	src/main/scala/net/lag/kestrel/QueueCollection.scala
	src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
	src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
	src/test/scala/net/lag/kestrel/ServerSpec.scala
  • Loading branch information...
commit a1e519369e7145c35a928888ba62e10ea99aba78 2 parents 9c35a16 + 9fb75d7
Robey Pointer authored
View
11 src/main/scala/net/lag/kestrel/Kestrel.scala
@@ -45,7 +45,8 @@ class Kestrel(defaultQueueBuilder: QueueBuilder, queueBuilders: Seq[QueueBuilder
listenAddress: String, memcacheListenPort: Option[Int], textListenPort: Option[Int],
thriftListenPort: Option[Int], queuePath: String,
expirationTimerFrequency: Option[Duration], clientTimeout: Option[Duration],
- maxOpenTransactions: Int, debugLogQueues: List[String] = Nil)
+ maxOpenTransactions: Int, debugLogQueues: List[String] = Nil,
+ connectionBacklog: Option[Int])
extends Service {
private val log = Logger.get(getClass.getName)
@@ -76,6 +77,7 @@ class Kestrel(defaultQueueBuilder: QueueBuilder, queueBuilders: Seq[QueueBuilder
.name(name)
.reportTo(new OstrichStatsReceiver)
.bindTo(address)
+ connectionBacklog.foreach { backlog => builder = builder.backlog(backlog) }
clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) }
// calling build() is equivalent to calling start() in finagle.
builder.build(factory)
@@ -91,6 +93,7 @@ class Kestrel(defaultQueueBuilder: QueueBuilder, queueBuilders: Seq[QueueBuilder
.name(name)
.reportTo(new OstrichStatsReceiver)
.bindTo(address)
+ connectionBacklog.foreach { backlog => builder = builder.backlog(backlog) }
clientTimeout.foreach { timeout => builder = builder.readTimeout(timeout) }
// calling build() is equivalent to calling start() in finagle.
builder.build(connection => {
@@ -110,9 +113,9 @@ class Kestrel(defaultQueueBuilder: QueueBuilder, queueBuilders: Seq[QueueBuilder
def start() {
log.info("Kestrel config: listenAddress=%s memcachePort=%s textPort=%s queuePath=%s " +
- "expirationTimerFrequency=%s clientTimeout=%s maxOpenTransactions=%d",
+ "expirationTimerFrequency=%s clientTimeout=%s maxOpenTransactions=%d connectionBacklog=%s",
listenAddress, memcacheListenPort, textListenPort, queuePath,
- expirationTimerFrequency, clientTimeout, maxOpenTransactions)
+ expirationTimerFrequency, clientTimeout, maxOpenTransactions, connectionBacklog)
// this means no timeout will be at better granularity than 100 ms.
timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS)
@@ -172,6 +175,8 @@ class Kestrel(defaultQueueBuilder: QueueBuilder, queueBuilders: Seq[QueueBuilder
new PeriodicBackgroundProcess("background-expiration", expirationTimerFrequency.get) {
def periodic() {
Kestrel.this.queueCollection.flushAllExpired()
+ // Now that we've cleaned out the queue, lets see if any of them are
+ // ready to be expired.
Kestrel.this.queueCollection.deleteExpiredQueues()
}
}.start()
View
6 src/main/scala/net/lag/kestrel/QueueCollection.scala
@@ -234,10 +234,8 @@ class QueueCollection(
}
}
- def flushExpired(name: String) {
- if (!shuttingDown) {
- writer(name) foreach { _.discardExpired() }
- }
+ def flushExpired(name: String, limit: Boolean = false) {
+ writer(name) foreach { _.discardExpired() }
}
def flushAllExpired() {
View
8 src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
@@ -299,10 +299,16 @@ trait KestrelConfig extends ServerConfig[Kestrel] {
*/
var debugLogQueues: List[String] = Nil
+ /**
+ * An optional size for the backlog of connecting clients. This setting is applied to each listening port.
+ */
+ var connectionBacklog: Option[Int] = None
+
def apply(runtime: RuntimeEnvironment) = {
new Kestrel(
default, queues, listenAddress, memcacheListenPort, textListenPort, thriftListenPort,
- queuePath, expirationTimerFrequency, clientTimeout, maxOpenTransactions, debugLogQueues
+ queuePath, expirationTimerFrequency, clientTimeout, maxOpenTransactions, debugLogQueues,
+ connectionBacklog
)
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.