diff --git a/fang/images/bg.jpg b/fang/images/bg.jpg deleted file mode 100644 index 66cb5c9..0000000 Binary files a/fang/images/bg.jpg and /dev/null differ diff --git a/fang/index.html b/fang/index.html deleted file mode 100644 index 90e2f4b..0000000 --- a/fang/index.html +++ /dev/null @@ -1,52 +0,0 @@ - - - - - Congratulations! - - - -
-

Congratulations!

-

For your magical puzzle-solving abilities, you are cordially invited to a fireside chat with Reid Hoffman, founder of LinkedIn.

- » RSVP « -
- - \ No newline at end of file diff --git a/includes/project_list.php b/includes/project_list.php index 9cc4af4..dc4c36f 100644 --- a/includes/project_list.php +++ b/includes/project_list.php @@ -3,7 +3,7 @@ decomposer · norbert · voldemort · -kafka · +kafka · kamikaze · krati · sensei · diff --git a/kafka/configuration.php b/kafka/configuration.php index 9163b21..9b49ea1 100644 --- a/kafka/configuration.php +++ b/kafka/configuration.php @@ -1,25 +1,4 @@ - - - - -

Configuration

- -

Important configuration properties for Kafka broker:

- -

More details about server configuration can be found in the scala class kafka.server.KafkaConfig.

- - - -

Important configuration properties for the high-level consumer:

- -

More details about consumer configuration can be found in the scala class kafka.consumer.ConsumerConfig.

- - - -

Important configuration properties for the producer:

- -

More details about producer configuration can be found in the scala class kafka.producer.ProducerConfig.

- - - - + \ No newline at end of file diff --git a/kafka/contact.php b/kafka/contact.php new file mode 100644 index 0000000..37e0466 --- /dev/null +++ b/kafka/contact.php @@ -0,0 +1,4 @@ + \ No newline at end of file diff --git a/kafka/design.php b/kafka/design.php index d452ee2..437ebf7 100644 --- a/kafka/design.php +++ b/kafka/design.php @@ -1,570 +1,4 @@ - - - - -

Why we built this

- -

-Kafka is a messaging system that was built at LinkedIn to serve as the foundation for our activity stream processing. -

- -

-Activity stream data is a normal part of any website for reporting on usage of the site. Activity data is things like page views, information about what content was shown, searches, etc. This kind of thing is usually handled by logging the activity out to some kind of file and then periodically aggregating these files for analysis. -

- -

-In recent years, however, activity data has become a critical part of the production features of websites, and a slightly more sophisticated set of infrastructure is needed. - -

Use cases for activity stream data

- -

-

Characteristics of activity stream data

-

-This high-throughput stream of immutable activity data represents a real computational challenge as the volume may easily be 10x or 100x larger than the next largest data source on a site. -

-

Traditional log file aggregation is a respectable and scalable approach to supporting offline use cases like reporting or batch processing; but is too high latency for real-time processing and tends to have rather high operational complexity. On the other hand, existing messaging and queuing systems are okay for real-time and near-real-time use-cases, but handle large unconsumed queues very poorly often treating persistence as an after thought. This creates problems for feeding the data to offline systems like Hadoop that may only consume some sources once per hour or per day. Kafka is intended to be a single queuing platform that can support both offline and online use cases. -

-

-Kafka supports fairly general messaging semantics. Nothing ties it to activity processing, though that was our motivating use case. -

- -

Deployment

- -

-The following diagram gives a simplified view of the deployment topology at LinkedIn. -

- - - -

-Note that a single kafka cluster handles all activity data from all different sources. This provides a single pipeline of data for both online and offline consumers. This tier acts as a buffer between live activity and asynchronous processing. We also use kafka to replicate all data to a different datacenter for offline consumption. -

- -

Major Design Elements

- -

-There is a small number of major design decisions that make Kafka different from most other messaging systems: -

    -
  1. Kafka is designed for persistent messages as the common case
  2. -
  3. Throughput rather than features are the primary design constraint
  4. -
  5. State about what has been consumed is maintained as part of the consumer not the server
  6. -
  7. Kafka is explicitly distributed. It is assumed that producers, brokers, and consumers are all spread over multiple machines.
  8. -
-

- -

-Each of these decisions will be discussed in more detail below. -

- -

Basics

-

-First some basic terminology and concepts. -

-

-Messages are the fundamental unit of communication. Messages are published to a topic by a producer which means they are physically sent to a server acting as a broker (probably another machine). Some number of consumers subscribe to a topic, and each published message is delivered to all the consumers. -

-

-Kafka is explicitly distributed—producers, consumers, and brokers can all be run on a cluster of machines that co-operate as a logical group. This happens fairly naturally for brokers and producers, but consumers require some particular support. Each consumer process belongs to a consumer group and each message is delivered to exactly one process within every consumer group. Hence a consumer group allows many processes or machines to logically act as a single consumer. The concept of consumer group is very powerful and can be used to support the semantics of either a queue or topic as found in JMS. To support queue semantics, we can put all consumers in a single consumer group, in which case each message will go to a single consumer. To support topic semantics, each consumer is put in its own consumer group, and then all consumers will receive each message. A more common case in our own usage is that we have multiple logical consumer groups, each consisting of a cluster of consuming machines that act as a logical whole. Kafka has the added benefit in the case of large data that no matter how many consumers a topic has, a message is stored only a single time. -

- -

Message Persistence and Caching

- -

Don't fear the filesystem!

-

-Kafka relies heavily on the filesystem for storing and caching messages. There is a general perception that "disks are slow" which makes people skeptical that a persistent structure can offer competitive performance. In fact disks are both much slower and much faster than people expect depending on how they are used; and a properly designed disk structure can often be as fast as the network. -

- -

-The key fact about disk performance is that the throughput of hard drives has been diverging from the latency of a disk seek for the last decade. As a result the performance of linear writes on a 6 7200rpm SATA RAID-5 array is about 300MB/sec but the performance of random writes is only about 50k/sec—a difference of nearly 10000X. These linear reads and writes are the most predictable of all usage patterns, and hence the one detected and optimized best by the operating system using read-ahead and write-behind techniques that prefetch data in large block multiples and group smaller logical writes into large physical writes. A further discussion of this issue can be found in this ACM Queue article; they actually find that sequential disk access can in some cases be faster than random memory access! -

-

-To compensate for this performance divergence modern operating systems have become increasingly aggressive in their use of main memory for disk caching. Any modern OS will happily divert all free memory to disk caching with little performance penalty when the memory is reclaimed. All disk reads and writes will go through this unified cache. This feature cannot easily be turned off without using direct I/O, so even if a process maintains an in-process cache of the data, this data will likely be duplicated in OS pagecache, effectively storing everything twice. -

-

-Furthermore we are building on top of the JVM, and anyone who has spent any time with Java memory usage knows two things: -

    -
  1. The memory overhead of objects is very high, often doubling the size of the data stored (or worse).
  2. -
  3. Java garbage collection becomes increasingly sketchy and expensive as the in-heap data increases.
  4. -
-

-

-As a result of these factors using the filesystem and relying on pagecache is superior to maintaining an in-memory cache or other structure—we at least double the available cache by having automatic access to all free memory, and likely double again by storing a compact byte structure rather than individual objects. Doing so will result in a cache of up to 28-30GB on a 32GB machine without GC penalties. Furthermore this cache will stay warm even if the service is restarted, whereas the in-process cache will need to be rebuilt in memory (which for a 10GB cache may take 10 minutes) or else it will need to start with a completely cold cache (which likely means terrible initial performance). It also greatly simplifies the code as all logic for maintaining coherency between the cache and filesystem is now in the OS, which tends to do so more efficiently and more correctly than one-off in-process attempts. If your disk usage favors linear reads then read-ahead is effectively pre-populating this cache with useful data on each disk read. -

-

-This suggests a design which is very simple: rather than maintain as much as possible in-memory and flush to the filesystem only when necessary, we invert that. All data is immediately written to a persistent log on the filesystem without any call to flush the data. In effect this just means that it is transferred into the kernel's pagecache where the OS can flush it later. Then we add a configuration driven flush policy to allow the user of the system to control how often data is flushed to the physical disk (every N messages or every M seconds) to put a bound on the amount of data "at risk" in the event of a hard crash. -

-

-This style of pagecache-centric design is described in an article on the design of Varnish here (along with a healthy helping of arrogance). -

-

Constant Time Suffices

-

-The persistent data structure used in messaging systems metadata is often a BTree. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with actual physical disk operations, the observed performance of tree structures is often superlinear. Furthermore BTrees require a very sophisticated page or row locking implementation to avoid locking the entire tree on each operation. The implementation must pay a fairly high price for row-locking or else effectively serialize all reads. Because of the heavy reliance on disk seeks it is not possible to effectively take advantage of the improvements in drive density, and one is forced to use small (< 100GB) high RPM SAS drives to maintain a sane ratio of data to seek capacity. -

-

-Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. Though this structure would not support the rich semantics of a BTree implementation, but it has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size--one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance, these drives often have comparable performance for large reads and writes at 1/3 the price and 3x the capacity. -

-

-Having access to virtually unlimited disk space without penalty means that we can provide some features not usually found in a messaging system. For example, in kafka, instead of deleting a message immediately after consumption, we can retain messages for a relative long period (say a week). -

- -

Maximizing Efficiency

- -

-Our assumption is that the volume of messages is extremely high, indeed it is some multiple of the total number of page views for the site (since a page view is one of the activities we process). Furthermore we assume each message published is read at least once (and often multiple times), hence we optimize for consumption rather than production. -

-

-There are two common causes of inefficiency: two many network requests, and excessive byte copying. -

-

-To encourage efficiency, the APIs are built around a "message set" abstraction that naturally groups messages. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. -

-

The MessageSet implementation is itself a very thin API that wraps a byte array or file. Hence there is no separate serialization or deserialization step required for message processing, message fields are lazily deserialized as needed (or not deserialized if not needed). -

