Skip to content

Commit

Permalink
clojure m/r queries
Browse files Browse the repository at this point in the history
  • Loading branch information
videlalvaro committed Aug 20, 2010
1 parent 34208dd commit 4e3bc9e
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 1 deletion.
3 changes: 3 additions & 0 deletions ebin/riak_kv.app
Expand Up @@ -13,6 +13,9 @@
riak_kv_backend,
riak_kv_bitcask_backend,
riak_kv_cache_backend,
riak_kv_clj_manager,
riak_kv_clj_sup,
riak_kv_clj_vm,
riak_kv_console,
riak_kv_delete,
riak_kv_dets_backend,
Expand Down
41 changes: 41 additions & 0 deletions src/riak_kv_mapred_json.erl
Expand Up @@ -230,6 +230,47 @@ parse_step(<<"javascript">>, StepDef) ->
_ ->
{ok, {jsanon, Source}}
end;
parse_step(<<"clojure">>, StepDef) ->
Source = proplists:get_value(<<"source">>, StepDef),
Name = proplists:get_value(<<"name">>, StepDef),
Bucket = proplists:get_value(<<"bucket">>, StepDef),
Key = proplists:get_value(<<"key">>, StepDef),
case Source of
undefined ->
case Name of
undefined ->
case Bucket of
undefined ->
{error, ["No function specified in Javascript phase:\n"
" ",mochijson2:encode({struct,StepDef}),
"\n\nFunctions may be specified by:\n"
" - a \"source\" field, with source for"
" a Javascript function\n"
" - a \"name\" field, naming a predefined"
" Javascript function\n"
" - \"bucket\" and \"key\" fields,"
" specifying a Riak object containing"
" Javascript function source\n"]};
_ ->
case Key of
undefined ->
{error, ["Javascript phase was missing a"
" \"key\" field to match the \"bucket\""
" field, pointing to the function"
" to evaluate in:"
" ",mochijson2:encode(
{struct,StepDef}),
"\n"]};
_ ->
{ok, {cljanon, {Bucket, Key}}}
end
end;
_ ->
{ok, {cljfun, Name}}
end;
_ ->
{ok, {cljanon, Source}}
end;
parse_step(<<"erlang">>, StepDef) ->
case bin_to_atom(proplists:get_value(<<"module">>, StepDef)) of
{ok, Module} ->
Expand Down
11 changes: 11 additions & 0 deletions src/riak_kv_mapred_query.erl
Expand Up @@ -100,6 +100,17 @@ check_query_syntax([QTerm={QTermType, QueryFun, Misc, Acc}|Rest], Accum) when is
end;
{jsfun, JS} when is_binary(JS) ->
{phase_mod(T), phase_behavior(T, QueryFun, Acc), [{javascript, QTerm}]};
{cljanon, JS} when is_binary(JS) ->
{phase_mod(T), phase_behavior(T, QueryFun, Acc), [{clojure, QTerm}]};
{cljanon, {Bucket, Key}} when is_binary(Bucket),
is_binary(Key) ->
case fetch_js(Bucket, Key) of
{ok, JS} ->
{phase_mod(T), phase_behavior(T, QueryFun, Acc), [{clojure,
{T, {jsanon, JS}, Misc, Acc}}]};
_ ->
{bad_qterm, QTerm}
end;
_ ->
{bad_qterm, QTerm}
end
Expand Down
9 changes: 9 additions & 0 deletions src/riak_kv_reduce_phase.erl
Expand Up @@ -96,6 +96,15 @@ perform_reduce({Lang,{reduce,FunTerm,Arg,_Acc}},
{ok, [riak_kv_mapred_json:dejsonify_not_found(Datum) || Datum <- Data]};
Data ->
Data
end;
{clojure, _} ->
case riak_kv_clj_manager:blocking_dispatch({FunTerm,
[riak_kv_mapred_json:jsonify_not_found(R) || R <- Reduced],
Arg}) of
{ok, Data} when is_list(Data) ->
{ok, [riak_kv_mapred_json:dejsonify_not_found(Datum) || Datum <- Data]};
Data ->
Data
end
end
catch _:R ->
Expand Down
11 changes: 10 additions & 1 deletion src/riak_kv_sup.erl
Expand Up @@ -62,6 +62,13 @@ init([]) ->
RiakJsSup = {riak_kv_js_sup,
{riak_kv_js_sup, start_link, []},
permanent, infinity, supervisor, [riak_kv_js_sup]},
RiakCljMgr = {riak_kv_clj_manager,
{riak_kv_clj_manager, start_link,
[app_helper:get_env(riak_kv, js_vm_count, 0)]},
permanent, 30000, worker, [riak_kv_clj_manager]},
RiakCljSup = {riak_kv_clj_sup,
{riak_kv_clj_sup, start_link, []},
permanent, infinity, supervisor, [riak_kv_clj_sup]},
% Figure out which processes we should run...
IsPbConfigured = (app_helper:get_env(riak_kv, pb_ip) /= undefined)
andalso (app_helper:get_env(riak_kv, pb_port) /= undefined),
Expand All @@ -74,7 +81,9 @@ init([]) ->
?IF(IsPbConfigured, RiakPb, []),
?IF(IsStatEnabled, RiakStat, []),
RiakJsSup,
RiakJsMgr
RiakJsMgr,
RiakCljSup,
RiakCljMgr
]),

% Run the proesses...
Expand Down
16 changes: 16 additions & 0 deletions src/riak_kv_vnode.erl
Expand Up @@ -434,6 +434,22 @@ do_map({javascript, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData,
end;
CV ->
{ok, CV}
end;
do_map({clojure, {map, FunTerm, Arg, _}=QTerm}, BKey, Mod, ModState, KeyData, Cache, _VNode, Sender) ->
CacheKey = build_key(FunTerm, Arg, KeyData),
CacheVal = cache_fetch(BKey, CacheKey, Cache),
case CacheVal of
not_cached ->
case Mod:get(ModState, BKey) of
{ok, Binary} ->
V = binary_to_term(Binary),
riak_kv_clj_manager:dispatch({Sender, QTerm, V, KeyData, BKey}),
map_executing;
{error, notfound} ->
{error, notfound}
end;
CV ->
{ok, CV}
end.

build_key({modfun, CMod, CFun}, Arg, KeyData) ->
Expand Down

0 comments on commit 4e3bc9e

Please sign in to comment.