Skip to content
Browse files

modules to manage the communication with clj

  • Loading branch information...
1 parent 4e3bc9e commit a4f2259b676f7212d3af52d9d3b92a348a2e8dd1 @videlalvaro committed Aug 21, 2010
Showing with 241 additions and 0 deletions.
  1. +122 −0 src/riak_kv_clj_manager.erl
  2. +20 −0 src/riak_kv_clj_sup.erl
  3. +99 −0 src/riak_kv_clj_vm.erl
View
122 src/riak_kv_clj_manager.erl
@@ -0,0 +1,122 @@
+-module(riak_kv_clj_manager).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/1, dispatch/1, blocking_dispatch/1, add_to_manager/0, reload/1, reload/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {tid}).
+
+dispatch(JSCall) ->
+ case select_random() of
+ no_vms ->
+ {error, no_vms};
+ Target ->
+ JobId = {Target, make_ref()},
+ riak_kv_clj_vm:dispatch(Target, self(), JobId, JSCall),
+ {ok, JobId}
+ end.
+
+blocking_dispatch(JSCall) ->
+ case select_random() of
+ no_vms ->
+ {error, no_vms};
+ Target ->
+ JobId = {Target, make_ref()},
+ riak_kv_clj_vm:blocking_dispatch(Target, JobId, JSCall)
+ end.
+
+%% Hack to allow riak-admin to trigger a reload
+reload([]) ->
+ reload().
+
+reload() ->
+ gen_server:call(?MODULE, reload_all_vm).
+
+add_to_manager() ->
+ gen_server:cast(?MODULE, {add_child, self()}).
+
+start_link(ChildCount) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [ChildCount], []).
+
+init([ChildCount]) ->
+ Tid = ets:new(?MODULE, [named_table]),
+ start_children(ChildCount),
+ {ok, #state{tid=Tid}}.
+
+handle_call(reload_all_vm, _From, #state{tid=Tid}=State) ->
+ ets:safe_fixtable(Tid, true),
+ reload_children(ets:first(Tid), Tid),
+ ets:safe_fixtable(Tid, false),
+ riak_kv_vnode:purge_mapcaches(),
+ {reply, ok, State};
+
+handle_call(_Request, _From, State) ->
+ {reply, ignore, State}.
+
+handle_cast({add_child, ChildPid}, #state{tid=Tid}=State) ->
+ erlang:monitor(process, ChildPid),
+ ets:insert_new(Tid, {ChildPid}),
+ {noreply, State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _MRef, _Type, Pid, _Info}, #state{tid=Tid}=State) ->
+ case ets:lookup(Tid, Pid) of
+ [] ->
+ {noreply, State};
+ [{Pid}] ->
+ ets:delete(?MODULE, Pid),
+ riak_kv_clj_sup:start_clj(self()),
+ {noreply, State}
+ end;
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Internal functions
+start_children(0) ->
+ ok;
+start_children(Count) ->
+ riak_kv_clj_sup:start_clj(self()),
+ start_children(Count - 1).
+
+select_random() ->
+ case ets:match(?MODULE, {'$1'}) of
+ [] ->
+ no_vms;
+ Members ->
+ {T1, T2, T3} = erlang:now(),
+ random:seed(T1, T2, T3),
+ Pos = pick_pos(erlang:get(?MODULE), length(Members)),
+ [Member] = lists:nth(Pos, Members),
+ Member
+ end.
+
+pick_pos(undefined, Size) ->
+ Pos = random:uniform(Size),
+ erlang:put(?MODULE, Pos),
+ Pos;
+pick_pos(OldPos, Size) ->
+ case random:uniform(Size) of
+ OldPos ->
+ pick_pos(OldPos, Size);
+ Pos ->
+ erlang:put(?MODULE, Pos),
+ Pos
+ end.
+
+reload_children('$end_of_table', _Tid) ->
+ ok;
+reload_children(Current, Tid) ->
+ riak_kv_clj_vm:reload(Current),
+ reload_children(ets:next(Tid, Current), Tid).
View
20 src/riak_kv_clj_sup.erl
@@ -0,0 +1,20 @@
+-module(riak_kv_clj_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1, stop/1]).
+-export([start_clj/1]).
+
+start_clj(Manager) when is_pid(Manager) ->
+ supervisor:start_child(?MODULE, [Manager]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+stop(_S) -> ok.
+
+%% @private
+init([]) ->
+ {ok,
+ {{simple_one_for_one, 10, 10},
+ [{undefined,
+ {riak_kv_clj_vm, start_link, []},
+ temporary, 2000, worker, [riak_kv_clj_vm]}]}}.
View
99 src/riak_kv_clj_vm.erl
@@ -0,0 +1,99 @@
+-module(riak_kv_clj_vm).
+
+-behaviour(gen_server).
+
+-define(MAX_ANON_FUNS, 25).
+
+%% API
+-export([start_link/1, dispatch/4, blocking_dispatch/3, reload/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {manager, ctx, next_funid=1, anon_funs=[]}).
+
+start_link(Manager) ->
+ gen_server:start_link(?MODULE, [Manager], []).
+
+dispatch(VMPid, Requestor, JobId, JSCall) ->
+ gen_server:cast(VMPid, {dispatch, Requestor, JobId, JSCall}).
+
+blocking_dispatch(VMPid, JobId, JSCall) ->
+ gen_server:call(VMPid, {dispatch, JobId, JSCall}, 10000).
+
+reload(VMPid) ->
+ gen_server:cast(VMPid, reload).
+
+init([Manager]) ->
+ error_logger:info_msg("Clojure VM starting (~p)~n",
+ [self()]),
+ riak_kv_clj_manager:add_to_manager(),
+ erlang:monitor(process, Manager),
+ {ok, #state{manager=Manager}}.
+
+% Reduce Function Call
+handle_call({dispatch, _JobId, {{cljanon, JS}, Reduced, _Arg}}, _From, State) ->
+ Reply = {ok, call_clojure(red, Reduced, JS)},
+ {reply, Reply, State};
+%% Pre-commit hook with named function
+% handle_call({dispatch, _JobId, {{jsfun, JS}, Obj}}, _From, #state{ctx=Ctx}=State) ->
+% {reply, invoke_js(Ctx, JS, [riak_object:to_json(Obj)]), State};
+handle_call(_Request, _From, State) ->
+ {reply, ignore, State}.
+
+handle_cast(reload, State) ->
+ % init_context(Ctx),
+ error_logger:info_msg("Clojure VM host reloaded (~p)~n", [self()]),
+ {noreply, State};
+
+% Map Function Call
+handle_cast({dispatch, Requestor, _JobId, {Sender, {map, {cljanon, JS}, _Arg, _Acc},
+ Value,
+ _KeyData, _BKey}}, State) ->
+ Result = {ok, call_clojure(map, riak_object:get_value(Value), JS)},
+ case Result of
+ {ok, ReturnValue} ->
+ riak_core_vnode:reply(Sender, {mapexec_reply, ReturnValue, Requestor}),
+ {noreply, State};
+ ErrorResult ->
+ riak_core_vnode:reply(Sender, {mapexec_error_noretry, Requestor, ErrorResult}),
+ {noreply, State}
+ end;
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _MRef, _Type, Manager, _Info}, #state{manager=Manager}=State) ->
+ {stop, normal, State#state{manager=undefined}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ error_logger:info_msg("Clojure VM host stopping (~p)~n", [self()]),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Internal functions
+
+call_clojure(Command, Val, JS) ->
+ % io:format("Calling Clojure: ~p ~p ~p~n", [Command, Val, JS]),
+ {cljmbox, 'clj2@mrhyde'} ! {{pid, self()}, {command, Command}, {'r-value', Val},
+ {'clj-map-fn', JS}},
+ receive
+ {clj, Msg} ->
+ % io:format("Clojure Reply: ~p~n", [Msg]),
+ Msg
+ end.
+
+% read_config(Param, Default) ->
+% case app_helper:get_env(riak_kv, Param, 8) of
+% N when is_integer(N) ->
+% N;
+% _ ->
+% Default
+% end.

0 comments on commit a4f2259

Please sign in to comment.
Something went wrong with that request. Please try again.