Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ project/sbt_project_definition.iml
*~
*#
.#*
/kafka-request.log
/log-cleaner.log
/state-change.log
8 changes: 8 additions & 0 deletions .idea_modules/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/Kafka-build.iml
/Kafka.iml
/contrib.iml
/core.iml
/hadoop-consumer.iml
/hadoop-producer.iml
/java-examples.iml
/perf.iml
17 changes: 11 additions & 6 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -22,12 +22,17 @@ fi

base_dir=$(dirname $0)/..

for file in $base_dir/project/boot/scala-2.8.0/lib/*.jar;
for file in $base_dir/s2.10/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/project/boot/scala-2.10.0/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/core/target/scala_2.8.0/*.jar;
for file in $base_dir/core/target/scala-2.10/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
Expand All @@ -37,12 +42,12 @@ do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/perf/target/scala_2.8.0/kafka*.jar;
for file in $base_dir/perf/target/scala_2.10.0/kafka*.jar;
do
CLASSPATH=$CLASSPATH:$file
done

for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
for file in $base_dir/core/lib_managed/scala_2.10.0/compile/*.jar;
do
if [ ${file##*/} != "sbt-launch.jar" ]; then
CLASSPATH=$CLASSPATH:$file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
" for topic " + topic + " with consumers: " + curConsumers)

for (consumerThreadId <- consumerThreadIdSet) {
val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
//val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
val myConsumerPosition = curConsumers.indexWhere(_ == consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
Expand Down
35 changes: 25 additions & 10 deletions core/src/main/scala/kafka/javaapi/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ private[javaapi] object Implicits extends Logging {
new kafka.producer.async.EventHandler[T] {
override def init(props: java.util.Properties) { eventHandler.init(props) }
override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
import collection.JavaConversions._
eventHandler.handle(asList(events), producer, encoder)
//import collection.JavaConversions._
import collection.JavaConverters._
eventHandler.handle(events.asJava, producer, encoder)
//eventHandler.handle(asList(events), producer, encoder)
}
override def close { eventHandler.close }
}
Expand All @@ -58,8 +60,10 @@ private[javaapi] object Implicits extends Logging {
override def init(props: java.util.Properties) { eventHandler.init(props) }
override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
encoder: Encoder[T]) {
import collection.JavaConversions._
eventHandler.handle(asBuffer(events), producer, encoder)
//import collection.JavaConversions._
import collection.JavaConverters._
//eventHandler.handle(asBuffer(events), producer, encoder)
eventHandler.handle(events.asScala.toBuffer, producer, encoder)
}
override def close { eventHandler.close }
}
Expand All @@ -80,10 +84,14 @@ private[javaapi] object Implicits extends Logging {
cbkHandler.afterDequeuingExistingData(data)
}
override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
asList(cbkHandler.beforeSendingData(asList(data)))
import collection.JavaConverters._
cbkHandler.beforeSendingData(data.toList)
//asList(cbkHandler.beforeSendingData(asList(data)))
}
override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
asBuffer(cbkHandler.lastBatchBeforeClose)
//asBuffer(cbkHandler.lastBatchBeforeClose)
import collection.JavaConverters._
cbkHandler.lastBatchBeforeClose().toBuffer
}
override def close { cbkHandler.close }
}
Expand All @@ -92,7 +100,8 @@ private[javaapi] object Implicits extends Logging {
implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
: kafka.javaapi.producer.async.CallbackHandler[T] = {
new kafka.javaapi.producer.async.CallbackHandler[T] {
import collection.JavaConversions._
//import collection.JavaConversions._
import collection.JavaConverters._
override def init(props: java.util.Properties) { cbkHandler.init(props)}
override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
cbkHandler.beforeEnqueue(data)
Expand All @@ -102,14 +111,20 @@ private[javaapi] object Implicits extends Logging {
}
override def afterDequeuingExistingData(data: QueueItem[T] = null)
: java.util.List[QueueItem[T]] = {
asList(cbkHandler.afterDequeuingExistingData(data))

//asList(cbkHandler.afterDequeuingExistingData(data))
cbkHandler.afterDequeuingExistingData(data).toList.asJava
}

override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
: java.util.List[QueueItem[T]] = {
asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
//asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
cbkHandler.beforeSendingData(data.asScala.toBuffer).toList.asJava
}

override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
asList(cbkHandler.lastBatchBeforeClose)
//asList(cbkHandler.lastBatchBeforeClose)
cbkHandler.lastBatchBeforeClose.toList.asJava
}
override def close { cbkHandler.close }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ class SimpleConsumer(val host: String,
* @return a sequence of fetch responses
*/
def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = {
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import kafka.javaapi.Implicits._
underlying.multifetch(asBuffer(fetches): _*)
//underlying.multifetch(asBuffer(fetches): _*)
underlying.multifetch( fetches.asScala.toBuffer: _*)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package kafka.javaapi.consumer
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
import kafka.consumer._
import scala.collection.JavaConversions.asList
//import scala.collection.JavaConversions.asList
import scala.collection.JavaConverters._


/**
Expand Down Expand Up @@ -71,9 +72,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicCountMap: java.util.Map[String,java.lang.Integer],
decoder: Decoder[T])
: java.util.Map[String,java.util.List[KafkaStream[T]]] = {
import scala.collection.JavaConversions._

val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
import scala.collection.JavaConverters._

val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]).asScala
val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
for ((topic, streams) <- scalaReturn) {
Expand All @@ -91,7 +93,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
createMessageStreams(topicCountMap, new DefaultDecoder)

def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
//asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder).toSeq.asJava

def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kafka.javaapi.message
import java.nio.ByteBuffer
import kafka.common.ErrorMapping
import kafka.message._
import scala.collection.JavaConverters._

class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
Expand All @@ -29,7 +30,8 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)

def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
//this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
this(MessageSet.createByteBuffer(compressionCodec, messages.asScala.toBuffer: _*),
0L, ErrorMapping.NoError)
}

Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/kafka/javaapi/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ class Producer[K,V](config: ProducerConfig,
override def init(props: Properties) { eventHandler.init(props) }
override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer,
encoder: Encoder[V]) {
import collection.JavaConversions._
//import collection.JavaConversions._
import collection.JavaConverters._
import kafka.javaapi.Implicits._
eventHandler.handle(asList(events), producer, encoder)
//eventHandler.handle(asList(events), producer, encoder)
eventHandler.handle(events, producer, encoder)
}
override def close { eventHandler.close }
},
Expand All @@ -84,10 +86,12 @@ class Producer[K,V](config: ProducerConfig,
cbkHandler.afterDequeuingExistingData(data)
}
override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
asList(cbkHandler.beforeSendingData(asList(data)))
//asList(cbkHandler.beforeSendingData(asList(data)))
cbkHandler.beforeSendingData(data.toList)
}
override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = {
asBuffer(cbkHandler.lastBatchBeforeClose)
//asBuffer(cbkHandler.lastBatchBeforeClose)
cbkHandler.lastBatchBeforeClose.toBuffer
}
override def close { cbkHandler.close }
}))
Expand All @@ -101,17 +105,17 @@ class Producer[K,V](config: ProducerConfig,
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
import collection.JavaConversions._
underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
asBuffer(producerData.getData)))
producerData.getData.toBuffer))
}

/**
* Use this API to send data to multiple topics
* @param producerData list of producer data objects that encapsulate the topic, key and message data
*/
def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
import collection.JavaConversions._
underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
asBuffer(pd.getData))): _*)
import collection.JavaConverters._
underlying.send(producerData.asScala.map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
pd.getData.asScala)): _*)
}