-

-The message log maintained by the broker is itself just a directory of message sets that have been written to disk. This abstraction allows a single byte format to be shared by both the broker and the consumer (and to some degree the producer, though producer messages are checksumed and validated before being added to the log). -

-

-Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call. Java provides access to this system call with the FileChannel.transferTo api. -

-

-To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket: -

    -
  1. The operating system reads data from the disk into pagecache in kernel space
  2. -
  3. The application reads the data from kernel space into a user-space buffer
  4. -
  5. The application writes the data back into kernel space into a socket buffer
  6. -
  7. The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network
  8. -
-

-

-This is clearly inefficient, there are four copies, two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed. -

-

-We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection. -

-

-For more background on the sendfile and zero-copy support in Java, see this article on IBM developerworks. -

- -

Consumer state

- -

-Keeping track of what has been consumed is one of the key things a messaging system must provide. It is not intuitive, but recording this state is one of the key performance points for the system. State tracking requires updating a persistent entity and potentially causes random accesses. Hence it is likely to be bound by the seek time of the storage system not the write bandwidth (as described above). -

-

-Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker records that fact locally. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else it could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice--since the broker knows what is consumed it can immediately delete it, keeping the data size small. -

-

-What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) then that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems. First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice. The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed). Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged. -

-

Message delivery semantics

-

-So clearly there are multiple possible message delivery guarantees that could be provided: -

-

-

-This problem is heavily studied, and is a variation of the "transaction commit" problem. Algorithms that provide exactly once semantics exist, two- or three-phase commits and Paxos variants being examples, but they come with some drawbacks. They typically require multiple round trips and may have poor guarantees of liveness (they can halt indefinitely). The FLP result provides some of the fundamental limitations on these algorithms. -

-

-Kafka does two unusual things with respect to metadata. First the stream is partitioned on the brokers into a set of distinct partitions. The semantic meaning of these partitions is left up to the producer and the producer specifies which partition a message belongs to. Within a partition messages are stored in the order in which they arrive at the broker, and will be given out to consumers in that same order. This means that rather than store metadata for each message (marking it as consumed, say), we just need to store the "high water mark" for each combination of consumer, topic, and partition. Hence the total metadata required to summarize the state of the consumer is actually quite small. In Kafka we refer to this high-water mark as "the offset" for reasons that will become clear in the implementation section. -

-

Consumer state

-

-Kafka also maintains this state about what has been consumed to the client. This provides an easy out for some simple cases, and has a few side benefits. In the simplest cases the consumer may simply be entering some aggregate value into a centralized, transactional OLTP database. In this case the consumer can store the state of what is consumed in the same transaction as the database modification. This solves a distributed consensus problem, by removing the distributed part! A similar trick works for some non-transactional systems as well. A search system can store its consumer state with its index segments. Though it may provide no durability guarantees, this means that the index is always in sync with the consumer state: if an unflushed index segment is lost in a crash, the indexes can always resume consumption from the latest checkpointed offset. Likewise our Hadoop load job which does parallel loads from Kafka, does a similar trick. Individual mappers write the offset of the last consumed message to HDFS at the end of the map task. If a job fails and gets restarted, each mapper simply restarts from the offsets stored in HDFS. -

-

-There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed. -

-

Push vs. pull

-

-A related question is whether consumers should pull data from brokers or brokers should push data to the subscriber. In this respect Kafka follows a more traditional design, shared by most messaging systems, where data is pushed to the broker from the producer and pulled from the broker by the consumer. Some recent systems, such as scribe and flume, focusing on log aggregation, follow a very different push based path where each node acts as a broker and data is pushed downstream. There are pros and cons to both approaches. However a push-based system has difficulty dealing with diverse consumers as the broker controls the rate at which data is transferred. The goal, is generally for the consumer to be able to consume at the maximum possible rate; unfortunately in a push system this means the consumer tends to be overwhelmed when its rate of consumption falls below the rate of production (a denial of service attack, in essence). A pull-based system has the nicer property that the consumer simply falls behind and catches up when it can. This can be mitigated with some kind of backoff protocol by which the consumer can indicate it is overwhelmed, but getting the rate of transfer to fully utilize (but never over-utilize) the consumer is trickier than it seems. Previous attempts at building systems in this fashion led us to go with a more traditional pull model. -

- -

Distribution

-

-Kafka is built to be run across a cluster of machines as the common case. Brokers and consumers co-ordinate through Zookeeper to discover topics and co-ordinate consumption. There is no central "master" node, instead brokers and consumers co-ordinate amongst one-another as homogenious set of peers. The set of machines in the cluster is fully elastic: brokers and consumers can both be added and removed at anytime without any manual configuration change. -

-

-Currently, there is no built-in load balancing between the producers and the brokers in Kafka; in our own usage we publish from a large number of heterogeneous machines and so it is desirable that the publisher not need any explicit knowledge of the cluster topology. We rely on a hardware load balancer to distribute the producer load across multiple brokers. We will consider adding this in a future release to allow semantic partitioning of messages (i.e. publishing all messages to a particular broker based on some id to ensure an ordered stream of updates within that id). -

-

-Kafka does have built-in load balancing between the consumers and the brokers. To achieve this co-ordination, each broker and each consumer register its state and maintains its metadata in Zookeeper. When there is a broker or a consumer change, each consumer is notified about the change through the zookeeper watcher. The consumer then reads the current information about all relevant brokers and consumers, and determines which brokers it should consume data from. -

-

-This kind of cluster-aware balancing of consumption has several advantages: -

-

- -

Support for Hadoop and other batch data load

- -

-Scalable persistence allows for the possibility of supporting batch data loads that periodically snapshot data into an offline system for batch processing. We make use of this for loading data into our data warehouse and Hadoop clusters. -

- -

-Batch processing happens in stages beginning with the data load stage and proceeding in an acyclic graph of processing and output stages (e.g. as supported here). An essential feature of support for this model is the ability to re-run the data load from a point in time (in case anything goes wrong). -

- -

-In the case of Hadoop we parallelize the data load by splitting the load over individual map tasks, one for each node/topic/partition combination, allowing full parallelism in the loading. Hadoop provides the task management, and tasks which fail can restart without danger of duplicate data. -

- -

Implementation Details

- -

-The following gives a brief description of some relevant lower-level implementation details for some parts of the system described in the above section. -

-

API Design

- -

Producer APIs

- -

Low-level API

- -The low level producer API is extremely basic, it maintains a connection to a particular broker, and allows sending an ordered set of messages to a particular topic and partition on that broker. - -
-class SyncProducer {
-	
-  /* Send a set of messages to a topic in a partition. */ 
-  public void send(String topic, int partition, ByteBufferMessageSet messages);
-  
-  /* Send a list of produce requests to a broker. */ 
-  public void multiSend(ProducerRequest produces[]);
-
-}
-
- -

High-level API

- -

-With release 0.6, we introduce a new Producer API that wraps the 2 low-level producer APIs - kafka.producer.SyncProducer and kafka.producer.async.AsyncProducer. The goal is to expose all the producer functionalities through a single API to the client. This feature introduces a cluster aware producer that semantically maps messages to kafka nodes and partitions. This allows partitioning the stream of messages with some semantic partition function based on some key in the message to spread them over broker machines—e.g. to ensure that all messages for a particular user go to a particular partition and hence appear in the same stream for the same consumer thread. The new producer - -

-

- -
-class Producer {
-	
-  /* Sends the data, partitioned by key to the topic using either the synchronous or the asynchronous producer */
-  public void send(kafka.javaapi.producer.ProducerData producerData);
-
-  /* Sends a list of data, partitioned by key to the topic using either the synchronous or the asynchronous producer */
-  public void send(java.util.List< kafka.javaapi.producer.ProducerData> producerData);
-
-  /* Closes the producer and cleans up */	
-  public void close();
-
-}
-
- -

-kafka.producer.Producer provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a blocking queue, until either buffer.time or batch.size is reached. A background thread (kafka.producer.async.ProducerSendThread) dequeues the batch of data and lets the kafka.producer.EventHandler serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. -

- -

-At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the kafka.producer.async.CallbackHandler interface and setting callback.handler config parameter to that class. -

- -

-A new producer feature added in release 0.6 is zookeeper based automatic broker discovery. By setting the zk.connect parameter, the producer connects to zookeeper to pull the list of all available brokers in the Kafka cluster. It also registers listeners on the zookeeper paths to invoke callbacks when - -

-

- -

-Each of the above events trigger callbacks that update the in-memory data structures to reflect the right set of brokers and the correct number of available partitions on each, for a particular topic. Based on this information, the producer maintains a pool of connections to each of the available brokers. When a producer request comes in, it uses the available producer connection to send the serialized, optionally batched data, and routes it to the appropriate broker partition. -

- -

-The routing decision is influenced by the kafka.producer.Partitioner. The partition API - -

-interface Partitioner<T> {
-   int partition(T key, int numPartitions);
-}
-
-uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is hash(key)%numPartitions. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the partitioner.class config parameter. -

- -

-For some applications, the dependence on zookeeper is inappropriate. In that case, the producer can take in a static list of brokers through the broker.partition.info config parameter. Each produce requests gets routed to a random broker partition in this case. If that broker is down, the produce request fails. -

- -

Consumer APIs

-

-We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose. -

-

-The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. -

- -

Low-level API

-
-class SimpleConsumer {
-	
-  /* Send fetch request to a broker and get back a set of messages. */ 
-  public ByteBufferMessageSet fetch(FetchRequest request);
-
-  /* Send a list of fetch requests to a broker and get back a response set. */ 
-  public MultiFetchResponse multifetch(List<FetchRequest> fetches);
-
-  /**
-   * Get a list of valid offsets (up to maxSize) before the given time.
-   * The result is a list of offsets, in descending order.
-   * @param time: time in millisecs,
-   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
-   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
-   */
-  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
-}
-
- -The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state. - -

High-level API

