Skip to content

Commit

Permalink
Merge branch 'master' of github.com:kafka-dev/kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
nehanarkhede committed Jun 17, 2011
2 parents 4592215 + 6ca16ef commit cfe06af
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 23 deletions.
14 changes: 12 additions & 2 deletions config/log4j.properties
@@ -1,10 +1,20 @@
log4j.rootLogger=INFO, stdout
log4j.rootLogger=OFF, fileAppender, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.fileAppender=org.apache.log4j.FileAppender
log4j.appender.fileAppender.File=kafka-request.log
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n


# Turn on all our debugging info
#log4j.logger.kafka=INFO
log4j.logger.kafka=INFO,stdout
log4j.logger.kafka.request.logger=TRACE,fileAppender
log4j.additivity.kafka.request.logger=false
#log4j.logger.kafka.network.Processor=TRACE,fileAppender
#log4j.additivity.kafka.network.Processor=false
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG

3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/api/FetchRequest.scala
Expand Up @@ -45,5 +45,6 @@ class FetchRequest(val topic: String,

def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4

override def toString(): String= "topic:" + topic + ", part:" + partition +" offset:" + offset + " maxSize:" + maxSize
override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset +
" maxSize:" + maxSize + ")"
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/api/OffsetRequest.scala
Expand Up @@ -69,8 +69,8 @@ class OffsetRequest(val topic: String,

def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4

override def toString(): String= "topic:" + topic + ", part:" + partition + ", time:" + time +
", maxNumOffsets:" + maxNumOffsets
override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
", maxNumOffsets:" + maxNumOffsets + ")"
}

@nonthreadsafe
Expand Down
21 changes: 18 additions & 3 deletions core/src/main/scala/kafka/network/SocketServer.scala
Expand Up @@ -26,6 +26,7 @@ import java.nio.channels._
import kafka.utils._

import org.apache.log4j.Logger
import kafka.api.RequestKeys

/**
* An NIO socket server. The thread model is
Expand Down Expand Up @@ -181,7 +182,8 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
val stats: SocketServerStats) extends AbstractServerThread {

private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();

private val requestLogger = Logger.getLogger("kafka.request.logger")

override def run() {
startupComplete()
while(isRunning) {
Expand Down Expand Up @@ -258,9 +260,22 @@ private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
* Handle a completed request producing an optional response
*/
private def handle(key: SelectionKey, request: Receive): Option[Send] = {
if(logger.isTraceEnabled)
logger.trace("Handling request from " + channelFor(key).socket.getRemoteSocketAddress())
val requestTypeId = request.buffer.getShort()
if(requestLogger.isTraceEnabled) {
requestTypeId match {
case RequestKeys.Produce =>
requestLogger.trace("Handling produce request from " + channelFor(key).socket.getRemoteSocketAddress())
case RequestKeys.Fetch =>
requestLogger.trace("Handling fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
case RequestKeys.MultiFetch =>
requestLogger.trace("Handling multi-fetch request from " + channelFor(key).socket.getRemoteSocketAddress())
case RequestKeys.MultiProduce =>
requestLogger.trace("Handling multi-produce request from " + channelFor(key).socket.getRemoteSocketAddress())
case RequestKeys.Offsets =>
requestLogger.trace("Handling offset request from " + channelFor(key).socket.getRemoteSocketAddress())
case _ => throw new InvalidRequestException("No mapping found for handler id " + requestTypeId)
}
}
val handler = handlerMapping(requestTypeId, request)
if(handler == null)
throw new InvalidRequestException("No handler found for request")
Expand Down
30 changes: 17 additions & 13 deletions core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
Expand Up @@ -35,7 +35,8 @@ import java.io.IOException
private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {

private val logger = Logger.getLogger(classOf[KafkaRequestHandlers])

private val requestLogger = Logger.getLogger("kafka.request.logger")

def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = {
requestTypeId match {
case RequestKeys.Produce => handleProducerRequest _
Expand All @@ -49,9 +50,9 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {

def handleProducerRequest(receive: Receive): Option[Send] = {
val sTime = SystemTime.milliseconds
if(logger.isTraceEnabled)
logger.trace("Handling producer request")
val request = ProducerRequest.readFrom(receive.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Producer request " + request.toString)
val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
try {
logManager.getOrCreateLog(request.topic, partition).append(request.messages)
Expand All @@ -60,7 +61,7 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {
}
catch {
case e =>
logger.error("error processing ProduceRequst on " + request.topic + ":" + partition, e)
logger.error("error processing ProduceRequest on " + request.topic + ":" + partition, e)
e match {
case _: IOException =>
logger.error("force shutdown due to " + e)
Expand All @@ -75,19 +76,21 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {
}

def handleMultiProducerRequest(receive: Receive): Option[Send] = {
if(logger.isTraceEnabled)
logger.trace("Handling multiproducer request")
val request = MultiProducerRequest.readFrom(receive.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Multiproducer request ")
for (produce <- request.produces) {
val partition = produce.getTranslatedPartition(logManager.chooseRandomPartition)
try {
logManager.getOrCreateLog(produce.topic, partition).append(produce.messages)
if(requestLogger.isTraceEnabled)
requestLogger.trace(produce.toString)
if(logger.isTraceEnabled)
logger.trace(produce.messages.sizeInBytes + " bytes written to logs.")
}
catch {
case e =>
logger.error("erorr processing MultiProduceRequst on " + produce.topic + ":" + partition, e)
logger.error("error processing MultiProduceRequest on " + produce.topic + ":" + partition, e)
e match {
case _: IOException =>
logger.error("force shutdown due to ", e)
Expand All @@ -101,16 +104,17 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {
}

def handleFetchRequest(request: Receive): Option[Send] = {
if(logger.isTraceEnabled)
logger.trace("Handling fetch request")
val fetchRequest = FetchRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Fetch request " + fetchRequest.toString)
Some(readMessageSet(fetchRequest))
}

def handleMultiFetchRequest(request: Receive): Option[Send] = {
if(logger.isTraceEnabled)
logger.trace("Handling multifetch request")
val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Multifetch request")
multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
var responses = multiFetchRequest.fetches.map(fetch =>
readMessageSet(fetch)).toList

Expand All @@ -132,9 +136,9 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) {
}

def handleOffsetRequest(request: Receive): Option[Send] = {
if(logger.isTraceEnabled)
logger.trace("Handling offset request")
val offsetRequest = OffsetRequest.readFrom(request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Offset request " + offsetRequest.toString)
val log = logManager.getOrCreateLog(offsetRequest.topic, offsetRequest.partition)
val offsets = log.getOffsetsBefore(offsetRequest)
val response = new OffsetArraySend(offsets)
Expand Down
8 changes: 7 additions & 1 deletion core/src/test/resources/log4j.properties
@@ -1,13 +1,19 @@
log4j.rootLogger=WARN, stdout
log4j.rootLogger=TRACE, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.fileAppender=org.apache.log4j.FileAppender
log4j.appender.fileAppender.File=kafka.log
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n

# Turn on all our debugging info
#log4j.logger.kafka=DEBUG
#log4j.logger.kafka.producer=WARN
log4j.logger.kafka=WARN
log4j.logger.kafka.network.Processor=TRACE,fileAppender
log4j.logger.org.I0Itec.zkclient=OFF
log4j.logger.org.apache.zookeeper=OFF

9 changes: 8 additions & 1 deletion examples/README
Expand Up @@ -3,10 +3,17 @@ This directory contains examples of client code that uses kafka.
The default target for ant is kafka.examples.KafkaConsumerProducerDemo which sends and receives
messages from Kafka server.

In order to run demo:
In order to run demo from SBT:
1. Start Zookeeper and the Kafka server
2. ./sbt from top-level kafka directory
3. Switch to the kafka java examples project -> project Kafka Java Examples
4. execute run -> run
5. For unlimited producer-consumer run, select option 1
For simple consumer demo, select option 2

To run the demo using scripts:

1. Start Zookeeper and the Kafka server
2. For simple consumer demo, run bin/java-simple-consumer-demo.sh
3. For unlimited producer-consumer run, run bin/java-producer-consumer-demo.sh

0 comments on commit cfe06af

Please sign in to comment.