/**
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
*/
package kafka.javaapi.producer

import scala.collection.JavaConversions._
//import scala.collection.JavaConversions._
import collection.JavaConverters._

class ProducerData[K, V](private val topic: String,
private val key: K,
private val data: java.util.List[V]) {

def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)

def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d).asJava)

def getTopic: String = topic

Expand Down
28 changes: 19 additions & 9 deletions core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,20 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
private def getZKTopicPartitionInfo(): collection.mutable.Map[String, SortedSet[Partition]] = {
val brokerPartitionsPerTopic = new HashMap[String, SortedSet[Partition]]()
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)

println("Broker Topic Path => " + ZkUtils.BrokerTopicsPath)
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)

topics.foreach { topic =>
// find the number of broker partitions registered for this topic
val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)

val numPartitions = brokerList.map{bid =>
val x = ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid)
if (x == "") 0 else bid.toInt
}

val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
Expand Down Expand Up @@ -219,11 +227,13 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In

zkWatcherLock synchronized {
trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString)
import scala.collection.JavaConversions._
//import scala.collection.JavaConversions._
import collection.JavaConverters._

parentPath match {
case "/brokers/topics" => // this is a watcher for /broker/topics path
val updatedTopics = asBuffer(curChilds)
//val updatedTopics = asBuffer(curChilds)
val updatedTopics = curChilds.asScala.toBuffer
debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
curChilds.toString)
debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
Expand All @@ -240,14 +250,15 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
case "/brokers/ids" => // this is a watcher for /broker/ids path
debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
"\t Currently registered list of brokers -> " + curChilds.toString)
processBrokerChange(parentPath, curChilds)
processBrokerChange(parentPath, curChilds.asScala)
case _ =>
val pathSplits = parentPath.split("/")
val topic = pathSplits.last
if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
" list of brokers -> " + curChilds.toString + " for topic -> " + topic)
processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
//processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
processNewBrokerInExistingTopic(topic, curChilds.asScala.toBuffer)
}
}

Expand All @@ -259,8 +270,9 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In

def processBrokerChange(parentPath: String, curChilds: Seq[String]) {
if(parentPath.equals(ZkUtils.BrokerIdsPath)) {
import scala.collection.JavaConversions._
val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
//import scala.collection.JavaConversions._
import collection.JavaConversions._
val updatedBrokerList = curChilds.map(bid => bid.toInt).toBuffer
val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
newBrokers.foreach { bid =>
Expand Down Expand Up @@ -374,7 +386,5 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
// there is no need to re-register other listeners as they are listening on the child changes of
// permanent nodes
}

}

}
Loading