Skip to content

Commit

Permalink
Add NIF versions of recvfrom/sendto
Browse files Browse the repository at this point in the history
sendto is untested and disabled. Make sockets non-blocking for NIF
sendto/recvfrom. Cache atoms and clean up corresponding error functions.
  • Loading branch information
msantos committed May 29, 2010
1 parent 73d8247 commit 194feb8
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 18 deletions.
137 changes: 120 additions & 17 deletions c_src/procket.c
Expand Up @@ -35,8 +35,23 @@


#define BACKLOG 5 #define BACKLOG 5


static ERL_NIF_TERM error_tuple(ErlNifEnv *env, char *atom, char *err); static ERL_NIF_TERM error_tuple(ErlNifEnv *env, char *err);
static ERL_NIF_TERM error_message(ErlNifEnv *env, char *atom, char *err, char *msg); static ERL_NIF_TERM error_message(ErlNifEnv *env, char *err, char *msg);

static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_nodata;


static int
load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{
atom_ok = enif_make_atom(env, "ok");
atom_error = enif_make_atom(env, "error");
atom_nodata = enif_make_atom(env, "nodata");

return (0);
}




static ERL_NIF_TERM static ERL_NIF_TERM
Expand All @@ -54,20 +69,20 @@ nif_open(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])


sock_fd = socket(PF_LOCAL, SOCK_STREAM, 0); sock_fd = socket(PF_LOCAL, SOCK_STREAM, 0);
if (sock_fd < 0) if (sock_fd < 0)
return error_message(env, "error", "socket", strerror(errno)); return error_message(env, "socket", strerror(errno));


flags = fcntl(sock_fd, F_GETFL, 0); flags = fcntl(sock_fd, F_GETFL, 0);
flags |= O_NONBLOCK; flags |= O_NONBLOCK;
(void)fcntl(sock_fd, F_SETFL, flags); (void)fcntl(sock_fd, F_SETFL, flags);


if (bind(sock_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) if (bind(sock_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
return error_message(env, "error", "bind", strerror(errno)); return error_message(env, "bind", strerror(errno));


if (listen(sock_fd, BACKLOG) < 0) if (listen(sock_fd, BACKLOG) < 0)
return error_message(env, "error", "listen", strerror(errno)); return error_message(env, "listen", strerror(errno));


return enif_make_tuple(env, 2, return enif_make_tuple(env, 2,
enif_make_atom(env, "ok"), atom_ok,
enif_make_int(env, sock_fd)); enif_make_int(env, sock_fd));
} }


Expand All @@ -87,17 +102,17 @@ nif_poll(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])


fd = accept(sock_fd, (struct sockaddr *)&sa, &socklen); fd = accept(sock_fd, (struct sockaddr *)&sa, &socklen);
if (fd < 0) if (fd < 0)
return error_message(env, "error", "accept", strerror(errno)); return error_message(env, "accept", strerror(errno));


if (ancil_recv_fd(fd, &s) < 0) { if (ancil_recv_fd(fd, &s) < 0) {
(void)close (fd); (void)close (fd);
return error_message(env, "error", "recvmsg", strerror(errno)); return error_message(env, "recvmsg", strerror(errno));
} }


(void)close (fd); (void)close (fd);


return enif_make_tuple(env, 2, return enif_make_tuple(env, 2,
enif_make_atom(env, "ok"), atom_ok,
enif_make_int(env, s)); enif_make_int(env, s));
} }


Expand All @@ -119,28 +134,112 @@ nif_close(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return enif_make_badarg(env); return enif_make_badarg(env);


if (unlink(sa.sun_path) < 0) if (unlink(sa.sun_path) < 0)
return error_message(env, "error", "unlink", strerror(errno)); return error_message(env, "unlink", strerror(errno));


(void)close(sockfd); (void)close(sockfd);


return enif_make_atom(env, "ok"); return atom_ok;
}


/* 0: socket, 1: length */
static ERL_NIF_TERM
nif_recvfrom(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
{
int sockfd = -1;
int len = 0;
ErlNifBinary buf;

if (!enif_get_int(env, argv[0], &sockfd))
return enif_make_badarg(env);
if (!enif_get_int(env, argv[1], &len))
return enif_make_badarg(env);

if (!enif_alloc_binary(env, len, &buf))
return error_tuple(env, "out_of_memory");

if (recvfrom(sockfd, buf.data, buf.size, 0, NULL, NULL) == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
return atom_nodata;
default:
return error_tuple(env, strerror(errno));
}
}

return enif_make_tuple(env, 2,
atom_ok,
enif_make_binary(env, &buf));
}


