HTTPS clone URL
Subversion checkout URL
Upgrading from 0.81
- Berkeley DB Java Engine Tuning Tips
- Binary JSON Serialization
- Build and Push Jobs for Voldemort Read Only Stores
- C++ client build instructions
- Client side failure detector implementations
- EC2 Testing Infrastructure
- Fun Projects
- Hinted Handoff
- JMX Monitoring
- Krati storage engine
- Multi datacenter results
- New configuration system
- Performance Tool
- Powered By Voldemort
- Presentations and Talks
- RoutedStore redesign
- Server side transforms in Voldemort
- Topology awareness capability
- Upgrading from 0.81
- Voldemort Admin tool
- Voldemort dev roadmap
- Voldemort thin client
- Writing own client for Voldemort
Clone this wiki locally
Voldemort has changed a lot since its last release around a year back. I've tried to document every new feature that we've added over the past one year along with the necessary upgrading that would be required to get this running. Some of them are massive changes and some are small. So it would be advisable to first test out all these changes in some sort of staging environment before pushing to your production Voldemort instances.
A lot has changed for read-only stores over the past one year. In general our aim has been to (a) improve the experience of pushing / swapping stores (b) make rebalancing of read-only stores better.
[NB- The format change does not effect read-write stores, and only applies to read-only ones]
The first most important change we made to the read-only stores is the directory structure to store the static files. The old format was as follows :
my_store/ version-0/ 0.data 0.index version-1/ 0.data 0.index
The process of updating to a newer version of data from Hadoop was initiated by a tool and would execute the following steps
- Tool sends message to all nodes to fetch data from HDFS in parallel to some temporary directory
- All voldemort nodes fetch data from HDFS in parallel to some temporary directory
- Once all fetches are complete, tool tells all nodes to swap
- Swap on a single node is as follows
- Shutdown all files in version-0 ( which is the one serving the traffic )
- Rename version-[n] to version-[n+1] and so on, till version-0 becomes version-1
- Move the temporary directory to version-0
- Open the files in version-0
This approach has a couple of problems - (a) Maintenance of the temporary folder can go out of control once we have too many stores. And debugging failures was tough since we didn't know exactly which temporary folder data would be stored in (b) Un-intuitive versioning format
So we decided to change this to a slightly different format. The new format is as follows
my_store/ version-134/ 0.data 0.index version-135/ 0.data 0.index latest -> version-135
We completely reversed the number nomenclature. This way the user could mention a version number while pushing ( as long as its a number greater than the current latest ). So now the tool executes the following steps -
- If user has specified version number, do necessary checks and create version-[no] folder, else just do version-[current_greatest + 1]. Start fetching to it...
- Once all fetches are done, tool tells all nodes to swap
- Swap on a single node is as follows
- Shut down the version directory pointed by latest symbolic link
- Change symbolic link to new folder
- Open the files in the version directory pointed by symbolic links
So now by fetching to the same folder we are centralizing the location of data and making debugging easier. Also swapping is easier - a change in the symbolic link compared to multiple file renames. Finally having chronologically increasing numbers allowed users to understand which version they are pushing. Users can use this to push version = timestamps and get thereby get their push history.
To upgrade from the current version you'll need to do a rolling restart while running
./contrib/hadoop-store-builder/bin/grandfather-readonly.sh [current read-only data folder] on every node. This scripts grandfathers the folder names and inverts the numbers.
The old fetch / swap tool would use talk over HTTP and send commands to a servlet running on Voldemort. We had two issues with this
- In the case of fetch, a long running process, we'd sometime timeout and not even realize what happened ( while the server would still continue fetching )
- No way of monitoring progress of fetch.
Since we had already planned to consolidate all our "administrative" operations to the admin service we decided to port this functionality over. Thus we have introduced the new "admin based fetch / swap tool" which has is more reliable and reports progress.
You can run this using
./contrib/hadoop-store-builder/bin/swap-store.sh --admin [ commands ].
Also if you are power user of read-only stores you would have come across two failure scenarios which can be troublesome.
- During a swap on one of the nodes, if a swap fails on one node the rollback will take place, but other nodes will still be using the new version and need to be rolled back manually.
- If a fetch fails on one of the nodes ( thereby stopping further swaps ) , the other nodes still have a copy of the data ( using the new directory format shown above ).
Both of these problems have been solved in the new admin based tool where-in we automatically (a) rollback on other nodes as well (b) delete the redundant data pushed. To get access this look at AdminStoreSwapper class
We've tried to maintain backwards compatibility by supporting reading data in the old format ( as shown here ). But over the past one year we've iterated a lot on the storage format to aid with rebalancing. In the process we went through two iterations and have finally came up with a storage format which has a better memory footprint and supports iteration.
Let's start by recapping what the old storage format looked like. The old storage format would hash the key to a particular node and then we would store all keys in chunk files ( a chunk = pair of .index and .data files ) totally agnostic of the partition information. This obviously isn't good from rebalancing's perspective because if we need to move only a sub-set of the partitions belong to the node we need to go over the complete set and strip the relevant data out. Also besides stripping it out we also need to merge the data on the receiving ( stealer ) end while merging it in the correct sorted order.
To solve this problem we decided to take the approach of storing chunk files on a per [ primary partition + replica ] basis ( let's call it a bucket ). So for example if we have a key which maps to [ Partition 19 ( Node 2 ), Partition 20 ( Node 1 ), Partition 22 ( Node 3 ) ] this will be stored on Node 2 in the bucket 19_0, on Node 1 in the bucket 19_1 and on Node 3 in the bucket 19_2 . So compared to the the older format where-in we had chunk files on a per node basis [ like ( 0.data, 1.data, ... ) ] we now have chunk files on the bucket basis [ like ( 19_0_0.data, 19_0_1.data ) ]. So rebalancing is now as simple as moving all the chunk files for a particular bucket over to the new node.
We also tweaked the data layout in the .data and .index files. The following diagram shows the exact change.
The simple change we did was to cut down on the size of the md5(key) that we store in the index files and save collided values in the data file as a list. Optimizing the space of the index files is very important since these files are mmap-ed and take up space in the OS page cache. Then the next important question was, how many bytes do we trim down the key to? This is where the famous birthday paradox helped us. This theorem says that if we want to retrieve n random integers from a uniform distribution of range [1, x], the probability that atleast 2 numbers are the same is ~
1 – e^(-n(n-1)/2x). Mapping this to our read-only stores scenario at LinkedIn, currently we are storing the recommendations for our 100 million member user base. So n = 100 million and x = 2^128 [ 16 bytes = 128 bits ] gives us a probability of close to 0. Decreasing this to say 4 bytes ( i.e. 32 bit ) gave a high probability of
1 – exp((-10010^6 * ( 10010^6 –1) / (2^33)) ~ 1 . But if we instead cut it by half to 8 bytes we get a very low probability of
1 – exp((-10010^6 * (10010^6-1) / ( 2^65)) ~ 2.0000e-04 . This is probability of one collision and obviously the probability of more than one collision decreases further. In conclusion, in order to save the mmap-ed file size we added a small overhead of a key comparison + space to save the key in the data file.
Voldemort already had a very old version of rebalancing which had some problems :
- Incorrect plan generation : The first phase of rebalancing takes as its input the existing metadata and the desired future cluster metadata. It then generates a plan which consists of steps that need to be execute in order to move partitions over from one node to the other. One of the major issues ( as highlighted in Issue 288 ) was that this plan assumed that data stored on the node was not partition aware ( i.e. when storing data on a particular node we don't differentiate between partitions ). This was a valid assumption when written because initially we wanted to only support rebalancing of BDB stores. But over time the need to add rebalancing for MySQL / Read-only stores required that we fix this.
- No checkpoints : Since rebalancing data transfer can be a long running processes it becomes necessary that we checkpoint periodically.
- No tooling : We didn't have tooling to restore back to a good state. So if one of the rebalancing steps failed there was no way to restore back to a good state without bringing down the box, removing the .temp folder containing the metadata and restarting again.
The new version of Voldemort solves all of these problems by doing the following
- The plan generation is done independent of how the data is stored. The full plan is then passed down all the way to the stealer nodes where we make the decision whether to run any optimization or not. This approach is perfect from the standpoint of extensibility since future addition of storage engines won't require any more changes to plan generation.
- The problem of checkpointing can be solved by increasing the granularity of transfer. So for example if we are moving 10 primary partitions instead of moving all of them at once, we should aim to move it in batches of a smaller number and then checkpoint the information. The new version allows the user to specify the 'batch size'
- Tons of tooling has been added to the Voldemort admin tool. This tool now has the ability to (a) list / stop any asynchronous rebalancing jobs (b) list / update any rebalancing metadata on the node (c) if we realize redirections are causing a problem, switch them off through JMX. All of these required a restart of nodes in the previous version.
We have tons of work to do in this area to make rebalancing more robust. Our current approach is to run a "stealer driven rebalancing" ( i.e. stealer co-ordinates the rebalancing ). This has the disadvantage that now we might end up doing a lot of complete disk sweeps on the same donor node multiple times. This approach can be improved by making rebalancing "donor driven". Another area we need help from external contributors is our GUI for rebalancing. Being able to press button rebalance and get an approximate percent completed progress bar would make it more intuitive.
For more details about rebalancing read this.
Over time we started facing a lot of problems with the legacy thread-per-socket blocking I/O based client + server. We have now changed our defaults to the new non-blocking implementation of both client + server which multiplexes using just a fixed number of threads. This is good from an operations perspective on the server because now we don't have to manually keep bumping up the maximum number of threads when new clients are added. From the client's perspective we now won't need to worry about thread pool exhaustion due to slow responses from slow servers.
In terms of configuration we no more need
max.threads parameter on the server side. The only parameters to worry about on the client side is
selectors and on the server side (
With the new release we've added a new consistency mechanism called hinted handoff. This mechanism gets triggered during put's and delete's when we have successfully satisfied our quorum but have still received failures from some of the remaining N-W replicas ( replication factor - required writes ). Without hinted handoff these failed remaining replicas would become out of sync. They would then lazily sync up during read-repair. To decrease this "lazy sync-up", hinted handoff stores a "hint" about this failure on one of the live nodes. Then every N (
slop.frequency.ms on server side ) ms these live nodes try to push their hints out to the nodes which were down.
To enable hinted handoff you need to do three things
- Use the pipeline routed store - This is on by default in the latest version of our code
- Setup your store definition to support hinted handoff - We support hinted handoff on a per store basis. You need to add
proximity-handoffto your store definition. This tells the client how it should decide the live nodes
- Setup the slop engine and pusher job - We need to finally store these "hints" ( or as we call it "slops" ) into another store on every node. We call this store the "slop store". Also we need to setup a scheduled job which would iterate over all the keys in this "slop store" and try to push it out to nodes when they come back up. For more information about the various configurations required to get this setup on the server read this
Voldemort had support for views which allowed users to specify a transform function on their store and save it on the server side. But we had an internal use case at LinkedIn we wanted to apply a different transform for every type of request that we made. For example, say our value is a list of integers. Then instead of having 2 different views, one which calculates the max of integers and the other which does an average, server side transforms allows you to do something like this get ( key, add_transform ) AND get ( key, average_transform ).
For more details about server side transforms look check out StoreClient interface
The current default routing algorithm is based on the original routing algorithm described in the Amazon Dynamo paper. To generate the preference list ( i.e. list of nodes a key would belong on ), we would hash to a partition on the ring and then keeping jumping till we found the next N-1 ( where N = replication-factor ) partitions ( each belonging to a different node ).
We wanted to extend this strategy to support routing in a multi-datacenter scenario. For the same we came up with a generic routing scheme called 'zone-routing'. In this strategy we cluster nodes together into 'zones'. Then while generating the preference list we add one more constraint while jumping the ring. We try to get partitions belong to a different node and zone ( depending the replication factor mentioned for a zone ).
For more information read this
. In summary you only need to add the
zone-replication-factor tags to your store definitions to get started. It is also important that we set the zone id of the client in order for it to reorder the preference list to do operations on the local zone first ( This can be done by setting
client_zone_id parameter ).
Every month at LinkedIn we have a day dedicated to working on any project that we like. It was during one of these one of our engineers came up with the idea to build a simple GUI in JRuby. The GUI has some basic functionality such as retrieving values, checking store definition, adding stores, etc. But it has been heavily tailored to suite stores which use simple string serialization. To play with it check out this page
Eventhough we've paid a lot of emphasis on our Java client ( which does client side routing ), we've seen a lot of traction from Ruby and Python developers. In the latest release we've upgraded both are Python and Ruby clients. Both of these clients depend on server side routing. The Python client supports both raw and binary JSON serialized stores. The Ruby client currently only supports "string" serialized stores. Adding support for clients in other languages is very easy and we have a documentation here talking about the protocol implementation.
One of most frequent questions we get from users is what is the best way to create the cluster metadata. We need to answer two questions here while generating the metadata
- How many partitions per node?
- How to map the partitions to a node?
For both the questions its very tough to say that we have the correct metadata unless we have a metric to measure success. The basic thumb rule for the first question is that we should keep more partitions than the number of nodes. This gives us some buffer space to move the partition ownership to new nodes in the future. But for the mapping part we tend to rely on randomness to give us the best answer. To measure success of your metadata we've created a tool which calculates the skewness of your data. The metric calculated is standard deviation from the fair distribution ( 1 / [ number of nodes ] ). This is calculated by running through a set of random keys and calculating hits for every nodes. A value close to zero means we have a fair distribution.
To run this tool use