Branch: master
Find file History
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Type Name Latest commit message Commit time
Failed to load latest commit information.

Riak Core, The vnode

If you're new to Riak Core you should read my previous post about multinode first to see how to create a project skeleton.

In this post I will implement an application I'm calling Real Time Statistics (RTS for short) which will accept log entries over HTTP and generate real-time statistics from those entries. I will focus on the vnode which is the workhorse of a Riak Core application. You should follow along by cloning this repo and looking at the RTS code as you read this.

An Interview Question

I was recently posed with an interview question that went something like this:

You have N machines writing syslog events to a server somewhere. Each incoming entry should be compared to a list of regular expressions on the server. Each regexp has a corresponding event that should be triggered if a match occurs. Said event will write/update a value somewhere that can be queried by interested parties. How do you implement it?

My RTS system is designed to solve this problem, except I'm swapping syslog for anything that can write over HTTP. You could even use a named pipe and netstat to send your syslog events. There are two problems to solve. First, parsing the incoming entries and triggering functions when a match occurs. This will be handled by the entry vnode. Second, accepting and persisting statistics generated by the triggered functions. The stats will be handled by the stat vnode. I'll be using entries from access logs in combined log format as my sample data.

What's a vnode, Anyways?

  • A vnode is a virtual node, as opposed to physical node

  • Each vnode is responsible for one partition on the ring

  • A vnode is an Erlang process

  • A vnode is a behavior written atop of the gen_fsm behavior

  • A vnode handles incoming requests

  • A vnode potentially stores data to be retrieved later

  • A vnode is the unit of concurrency, replication, and fault tolerance

  • Typically many vnodes will run on each physical node

  • Each machine has a vnode master who's purpose is to keep track of all active vnodes on its node

As you can see a vnode takes on a lot of responsibility. Luckily, the good folks at Basho have already done most of the heavy lifting by providing a vnode behavior which you code to. If Erlang is like a foreign language to you than think of a behavior like an interface in that it declares callback functions that you must implement. Furthermore, it provides a container which implements the semantics of the behavior and makes calls into your callback module. Kind of like how J2EE provides a web container and you simply code a Servlet, except way cooler. By coding to a behavior you are free to focus on the problem you need to solve rather than how to implement a vnode. That said, you can't do much if you don't understand the inputs and outputs of these callbacks. That's why you're reading this.

Life-cycle Callbacks

The init/1 and terminate/2 callbacks are called at the edges of the vnode life-cycle. When a pid linked to the vnode crashes the handle_exit/3 callback will be invoked.

init([Index]) -> Result

Index       :: int() >= 0
Result      :: {ok, State}
State       :: term()

This callback initializes the state of the vnode. My entry vnode needs to store the regexp to trigger fun mapping so that the command callback can access it later.

