diff --git a/.gitignore b/.gitignore
index 1fc794d03bfd8..bbabda45f67d7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,3 +12,6 @@ project/sbt_project_definition.iml
*~
*#
.#*
+/kafka-request.log
+/log-cleaner.log
+/state-change.log
diff --git a/.idea_modules/.gitignore b/.idea_modules/.gitignore
new file mode 100644
index 0000000000000..2eda59fc92fa0
--- /dev/null
+++ b/.idea_modules/.gitignore
@@ -0,0 +1,8 @@
+/Kafka-build.iml
+/Kafka.iml
+/contrib.iml
+/core.iml
+/hadoop-consumer.iml
+/hadoop-producer.iml
+/java-examples.iml
+/perf.iml
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index e93f670becf2a..6b8fcdf374130 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -5,9 +5,9 @@
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,12 +22,17 @@ fi
base_dir=$(dirname $0)/..
-for file in $base_dir/project/boot/scala-2.8.0/lib/*.jar;
+for file in $base_dir/s2.10/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/project/boot/scala-2.10.0/lib/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/core/target/scala_2.8.0/*.jar;
+for file in $base_dir/core/target/scala-2.10/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
@@ -37,12 +42,12 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/perf/target/scala_2.8.0/kafka*.jar;
+for file in $base_dir/perf/target/scala_2.10.0/kafka*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
+for file in $base_dir/core/lib_managed/scala_2.10.0/compile/*.jar;
do
if [ ${file##*/} != "sbt-launch.jar" ]; then
CLASSPATH=$CLASSPATH:$file
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f7782df231b3f..60dd2c5b1e63c 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -505,7 +505,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) {
- val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+ //val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId)
+ val myConsumerPosition = curConsumers.indexWhere(_ == consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index 20ca1930e7264..8fd5c85005911 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -45,8 +45,10 @@ private[javaapi] object Implicits extends Logging {
new kafka.producer.async.EventHandler[T] {
override def init(props: java.util.Properties) { eventHandler.init(props) }
override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) {
- import collection.JavaConversions._
- eventHandler.handle(asList(events), producer, encoder)
+ //import collection.JavaConversions._
+ import collection.JavaConverters._
+ eventHandler.handle(events.asJava, producer, encoder)
+ //eventHandler.handle(asList(events), producer, encoder)
}
override def close { eventHandler.close }
}
@@ -58,8 +60,10 @@ private[javaapi] object Implicits extends Logging {
override def init(props: java.util.Properties) { eventHandler.init(props) }
override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer,
encoder: Encoder[T]) {
- import collection.JavaConversions._
- eventHandler.handle(asBuffer(events), producer, encoder)
+ //import collection.JavaConversions._
+ import collection.JavaConverters._
+ //eventHandler.handle(asBuffer(events), producer, encoder)
+ eventHandler.handle(events.asScala.toBuffer, producer, encoder)
}
override def close { eventHandler.close }
}
@@ -80,10 +84,14 @@ private[javaapi] object Implicits extends Logging {
cbkHandler.afterDequeuingExistingData(data)
}
override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = {
- asList(cbkHandler.beforeSendingData(asList(data)))
+ import collection.JavaConverters._
+ cbkHandler.beforeSendingData(data.toList)
+ //asList(cbkHandler.beforeSendingData(asList(data)))
}
override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = {
- asBuffer(cbkHandler.lastBatchBeforeClose)
+ //asBuffer(cbkHandler.lastBatchBeforeClose)
+ import collection.JavaConverters._
+ cbkHandler.lastBatchBeforeClose().toBuffer
}
override def close { cbkHandler.close }
}
@@ -92,7 +100,8 @@ private[javaapi] object Implicits extends Logging {
implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T])
: kafka.javaapi.producer.async.CallbackHandler[T] = {
new kafka.javaapi.producer.async.CallbackHandler[T] {
- import collection.JavaConversions._
+ //import collection.JavaConversions._
+ import collection.JavaConverters._
override def init(props: java.util.Properties) { cbkHandler.init(props)}
override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = {
cbkHandler.beforeEnqueue(data)
@@ -102,14 +111,20 @@ private[javaapi] object Implicits extends Logging {
}
override def afterDequeuingExistingData(data: QueueItem[T] = null)
: java.util.List[QueueItem[T]] = {
- asList(cbkHandler.afterDequeuingExistingData(data))
+
+ //asList(cbkHandler.afterDequeuingExistingData(data))
+ cbkHandler.afterDequeuingExistingData(data).toList.asJava
}
+
override def beforeSendingData(data: java.util.List[QueueItem[T]] = null)
: java.util.List[QueueItem[T]] = {
- asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
+ //asBuffer(cbkHandler.beforeSendingData(asBuffer(data)))
+ cbkHandler.beforeSendingData(data.asScala.toBuffer).toList.asJava
}
+
override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = {
- asList(cbkHandler.lastBatchBeforeClose)
+ //asList(cbkHandler.lastBatchBeforeClose)
+ cbkHandler.lastBatchBeforeClose.toList.asJava
}
override def close { cbkHandler.close }
}
diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
index 9ba324da45c36..7447c3d106cc2 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -50,9 +50,10 @@ class SimpleConsumer(val host: String,
* @return a sequence of fetch responses
*/
def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = {
- import scala.collection.JavaConversions._
+ import scala.collection.JavaConverters._
import kafka.javaapi.Implicits._
- underlying.multifetch(asBuffer(fetches): _*)
+ //underlying.multifetch(asBuffer(fetches): _*)
+ underlying.multifetch( fetches.asScala.toBuffer: _*)
}
/**
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index f1a469b23ca17..4bff5103ea411 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -19,7 +19,8 @@ package kafka.javaapi.consumer
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
import kafka.consumer._
-import scala.collection.JavaConversions.asList
+//import scala.collection.JavaConversions.asList
+import scala.collection.JavaConverters._
/**
@@ -71,9 +72,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicCountMap: java.util.Map[String,java.lang.Integer],
decoder: Decoder[T])
: java.util.Map[String,java.util.List[KafkaStream[T]]] = {
- import scala.collection.JavaConversions._
- val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
+ import scala.collection.JavaConverters._
+
+ val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]).asScala
val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
for ((topic, streams) <- scalaReturn) {
@@ -91,7 +93,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
createMessageStreams(topicCountMap, new DefaultDecoder)
def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
- asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
+ //asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
+ underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder).toSeq.asJava
def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 7ebeb9c1efd66..38d8ff9bc47df 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -19,6 +19,7 @@ package kafka.javaapi.message
import java.nio.ByteBuffer
import kafka.common.ErrorMapping
import kafka.message._
+import scala.collection.JavaConverters._
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
@@ -29,7 +30,8 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
- this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
+ //this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
+ this(MessageSet.createByteBuffer(compressionCodec, messages.asScala.toBuffer: _*),
0L, ErrorMapping.NoError)
}
diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
index faa420dc20ad2..344ea2018eb58 100644
--- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -65,9 +65,11 @@ class Producer[K,V](config: ProducerConfig,
override def init(props: Properties) { eventHandler.init(props) }
override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer,
encoder: Encoder[V]) {
- import collection.JavaConversions._
+ //import collection.JavaConversions._
+ import collection.JavaConverters._
import kafka.javaapi.Implicits._
- eventHandler.handle(asList(events), producer, encoder)
+ //eventHandler.handle(asList(events), producer, encoder)
+ eventHandler.handle(events, producer, encoder)
}
override def close { eventHandler.close }
},
@@ -84,10 +86,12 @@ class Producer[K,V](config: ProducerConfig,
cbkHandler.afterDequeuingExistingData(data)
}
override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = {
- asList(cbkHandler.beforeSendingData(asList(data)))
+ //asList(cbkHandler.beforeSendingData(asList(data)))
+ cbkHandler.beforeSendingData(data.toList)
}
override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = {
- asBuffer(cbkHandler.lastBatchBeforeClose)
+ //asBuffer(cbkHandler.lastBatchBeforeClose)
+ cbkHandler.lastBatchBeforeClose.toBuffer
}
override def close { cbkHandler.close }
}))
@@ -101,7 +105,7 @@ class Producer[K,V](config: ProducerConfig,
def send(producerData: kafka.javaapi.producer.ProducerData[K,V]) {
import collection.JavaConversions._
underlying.send(new kafka.producer.ProducerData[K,V](producerData.getTopic, producerData.getKey,
- asBuffer(producerData.getData)))
+ producerData.getData.toBuffer))
}
/**
@@ -109,9 +113,9 @@ class Producer[K,V](config: ProducerConfig,
* @param producerData list of producer data objects that encapsulate the topic, key and message data
*/
def send(producerData: java.util.List[kafka.javaapi.producer.ProducerData[K,V]]) {
- import collection.JavaConversions._
- underlying.send(asBuffer(producerData).map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
- asBuffer(pd.getData))): _*)
+ import collection.JavaConverters._
+ underlying.send(producerData.asScala.map(pd => new kafka.producer.ProducerData[K,V](pd.getTopic, pd.getKey,
+ pd.getData.asScala)): _*)
}
/**
diff --git a/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala b/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
index 338e0a8a56d7c..a2d2b939f781a 100644
--- a/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
+++ b/core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
@@ -16,7 +16,8 @@
*/
package kafka.javaapi.producer
-import scala.collection.JavaConversions._
+//import scala.collection.JavaConversions._
+import collection.JavaConverters._
class ProducerData[K, V](private val topic: String,
private val key: K,
@@ -24,7 +25,7 @@ class ProducerData[K, V](private val topic: String,
def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d)
- def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d)))
+ def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d).asJava)
def getTopic: String = topic
diff --git a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
index 9e95b1cb555fd..87cf706695b69 100644
--- a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
@@ -159,12 +159,20 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
private def getZKTopicPartitionInfo(): collection.mutable.Map[String, SortedSet[Partition]] = {
val brokerPartitionsPerTopic = new HashMap[String, SortedSet[Partition]]()
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
+
+ println("Broker Topic Path => " + ZkUtils.BrokerTopicsPath)
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+
topics.foreach { topic =>
// find the number of broker partitions registered for this topic
val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
- val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
+
+ val numPartitions = brokerList.map{bid =>
+ val x = ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid)
+ if (x == "") 0 else bid.toInt
+ }
+
val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
@@ -219,11 +227,13 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
zkWatcherLock synchronized {
trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString)
- import scala.collection.JavaConversions._
+ //import scala.collection.JavaConversions._
+ import collection.JavaConverters._
parentPath match {
case "/brokers/topics" => // this is a watcher for /broker/topics path
- val updatedTopics = asBuffer(curChilds)
+ //val updatedTopics = asBuffer(curChilds)
+ val updatedTopics = curChilds.asScala.toBuffer
debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
curChilds.toString)
debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
@@ -240,14 +250,15 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
case "/brokers/ids" => // this is a watcher for /broker/ids path
debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
"\t Currently registered list of brokers -> " + curChilds.toString)
- processBrokerChange(parentPath, curChilds)
+ processBrokerChange(parentPath, curChilds.asScala)
case _ =>
val pathSplits = parentPath.split("/")
val topic = pathSplits.last
if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
" list of brokers -> " + curChilds.toString + " for topic -> " + topic)
- processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
+ //processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
+ processNewBrokerInExistingTopic(topic, curChilds.asScala.toBuffer)
}
}
@@ -259,8 +270,9 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
def processBrokerChange(parentPath: String, curChilds: Seq[String]) {
if(parentPath.equals(ZkUtils.BrokerIdsPath)) {
- import scala.collection.JavaConversions._
- val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
+ //import scala.collection.JavaConversions._
+ import collection.JavaConversions._
+ val updatedBrokerList = curChilds.map(bid => bid.toInt).toBuffer
val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
newBrokers.foreach { bid =>
@@ -374,7 +386,5 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
// there is no need to re-register other listeners as they are listening on the child changes of
// permanent nodes
}
-
}
-
}
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index dbfa1d83afd88..0faf1fc5ec096 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -23,7 +23,8 @@ import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
import joptsimple.OptionParser
-import scala.collection.JavaConversions._
+//import scala.collection.JavaConversions._
+import collection.JavaConverters._
import scala.collection.mutable
import scala.math._
@@ -74,10 +75,11 @@ object JmxTool {
val queries: Iterable[ObjectName] =
if(options.has(objectNameOpt))
- options.valuesOf(objectNameOpt).map(new ObjectName(_))
+ options.valuesOf(objectNameOpt).asScala.map(new ObjectName(_))
else
List(null)
- val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten
+ //val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null)).flatten
+ val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null).asScala).flatten
val attributes: Iterable[(ObjectName, Array[String])] =
names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName)))
@@ -99,7 +101,8 @@ object JmxTool {
var attributes = new mutable.HashMap[String, Any]()
for(name <- names) {
val mbean = mbsc.getMBeanInfo(name)
- for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
+ //for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName))) {
+ for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) {
val attr = attrObj.asInstanceOf[Attribute]
attributes(name + ":" + attr.getName) = attr.getValue
}
@@ -107,4 +110,4 @@ object JmxTool {
attributes
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala
index 28269eb037109..11f942e1e96be 100644
--- a/core/src/main/scala/kafka/utils/Annotations.scala
+++ b/core/src/main/scala/kafka/utils/Annotations.scala
@@ -17,6 +17,8 @@
package kafka.utils
+import annotation.StaticAnnotation
+
/* Some helpful annotations */
/**
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index d62fa77164f42..de3545d1e283f 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -19,7 +19,10 @@ package kafka.utils
import java.util.ArrayList
import java.util.concurrent._
-import collection.JavaConversions
+import java.util
+
+//import collection.JavaConversions
+import collection.JavaConverters._
class Pool[K,V] extends Iterable[(K, V)] {
@@ -27,8 +30,7 @@ class Pool[K,V] extends Iterable[(K, V)] {
def this(m: collection.Map[K, V]) {
this()
- for((k,v) <- m.elements)
- pool.put(k, v)
+ m map { case (k,v) => pool.put(k, v)}
}
def put(k: K, v: V) = pool.put(k, v)
@@ -41,10 +43,10 @@ class Pool[K,V] extends Iterable[(K, V)] {
def remove(key: K): V = pool.remove(key)
- def keys = JavaConversions.asSet(pool.keySet())
+ def keys = pool.keySet.asScala.toSet
- def values: Iterable[V] =
- JavaConversions.asIterable(new ArrayList[V](pool.values()))
+ def values: Iterable[V] = new util.ArrayList[V](pool.values).asScala
+ //JavaConversions.asIterable(new ArrayList[V](pool.values()))
def clear: Unit = pool.clear()
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index caddb06bea1b4..984f5c3176fd8 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -173,19 +173,14 @@ object ZkUtils extends Logging {
}
def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
- import scala.collection.JavaConversions._
+ import scala.collection.JavaConverters._
// triggers implicit conversion from java list to scala Seq
- var ret: java.util.List[String] = null
- try {
- ret = client.getChildren(path)
- }
+ try { client.getChildren(path).asScala.toSeq }
catch {
- case e: ZkNoNodeException =>
- return Nil
- case e2 => throw e2
+ case e: ZkNoNodeException => Nil
+ case e2 => throw e2
}
- return ret
}
/**
diff --git a/project/Build.scala b/project/Build.scala
new file mode 100644
index 0000000000000..6811511379bb8
--- /dev/null
+++ b/project/Build.scala
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import sbt._
+import Keys._
+import java.io.File
+
+import scala.xml.{Node, Elem}
+import scala.xml.transform.{RewriteRule, RuleTransformer}
+
+object KafkaBuild extends Build {
+ val commonSettings = Seq(
+ version := "0.7.2FF",
+ organization := "org.apache",
+ scalacOptions ++= Seq("-deprecation", "-unchecked"),
+ scalaVersion := "2.10.0",
+ javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"),
+ parallelExecution in Test := false, // Prevent tests from overrunning each other
+ libraryDependencies ++= Seq(
+ "org.scalatest" %% "scalatest" % "1.9" % "test",
+ "log4j" % "log4j" % "1.2.15",
+ "net.sf.jopt-simple" % "jopt-simple" % "3.2",
+ "org.slf4j" % "slf4j-simple" % "1.6.4",
+ "org.scala-lang" % "scala-actors" % "2.10.0",
+ "org.xerial.snappy" % "snappy-java" % "1.0.5-M3"
+ ),
+ // The issue is going from log4j 1.2.14 to 1.2.15, the developers added some features which required
+ // some dependencies on various sun and javax packages.
+ ivyXML :=
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ )
+
+ val hadoopSettings = Seq(
+ javacOptions ++= Seq("-Xlint:deprecation"),
+ libraryDependencies ++= Seq(
+ "org.apache.avro" % "avro" % "1.4.0",
+ "org.apache.pig" % "pig" % "0.8.0",
+ "commons-logging" % "commons-logging" % "1.0.4",
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.5.5",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.5.5",
+ "org.apache.hadoop" % "hadoop-core" % "0.20.2"
+ ),
+ ivyXML :=
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ )
+
+ val coreSettings = Seq(
+ pomPostProcess := { (pom: Node) => MetricsDepAdder(ZkClientDepAdder(pom)) }
+ )
+
+ val runRat = TaskKey[Unit]("run-rat-task", "Runs Apache rat on Kafka")
+ val runRatTask = runRat := {
+ "bin/run-rat.sh" !
+ }
+
+ lazy val kafka = Project(id = "Kafka", base = file(".")).aggregate(core, examples, contrib, perf).settings((commonSettings ++ runRatTask): _*)
+ lazy val core = Project(id = "core", base = file("core")).settings(commonSettings: _*).settings(coreSettings: _*)
+ lazy val examples = Project(id = "java-examples", base = file("examples")).settings(commonSettings :_*) dependsOn (core)
+ lazy val perf = Project(id = "perf", base = file("perf")).settings((Seq(name := "kafka-perf") ++ commonSettings):_*) dependsOn (core)
+
+ lazy val contrib = Project(id = "contrib", base = file("contrib")).aggregate(hadoopProducer, hadoopConsumer).settings(commonSettings :_*)
+ lazy val hadoopProducer = Project(id = "hadoop-producer", base = file("contrib/hadoop-producer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
+ lazy val hadoopConsumer = Project(id = "hadoop-consumer", base = file("contrib/hadoop-consumer")).settings(hadoopSettings ++ commonSettings: _*) dependsOn (core)
+
+
+ // POM Tweaking for core:
+ def zkClientDep =
+
+ zkclient
+ zkclient
+ 20120522
+ compile
+
+
+ def metricsDeps =
+
+
+ com.yammer.metrics
+ metrics-core
+ 3.0.0-c0c8be71
+ compile
+
+
+ com.yammer.metrics
+ metrics-annotations
+ 3.0.0-c0c8be71
+ compile
+
+
+
+ object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
+ override def transform(node: Node): Seq[Node] = node match {
+ case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
+ Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
+ }
+ case other => other
+ }
+ })
+
+ object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
+ override def transform(node: Node): Seq[Node] = node match {
+ case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
+ Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*)
+ }
+ case other => other
+ }
+ })
+}
diff --git a/project/build.properties b/project/build.properties
index e880cadf38dd7..a7365cf88147e 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -14,11 +14,4 @@
# limitations under the License.
#Project properties
#Mon Feb 28 11:55:49 PST 2011
-project.name=Kafka
-sbt.version=0.7.5
-project.version=0.7.2
-build.scala.versions=2.8.0
-contrib.root.dir=contrib
-lib.dir=lib
-target.dir=target/scala_2.8.0
-dist.dir=dist
+sbt.version=0.12.2
diff --git a/project/build/KafkaProject.scala b/project/build/KafkaProject.scala
index d0b52cf29be26..48d1930a75db2 100644
--- a/project/build/KafkaProject.scala
+++ b/project/build/KafkaProject.scala
@@ -16,10 +16,14 @@
*/
import sbt._
-import scala.xml.{Node, Elem, NodeSeq}
+import scala.xml.{Node, Elem}
import scala.xml.transform.{RewriteRule, RuleTransformer}
class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject {
+ override def managedStyle = ManagedStyle.Maven
+ val publishTo = "Maven Repo" at "http://maven/content/repositories/repository.snapshots"
+ Credentials(Path.userHome / ".m2" / ".credentials", log)
+
lazy val core = project("core", "core-kafka", new CoreKafkaProject(_))
lazy val examples = project("examples", "java-examples", new KafkaExamplesProject(_), core)
lazy val contrib = project("contrib", "contrib", new ContribProject(_))
@@ -56,11 +60,55 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
-
-
- override def artifactID = "kafka"
+ def zkClientDep =
+
+ com.101tec
+ zkclient
+ 0.2
+ compile
+
+
+ def metricsDepsCore =
+
+ com.yammer.metrics
+ metrics-core
+ 3.0.0-SNAPSHOT
+ compile
+
+
+ def metricsDepsAnnotations =
+
+ com.yammer.metrics
+ metrics-annotation
+ 3.0.0-SNAPSHOT
+ compile
+
+
+ object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
+ override def transform(node: Node): Seq[Node] = node match {
+ case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
+ Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
+ }
+ case other => other
+ }
+ })
+
+ object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
+ override def transform(node: Node): Seq[Node] = node match {
+ case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
+ Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDepsCore ++ metricsDepsAnnotations:_*)
+ }
+ case other => other
+ }
+ })
+
+ override def pomPostProcess(pom: Node): Node = {
+ MetricsDepAdder(ZkClientDepAdder(pom))
+ }
+
+ override def organization = "org.apache"
override def filterScalaJars = false
// build the executable jar's classpath.
@@ -118,7 +166,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
override def javaCompileOptions = super.javaCompileOptions ++
List(JavaCompileOption("-source"), JavaCompileOption("1.5"))
- override def packageAction = super.packageAction dependsOn (testCompileAction)
+ override def packageAction = super.packageAction dependsOn (testCompileAction, packageTestAction)
}
@@ -232,6 +280,7 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
trait CoreDependencies {
val log4j = "log4j" % "log4j" % "1.2.15"
val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
+ val slf4jSimple = "org.slf4j" % "slf4j-simple" % "1.6.4"
}
trait HadoopDependencies {
@@ -245,5 +294,4 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje
trait CompressionDependencies {
val snappy = "org.xerial.snappy" % "snappy-java" % "1.0.4.1"
}
-
}
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000000000..48d44c81f44d8
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1,5 @@
+resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
+
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
+
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
diff --git a/project/plugins/Plugins.scala b/project/plugins/Plugins.scala
deleted file mode 100644
index 0777d82bd3590..0000000000000
--- a/project/plugins/Plugins.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import sbt._
-
-class Plugins(info: ProjectInfo) extends PluginDefinition(info) {
- val repo = "GH-pages repo" at "http://mpeltonen.github.com/maven/"
- val idea = "com.github.mpeltonen" % "sbt-idea-plugin" % "0.1-SNAPSHOT"
-}
diff --git a/s2.10/akka-actors.jar b/s2.10/akka-actors.jar
new file mode 100644
index 0000000000000..cf00634996a02
Binary files /dev/null and b/s2.10/akka-actors.jar differ
diff --git a/s2.10/jline.jar b/s2.10/jline.jar
new file mode 100644
index 0000000000000..b1f60702dcbf7
Binary files /dev/null and b/s2.10/jline.jar differ
diff --git a/s2.10/scala-actors-migration.jar b/s2.10/scala-actors-migration.jar
new file mode 100644
index 0000000000000..26a5e4e39b25e
Binary files /dev/null and b/s2.10/scala-actors-migration.jar differ
diff --git a/s2.10/scala-actors.jar b/s2.10/scala-actors.jar
new file mode 100644
index 0000000000000..2af3953cc7f28
Binary files /dev/null and b/s2.10/scala-actors.jar differ
diff --git a/s2.10/scala-compiler.jar b/s2.10/scala-compiler.jar
new file mode 100644
index 0000000000000..e3aab0ddca34e
Binary files /dev/null and b/s2.10/scala-compiler.jar differ
diff --git a/s2.10/scala-library.jar b/s2.10/scala-library.jar
new file mode 100644
index 0000000000000..bfc06abb066a2
Binary files /dev/null and b/s2.10/scala-library.jar differ
diff --git a/s2.10/scala-partest.jar b/s2.10/scala-partest.jar
new file mode 100644
index 0000000000000..c9d44e06d630e
Binary files /dev/null and b/s2.10/scala-partest.jar differ
diff --git a/s2.10/scala-reflect.jar b/s2.10/scala-reflect.jar
new file mode 100644
index 0000000000000..92cfbab488685
Binary files /dev/null and b/s2.10/scala-reflect.jar differ
diff --git a/s2.10/scala-swing.jar b/s2.10/scala-swing.jar
new file mode 100644
index 0000000000000..48cb049d195f5
Binary files /dev/null and b/s2.10/scala-swing.jar differ
diff --git a/s2.10/scalap.jar b/s2.10/scalap.jar
new file mode 100644
index 0000000000000..33428aaa9a3d6
Binary files /dev/null and b/s2.10/scalap.jar differ
diff --git a/s2.10/typesafe-config.jar b/s2.10/typesafe-config.jar
new file mode 100644
index 0000000000000..49ead9baddfd5
Binary files /dev/null and b/s2.10/typesafe-config.jar differ