From 98cbf57c54cca1a08e5899fc15009739ca0f3da5 Mon Sep 17 00:00:00 2001 From: Spencer Tipping Date: Thu, 15 Apr 2010 17:10:38 -0600 Subject: [PATCH 1/3] Made syslogging non-blocking; this reduces thread contention during log spikes (and illustrates the failure of pre-emptive multithreading as an application development paradigm, IMAO). --- src/main/scala/net/lag/logging/Syslog.scala | 27 +++++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/scala/net/lag/logging/Syslog.scala b/src/main/scala/net/lag/logging/Syslog.scala index 88b38a5..721e493 100644 --- a/src/main/scala/net/lag/logging/Syslog.scala +++ b/src/main/scala/net/lag/logging/Syslog.scala @@ -16,6 +16,9 @@ package net.lag.logging +import scala.actors.Actor._ +import scala.actors._ + import java.util.{logging => javalog} import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, SocketAddress} import java.text.SimpleDateFormat @@ -129,14 +132,22 @@ class SyslogHandler(useIsoDateFormat: Boolean, server: String) extends Handler(n def clearServerName = formatter.clearServerName - def publish(record: javalog.LogRecord) = synchronized { - try { - val data = getFormatter.format(record).getBytes - val packet = new DatagramPacket(data, data.length, dest) - socket.send(packet) - } catch { - case e => - System.err.println(Formatter.formatStackTrace(e, 30).mkString("\n")) + def publish(record: javalog.LogRecord) = NonBlockingSyslog send record +} + +object NonBlockingSyslog { + val send = writer ! _ + + private lazy val writer = actor { + while (true) receive { + case x: javalog.LogRecord => try { + val data = getFormatter.format(record).getBytes + val packet = new DatagramPacket(data, data.length, dest) + socket.send(packet) + } catch { + case e => + System.err.println(Formatter.formatStackTrace(e, 30).mkString("\n")) + } } } } From 332faa81c23800c48540396aa6f680cf6e688ee0 Mon Sep 17 00:00:00 2001 From: Spencer Tipping Date: Thu, 15 Apr 2010 17:20:58 -0600 Subject: [PATCH 2/3] Undo eta-reduction to preserve type safety --- src/main/scala/net/lag/logging/Syslog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/net/lag/logging/Syslog.scala b/src/main/scala/net/lag/logging/Syslog.scala index 721e493..5304c7c 100644 --- a/src/main/scala/net/lag/logging/Syslog.scala +++ b/src/main/scala/net/lag/logging/Syslog.scala @@ -136,7 +136,7 @@ class SyslogHandler(useIsoDateFormat: Boolean, server: String) extends Handler(n } object NonBlockingSyslog { - val send = writer ! _ + def send (record: javalog.LogRecord) = writer ! record private lazy val writer = actor { while (true) receive { From c373c70c7624a13a7b95b52f252d07c3ae93ede0 Mon Sep 17 00:00:00 2001 From: Spencer Tipping Date: Thu, 15 Apr 2010 17:57:57 -0600 Subject: [PATCH 3/3] Cleaned up the nonblocking interface and added a blocking API for unit tests and other situations where synchronous operation is appropriate. --- src/main/scala/net/lag/logging/Syslog.scala | 22 ++++++++++--------- .../scala/net/lag/logging/LoggingSpec.scala | 3 +++ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/main/scala/net/lag/logging/Syslog.scala b/src/main/scala/net/lag/logging/Syslog.scala index 5304c7c..b9800f7 100644 --- a/src/main/scala/net/lag/logging/Syslog.scala +++ b/src/main/scala/net/lag/logging/Syslog.scala @@ -132,22 +132,24 @@ class SyslogHandler(useIsoDateFormat: Boolean, server: String) extends Handler(n def clearServerName = formatter.clearServerName - def publish(record: javalog.LogRecord) = NonBlockingSyslog send record + def publish(record: javalog.LogRecord) = { + val data = formatter.format(record).getBytes + val packet = new DatagramPacket(data, data.length, dest) + NonBlockingSyslog.send (() => socket.send(packet)) + } } object NonBlockingSyslog { - def send (record: javalog.LogRecord) = writer ! record + def send (action: () => Unit) = writer ! action + def block = writer !? Wait + + private case object Wait private lazy val writer = actor { while (true) receive { - case x: javalog.LogRecord => try { - val data = getFormatter.format(record).getBytes - val packet = new DatagramPacket(data, data.length, dest) - socket.send(packet) - } catch { - case e => - System.err.println(Formatter.formatStackTrace(e, 30).mkString("\n")) - } + case Wait => reply (()) + case action: (() => Unit) => try {action ()} + catch {case e => System.err.println(Formatter.formatStackTrace(e, 30).mkString("\n"))} } } } diff --git a/src/test/scala/net/lag/logging/LoggingSpec.scala b/src/test/scala/net/lag/logging/LoggingSpec.scala index a93781d..ca8d166 100644 --- a/src/test/scala/net/lag/logging/LoggingSpec.scala +++ b/src/test/scala/net/lag/logging/LoggingSpec.scala @@ -345,6 +345,8 @@ object LoggingSpec extends Specification with TestHelper { syslog.clearServerName log.debug("and debug!") + NonBlockingSyslog.block + val p = new DatagramPacket(new Array[Byte](1024), 1024) serverSocket.receive(p) new String(p.getData, 0, p.getLength) mustEqual "<9>2008-03-29T05:53:16 raccoon.local whiskey: fatal message!" @@ -360,6 +362,7 @@ object LoggingSpec extends Specification with TestHelper { log.addHandler(syslog) log.info("here's an info message with BSD time.") serverSocket.receive(p) + NonBlockingSyslog.block new String(p.getData, 0, p.getLength) mustEqual "<14>Mar 29 05:53:16 raccoon.local whiskey: here's an info message with BSD time." }