Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
gearman_worker:start* now accepts a list of worker modules rather tha…
Browse files Browse the repository at this point in the history
…n a list of functions. The worker modules should export functions/0 which returns a list of functions.
  • Loading branch information
samuel committed Jul 10, 2009
1 parent 4203669 commit 4106db8
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 23 deletions.
1 change: 0 additions & 1 deletion gearman.hrl

This file was deleted.

2 changes: 1 addition & 1 deletion gearman_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ start_link() ->
stop(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, stop).

connect(Pid, Host) when is_list(Host) ->
connect(Pid, Host) when is_pid(Pid), is_list(Host) ->
connect(Pid, {Host, ?DEFAULT_PORT});
connect(Pid, {Host, Port}) when is_pid(Pid) ->
gen_server:call(Pid, {connect, Host, Port}).
Expand Down
4 changes: 2 additions & 2 deletions gearman_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ parse_command(Data) when size(Data) >= 12 ->
$Q -> request;
$S -> response
end,
{ArgData, NewPacket} = split_binary(Rest, DataLength),
{ArgData, LeftOverData} = split_binary(Rest, DataLength),
Command = parse_command(CommandID, binary_to_list(ArgData)),
{ok, NewPacket, Type, Command};
{ok, LeftOverData, Type, Command};
true ->
{error, not_enough_data}
end;
Expand Down
6 changes: 2 additions & 4 deletions gearman_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@

-export([test/0, test_worker/0]).

-include_lib("gearman.hrl").

test_worker() ->
gearman_worker:start({"127.0.0.1"}, [{"echo", fun(Task) -> Task#task.arg end}, {"fail", fun(Task) -> throw(Task#task.arg) end}]).
gearman_worker:start({"127.0.0.1"}, [{"echo", fun(_Handle, _Func, Arg) -> Arg end}, {"fail", fun(_Handle, _Func, Arg) -> throw(Arg) end}]).

test() ->
gearman_worker:start({"127.0.0.1"}, [{"echo", fun(Task) -> Task#task.arg end}, {"fail", fun(Task) -> throw(Task#task.arg) end}]),
gearman_worker:start({"127.0.0.1"}, [{"echo", fun(_Handle, _Func, Arg) -> Arg end}, {"fail", fun(_Handle, _Func, Arg) -> throw(Arg) end}]),
{ok, P} = gearman_connection:connect({"127.0.0.1"}),
receive
{P, connected} -> void;
Expand Down
35 changes: 20 additions & 15 deletions gearman_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,28 @@
%% fsm events
-export([working/2, sleeping/2, dead/2]).

-include_lib("gearman.hrl").
-record(state, {connection, modules, functions}).

-record(state, {connection, functions}).
start_link(Server, WorkerModules) ->
gen_fsm:start_link(?MODULE, {self(), Server, WorkerModules}, []).

start_link(Server, Functions) ->
gen_fsm:start_link(?MODULE, {self(), Server, Functions}, []).

start(Server, Functions) ->
gen_fsm:start(?MODULE, {self(), Server, Functions}, []).
start(Server, WorkerModules) ->
gen_fsm:start(?MODULE, {self(), Server, WorkerModules}, []).

%% gen_server callbacks

init({_PidMaster, Server, Functions}) ->
% process_flag(trap_exit, true),
init({_PidMaster, Server, WorkerModules}) ->
Functions = get_functions(WorkerModules),
{ok, Connection} = gearman_connection:start_link(),
gearman_connection:connect(Connection, Server),
{ok, dead, #state{connection=Connection, functions=Functions}}.
{ok, dead, #state{connection=Connection, modules=WorkerModules, functions=Functions}}.

get_functions(Modules) ->
get_functions(Modules, []).
get_functions([], Functions) ->
lists:flatten(Functions);
get_functions([Module|Modules], Functions) ->
get_functions(Modules, [Functions, Module:functions()]).

%% Private Callbacks

Expand Down Expand Up @@ -63,7 +68,7 @@ working({Connection, command, no_job}, #state{connection=Connection} = State) ->
gearman_connection:send_request(Connection, pre_sleep, {}),
{next_state, sleeping, State, 15*1000};
working({Connection, command, {job_assign, Handle, Func, Arg}}, #state{connection=Connection, functions=Functions} = State) ->
try dispatch_function(Functions, Func, #task{handle=Handle, func=Func, arg=Arg}) of
try dispatch_function(Functions, Func, Arg, Handle) of
{ok, Result} ->
gearman_connection:send_request(Connection, work_complete, {Handle, Result});
{error, _Reason} ->
Expand All @@ -90,15 +95,15 @@ dead(Event, State) ->

%%%

dispatch_function([], _Func, _Task) ->
dispatch_function([], _Func, _Arg, _Handle) ->
{error, invalid_function};
dispatch_function([{Name, Function}|Functions], Func, Task) ->
dispatch_function([{Name, Function}|Functions], Func, Arg, Handle) ->
if
Name == Func ->
Res = Function(Task),
Res = Function(Handle, Func, Arg),
{ok, Res};
true ->
dispatch_function(Functions, Func, Task)
dispatch_function(Functions, Func, Arg, Handle)
end.

register_functions(_Connection, []) ->
Expand Down

0 comments on commit 4106db8

Please sign in to comment.