Skip to content

Commit

Permalink
Merge branch 'KAFKA-69-patch' of https://github.com/jghoman/kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Jun Rao committed Jul 20, 2011
2 parents 09ae372 + 58a54ec commit 0575ecf
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package kafka.common

/**
* Thrown when a request is made for broker but no brokers with that topic
* exist.
*/
class NoBrokersForPartitionException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/javaapi/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Producer[K,V](config: ProducerConfig,
/* use the other constructor*/
{

private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool)
private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null)

/**
* This constructor can be used when all config parameters will be specified through the
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import collection.mutable.Map
import collection.SortedSet
import kafka.cluster.{Broker, Partition}

private[producer] trait BrokerPartitionInfo {
trait BrokerPartitionInfo {
/**
* Return a sequence of (brokerId, numPartitions)
* Return a sequence of (brokerId, numPartitions).
* @param topic the topic for which this information is to be returned
* @return a sequence of (brokerId, numPartitions)
* @return a sequence of (brokerId, numPartitions). Returns a zero-length
* sequence if no brokers are available.
*/
def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition]

Expand Down
45 changes: 23 additions & 22 deletions core/src/main/scala/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,43 @@

package kafka.producer

import async.{QueueItem, CallbackHandler, EventHandler}
import async.{CallbackHandler, EventHandler}
import org.apache.log4j.Logger
import kafka.serializer.Encoder
import kafka.utils._
import kafka.common.InvalidConfigException
import java.util.Properties
import kafka.cluster.{Partition, Broker}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.InvalidPartitionException
import kafka.api.ProducerRequest
import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}

class Producer[K,V](config: ProducerConfig,
partitioner: Partitioner[K],
producerPool: ProducerPool[V],
populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */
populateProducerPool: Boolean,
private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
/* use the other constructor*/
{
private val logger = Logger.getLogger(classOf[Producer[K, V]])
private val hasShutdown = new AtomicBoolean(false)
if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo))
throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
private val random = new java.util.Random
private var brokerPartitionInfo: BrokerPartitionInfo = null
// check if zookeeper based auto partition discovery is enabled
private val zkEnabled = if(!Utils.propertyExists(config.zkConnect)) false else true
zkEnabled match {
case true =>
val zkProps = new Properties()
zkProps.put("zk.connect", config.zkConnect)
zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
case false =>
brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
private val zkEnabled = Utils.propertyExists(config.zkConnect)
if(brokerPartitionInfo == null) {
zkEnabled match {
case true =>
val zkProps = new Properties()
zkProps.put("zk.connect", config.zkConnect)
zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString)
zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString)
zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString)
brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
case false =>
brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
}
}

// pool of producers, one per broker
if(populateProducerPool) {
val allBrokers = brokerPartitionInfo.getAllBrokerInfo
Expand All @@ -65,7 +65,7 @@ class Producer[K,V](config: ProducerConfig,
* @param config Producer Configuration object
*/
def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
new ProducerPool[V](config, Utils.getObject(config.serializerClass)))
new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null)