-
-
-/* create a connection to the cluster */ 
-ConsumerConnector connector = Consumer.create(consumerConfig);
-
-interface ConsumerConnector {
-	
-  /**
-   * This method is used to get a list of KafkaMessageStreams, which are iterators over topic.
-   *  Input: a map of <topic, #streams>
-   *  Output: a map of <topic, list of message streams>
-   *          Each message stream supports a message iterator.
-   */
-  public Map<String,List<KafkaMessageStream>> createMessageStreams(Map<String,Int> topicCountMap); 
-
-  /* Commit the offsets of all messages consumed so far. */
-  public commitOffsets()
-  
-  /* Shut down the connector */
-  public shutdown()
-}
-
-

-This API is centered around iterators, implemented by the KafkaMessageStream class. Each KafkaMessageStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream. -

-

-The create call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. To minimize this rebalancing the API encourages creating many topic streams in a single call. -

-

Network Layer

-

-The network layer is a fairly straight-forward NIO server, and will not be described in great detail. The sendfile implementation is done by giving the MessageSet interface a writeTo method. This allows the file-backed message set to use the more efficient transferTo implementation instead of an in-process buffered write. The threading model is a single acceptor thread and N processor threads which handle a fixed number of connections each. This design has been pretty thoroughly tested elsewhere and found to be simple to implement and fast. The protocol is kept quite simple to allow for future the implementation of clients in other languages. -

-

Messages

-

-Messages consist of a fixed-size header and variable length opaque byte array payload. The header contains a format version and a CRC32 checksum to detect corruption or truncation. Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The MessageSet interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO Channel. -

-

Log

-

-A log for a topic named "my_topic" with two partitions consists of two directories (namely my_topic_0 and my_topic_1) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer N storing the message length which is followed by the N message bytes. Each message is uniquely identified by a 64-bit integer offset giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly S bytes from the previous file where S is the max log file size given in the configuration. -

-

-The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transfered between producer, broker, and client without recopying or conversion when desirable. This format is as follows: -

-
-On-disk format of a message
-
-message length : 4 bytes (value: 1+4+n) 
-"magic" value  : 1 byte
-crc            : 4 bytes
-payload        : n bytes
-
-

-The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural—both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach. -

- -

Writes

-

-The log allows serial appends which always go to the last file. This file is rolled over to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration parameter M which gives the number of messages to write before forcing the OS to flush the file to disk, and S which gives a number of seconds after which a flush is forced. This gives a durability guarantee of losing at most M messages or S seconds of data in the event of a system crash. -

-

Reads

-

-Reads are done by giving the 64-bit logical offset of a message and an S-byte max chunk size. This will return an iterator over the messages contained in the S-byte buffer. S is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject messages larger than some size, and to give a bound to the client on the maximum it need ever read to get a complete message. It is likely that the read buffer ends with a partial message, this is easily detected by the size delimiting. -

-

-The actual process of reading from an offset requires first locating the log segment file in which the data is stored, calculating the file-specific offset from the global offset value, and then reading from that file offset. The search is done as a simple binary search variation against an in-memory range maintained for each file. -

-

-The log provides the capability of getting the most recently written message to allow clients to start subscribing as of "right now". This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existant offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case. -

- -

The following is the format of the results sent to the consumer. - -

-MessageSetSend (fetch result)
-
-total length     : 4 bytes
-error code       : 2 bytes
-message 1        : x bytes
-...
-message n        : x bytes
-
- -
-MultiMessageSetSend (multiFetch result)
-
-total length       : 4 bytes
-error code         : 2 bytes
-messageSetSend 1
-...
-messageSetSend n
-
- -

Deletes

-

-Data is deleted one log segment at a time. The log manager allows pluggable delete policies to choose which files are eligible for deletion. The current policy deletes any log with a modification time of more than N days ago, though a policy which retained the last N GB could also be useful. To avoid locking reads while still allowing deletes that modify the segment list we use a copy-on-write style segment list implementation that provides consistent views to allow a binary search to proceed on an immutable static snapshot view of the log segments while deletes are progressing. -

-

Guarantees

-

-The log provides a configuration parameter M which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corruption is detected the log is truncated to the last valid offset. -

-

-Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is not written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost). -

- -

Distribution

-

Zookeeper Directories

-

-The following gives the zookeeper structures and algorithms used for co-ordination between consumers and brokers. -

- -

Notation

-

-When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a zookeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". -

- -

Broker Node Registry

-
-/brokers/ids/[0...N] --> host:port (ephemeral node)
-
-

-This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error. -

-

-Since the broker registers itself in zookeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available). -

-

Broker Topic Registry

-
-/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
-
- -

-Each broker registers itself under the topics it maintains and stores the number of partitions for that topic. -

- -

Consumers and Consumer Groups

-

-Consumers of topics also register themselves in Zookeeper, in order to balance the consumption of data and track their offsets in each partition for each broker they consume from. -

- -

-Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. -For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to. -

- -

-The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group. -

- -

Consumer Id Registry

-

-In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory. -

-/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
-
-Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of <topic, #streams>. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies. -

- -

Consumer Offset Tracking

-

-Consumers track the maximum offset they have consumed in each partition. This value is stored in a zookeeper directory -

-
-/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
-
- -

Partition Owner registry

- -

-Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming. -

- -
-/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
-
- -

Broker node registration

- -

-The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker. -

- -

Consumer registration algorithm

- -

-When a consumer starts, it does the following: -

    -
  1. Register itself in the consumer id registry under its group. -
  2. -
  3. Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs. -
  4. -
  5. Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. Each change triggers rebalancing among all consumers in all consumer groups.
  6. -
  7. Force itself to rebalance within in its consumer group. -
  8. -
-

- -

Consumer rebalancing algorithm

-

-The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to. -

-

-Each consumer does the following during rebalancing: -

-
-   1. For each topic T that Ci subscribes to 
-   2.   let PT be all partitions producing topic T
-   3.   let CG be all consumers in the same group as Ci that consume topic T
-   4.   sort PT (so partitions on the same broker are clustered together)
-   5.   sort CG
-   6.   let i be the index position of Ci in CG and let N = size(PT)/size(CG)
-   7.   assign partitions from i*N to (i+1)*N - 1 to consumer Ci
-   8.   remove current entries owned by Ci from the partition owner registry
-   9.   add newly assigned partitions to the partition owner registry
-        (we may need to re-try this until the original partition owner releases its ownership)
-
-

-When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time. -

- - - + \ No newline at end of file diff --git a/kafka/downloads.php b/kafka/downloads.php new file mode 100644 index 0000000..8f5a09c --- /dev/null +++ b/kafka/downloads.php @@ -0,0 +1,23 @@ + + + + +

Downloads

+ + +

Release Notes

+ + +

API docs

+ diff --git a/kafka/faq.php b/kafka/faq.php index 747ccb1..c76544b 100644 --- a/kafka/faq.php +++ b/kafka/faq.php @@ -1,13 +1,4 @@ - - - - -

Frequently asked questions

-
    -
  1. Why does my consumer get InvalidMessageSizeException?

    -This typically means that the "fetch size" of the consumer is too small. Each time the consumer pulls data from the broker, it reads bytes up to a configured limit. If that limit is smaller than the largest single message stored in Kafka, the consumer can't decode the message properly and will throw an InvalidMessageSizeException. To fix this, increase the limit by setting the property "fetch.size" properly in config/consumer.properties. The default fetch.size is 300,000 bytes. - -
  2. On EC2, why can't my high-level consumers connect to the brokers?

    -When a broker starts up, it registers its host ip in ZK. The high-level consumer later uses the registered host ip to establish the socket connection to the broker. By default, the registered ip is given by InetAddress.getLocalHost.getHostAddress. Typically, this should return the real ip of the host. However, in EC2, the returned ip is an internal one and can't be connected to from outside. The solution is to explicitly set the host ip to be registered in ZK by setting the "hostname" property in server.properties. - -