/* 0: socket, 1: buffer, 2: flags, 3: address
* 4: port, 5: family
*/
static ERL_NIF_TERM
nif_sendto(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
{
int sockfd = -1;
int flags = 0;
char address[1024]; /* XXX IPv4 */
struct in_addr in;
struct sockaddr_in sa = {0};
in_port_t port = 0;
int family = 0;

ErlNifBinary buf;

if (!enif_get_int(env, argv[0], &sockfd))
return enif_make_badarg(env);

if (!enif_inspect_iolist_as_binary(env, argv[1], &buf))
return enif_make_badarg(env);

if (!enif_get_int(env, argv[2], &flags))
return enif_make_badarg(env);

if (enif_get_string(env, argv[3], address, sizeof(address), ERL_NIF_LATIN1) < 1)
return enif_make_badarg(env);

if (!enif_get_int(env, argv[4], (int *)&port))
return enif_make_badarg(env);

if (!enif_get_int(env, argv[5], &family))
return enif_make_badarg(env);

if (inet_aton(address, &in) < 0)
return enif_make_badarg(env);

sa.sin_family = family;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = in.s_addr;

if (sendto(sockfd, buf.data, buf.size, flags, (struct sockaddr *)&sa, sizeof(sa)) == -1)
return enif_make_tuple(env, 2,
atom_error,
enif_make_tuple(env, 2,
enif_make_int(env, errno),
enif_make_string(env, strerror(errno), ERL_NIF_LATIN1)));

return atom_ok;
} }




static ERL_NIF_TERM static ERL_NIF_TERM
error_tuple(ErlNifEnv *env, char *atom, char *err) error_tuple(ErlNifEnv *env, char *err)
{ {
return enif_make_tuple(env, 2, return enif_make_tuple(env, 2,
enif_make_atom(env, atom), atom_error,
enif_make_atom(env, err)); enif_make_atom(env, err));
} }




static ERL_NIF_TERM static ERL_NIF_TERM
error_message(ErlNifEnv *env, char *atom, char *err, char *msg) error_message(ErlNifEnv *env, char *err, char *msg)
{ {
return enif_make_tuple(env, 2, return enif_make_tuple(env, 2,
enif_make_atom(env, atom), atom_error,
enif_make_tuple(env, 2, enif_make_tuple(env, 2,
enif_make_atom(env, err), enif_make_atom(env, err),
enif_make_string(env, msg, ERL_NIF_LATIN1))); enif_make_string(env, msg, ERL_NIF_LATIN1)));
Expand All @@ -150,9 +249,13 @@ error_message(ErlNifEnv *env, char *atom, char *err, char *msg)
static ErlNifFunc nif_funcs[] = { static ErlNifFunc nif_funcs[] = {
{"open", 1, nif_open}, {"open", 1, nif_open},
{"poll", 1, nif_poll}, {"poll", 1, nif_poll},
{"close", 2, nif_close} {"close", 2, nif_close},
/*
{"sendto", 6, nif_sendto},
*/
{"recvfrom", 2, nif_recvfrom}
}; };


ERL_NIF_INIT(procket, nif_funcs, NULL, NULL, NULL, NULL) ERL_NIF_INIT(procket, nif_funcs, load, NULL, NULL, NULL)




5 changes: 5 additions & 0 deletions c_src/procket_cmd.c
Expand Up @@ -137,11 +137,16 @@ procket_parse_address(PROCKET_STATE *ps)
procket_open_socket(PROCKET_STATE *ps) procket_open_socket(PROCKET_STATE *ps)
{ {
struct sockaddr_in sa = { 0 }; struct sockaddr_in sa = { 0 };
int flags = 0;




if ( (ps->s = socket(ps->family, ps->type, ps->protocol)) < 0) if ( (ps->s = socket(ps->family, ps->type, ps->protocol)) < 0)
return (-1); return (-1);


flags = fcntl(ps->s, F_GETFL, 0);
flags |= O_NONBLOCK;
(void)fcntl(ps->s, F_SETFL, flags);

/* Erlang assumes the socket has already been bound */ /* Erlang assumes the socket has already been bound */
if ( (ps->protocol == IPPROTO_TCP) || (ps->protocol == IPPROTO_UDP)) { if ( (ps->protocol == IPPROTO_TCP) || (ps->protocol == IPPROTO_UDP)) {
sa.sin_family = ps->family; sa.sin_family = ps->family;
Expand Down
11 changes: 10 additions & 1 deletion src/procket.erl
Expand Up @@ -30,7 +30,10 @@
%% POSSIBILITY OF SUCH DAMAGE. %% POSSIBILITY OF SUCH DAMAGE.
-module(procket). -module(procket).


-export([init/0,open/1,poll/1,close/2,listen/1,listen/2]). -export([
init/0,open/1,poll/1,close/2,listen/1,listen/2,
recvfrom/2,sendto/6
]).
-export([make_args/2,progname/0]). -export([make_args/2,progname/0]).


-on_load(on_load/0). -on_load(on_load/0).
Expand All @@ -51,6 +54,12 @@ poll(_) ->
close(_,_) -> close(_,_) ->
erlang:error(not_implemented). erlang:error(not_implemented).


recvfrom(_,_) ->
erlang:error(not_implemented).

sendto(_,_,_,_,_,_) ->
erlang:error(not_implemented).

listen(Port) -> listen(Port) ->
listen(Port, []). listen(Port, []).
listen(Port, Options) when is_integer(Port), is_list(Options) -> listen(Port, Options) when is_integer(Port), is_list(Options) ->
Expand Down

0 comments on commit 194feb8

Please sign in to comment.