/**
* This constructor can be used to provide pre-instantiated objects for all config parameters
Expand All @@ -90,9 +90,8 @@ class Producer[K,V](config: ProducerConfig,
eventHandler: EventHandler[V],
cbkHandler: CallbackHandler[V],
partitioner: Partitioner[K]) =
this(config, if(partitioner == null) new DefaultPartitioner else partitioner,
new ProducerPool[V](config, encoder, eventHandler, cbkHandler))

this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
/**
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
Expand All @@ -105,6 +104,8 @@ class Producer[K,V](config: ProducerConfig,
val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
val totalNumPartitions = numBrokerPartitions.length
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)

var brokerIdPartition: Partition = null
var partition: Int = 0
if(zkEnabled) {
Expand Down Expand Up @@ -174,4 +175,4 @@ class Producer[K,V](config: ProducerConfig,
brokerPartitionInfo.close
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
/**
* Return a sequence of (brokerId, numPartitions)
* @param topic the topic for which this information is to be returned
* @return a sequence of (brokerId, numPartitions)
* @return a sequence of (brokerId, numPartitions). Returns a zero-length
* sequence if no brokers are available.
*/
def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = {
val brokerPartitions = topicBrokerPartitions.get(topic)
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/kafka/tools/ProducerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ object ProducerPerformance {
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

val producer = new Producer[String, String](new ProducerConfig(props), new StringEncoder, new DefaultEventHandler, null,
new DefaultPartitioner)
val producer = new Producer[String, String](new ProducerConfig(props), new StringEncoder, new DefaultEventHandler[String], null, new DefaultPartitioner[String])

override def run {
var bytesSent = 0L
Expand Down Expand Up @@ -210,8 +209,8 @@ object ProducerPerformance {
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)

val producer = new Producer[String, String](new ProducerConfig(props), new StringEncoder, new DefaultEventHandler, null,
new DefaultPartitioner)
val producer = new Producer[String, String](new ProducerConfig(props), new StringEncoder, new DefaultEventHandler[String], null,
new DefaultPartitioner[String])

override def run {
var bytesSent = 0L
Expand Down
54 changes: 54 additions & 0 deletions core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2011 LinkedIn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package unit.kafka.producer

import collection.immutable.SortedSet
import java.util._
import junit.framework.Assert._
import kafka.cluster.Partition
import kafka.common.NoBrokersForPartitionException
import kafka.producer._
import org.easymock.EasyMock
import org.junit.Test
import org.scalatest.junit.JUnitSuite
import scala.collection.immutable.List

class ProducerMethodsTest extends JUnitSuite {

@Test
def producerThrowsNoBrokersException() = {
val props = new Properties
props.put("broker.list", "placeholder") // Need to fake out having specified one
val config = new ProducerConfig(props)
val mockPartitioner = EasyMock.createMock(classOf[Partitioner[String]])
val mockProducerPool = EasyMock.createMock(classOf[ProducerPool[String]])
val mockBrokerPartitionInfo = EasyMock.createMock(classOf[kafka.producer.BrokerPartitionInfo])

EasyMock.expect(mockBrokerPartitionInfo.getBrokerPartitionInfo("the_topic")).andReturn(SortedSet[Partition]())
EasyMock.replay(mockBrokerPartitionInfo)

val producer = new Producer[String, String](config,mockPartitioner, mockProducerPool,false, mockBrokerPartitionInfo)

try {
val producerData = new ProducerData[String, String]("the_topic", "the_key", List("the_datum"))
producer.send(producerData)
fail("Should have thrown a NoBrokersForPartitionException.")
} catch {
case nb: NoBrokersForPartitionException => assertTrue(nb.getMessage.contains("the_key"))
}

}
}
12 changes: 6 additions & 6 deletions core/src/test/scala/unit/kafka/producer/ProducerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ProducerTest extends JUnitSuite {
syncProducers.put(brokerId2, syncProducer2)

val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)

producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
producer.close
Expand Down Expand Up @@ -163,7 +163,7 @@ class ProducerTest extends JUnitSuite {

val producerPool = new ProducerPool[String](config, serializer, syncProducers,
new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)

producer.send(new ProducerData[String, String](topic, "t"))
producer.close
Expand Down Expand Up @@ -380,7 +380,7 @@ class ProducerTest extends JUnitSuite {
asyncProducers.put(brokerId1, asyncProducer1)

val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
val producer = new Producer[String, String](config, partitioner, producerPool, false)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)

producer.send(new ProducerData[String, String](topic, "test1", Array("test1")))
producer.close
Expand Down Expand Up @@ -544,7 +544,7 @@ class ProducerTest extends JUnitSuite {
syncProducers.put(brokerId2, syncProducer2)

val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)

producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1")))
Thread.sleep(100)
Expand Down Expand Up @@ -596,7 +596,7 @@ class ProducerTest extends JUnitSuite {
syncProducers.put(2, syncProducer3)

val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
val producer = new Producer[String, String](config, partitioner, producerPool, false)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)

val serverProps = TestUtils.createBrokerConfig(2, 9094)
val serverConfig = new KafkaConfig(serverProps) {
Expand Down Expand Up @@ -648,7 +648,7 @@ class ProducerTest extends JUnitSuite {
asyncProducers.put(brokerId1, asyncProducer1)

val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
val producer = new Producer[String, String](config, partitioner, producerPool, false)
val producer = new Producer[String, String](config, partitioner, producerPool, false, null)

producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
producer.close
Expand Down

0 comments on commit 0575ecf

Please sign in to comment.