+ diff --git a/kafka/includes/producer_config.php b/kafka/includes/producer_config.php index 9734277..2ebd37a 100644 --- a/kafka/includes/producer_config.php +++ b/kafka/includes/producer_config.php @@ -6,7 +6,7 @@ serializer.class - None. This is required + kafka.serializer.DefaultEncoder. This is a no-op encoder. The serialization of data to Message should be handled outside the Producer class that implements the kafka.serializer.Encoder<T> interface, used to encode data of type T into a Kafka message @@ -17,12 +17,12 @@ producer.type sync - this parameter specifies whether the messages are sent asynchronously or not. Valid values are - + this parameter specifies whether the messages are sent asynchronously or not. Valid values are - - broker.partition.info + broker.list null. Either this parameter or zk.connect needs to be specified by the user. - For bypassing zookeeper based auto partition discovery, use this config to pass in static broker and per-broker partition information. Format-brokerid1:host1:port1, brokerid2:host2:port2.. + For bypassing zookeeper based auto partition discovery, use this config to pass in static broker and per-broker partition information. Format-brokerid1:host1:port1, brokerid2:host2:port2. If you use this option, the partitioner.class will be ignored and each producer request will be routed to a random broker partition. @@ -79,6 +79,11 @@ 5000 the maximum time spent by kafka.producer.SyncProducer trying to connect to the kafka broker. Once it elapses, the producer throws an ERROR and stops. + + socket.timeout.ms + 30000 + The socket timeout in milliseconds + reconnect.interval 30000 diff --git a/kafka/includes/project_info.php b/kafka/includes/project_info.php index af935ae..c19ac61 100644 --- a/kafka/includes/project_info.php +++ b/kafka/includes/project_info.php @@ -16,17 +16,18 @@ $PROJ_FAVICON_MIME = "image/png"; /* Navigation links in the sidebar */ - $PROJ_NAV_LINKS = array("download" => "downloads/", + $PROJ_NAV_LINKS = array("download" => "downloads.php", "code" => "https://github.com/kafka-dev/kafka", - "quickstart" => "quickstart.php", - "design" => "design.php", - "configuration" => "configuration.php", - "performance" => "performance.php", - "current work" => "projects.php", + "javadoc" => "javadoc/current", + "quickstart" => "http://incubator.apache.org/kafka/quickstart.html", + "design" => "http://incubator.apache.org/kafka/design.html", + "configuration" => "http://incubator.apache.org/kafka/configuration.html", + "performance" => "http://incubator.apache.org/kafka/performance.html", + "projects" => "http://incubator.apache.org/kafka/projects.html", "faq" => "faq.php", - "wiki" => "http://linkedin.jira.com/wiki/display/KAFKA", - "bugs" => "http://linkedin.jira.com/browse/KAFKA", - "mailing list" => "http://groups.google.com/group/kafka-dev" + "bugs" => "https://issues.apache.org/jira/browse/KAFKA", + "mailing lists" => "http://incubator.apache.org/kafka/contact.html", + "unit tests" => "http://test.project-voldemort.com:8080/", ); /* Project color */ diff --git a/kafka/index.php b/kafka/index.php index f81da0e..71dad12 100644 --- a/kafka/index.php +++ b/kafka/index.php @@ -1,25 +1,4 @@ - - - - -

Kafka is a distributed publish/subscribe messaging system

-

-Kafka is a distributed publish-subscribe messaging system. It is designed to support the following -

- -Kafka is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing on a consumer-scale web site. This kind of activity (page views, searches, and other user actions) are a key ingredient in many of the social feature on the modern web. This data is typically handled by "logging" and ad hoc log aggregation solutions due to the throughput requirements. This kind of ad hoc solution is a viable solution to providing logging data to an offline analysis system like Hadoop, but is very limiting for building real-time processing. Kafka aims to unify offline and online processing by providing a mechanism for parallel load into Hadoop as well as the ability to partition real-time consumption over a cluster of machines. -

- -

-The use for activity stream processing makes Kafka comparable to Facebook's Scribe or Cloudera's Flume, though the architecture and primitives are very different for these systems and make Kafka more comparable to a traditional messaging system. See our design page for more details. -

- -

-This is a new project, and we are interested in building the community; we would welcome any thoughts or patches. You can reach us here. - - \ No newline at end of file + diff --git a/kafka/performance.php b/kafka/performance.php index e638a39..6a026b5 100644 --- a/kafka/performance.php +++ b/kafka/performance.php @@ -1,87 +1,4 @@ - - - - -

Performance Results

-

The following tests give some basic information on Kafka throughput as the number of topics, consumers and producers and overall data size varies. Since Kafka nodes are independent, these tests are run with a single producer, consumer, and broker machine. Results can be extrapolated for a larger cluster. -

- -

-We run producer and consumer tests separately to isolate their performance. For the consumer these tests test cold performance, that is consuming a large uncached backlog of messages. Simultaneous production and consumption tends to help performance since the cache is hot. -

- -

We took below setting for some of the parameters:

- - - -In our performance tests, we run experiments to answer below questions. -

What is the producer throughput as a function of batch size?

-

We can push about 50MB/sec to the system. However, this number changes with the batch size. The below graphs show the relation between these two quantities.

-


- -

What is the consumer throughput?

-

According to our experiments, we can consume about 100M/sec from a broker and the total does not seem to change much as we increase -the number of consumer threads.

-

- -

Does data size effect our performance?

-


- -

What is the effect of the number of producer threads on producer throughput?

-

We are able to max out production with only a few threads.

-


- -

What is the effect of number of topics on producer throughput?

-

Based on our experiments, the number of topic has a minimal effect on the total data produced. -The below graph is an experiment where we used 40 producers and varied the number of topics

- -


- -

How to Run a Performance Test

- -

The performance related code is under perf folder. To run the simulator :

- -

 ../run-simulator.sh -kafkaServer=localhost -numTopic=10  -reportFile=report-html/data -time=15 -numConsumer=20 -numProducer=40 -xaxis=numTopic

- -

It will run a simulator with 40 producer and 20 consumer threads - producing/consuming from a local kafkaserver.  The simulator is going to - run 15 minutes and the results are going to be saved under - report-html/data

- -

and they will be plotted from there. Basically it will write MB of - data consumed/produced, number of messages consumed/produced given a - number of topic and report.html will plot the charts.

- - -

Other parameters include numParts, fetchSize, messageSize.

- -

In order to test how the number of topic affects the performance the below script can be used (it is under utl-bin)

- - - -

#!/bin/bash
- - for i in 1 10 20 30 40 50;
- - do
- -   ../kafka-server.sh server.properties 2>&1 >kafka.out&
- sleep 60
-  ../run-simulator.sh -kafkaServer=localhost -numTopic=$i  -reportFile=report-html/data -time=15 -numConsumer=20 -numProducer=40 -xaxis=numTopic
-  ../stop-server.sh
-  rm -rf /tmp/kafka-logs
- -  sleep 300
- - done

- - - -

The charts similar to above graphs can be plotted with report.html automatically.

- - + diff --git a/kafka/projects.php b/kafka/projects.php index db787ef..a01f133 100644 --- a/kafka/projects.php +++ b/kafka/projects.php @@ -1,94 +1,4 @@ - - - - -

Current Work

- -

- Below is a list of major projects we are currently pursuing. If you have thoughts on these or want to help, please let us know. -

- -

Rich Producer Interface

-

Done. Released with v0.6

-

-The current producer connects to a single broker and publishes all data there. This feature would add a higher-level api would allow a cluster aware producer which would semantically map messages to kafka nodes and partitions. This allows partitioning the stream of messages with some semantic partition function based on some key in the message to spread them over broker machines—e.g. to ensure that all messages for a particular user go to a particular partition and hence appear in the same stream for the same consumer thread. -

- -

Map/Reduce Support

- -

-Streams of messages are the natural building block for higher-level processing built by stringing a set of topics together with intermediate processing between them. This is unlikely to be exactly map/reduce as it appears in Hadoop, but we are currently working on providing the ability to semantically repartition streams of data to allow the equivalent of a "reducer". This provides capabilities along the lines of stream processing systems or vaguely a sort of "online map/reduce". This is closely related to the rich producer interface described above. -

- -

-Technically Hadoop map/reduce provides a number of facilities, namely -

- - -

-In Kafka the stream of messages is the natural analogue to files in HDFS. We do intend to provide interfaces for processing and partitioning similar to map/reduce, but we are currently not working on anything equivalent to the task management facilities of Hadoop. This is less important since consumers dynamically register themselves and claim a portion of the stream, and record their consumption progress. Because of this a naive task management solution need only copy the "task" code to each machine and start it which can easily be done with rsync and ssh. -

- -

Replication

- -

-Messages are currently written to a single broker with no replication between brokers. We would like to provide replication between brokers and expose options to the producer to block until a configurable number of replicas have acknowledged the message to allow the client to control the fault-tolerance semantics. -

- -

Compression

- -

-We have a patch that provides end-to-end message set compression from producer to broker and broker to consumer with no need for intervening decompression. We hope to add this feature soon. -

- -

Console Consumer

-

Done!

-

-The interaction with zookeeper and complexity of the elastic load balancing of consumers makes implementing the equivalent of the rich consumer interface outside of the JVM somewhat difficult (implementing the low-level fetch api is quite easy). A simple approach to this problem could work similar to Hadoop Streaming and simply provide a consumer which dumps to standard output in some user-controllable format. This can be piped to another program in any language which simply reads from standard input to receive the data from the stream. -

- -

Hadoop Consumer

- -

-We are currently working on refactoring the existing Hadoop consumer (which can be found under contrib/hadoop-consumer) to serve as an InputFormat, which seems to us a cleaner way to provide this functionality. -

- -

Project Ideas

- -

-Not all the projects are started yet. Below is a list of projects which would be great to have but haven't yet been started. Ping the mailing list if you are interested in working on any of these. -

- -

Other Languages

- -

-We offer a JVM-based client for production and consumption and also a rather primitive native python client. It would be great to improve this list. The lower-level protocols are well documented here and should be relatively easy to implement in any language that supports standard socket I/O. -

- -

Long Poll

- -

-The consumer currently uses a simple polling mechanism. The fetch request always returns immediately, yielding no data if no new messages have arrived, and using a simple backoff mechanism when there are no new messages to avoid to frequent requests to the broker. This is efficient enough, but the lowest possible latency of the consumer is given by the polling frequency. It would be nice to enhance the consumer API to allow an option in the fetch request to have the server block for a given period of time waiting for data to be available rather than immediately returning and then waiting to poll again. This would provide somewhat improved latency in the low-throughput case where the consumer is often waiting for a message to arrive. -

- -

Syslogd Producer

- -

-We currently have a custom producer and also a log4j appender to work for "logging"-type applications. Outside the java world, however, the standard for logging is syslogd. It would be great to have an asynchronous producer that worked with syslogd to support these kinds of applications. -

- -

Hierarchical Topics

- -

-Currently streams are divided into only two levels—topics and partitions. This is unnecessarily limited. We should add support for hierarchical topics and allow subscribing to an arbitrary subset of paths. For example one could have /events/clicks and /events/logins and one could subscribe to either of these alone or get the merged stream by subscribing to the parent directory /events. -

- -

-In this model, partitions are naturally just subtopics (for example /events/clicks/0 might be one partition). This reduces the conceptual weight of the system and adds some power. -

- - \ No newline at end of file + diff --git a/kafka/quickstart.php b/kafka/quickstart.php index 61f42bb..fe02aee 100644 --- a/kafka/quickstart.php +++ b/kafka/quickstart.php @@ -1,234 +1,4 @@ - - - - -

Quick Start

- -

Step 1: Download the code

- -Download a recent stable release. - -
-> tar xzf kafka-<VERSION>.tgz
-> cd kafka-<VERSION>
-
- -

Step 2: Start the server

- -Kafka brokers and consumers use this for co-ordination. -

-First start the zookeeper server. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node zookeeper instance. - -

-> bin/zookeeper-server-start.sh config/zookeeper.properties
-[2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties 
-...
-
- -Now start the Kafka server: -
-> bin/kafka-server-start.sh config/server.properties
-jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties 
-[2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager)
-[2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper)
-...
-
- -

Step 3: Send some messages

- -A toy producer script is available to send plain text messages. To use it, run the following command: - -
-> bin/kafka-producer-shell.sh --server kafka://localhost:9092 --topic test
-> hello
-sent: hello (14 bytes)
-> world
-sent: world (14 bytes)
-
- -

Step 5: Start a consumer

- -Start a toy consumer to dump out the messages you sent to the console: - -
-> bin/kafka-consumer-shell.sh --topic test --props config/consumer.properties
-Starting consumer...
-...
-consumed: hello
-consumed: world
-
- -If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal. - -

Step 6: Write some code

- -Below is some very simple examples of using Kafka for sending messages, more complete examples can be found in the Kafka source code in the examples/ directory. - -

Producer Code

- -
1. send() API
- -Using the sync producer is quite simple: - -
-Properties props = new Properties();
-props.put("host", "localhost");
-props.put("port", "9092");
-props.put("buffer.size", String.valueOf(64*1024));
-props.put("connect.timeout.ms", String.valueOf(30*1000));
-props.put("reconnect.interval", "1000");
-
-SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
-
-String topic = "test";
-int partition = 0;
-List messages = Arrays.asList(new Message("a message".getBytes()), 
-                                       new Message("another message".getBytes()),
-                                       new Message("a third message".getBytes()));
-producer.send(topic, partition, messages)
-
- -
2. Log4j appender
- -Data can also be produced to a Kafka server in the form of a log4j appender. In this way, minimal code needs to be written in order to send some data across to the Kafka server. -Here is an example of how to use the Kafka Log4j appender - - -Start by defining the Kafka appender in your log4j.properties file. -
-// define the kafka log4j appender config parameters
-log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
-// REQUIRED: set the hostname of the kafka server
-log4j.appender.KAFKA.Host=localhost
-// REQUIRED: set the port on which the Kafka server is listening for connections
-log4j.appender.KAFKA.Port=9092
-// REQUIRED: the topic under which the logger messages are to be posted
-log4j.appender.KAFKA.Topic=test-topic
-// the serializer to be used to turn an object into a Kafka message. Defaults to kafka.producer.DefaultStringEncoder
-log4j.appender.KAFKA.Serializer=kafka.test.AppenderStringSerializer
-// do not set the above KAFKA appender as the root appender
-log4j.rootLogger=INFO
-// set the logger for your package to be the KAFKA appender
-log4j.logger.test.package=INFO, KAFKA
-
- -Data can be sent using a log4j appender as follows - - -
-Logger logger = Logger.getLogger(classOf[KafkaLog4jAppender])    
-logger.info("test")
-
- -
3. New producer API
- -With release 0.6, we introduced a new producer API - kafka.producer.Producer<T>. The producer takes in a required config parameter serializer.class that specifies an Encoder<T> to convert T to a Kafka Message. Also, one of the following config parameters need to be specified - - - - -
-Properties props = new Properties();
-props.put(“serializer.class”, “kafka.test.TestEncoder”);
-props.put(“zk.connect”, “127.0.0.1:2181”);
-ProducerConfig config = new ProducerConfig(props);
-Producer producer = new Producer(config);
-
-class TestEncoder extends Encoder {
-  public Message toMessage(String event) { return new Message(event.getBytes); }
-}
-
- -

If you are using zookeeper based broker discovery, kafka.producer.Producer<T> can route your data to a particular broker partition based on a kafka.producer.Partitioner<T>, specified through the partitioner.class config parameter. It defaults to kafka.producer.DefaultPartitioner. If not, then it sends each request to a random broker partition.

- -The send API takes in the data to be sent through a kafka.producer.ProducerData<K, T> object, where K is the type of the key used by the Partitioner<T> and T is the type of data to send to the broker. In this example, the key and value type, both are String. - -

You can batch multiple messages and pass a java.util.List<T> as the last argument to the kafka.producer.ProducerData<K, T> object.

- -
-List messages = new java.util.ArrayList
-messages.add("test1")
-messages.add(“test2”)
-producer.send(new ProducerData(“test_topic”, "test_key", messages))
-
- -

You can also route the data to a random broker partition, by not specifying the key in the kafka.producer.ProducerData<K, T> object. The key defaults to null.

- -
-producer.send(new ProducerData(“test_topic”, messages))
-
- -

Finally, the producer should be closed, through

- -
producer.close();
- -

Consumer Code

- -The consumer code is slightly more complex as it enables multithreaded consumption: - -
-// specify some consumer properties
-Properties props = new Properties();
-props.put("zk.connect", "localhost:2181");
-props.put("zk.connectiontimeout.ms", "1000000");
-props.put("groupid", "test_group");
-
-// Create the connection to the cluster
-ConsumerConfig consumerConfig = new ConsumerConfig(props);
-ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
-
-// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
-Map<String, List<KafkaMessageStream>> topicMessageStreams = 
-    consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
-List<KafkaMessageStream> streams = topicMessageStreams.get("test")
-
-// create list of 4 threads to consume from each of the partitions 
-ExecutorService executor = Executors.newFixedThreadPool(4);
-
-// consume the messages in the threads
-for(final KafkaMessageStream>> stream: streams) {
-  executors.submit(new Runnable() {
-    public void run() {
-      for(Message message: stream) {
-        // process message
-      }	
-    }
-  });
-}
-
- -

Hadoop Consumer

- -

-Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers). -

- -

-Usage information on the hadoop consumer can be found here. -

- -

Simple Consumer

- -Kafka has a lower-level consumer api for reading message chunks directly from servers. Under most circumstances this should not be needed. But just in case, it's usage is as follows: - -
-// create a consumer to connect to the server host, port, socket timeout of 10 secs, socket receive buffer of ~1MB
-SimpleConsumer consumer = new SimpleConsumer(host, port, 10000, 1024000);
-
-long offset = 0;
-while (true) {
-  // create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
-  FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
-
-  // get the message set from the consumer and print them out
-  ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
-  for(message : messages) {
-    System.out.println("consumed: " + Utils.toString(message.payload, "UTF-8"))
-    // advance the offset after consuming each message
-    offset += MessageSet.entrySize(message);
-  }
-}
-
- - - + diff --git a/kamikaze/includes/project_info.php b/kamikaze/includes/project_info.php index a565293..d327e5e 100644 --- a/kamikaze/includes/project_info.php +++ b/kamikaze/includes/project_info.php @@ -24,6 +24,7 @@ "quick start" => "quickstart.php", "suggestion" => "suggestion.php", "releases" => "releases.php", + "license" => "license.php", "source" => "http://github.com/hyan/kamikaze", "wiki" => "http://linkedin.jira.com/wiki/display/KAMI/Home", "team" => "team.php", diff --git a/push-javadoc.sh b/push-javadoc.sh index 2e9da13..cfe3147 100755 --- a/push-javadoc.sh +++ b/push-javadoc.sh @@ -1,3 +1,10 @@ #!/bin/bash -rsync -r --progress --cvs-exclude --delete --archive /Users/jkreps/workspace/voldemort/docs/javadoc empathybox@empathybox.com:/home/empathybox/project-voldemort.com/ \ No newline at end of file +if [ $# -lt 1 ]; then + echo $0 location-of-javadoc + exit 1 +fi + +echo Pushing $1... + +rsync -r --progress --cvs-exclude --delete --archive $1 snateam@sna-projects.com:/home/empathybox/sna-projects.com/voldemort/javadoc diff --git a/sna/index.php b/sna/index.php index a0ab2b6..38e1874 100644 --- a/sna/index.php +++ b/sna/index.php @@ -75,6 +75,10 @@
  • Sam Shah – Building data products with Hadoop (O'Reilly Strata)
  • Pete Skomoroch – Distilling Data Exhaust: How to Surface Insights and Build Products (O'Reilly Strata)
  • Christian Posse – Operating Smartly in a Networked World (Keynote address, IEEE International Workshop on Business Applications of Social Network Analysis 2010)
  • +
  • Mitul Tiwari – Building Data + Products using Hadoop at LinkedIn (In Big Data and Cloud + Computing meetup, Aug 11, 2011.) @@ -110,85 +114,65 @@ function echo_person($name, $title, $profile, $img_path, $blurb) { ?> - - - +
    + + - - - - - - + + + + - - - + - - - + + - - - - - + + + + + - - - - + + - - - - + - + - - - + - - - - - - - + + + + - + - - - + + - + - - - + + - - - - + diff --git a/sna/media/data_products.pdf b/sna/media/data_products.pdf new file mode 100644 index 0000000..10b9329 Binary files /dev/null and b/sna/media/data_products.pdf differ diff --git a/voldemort/configuration.php b/voldemort/configuration.php index a90a1f5..a28a394 100644 --- a/voldemort/configuration.php +++ b/voldemort/configuration.php @@ -21,12 +21,20 @@

    Cluster configuration

    -

    Here is an example cluster.xml for a 2-node cluster with 8 data partitions:

    +

    Here is an example cluster.xml for a 2-node cluster with 8 data partitions. We also have optional 'zone' fields which allow you to map nodes to certain logical clusters ( datacenter, rack, etc ) called zones:

    	
       <cluster>
         <!-- The name is just to help users identify this cluster from the gui -->
         <name>mycluster</name>
    +    <zone>
    +      <zone-id>0</zone-id>
    +      <proximity-list>1</proximity-list>
    +    <zone>
    +    <zone>
    +      <zone-id>1</zone-id>
    +      <proximity-list>0</proximity-list>
    +    <zone>
         <server>
           <!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
           <id>0</id>
    @@ -36,6 +44,7 @@
           <admin-port>6667</admin-port>
           <!-- A list of data partitions assigned to this server -->
           <partitions>0,1,2,3</partitions>
    +      <zone-id>0</zone-id>
         </server>
         <server>
           <id>1</id>
    @@ -44,14 +53,22 @@
           <socket-port>6666</socket-port>
           <admin-port>6667</admin-port>
           <partitions>4,5,6,7</partitions>
    +      <zone-id>1</zone-id>
         </server>
       </cluster>
     
    +

    One thing that is important to understand is that partitions are not static partitions of servers, but rather they are a mechanism for partitioning the key space in such a way that each key is statically mapped to a particular data -partition. What this means is that a particular cluster may support many stores each with different replication factors—the replication factor is not hardcoded in the cluster design. This is important, since some data is more important than other data, and the correct trade-off between performance and consistency for one store may be different from another store. The number of data partitions cannot be changed. Online redistribution of data is not yet supported, but this provides the mechanism by which it will work when it is. Partitions will be moved to the new servers (or rebalanced between servers), but the total number of partitions will always remain the same, as will the mapping of key to partition. This means it is important to give a good number of partitions to start with. The script test/integration/generate_partitions.py will generate this part of the config for you. +partition. What this means is that a particular cluster may support many stores each with different replication factors—the replication factor is not hardcoded in the cluster design. This is important, since some data is more important than other data, and the correct trade-off between performance and consistency for one store may be different from another store. +

    +

    +Another important point to remember is that the number of data partitions cannot be changed. We do support an online redistribution ( rebalancing ) of partitions. In other words inclusion of new nodes results in moving ownership of partitions, but the total number of partitions will always remain the same, as will the mapping of key to partition. This means it is important to give a good number of partitions to start with. The script here will generate this part of the config for you. +

    +

    Note that the configuration is currently simple files so it is important that the data in cluster.xml and stores.xml be exactly the same on each server, and that the node ids and partitions not be changed, since that can mean that clients will think their data should be on node X when really it was stored on node Y. This limitation will be removed as the configuration is moved into voldemort itself. +

    Store configuration

    @@ -68,6 +85,7 @@ <required-writes>1</required-writes> <persistence>bdb</persistence> <routing>client</routing> + <routing-strategy>consistent-routing</routing-strategy> <key-serializer> <type>string</type> <schema-info>utf8</schema-info> @@ -92,7 +110,8 @@
  • preferred-writes(optional)—The number of successful writes the client attempts to block for before returning success. Defaults to required-writes
  • required-writes— The least number of writes that can succeed without the client getting back an exception.
  • persistence— The persistence backend used by the store. Currently this could be one of bdb, mysql, memory, readonly, and cache. The difference between cache and memory is that memory will throw and OutOfMemory exception if it grows larger than the JVM heap whereas cache will discard data.
  • -
  • routing— Determines the routing policy. Currently only client-side routing is fully supported. Server side routing will be coming soon, as will a few more interesting policies.
  • +
  • routing— Determines the routing policy. We support both client ( Client side routing ) and server ( Server side routing ).
  • +
  • routing-strategy— Determines how we store the replicas. Currently we support three routing-strategies - consistent-routing (default), zone-routing and all-routing.
  • key-serializer— The serialization type used for reading and writing keys. The type can be json, java-serialization, string, protobuff, thrift, or identity (meaning raw bytes). The schema-info gives information to the serializer about how to perform the mapping (e.g. the JSON schema described in here).
  • value-serializer— The serialization type used for reading and writing values. The supported types are the same as for keys. In the above example we also highlight the subelement 'compression' which currently supports 'gzip' and 'lzf' compression. The subelements are same as for the key-serializer, except that the the value serializer can have multiple schema-infos with different versions. The highest version is the one used for writing data, but data is always read with the version it was written with. This allows for gradual schema evolution. Versioning is only supported by the JSON serializer as other serialization formats have their own versioning systems. @@ -133,12 +152,42 @@ </value-serializer>
  • + +
  • retention-days (optional)— This optional parameter allows you to set a retention property to your data. Then every day, at a specified time on the servers, a scheduled job will be run to delete all data having timestamp > retention-days. This is useful to keep your data trimmed.
  • +
  • retention-scan-throttle-rate (optional)— If retention-days is specified this is the rate at which we'll scan the tuples to delete data.
  • + +

    +

    +If you intend to use the zone-routing strategy we need to extend the store definition to tell it how to replicate w.r.t. zones. Here is an example of a store definition with 'zone-routing' enabled. +

    +	<stores>
    +	    <store>
    +	        <name>test</name>
    +	        ...
    +                <routing-strategy>zone-routing</routing-strategy>
    +                <!-- This number should be total of individual zone-replication-factor's -->
    +                <replication-factor>2</replication-factor>
    +	        <zone-replication-factor>
    +	            <replication-factor zone-id="0">1</replication-factor>
    +	            <replication-factor zone-id="1">1</replication-factor>
    +	        </zone-replication-factor>
    +	        <zone-count-reads>0</zone-count-reads>
    +	        <zone-count-writes>0</zone-count-writes>
    +	        <hinted-handoff-strategy>proximity-handoff</hinted-handoff-strategy>
    +	        ... 
    +           </store>
    +	</stores>
    +
    + The important change here is the introduction of zone-replication-factor which should contain a replication factor that you would want in every zone. Other parameters : +

    Per-node configuration

    -

    Here is an example server.properties for node 0 in the cluster. Most properties have (hopefully) sane defaults and can be skipped. Here is a minimal server.properties:

    +

    We store per-node based configuration in the server.properties file. Most of the properties have sane defaults ( hopefully ). The bare minimal file should have the following property.

     	# The ID of *this* particular cluster node (different for each node in cluster)
    @@ -153,7 +202,7 @@
     
     

    The underlying key-value store is also important for configuration and operation management. If BDB is used then all configuration is done through the server.properties file. If MySQL is used then usual mysql administration must be done.

    -

    Oracle has a writeup that gives a good overview of the operational side of BDB.

    +

    Oracle has a writeup that gives a good overview of the operational side of BDB.

    Client configuration

    @@ -165,25 +214,33 @@

    JVM Settings

    -Since the Voldemort servers will likely have fairly large heap sizes, getting good JVM garbage collector settings is important. Here is what we use at LinkedIn, with some success: +At LinkedIn we maintain two sets of clusters, read-only and read-write. The read-write clusters are clusters using BDB stores and have totally different JVM characteristics from those using read-only stores. Here is what we use at LinkedIn for our read-write stores:
    -	# Min, max, total JVM size (-Xms -Xmx)
    -	JVM_SIZE="-server -Xms12g -Xmx12g"
    +	# Min, max, total JVM size 
    +	JVM_SIZE="-server -Xms22g -Xmx22g"
     
    -	# New Generation Sizes (-XX:NewSize -XX:MaxNewSize)
    +	# New Generation Sizes 
     	JVM_SIZE_NEW="-XX:NewSize=2048m -XX:MaxNewSize=2048m"
     
     	# Type of Garbage Collector to use
     	JVM_GC_TYPE="-XX:+UseConcMarkSweepGC -XX:+UseParNewGC"
     
     	# Tuning options for the above garbage collector
    -	JVM_GC_OPTS="-XX:CMSInitiatingOccupancyFraction=70"
    +	JVM_GC_OPTS="-XX:CMSInitiatingOccupancyFraction=70 -XX:SurvivorRatio=2"
     
    -	# JVM GC activity logging settings ($LOG_DIR set in the ctl script)
    -	JVM_GC_LOG="-XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:$LOG_DIR/gc.log"
    +	# JVM GC activity logging settings
    +	JVM_GC_LOG="-XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:$LOG_DIR/gc.log"
     
    -This setup was used with an 8GB BDB cache. There are two key things here: (1) BDB cache must fit in heap or else it won't work (obviously), (2) you must use the concurrent mark and sweep gc or else the GC pauses from collecting such a large heap will cause unresponsive periods (it doesn't happen at first either, it creeps up and then eventually goes into a spiral of gc pause death). +This is the setup on a 32GB RAM box with a BDB cache size of 10GB and 3 cleaner threads. There are two key things here: (1) BDB cache must fit in heap or else it won't work (obviously), (2) you must use the concurrent mark and sweep gc or else the GC pauses from collecting such a large heap will cause unresponsive periods (it doesn't happen at first either, it creeps up and then eventually goes into a spiral of gc pause death). +

    +For the read-only clusters we use the same JVM GC settings, except the heap size is set to a smaller value. +

    +
    +	# Min, max, total JVM size 
    +	JVM_SIZE="-server -Xms4096m -Xmx4096m"
    +
    +This is done because in the case of read-only stores we rely on the OS page cache and don't really want our JVM heap to take up space. diff --git a/voldemort/design.php b/voldemort/design.php index c33affc..98d7c2b 100644 --- a/voldemort/design.php +++ b/voldemort/design.php @@ -95,7 +95,7 @@ To visualize the consistent hashing method we can see the possible integer hash values as a ring beginning with 0 and circling around to 2^31-1. This ring is divided into Q equally-sized partitions with Q >> S, and each of the S servers is assigned Q/S of these. A key is mapped onto the ring using an arbitrary hash function, and then we compute a list of R servers responsible for this key by taking the first R unique nodes when moving over the partitions in a clockwise direction. The diagram below pictures a hash ring for servers A,B,C,D. The arrows indicate keys mapped onto the hash ring and the resulting list of servers that will store the value for that key if R=3.

    - +

    Data Format & Queries

    @@ -305,7 +305,7 @@

    We use versioning and read-repair. This has a the best availability guarantees, and the highest efficiency (only W writes network roundtrips are required for N replicas where W can be configured to be less than N). 2PC typically requires 2N blocking roundtrips. Paxos variations vary quite a bit but are comparable to 2PC.

    -

    Many of the specifics are borrowed from the Amazon paper below

    +

    Another approach to reach consistency is by using Hinted Handoff. In this method during writes if we find that the destination nodes are down we store a "hint" of the updated value on one of the alive nodes. Then when these down nodes come back up the "hints" are pushed to them thereby making the data consistent. Many of the specifics are borrowed from the Amazon paper below

    Here are some good write-ups on this subject:

    @@ -375,15 +375,17 @@

    Persistence Layer

    -

    We support a simple api for persistence and use BDB java edition as the default. MySQL and in memory storage are also supported. To add a new persistence implementation you need to implements put, get, and delete, plus provide an iterator over the values in the local store.

    +

    We support a simple api for persistence and use BDB Java edition as the default. Other storage engines supported are MySQL, in-memory storage ( used for unit testing ) and our own custom read-only storage engine ( generated offline as a batch process in Hadoop ). To add a new persistence implementation you need to implements put, get, and delete, plus provide an iterator over the values in the local store.

    -

    Support for batch computed data

    +

    Support for batch computed data - Read-only stores

    One of the most data-intensive storage needs is storing batch computed data about members and content in our system. These jobs often deal with the relationships between entities (e.g. related users, or related news articles) and so for N entities can produce up to N2 relationships. An exmaple at LinkedIn is member networks, which are in the 12TB range if stored explicitly for all members. Batch processing of data is generally much more efficient than random access, which means one can easily produce more batch computed data than can be easily accessed by the live system. Hadoop greatly expands this ability. We are in the process of open-sourcing a voldemort persistence-backend that supports very efficient read-only access that helps take a lot of the pain our of building, deploying, and managing large, read-only batch computed data sets.

    -

    Much of the pain of dealing with batch computing comes from the "push" process that transfers data from a data warehouse or hadoop instance to the live system. In a traditional db this will often mean rebuilding the index on the live system with the new data. Doing millions of sql insert or update statements is generally not at all efficient, and typically in a SQL db the data will be deployed as a new table and then swapped to replace the current data when the new table is completely built. This is better than doing millions of individual updates, but this still means the live system is now building a many GB index for the new data set (or performa) while simultaneously serving live traffic. This alone can take hours or days, and may destroy the performance on live queries. Some people have fixed this by swapping out at the database level (e.g. having an online and offline db, and then swapping), but this requires effort and means only half your hardware is being utilized. Voldemort fixes this process by making it possible to prebuild the index itself offline (on hadoop or wherever), and simply push it out to the live servers and transparently swap. +

    Much of the pain of dealing with batch computing comes from the "push" process that transfers data from a data warehouse or hadoop instance to the live system. In a traditional db this will often mean rebuilding the index on the live system with the new data. Doing millions of sql insert or update statements is generally not at all efficient, and typically in a SQL db the data will be deployed as a new table and then swapped to replace the current data when the new table is completely built. This is better than doing millions of individual updates, but this still means the live system is now building a many GB index for the new data set (or performa) while simultaneously serving live traffic. This alone can take hours or days, and may destroy the performance on live queries. Some people have fixed this by swapping out at the database level (e.g. having an online and offline db, and then swapping), but this requires effort and means only half your hardware is being utilized. Voldemort fixes this process by making it possible to prebuild the index itself offline (on Hadoop or wherever), and simply push it out to the live servers and transparently swap.

    +For more details about these batch computed stores ( called read-only stores ) read this. +

    References

    - - - - - - - + + - + - + - + @@ -36,7 +31,7 @@ - + @@ -44,6 +39,11 @@ + + + + + @@ -56,7 +56,7 @@ - + @@ -69,4 +69,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    max_connections6Maximum number of connection allowed to each voldemort node
    max_total_connections500Maximum number of connection allowed to all voldemort node50Maximum number of connection allowed to each voldemort node
    max_threads 5The maximum number of client threads The maximum number of client threads ( Used by the client thread pool )
    max_queued_requests 50The maximum number of queued node operations before client actions will be blockedThe maximum number of queued node operations before client actions will be blocked ( Used by the client thread pool )
    thread_idle_ms 100000The amount of time to keep an idle client thread aliveThe amount of time to keep an idle client thread alive ( Used by the client thread pool )
    connection_timeout_ms
    socket_timeout_ms5005000 Maximum amount of time the socket will block waiting for network activity
    15000 Set the timeout for all blocking operations to complete on all nodes. The number of blocking operations can be configured using the preferred-reads and preferred-writes configuration for the store.
    selectors8Number of selectors used for multiplexing requests in our NIO client
    socket_buffer_size 64 * 1024
    enable_pipeline_routed_storefalsetrue Use the new pipeline routed store for client side routing
    Compulsory parameter Comma separated list of URLs to use as bootstrap servers
    serializer_factory_classDefault serializer factory with support for avro, pb, java, etc.Custom serializer factory class name
    client_zone_id0Zone id where the client resides. Used to make smarter routing decision in case of 'zone-routing'
    Failure detector configs
    failuredetector_implementationBannagePeriodFailureDetectorClass name of the failure detector that the client will use. We support BannagePeriodFailureDetector and ThresholdFailureDetector
    failuredetector_bannage_period30000BannagePeriodFailureDetector : The number of milliseconds this node is considered as 'banned'
    failuredetector_threshold_countminimum10ThresholdFailureDetector : Minimum number of failures that must occur before the success ratio is checked against the threshold
    failuredetector_threshold_interval10000ThresholdFailureDetector : Millisecond interval for which the threshold is valid; it is 'reset' after this period is exceeded
    failuredetector_threshold80ThresholdFailureDector : The integer percentage representation of the threshold that must be met or exceeded
    diff --git a/voldemort/includes/project_info.php b/voldemort/includes/project_info.php index 3544a23..9dabc54 100644 --- a/voldemort/includes/project_info.php +++ b/voldemort/includes/project_info.php @@ -12,9 +12,12 @@ $PROJ_KEYWORDS = "Project Voldemort, Voldemort, key-value storage, distributed storage, Amazon Dynamo, persistence, scalability"; /* Favicon image file location and type */ - $PROJ_FAVICON_PATH = "http://project-voldemort.com/images/vold-logo-small.png"; + $PROJ_FAVICON_PATH = "images/voldemort_logo.png"; $PROJ_FAVICON_MIME = "image/png"; + /* Project icon in header */ + $PROJ_ICON_PATH = "images/voldemort_logo.png"; + /* Navigation links in the sidebar */ $PROJ_NAV_LINKS = array( "quickstart" => "quickstart.php", @@ -24,12 +27,10 @@ "download" => "http://github.com/voldemort/voldemort/downloads", "snapshot build" => "http://test.project-voldemort.com:8080/job/voldemort-master", "configuration" => "configuration.php", - "client javadoc" => "/javadoc/client", - "all javadoc" => "/javadoc/all", + "javadoc" => "/javadoc/all", "developer info" => "developer.php", "fun projects" => "http://wiki.github.com/voldemort/voldemort/fun-projects", "performance" => "performance.php", - "unit tests" => "http://test.project-voldemort.com:8080", "bugs" => "http://code.google.com/p/project-voldemort/issues", "wiki" => "http://wiki.github.com/voldemort/voldemort"); diff --git a/voldemort/includes/props.php b/voldemort/includes/props.php index 5941e82..948dae5 100644 --- a/voldemort/includes/props.php +++ b/voldemort/includes/props.php @@ -5,9 +5,9 @@ description - node.id - none - The unique, sequential identifier for this server in the cluster (starts with 0) + node.id + none + The unique, sequential identifier for this server in the cluster (starts with 0) voldemort.home @@ -24,6 +24,9 @@ ${voldemort.home}/config The directory where voldemort configuration is stored + + BDB stores configuration + enable.bdb.engine true @@ -31,7 +34,7 @@ bdb.cache.size - 200MB (make it bigger!!!) + 200MB The BDB cache that is shared by all BDB tables. Bigger is better. @@ -69,6 +72,19 @@ 30000 How often in ms should we checkpoint the transaction log + + bdb.one.env.per.store + false + Use one BDB environment for every store + + + bdb.cleaner.threads + 1 + Number of BDB cleaner threads + + + MySQL stores configuration + enable.mysql.engine false @@ -100,109 +116,127 @@ The name of the mysql database - enable.memory.engine - true - Should we enable the memory storage engine? Might as well this takes no resources and is just here for consistency. - - - enable.cache.engine - true - Should we enable the cache storage engine? Might as well this takes no resources and is just here for consistency. + Read-only stores configuration enable.readonly.engine false Should we enable the readonly storage engine? - - readonly.file.wait.timeout.ms - 4000 - The maximum time to wait to acquire a filehandle to perform reads. - readonly.backups 1 The number of backup copies of the data to keep around for rollback. - readonly.file.handles - 5 - The number of file descriptors to pool per store. + readonly.search.strategy + BinarySearchStrategy + Class name of search strategy to use while finding key. We support BinarySearchStrategy and InterpolationSearchStrategy readonly.data.directory ${data.directory}/read-only The directory in which to store readonly data files. + + readonly.delete.backup.ms + 0 + Millisecond we wait for before deleting old data. Useful to decreasing IO during swap. + + + Slop store configuration + + + slop.enable + true + Do we want to initialize a storage engine for slops + have the job enabled? + slop.store.engine bdb What storage engine should we use for storing misdelivered messages that need to be rerouted? - max.threads - 100 - The maximum number of threads the server can use. + slop.pusher.enable + true + Enable the slop pusher job which pushes every 'slop.frequency.ms' ms ( Prerequisite - slop.enable=true ) - core.threads - max(1, ${max.threads} / 2) - The number of threads to keep alive even when idle. + slop.read.byte.per.sec + 10 * 1000 * 1000 + Slop max read throughput - socket.timeout.ms - 4000 - The socket SO_TIMEOUT. Essentially the amount of time to block on a low-level network operation before throwing an error. + slop.write.byte.per.sec + 10 * 1000 * 1000 + Slop max write throughput - routing.timeout.ms - 5000 - The total amount of time to wait for adequate responses from all nodes before throwing an error. + pusher.type + StreamingSlopPusherJob + Job type to use for pushing out the slops - http.enable - true - Enable the HTTP data server? + slop.frequency.ms + 5 * 60 * 1000 + Frequency at which we'll try to push out the slops - socket.enable - true - Enable the socket data server? + Rebalancing configuration - jmx.enable + enable.rebalancing true - Enable JMX monitoring? + Enable rebalance service? - slop.detection.enable - false - Enable detection of misdelivered messages for persistence and redelivery. + max.rebalancing.attempts + 3 + Number of attempts the server side rebalancer makes to fetch data - enable.verbose.logging - true - Log every operation on all stores. + rebalancing.timeout.seconds + 10 * 24 * 60 * 60 + Time we give for the server side rebalancing to finish copying data - enable.stat.tracking + max.parallel.stores.rebalancing + 3 + Stores to rebalancing in parallel + + + rebalancing.optimization true - Track load statistics on the stores. + Should we run our rebalancing optimization for non-partition aware stores? + + + Retention configuration + + + retention.cleanup.first.start.hour + 0 + Hour when we want to start the first retention cleanup job + + + retention.cleanup.period.hours + 24 + Run the retention clean up job every n hours + + Gossip configuration + enable.gossip false Enable gossip to synchronize state - pusher.poll.ms - 2 * 60 * 1000 - How often should misdelivered "slop" data be pushed out to nodes? + gossip.interval.ms + 30*1000 + Enable gossup every n ms - scheduler.threads - 3 - The number of threads to use for scheduling periodic jobs + Admin service admin.enable @@ -212,12 +246,50 @@ admin.max.threads 20 - Max Number of threads used by Admin services + Max Number of threads used by Admin services. Used by BIO ( i.e. if enable.nio.connector = false ) admin.core.threads max(1, ${admin.max.threads} / 2) - The number of threads to keep alive by Admin service even when idle + The number of threads to keep alive by Admin service even when idle. Used by BIO ( i.e. if enable.nio.connector = false ) + + + nio.admin.connector.selectors + max ( 8, number of processors ) + Number of selector threads for admin operations. Used by NIO ( i.e. if enable.nio.connector = true ) + + + Core Voldemort server configuration + + + enable.nio.connector + false + Enable NIO on server side + + + nio.connector.selectors + max ( 8, number of processors ) + Number of selector threads for normal operations. Used by NIO ( i.e. if enable.nio.connector = true ) + + + max.threads + 100 + The maximum number of threads the server can use ( Used by HTTP and BIO - enable.nio.connector = false - service only ) + + + core.threads + max(1, ${max.threads} / 2) + The number of threads to keep alive even when idle ( Used by HTTP and BIO - enable.nio.connector = false - service only ) + + + socket.timeout.ms + 4000 + The socket SO_TIMEOUT. Essentially the amount of time to block on a low-level network operation before throwing an error. + + + routing.timeout.ms + 5000 + The total amount of time to wait for adequate responses from all nodes before throwing an error. stream.read.byte.per.sec @@ -230,13 +302,33 @@ Max write throughput allowed when Admin service streams data - enable.rebalancing + http.enable true - Enable rebalance service? + Enable the HTTP data server? - max.rebalancing.attempts - 3 - Number of attempts made during rebalancing + socket.enable + true + Enable the socket data server? + + + jmx.enable + true + Enable JMX monitoring? + + + enable.verbose.logging + true + Log every operation on all stores. + + + enable.stat.tracking + true + Track load statistics on the stores. + + + scheduler.threads + 6 + Number of threads to use for scheduled jobs diff --git a/voldemort/performance.php b/voldemort/performance.php index 3f5d72b..3eba05e 100644 --- a/voldemort/performance.php +++ b/voldemort/performance.php @@ -33,28 +33,11 @@

    Note that this is to a single node cluster so the replication factor is 1. Obviously doubling the replication factor will halve the client req/sec since it is doing 2x the operations. So these numbers represent the maximum throughput from one client, by increasing the replication factor, decreasing the cache size, or increasing the data size on the node, we can make the performance arbitrarily slow. Note that in this test, the server is actually fairly lightly loaded since it has only one client so this does not measure the maximum throughput of a server, just the maximum throughput from a single client.

    -

    -Here is the request-latency we see at LinkedIn on our production servers measured on the server side (e.g. from the time we get the request off the wire to the time we start writing back to the wire): -

    - -
    -Median GET: 0.015 ms
    -Median PUT: 0.040 ms
    -99.99 percentile GET: 0.227 ms
    -99.99 percentile PUT: 2.551 ms
    -
    - -

    -Note that this is very fast, and tells us that the bdb cache hit and/or pagecache hit ratio is extremely high for this particular dataset. -

    - -

    -This does not include network or routing overhead. From the client-side we do not measure the individual voldemort requests in production, but instead time a full GET, processing with simple business logic, and PUT. For this we see avg. latencies of around 1-2ms for the complete modification including both read, update, and store operations. -

    Your Millage May Vary

    If the numbers you see in your own tests do not look like what you expect please chime in on the mailing list. We have tuned and tested certain configurations and would like to gather data on other configurations and may be able to help with settings. - -We also have a performance tool which can help potential users to run their own benchmarks and judge if Voldemort fits their requirements. +

    +We also have a performance tool, heavily inspired from YCSB, which can help potential users to run their own benchmarks and judge if Voldemort fits their requirements. +

    diff --git a/voldemort/quickstart.php b/voldemort/quickstart.php index 4ecf2bc..6d0fc99 100644 --- a/voldemort/quickstart.php +++ b/voldemort/quickstart.php @@ -62,26 +62,35 @@ $ VOLDEMORT_HOME='/path/to/voldemort' $ cd $VOLDEMORT_HOME $ ./bin/voldemort-server.sh -[2008-08-11 17:00:32,884] INFO Starting voldemort-server (voldemort.server.VoldemortService) -[2008-08-11 17:00:32,886] INFO Starting all services: (voldemort.server.VoldemortServer) -[2008-08-11 17:00:32,886] INFO Starting storage-service (voldemort.server.VoldemortService) -[2008-08-11 17:00:32,891] INFO Initializing stores: (voldemort.server.storage.StorageService) -[2008-08-11 17:00:32,891] INFO Opening test. (voldemort.server.storage.StorageService) -[2008-08-11 17:00:32,903] INFO All stores initialized. (voldemort.server.storage.StorageService) -[2008-08-11 17:00:32,903] INFO Starting scheduler (voldemort.server.VoldemortService) -[2008-08-11 17:00:32,906] INFO Scheduling pusher to run every 60000 milliseconds. (voldemort.server.scheduler.SchedulerService) -[2008-08-11 17:00:32,909] INFO Starting http-service (voldemort.server.VoldemortService) -[2008-08-11 17:00:33,044] INFO Starting socket-service (voldemort.server.VoldemortService) -[2008-08-11 17:00:33,044] INFO Starting voldemort socket server on port 6666. (voldemort.server.socket.SocketServer) -[2008-08-11 17:00:33,045] INFO Starting JMX Service (voldemort.server.VoldemortService) -[2008-08-11 17:00:33,133] INFO All services started. (voldemort.server.VoldemortServer) +[2011-07-14 18:06:24,921 voldemort.store.metadata.MetadataStore] INFO metadata init(). +[2011-07-14 18:06:25,309 voldemort.server.VoldemortServer] INFO Using NIO Connector. +[2011-07-14 18:06:25,331 voldemort.server.VoldemortServer] INFO Using NIO Connector for Admin Service. +[2011-07-14 18:06:25,332 voldemort.server.VoldemortService] INFO Starting voldemort-server +[2011-07-14 18:06:25,333 voldemort.server.VoldemortServer] INFO Starting 8 services. +[2011-07-14 18:06:25,333 voldemort.server.VoldemortService] INFO Starting storage-service +[2011-07-14 18:06:25,399 voldemort.server.storage.StorageService] INFO Initializing bdb storage engine. +[2011-07-14 18:06:25,404 voldemort.server.storage.StorageService] INFO Initializing read-only storage engine. +[2011-07-14 18:06:25,406 voldemort.server.storage.StorageService] INFO Initializing the slop store using bdb +[2011-07-14 18:06:25,767 voldemort.server.storage.StorageService] INFO Initializing stores: +[2011-07-14 18:06:25,767 voldemort.server.storage.StorageService] INFO Opening store 'test' (bdb). +[2011-07-14 18:06:25,834 voldemort.server.storage.StorageService] INFO All stores initialized. +[2011-07-14 18:06:25,834 voldemort.server.VoldemortService] INFO Starting scheduler-service +[2011-07-14 18:06:25,834 voldemort.server.VoldemortService] INFO Starting async-scheduler +[2011-07-14 18:06:25,834 voldemort.server.VoldemortService] INFO Starting http-service +[2011-07-14 18:06:26,092 voldemort.server.VoldemortService] INFO Starting socket-service +[2011-07-14 18:06:26,101 voldemort.server.VoldemortService] INFO Starting rebalance-service +[2011-07-14 18:06:26,109 voldemort.server.VoldemortService] INFO Starting jmx-service +[2011-07-14 18:06:26,142 voldemort.server.VoldemortServer] INFO Startup completed in 809 ms.

    Alternately we can give VOLDEMORT_HOME on the command line and avoid having to set an environment variable

     $ ./bin/voldemort-server.sh /path/to/voldemort
    -[2008-08-11 17:00:32,884] INFO Starting voldemort-server (voldemort.server.VoldemortService)
    +[2011-07-14 18:06:24,921 voldemort.store.metadata.MetadataStore] INFO metadata init(). 
    +[2011-07-14 18:06:25,309 voldemort.server.VoldemortServer] INFO Using NIO Connector. 
    +[2011-07-14 18:06:25,331 voldemort.server.VoldemortServer] INFO Using NIO Connector for Admin Service. 
    +[2011-07-14 18:06:25,332 voldemort.server.VoldemortService] INFO Starting voldemort-server 
     ...