Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

file 122 lines (108 sloc) 4.103 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
-module(jobqueue_gearman).
-author('Samuel Stauffer <samuel@lefora.com>').

-export([functions/0]).

-include_lib("jobqueue.hrl").

functions() ->
[{"jobqueue", fun dispatcher/3}].

%%

dispatcher(_Handle, _Function, Argument) ->
    Zlib = zlib:open(),
    {ok, Args} = decode_args(Zlib, Argument),
    Method = list_to_atom(binary_to_list(table_lookup(Args, "method"))),
    {obj, Params} = table_lookup(Args, "params"),
    Response = dispatch(Method, Params),
    EncResponse = encode_args(Zlib, Response),
    zlib:close(Zlib),
    EncResponse.

dispatch(Method, Params) ->
    try service(Method, Params) of
        {ok, Result} ->
            format_result(Result);
        {error, ErrorType, ErrorMessage} ->
            format_error(ErrorType, ErrorMessage)
    catch
        error:{badmatch, _} ->
            format_error(<<"DispatchError">>, <<"Invalid set of params">>);
        Exc1:_Exc2 ->
            format_error(list_to_binary([atom_to_list(Exc1), <<":XXX">>]), "Fail") % TODO: include Exc2
    end.

format_result(Result) ->
    {obj, [{<<"error">>, null}, {<<"result">>, Result}]}.

format_error(ErrorType, ErrorMessage) ->
    {obj, [
        {<<"result">>, null},
        {<<"error">>,
            {obj, [
                {<<"type">>, ErrorType},
                {<<"message">>, ErrorMessage}
            ]}}
    ]}.

service(stats, _Params) ->
    {ok, objectify(jobqueue:stats())};
service(insert_job, Params) ->
    Func = table_lookup(Params, "func"),
    Arg = table_lookup(Params, "arg"),
    UniqKey = table_lookup(Params, "uniqkey", ""),
    AvailableAfter = table_lookup(Params, "available_after", 0),
    Priority = table_lookup(Params, "priority", 0),
    {ok, JobID} = jobqueue:insert_job(Func, Arg, UniqKey, AvailableAfter, Priority),
    {ok, {obj, [{"handle", JobID}]}};
service(find_jobs, Params) ->
    Funcs = table_lookup(Params, "funcs"),
    Count = table_lookup(Params, "count"),
    Timeout = table_lookup(Params, "timeout", 0),
    case jobqueue:find_jobs(Funcs, Count, Timeout) of
        [] ->
            {ok, []};
        Jobs when is_list(Jobs) ->
            {ok, [{obj, [
                {"handle", Job#job.job_id},
                {"func", Job#job.func},
                {"arg", Job#job.arg},
                {"failures", Job#job.failures}]} || Job <- Jobs]}
    end;
service(job_completed, Params) ->
    Handle = table_lookup(Params, "handle"),
    case jobqueue:job_completed(Handle) of
        ok ->
            {ok, null};
        Else ->
            {error, <<"JobQueueError">>, list_to_binary(atom_to_list(Else))}
    end;
service(job_failed, Params) ->
    Handle = table_lookup(Params, "handle"),
    Reason = table_lookup(Params, "reason"),
    DelayRetry = table_lookup(Params, "delay_retry", 0),
    case jobqueue:job_failed(Handle, Reason, DelayRetry) of
        ok ->
            {ok, null};
        Else ->
            {error, <<"JobQueueError">>, list_to_binary(atom_to_list(Else))}
    end;
service(_Method, _Params) ->
    {error, <<"DispatchError">>, <<"Unknown method">>}.

%% Utility functions

objectify(Atom) when is_atom(Atom) -> atom_to_list(Atom);
objectify(List) when is_list(List) -> {obj, objectify_list(List)};
objectify(Tuple) when is_tuple(Tuple) -> list_to_tuple(objectify_list(tuple_to_list(Tuple)));
objectify(Other) -> Other.
objectify_list([]) -> [];
objectify_list([Head|Rest]) -> [objectify(Head)|objectify_list(Rest)].

table_lookup(Table, Key) -> {Key, Value} = proplists:lookup(Key, Table), Value.
table_lookup(Table, Key, Default) -> proplists:get_value(Key, Table, Default).

decode_args(Zlib, Data) ->
    ok = zlib:inflateInit(Zlib),
    Data2 = list_to_binary(zlib:inflate(Zlib, Data)),
    zlib:inflateEnd(Zlib),
    case rfc4627:decode(Data2) of
        {ok, {obj, Args}, _} ->
            {ok, Args};
        _ ->
            throw("Received invalid json object for function arguments")
    end.

encode_args(Zlib, Data) ->
    EncData = rfc4627:encode(Data),
    zlib:deflateInit(Zlib),
    CompData = list_to_binary(zlib:deflate(Zlib, EncData, finish)),
    zlib:deflateEnd(Zlib),
    CompData.
Something went wrong with that request. Please try again.