Skip to content

Loading…

Issue #48 fix #49

Closed
wants to merge 5 commits into from

2 participants

@okeuday

Adds option {active_pid, pid()}

@okeuday

Not sure why those old commits are there. I only care about the last (recent) commit.

@yrashk yrashk closed this
@yrashk yrashk referenced this pull request
Merged

Issue #48 fix #50

@yrashk yrashk reopened this
@yrashk
The ZeroMQ project member

Can you rewrite/rebase this pullreq so it only contains the last commit, please?

@okeuday

I don't see a good way to do this, but would try if it was possible. I don't seem to have access to the pull request, to make modifications to it.

@yrashk
The ZeroMQ project member

You can reopen it as a new one based off another branch, I guess?

@okeuday

Yeah, I will redo the pull request for the master branch.

@okeuday okeuday closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 8, 2011
  1. @okeuday

    Use proper size_t.

    okeuday committed
Commits on Apr 9, 2011
  1. @okeuday
Commits on Mar 26, 2012
  1. @okeuday
Commits on Feb 9, 2013
  1. @okeuday
  2. @okeuday

    Add {active_pid, pid()} option to socket/2, which implies {active, tr…

    okeuday committed
    …ue}. The change provides a controlling_process concept for active mode sockets (so that the code doesn't assume self() is always the controlling process).
Showing with 22 additions and 11 deletions.
  1. +10 −3 c_src/erlzmq_nif.c
  2. +9 −4 src/erlzmq.erl
  3. +2 −2 src/erlzmq_nif.erl
  4. +1 −2 test/erlzmq_test.erl
View
13 c_src/erlzmq_nif.c
@@ -51,6 +51,7 @@ typedef struct erlzmq_socket {
int64_t socket_index;
void * socket_zmq;
int active;
+ ErlNifPid active_pid;
ErlNifMutex * mutex;
} erlzmq_socket_t;
@@ -114,7 +115,7 @@ static ERL_NIF_TERM return_zmq_errno(ErlNifEnv* env, int const value);
static ErlNifFunc nif_funcs[] =
{
{"context", 1, erlzmq_nif_context},
- {"socket", 3, erlzmq_nif_socket},
+ {"socket", 4, erlzmq_nif_socket},
{"bind", 2, erlzmq_nif_bind},
{"connect", 2, erlzmq_nif_connect},
{"setsockopt", 3, erlzmq_nif_setsockopt},
@@ -179,6 +180,7 @@ NIF(erlzmq_nif_socket)
erlzmq_context_t * context;
int socket_type;
int active;
+ ErlNifPid active_pid;
if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_context,
(void **) &context)) {
@@ -193,6 +195,10 @@ NIF(erlzmq_nif_socket)
return enif_make_badarg(env);
}
+ if (! enif_get_local_pid(env, argv[3], &active_pid)) {
+ return enif_make_badarg(env);
+ }
+
erlzmq_socket_t * socket = enif_alloc_resource(erlzmq_nif_resource_socket,
sizeof(erlzmq_socket_t));
assert(socket);
@@ -203,6 +209,7 @@ NIF(erlzmq_nif_socket)
return return_zmq_errno(env, zmq_errno());
}
socket->active = active;
+ socket->active_pid = active_pid;
socket->mutex = enif_mutex_create("erlzmq_socket_t_mutex");
assert(socket->mutex);
@@ -785,7 +792,7 @@ static void * polling_thread(void * handle)
{
enif_mutex_unlock(r->data.recv.socket->mutex);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
- enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
+ enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
enif_make_tuple3(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
@@ -820,7 +827,7 @@ static void * polling_thread(void * handle)
flags_list = enif_make_list(r->data.recv.env, 0);
}
- enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
+ enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
View
13 src/erlzmq.erl
@@ -89,7 +89,8 @@ context(Threads) when is_integer(Threads) ->
-spec socket(Context :: erlzmq_context(),
Type :: erlzmq_socket_type() |
list(erlzmq_socket_type() |
- {active, boolean()})) ->
+ {active, boolean()} |
+ {active_pid, pid()})) ->
{ok, erlzmq_socket()} |
erlzmq_error().
socket(Context, Type) when is_atom(Type) ->
@@ -108,14 +109,18 @@ socket(Context, [H | [Type]]) when is_tuple(H) ->
-spec socket(Context :: erlzmq_context(),
Type :: erlzmq_socket_type(),
- {active, boolean()}) ->
+ {active, boolean()} | {active_pid, pid()}) ->
{ok, erlzmq_socket()} |
erlzmq_error().
socket(Context, Type, {active, true}) ->
true = (Type =/= pub) and (Type =/= push) and (Type =/= xpub),
- erlzmq_nif:socket(Context, socket_type(Type), 1);
+ erlzmq_nif:socket(Context, socket_type(Type), 1, self());
+socket(Context, Type, {active_pid, Pid})
+ when is_pid(Pid), node(Pid) =:= node() ->
+ true = (Type =/= pub) and (Type =/= push) and (Type =/= xpub),
+ erlzmq_nif:socket(Context, socket_type(Type), 1, Pid);
socket(Context, Type, {active, false}) ->
- erlzmq_nif:socket(Context, socket_type(Type), 0).
+ erlzmq_nif:socket(Context, socket_type(Type), 0, self()).
%% @doc Accept connections on a socket.
View
4 src/erlzmq_nif.erl
@@ -2,7 +2,7 @@
-module(erlzmq_nif).
-export([context/1,
- socket/3,
+ socket/4,
bind/2,
connect/2,
send/3,
@@ -37,7 +37,7 @@ init() ->
context(_Threads) ->
erlang:nif_error(not_loaded).
-socket(_Context, _Type, _Active) ->
+socket(_Context, _Type, _Active, _ActivePid) ->
erlang:nif_error(not_loaded).
bind(_Socket, _Endpoint) ->
View
3 test/erlzmq_test.erl
@@ -30,7 +30,6 @@ hwm_loop(N, S) ->
?assertMatch({error, _} ,erlzmq:send(S, <<"test">>, [noblock])),
hwm_loop(N-1, S).
-
pair_inproc_test() ->
basic_tests("inproc://tester", pair, pair, active),
basic_tests("inproc://tester", pair, pair, passive).
@@ -76,7 +75,7 @@ shutdown_stress_loop(N) ->
?assertMatch(ok, erlzmq:term(C)),
shutdown_stress_loop(N-1).
-shutdown_no_blocking_test() ->
+shutdown_test() ->
{ok, C} = erlzmq:context(),
{ok, S} = erlzmq:socket(C, [pub, {active, false}]),
erlzmq:close(S),
Something went wrong with that request. Please try again.