init([Partition]) ->
    Reg = [
           {?COMBINED_LF, fun ?MODULE:combined_lf/2}
    {ok, #state { partition=Partition, reg=Reg }}.

My stat vnode needs to track the stat updates as they are sent in.

init([Partition]) ->
    {ok, #state { partition=Partition, stats=dict:new() }}.

terminate(Reason, State) -> Result

Reason      :: normal | shutdown | {shutdown, term()} | term()
State       :: term()
Result      :: term()

Used to cleanup any resources held by the vnode. The Reason will depend on how the vnode was stopped. Since the vnode container is simply a gen_fsm underneath you can read more about the Reason here. The State is the final state of the vnode and Result can be anything but will be ignored by the container.

Since both the entry and stat vnodes keep everything in memory Erlang will handle cleanup implicitly and there is nothing explicit to be done in terminate.

terminate(_Reason, _State) ->

handle_exit(Pid, Reason, State) -> Result

Pid         :: pid()
Reason      :: term()
State       :: term()
Result      :: {noreply, NewState}
             | {stop, NewState}

When a process linked to the vnode dies this callback will be invoked with the Pid of the crashed process along with the Reason for the crash and the vnode's current State. At this point you have two choices.

  1. The linked Pid is vital to the functioning of the vnode so you return {stop, NewState} to bring the vnode down as well.

  2. The linked Pid isn't vital so you log a warning and return {noreply, NewState} to allow the vnode to continue execution.


All incoming requests become commands on your vnode. For example, a GET on Riak KV will end up in the handle_command defined in its vnode. For this reason, it might be easiest to start with the question:

What commands will I need to implement?

To implement a command you add a new handle_command/3 function clause that matches against the incoming request. For example, to get a stat the request will be {get, StatName} which would require a function head with the form handle_command({get, StatName}, ...).

handle_command(Request, Sender, State) -> Result

Request     :: term()
Sender      :: sender()
State       :: term()
NewState    :: term()
Result      :: {reply, Reply, NewState}
             | {noreply, NewState}
             | {stop, Reason, NewState}

The Request can be anything (i.e. any term) but is typically a tagged tuple. The Sender is a representation of the client process but is typically used as an opaque value that you would use with a utility function such as riak_core_vnode:reply/2. The State is much like state in a gen_server and is there to track data across callback invocations.

There are three choices of reply. In all cases the 3rd element of the tuple is the potentially modified state.

  1. reply: Send Reply back to the client.

  2. noreply: Don't send a reply. This doesn't necessarily indicate that no reply was made, it just means you don't want the vnode container to send a reply. For example, this command could cause a long-running action to occur and you might spawn it on another process passing the Sender.

  3. stop: For whatever Reason you want this vnode to terminate.

The entry vnode needs to compare each incoming log entry with all registered regular expressions and possibly execute a corresponding trigger fun.

handle_command({entry, Client, Entry}, _Sender, #state{reg=Reg}=State) ->
    io:format("~p~n", [{entry, State#state.partition}]),
    lists:foreach(match(Client, Entry), Reg),
    {noreply, State}.

With the match/2 HOF defined as so.

match(Client, Entry) ->
    fun({Regexp, Fun}) ->
            case re:run(Entry, Regexp, [{capture, all, list}]) of
                nomatch -> ignore;
                {match, Match} -> Fun({Client, Entry, Regexp}, Match)

The stat vnode is like a mini redis in that it offers in-place updates. Notice that all mutations cause a new state to be created and returned.

handle_command({get, StatName}, _Sender, #state{stats=Stats}=State) ->
    Reply =
        case dict:find(StatName, Stats) of
            error ->
            Found ->
    {reply, Reply, State};

handle_command({set, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    Stats = dict:store(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}};

handle_command({incr, StatName}, _Sender, #state{stats=Stats0}=State) ->
    Stats = dict:update_counter(StatName, 1, Stats0),
    {reply, ok, State#state{stats=Stats}};

handle_command({incrby, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    Stats = dict:update_counter(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}};

handle_command({append, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    Stats = try dict:append(StatName, Val, Stats0)
            catch _:_ -> dict:store(StatName, [Val], Stats0)
    {reply, ok, State#state{stats=Stats}};

handle_command({sadd, StatName, Val}, _Sender, #state{stats=Stats0}=State) ->
    F = fun(S) ->
                sets:add_element(Val, S)
    Stats = dict:update(StatName, F, sets:from_list([Val]), Stats0),
    {reply, ok, State#state{stats=Stats}}.

"Covering" Commands

In 1.0 the idea of a "covering" command was added to the core vnode abstraction. The handle_coverage/4 callback along with a corresponding coverage behavior implementation allows you to implement a command that "covers" the entire key space. For example, this is used to implement Riak's list keys operation as well as range queries for secondary indices. Note, this is NOT pulling the objects out first. It passes the operation all the way down to the vnode which means it can stay very efficient. This opens up the door for other cool things like a native range operation that would allow you to perform a range query on the primary key and get back the objects that match that range; and it would be done in an efficient manner when using an ordered backend like leveldb.

In the future I'll be writing a dedicated post on implementing a coverage command but for now you'll have to look at the code.


A handoff occurs when a vnode realizes it's not on the proper node. This will happen when a) the ring has changed because of addition/removal of a node or b) a node comes alive after it has been down. Riak Core implements hinted handoff. The "hint" is a piece of data that tells the partition where its proper home is. Periodically a "home check" is done which uses the hint to determine if the vnode is on the correct physical host. If it isn't, then it will go into handoff mode transfering its data to the correct vnode.

Implementing handoff seems hard on the surface but it's nothing to be afraid of. The key thing to remember about handoff is its purpose is to transfer data from one vnode to another. Data transfer, that's it. This means that you don't have to implement handoff if your vnode is purely computational. Well, you should write the callbacks but they won't have to do anything.

The players in handoff are is_empty/1, delete/1, handoff_starting/2, handoff_cancelled/1, encode_handoff_item/2, handle_handoff_data/2, handle_handoff_command/3, and handoff_finished/2.

is_empty(State) -> Result

State       :: term()
NewState    :: term()
Result      :: {true, NewState}
             | {false, NewState}

Once the container has determined a vnode is out of place its first action is determine if there is any data to be transferred. If there is then return true otherwise return false. When a vnode is deemed empty the delete/3 callback will be invoked.

The stat vnode checks the size of the stats dict to determine if it's empty.

is_empty(State) ->
    case dict:size(State#state.stats) of
        0 -> {true, State};
        _ -> {false, State}

delete(State) -> Result

State       :: term()
NewState    :: term()
Result      :: {ok, NewState}

The container will invoke this callback when it's determined there is no more data to be transferred. That is, when is_empty/1 returns true. Use this time to perform any preemptive cleanup of vnode resources. On return the vnode will be terminated with a Reason of normal and the terminate/2 callback will have a chance to make any final cleanup.

handoff_starting(TargetNode, State) -> Result

TargetNode  :: node()
Result      :: {true, NewState} | {false, NewState}
State       :: term()
NewState    :: term()

Invoked by the container when it's determined a handoff must occur. The vnode has the final say in whether or not the handoff will occur. Return true to continue and false to cancel. A vnode might have some heuristic that determines its load and choose not to participate in handoff if overloaded at the moment. The TargetNode is the node to transfer the data to.

handoff_cancelled(State) -> Result

State       :: term()
NewState    :: term()
Result      :: {ok, NewState}

The handoff manager allows a set number of concurrent handoff operations. By default it's 4 but this can be adjusted. If it's determined that the maximum concurrency has been reached then the container will invoke this callback. You could use this to undo anything you might have done in handoff_starting/2.

This callback is also invoked if an error occured during handoff.

encode_handoff_item(K, V) -> Result

K           :: {Bucket, Key}
Bucket      :: riak_object:bucket()
Key         :: riak_object:key()
V           :: term()
Result      :: binary()

It seems there are still remnants of Riak KV leftover in Riak Core. Notice the notion of bucket/key and their types. I think the more general contract is that K should be a two-tuple and V can be anything. That said, this callback is used by the container to encode data before crossing the wire. I.e., it serializes the data. To get this right you have to know three things:

  1. Encode both K and V together, i.e. they need to go across together.

  2. This function must return a binary.

  3. This works in concert with handle_handoff_data/2.

To encode a stat I simply make a two-tuple with the name and value and convert to external format via term_to_binary/1.

encode_handoff_item(StatName, Val) ->

handle_handoff_data(BinObj, State) -> Result

BinObj      :: binary()
State       :: term()
NewState    :: term()
Result      :: {reply, ok, NewState}
             | {reply, {error, Error}, NewState}

This callback deserializes handoff data as it comes across. It's job is to reconstruct the vnode state from the BinObj binaries. If there is a problem decoding the data then reply with {error, Error} that describes the failure.

To decode a stat I unwrap it with binary_to_term/1 and insert it into the local dict.

handle_handoff_data(Data, #state{stats=Stats0}=State) ->
    {StatName, Val} = binary_to_term(Data),
    Stats = dict:store(StatName, Val, Stats0),
    {reply, ok, State#state{stats=Stats}}.

handle_handoff_command(Request, Sender, State) -> Result

Request     :: term()
Sender      :: sender()
State       :: term()
NewState    :: term()
Result      :: {reply, Reply, NewState}
             | {noreply, NewState}
             | {forward, NewState}
             | {drop, NewState}
             | {stop, Reason, NewState}

This callback is very similar to handle_command/3 but is instead invoked when a request is received during handoff. It has two additional possible return types as well: forward and drop. The forward reply will send the request to the target node. The drop reply will exhibit the same behavior as noreply but is used to signify that you are "dropping" this request on the floor. That is, you won't even attempt to fulfill it.

If you want your vnode to handoff data properly then you have to implement the ?FOLD_REQ in handle_handoff_command/3. You need to do this because this is the method by which the handoff manager iterates over your data. ?FOLD_REQ is a macro for the #riak_core_fold_req_v1 record which contains a foldfun and acc0 (accumulator) element. The foldfun expects to be passed three arguments: the key, value, and accumulator. This means that if your data structure doesn't already support this form of fold function you'll have to wrap it. For example, if your data was in a proplist you would write a wrapper function to convert each two-tuple into two separate arguments.

F = fun({Key,Val}, Acc) -> FoldFun(Key, Val, Acc) end,
Acc = lists:foldl(F, Acc0,,

In the case of the stat vnode I'm using a dict and it naturally supports a 3-arg fold function. The accumulator is an opaque value and is used to keep track of the socket and trigger sync commands periodically. See the handoff sender if you're really curious. Finally, make sure to return the final value of the accumulator. You can see just how easy this is by looking at what I did in the stat vnode.

handle_handoff_command(?FOLD_REQ{foldfun=Fun, acc0=Acc0}, _Sender, State) ->
    Acc = dict:fold(Fun, Acc0, State#state.stats),
    {reply, Acc, State}.

I do wonder if this fold request should have been its own callback in the vnode behavior, but that's discussion for another day.

handoff_finished(TargetNode, State) -> Result

TargetNode  :: node()
Result      :: any()
State       :: term()

This callback is invoked when all data has been successfully handed off to the TargetNode. The Result can be anything and is ignored by the container.

Using RTS

Now it's time to see it in action. You'll need to download the RTS project, compile it, and make a release.

git clone git://
cd try-try-try/2011/riak-core-the-vnode/rts
make rel

Start the console.

./rel/rts/bin/rts console

Open another terminal window and run the replay script with either my provided access log or your own. Remember, the log must be in combined log format.

gunzip -c progski.access.log.gz | ./replay progski

You should see a bunch of two-tuple's fly by in the console. This is showing the vnode's index (partition) that the incoming entry is being handled on. Notice how the number is constantly changing, this shows that work is being distributed. Keep in mind this won't be the fastest cause it's a new HTTP connection for each entry. I'm thinking of implementing persistent connections for a future post.

After the script has finished (or kill it early if you want, it won't hurt anything) go back to the console and ask for some stats.

(rts@> rts:get("progski", "total_reqs").

(rts@> rts:get("progski", "GET").

(rts@> rts:get("progski", "HEAD").

(rts@> rts:get("progski", "PUT").

{ok, Agents} = rts:get("progski", "agents").

Now lets turn it up a notch test this systems supposed "real time" moniker. First, delete the io:format call in the entry vnode, recompile, and then build a devrel.

make && make devrel

Now start your nodes and join them.

for d in dev/dev*; do $d/bin/rts start; done
for d in dev/dev*; do $d/bin/rts ping; done
for d in dev/dev{2,3}; do $d/bin/rts-admin join rts1@; done
./dev/dev1/bin/rts-admin ringready

Now run replay with the --devrel option.

gunzip -c progski.access.log.gz | ./replay --devrel progski

If you open up top or some other similar process monitoring tool you should see all three beam processes doing some work. It won't be any faster than one node because the overhead is probably in setup/teardown of the HTTP connection but it demonstrates that work is being distributed among multiple nodes. Disable the tty handler so that error msgs don't mess with your console.


Now ask for stats while the entries are flowing in. There ya go, real time statistics.

Where's The Redundancy?

The keen readers out there might have noticed that the stat vnode has no redundancy. That's right, there's none there. Follow the code path from entry request to writing of the stat, you'll find no code that copies the data. If you start taking down nodes you'll find that stats start to disappear.

When using Riak Core you must remember that it's a tool for writing distributed systems and it doesn't do everything for you. In my next post I will talk about the idea of coordination and I'll write a stat coordinator for the RTS application.

Other Examples

Current examples of Riak Core in action include Riak KV (also just called Riak), Riak Search and BashoBanjo. The first two are actual products created and supported by Basho and the third is just something really cool that Rusty Klophaus did.