Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

UDP support

  • Loading branch information...
commit 24d2aef0619ceb04f57bab9bb294be0ee17b5fd7 1 parent 3e88201
@rphillips rphillips authored
View
221 lib/luvit/dgram.lua
@@ -0,0 +1,221 @@
+--[[
+
+Copyright 2012 The Luvit Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS-IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+--]]
+
+-- Ported from node's dgram.js.
+
+local dns = require('dns')
+local net = require('net')
+local Udp = require('uv').Udp
+local Emitter = require('core').Emitter
+
+local dgram = {}
+
+local function lookup(address, family, callback)
+ local matchedFamily = net.isIP(address)
+ if matchedFamily then
+ return callback(nil, address, matchedFamily)
+ end
+ return dns.lookup(address, family, callback)
+end
+
+local function lookup4(address, callback)
+ return lookup(address or '0.0.0.0', 4, callback)
+end
+
+local function lookup6(address, callback)
+ return lookup(address or '::0', 6, callback)
+end
+
+local function newHandle(family)
+ if family == 'udp4' then
+ local handle = Udp:new()
+ handle.lookup = lookup4
+ handle.bind = handle.bind
+ handle.send = handle.send
+ return handle
+ end
+
+ if family == 'udp6' then
+ local handle = Udp:new()
+ handle.lookup = lookup6
+ handle.bind = handle.bind6
+ handle.send = handle.send6
+ return handle
+ end
+
+ error('Bad socket type specified. Valid types are: udp4, udp6')
+end
+
+--[[ Socket ]]--
+
+local Socket = Emitter:extend()
+
+dgram.Socket = Socket
+
+function Socket:initialize(family, listener)
+ self._handle = newHandle(family)
+ self._receiving = false
+ self._bound = false
+ self._family = family
+
+ self:_initEmitters()
+
+ if type(listener) == 'function' then
+ self:on('message', listener)
+ end
+end
+
+function Socket:_initEmitters()
+ self._handle:on('close', function(msg, rinfo)
+ self._handle = nil
+ self:emit('close')
+ end)
+
+ self._handle:on('message', function(msg, rinfo)
+ self:emit('message', msg, rinfo)
+ end)
+
+ self._handle:on('error', function(err)
+ self:emit('error', err)
+ end)
+end
+
+function dgram.createSocket(family, listener)
+ return Socket:new(family, listener)
+end
+
+function Socket:bind(port, address)
+ self:_healthCheck()
+
+ self._handle.lookup(address, function(err, ip)
+ if err then
+ process.nextTick(function()
+ self:emit('error', err)
+ end)
+ return
+ end
+
+ self._handle:bind(ip, port or 0)
+ self._bound = true
+ self:_startReceiving()
+ self:emit('listening')
+ end)
+end
+
+function Socket:send(msg, port, address, callback)
+ self:_healthCheck()
+ self:_startReceiving()
+
+ self._handle.lookup(address, function(err, ip)
+ if err then
+ if callback then callback(err) end
+ self:emit('error', err)
+ return
+ end
+
+ self._handle:send(msg, port, address, callback)
+ end)
+end
+
+function Socket:close()
+ self:_healthCheck()
+ self:_stopReceiving()
+ self._handle:close()
+end
+
+function Socket:address()
+ self:_healthCheck()
+
+ return self._handle:getsockname()
+end
+
+function Socket:setBroadcast(opt)
+ self._handle:setBroadcast(opt and 1 or 0)
+end
+
+function Socket:setTTL(opt)
+ self._handle:setTTL(opt)
+end
+
+function Socket:setMulticastTTL(opt)
+ self._handle:setMulticastTTL(opt)
+end
+
+function Socket:setMulticastLoopback(opt)
+ self._handle:setMulticastLoopback(opt and 1 or 0)
+end
+
+function Socket:setMembership(multicastAddress, interfaceAddress, op)
+ self:_healthCheck()
+
+ if not multicastAddress then
+ error("multicast address must be specified")
+ end
+
+ if not multicastInterface then
+ if self._family == 'udp4' then
+ multicastInterface = '0.0.0.0'
+ elseif self._family == 'udp6' then
+ multicastInterface = '::0'
+ end
+ end
+
+ self._handle:setMembership(multicastAddress, multicastInterface, op)
+end
+
+function Socket:addMembership(multicastAddress, interfaceAddress)
+ self:setMembership(multicastAddress, interfaceAddress, 'join')
+end
+
+function Socket:dropMembership(multicastAddress, interfaceAddress)
+ self:setMembership(multicastAddress, interfaceAddress, 'leave')
+end
+
+function Socket:_healthCheck()
+ if not self._handle then
+ error('self._handle uninitialized')
+ end
+end
+
+function Socket:_startReceiving()
+ if self._receiving then
+ return
+ end
+
+ if not self._bound then
+ self:bind()
+
+ if not self._bound then
+ error('implicit bind failed')
+ end
+ end
+
+ self._handle:recvStart()
+ self._receiving = true
+end
+
+function Socket:_stopReceiving()
+ if not self._receiving then
+ return
+ end
+
+ self._handle:recvStop()
+ self._receiving = false
+end
+
+return dgram
View
12 lib/luvit/uv.lua
@@ -153,6 +153,18 @@ Udp.recvStart = native.udpRecvStart
-- Udp:recvStop()
Udp.recvStop = native.udpRecvStop
+-- Udp:setBroadcast(opt)
+Udp.setBroadcast = native.udpSetBroadcast
+
+-- Udp:setTTL(opt)
+Udp.setTTL = native.udpSetTTL
+
+-- Udp:setMulticastTTL(opt)
+Udp.setMulticastTTL = native.udpSetMulticastTTL
+
+-- Udp:setMulticastLoopback(opt)
+Udp.setMulticastLoopback = native.udpSetMulticastLoopback
+
--------------------------------------------------------------------------------
local Pipe = Stream:extend()
View
1  luvit.gyp
@@ -66,6 +66,7 @@
'lib/luvit/buffer.lua',
'lib/luvit/childprocess.lua',
'lib/luvit/core.lua',
+ 'lib/luvit/dgram.lua',
'lib/luvit/dns.lua',
'lib/luvit/fiber.lua',
'lib/luvit/fs.lua',
View
4 src/luv.c
@@ -50,6 +50,10 @@ static const luaL_reg luv_f[] = {
{"udpSend6", luv_udp_send6},
{"udpRecvStart", luv_udp_recv_start},
{"udpRecvStop", luv_udp_recv_stop},
+ {"udpSetBroadcast", luv_udp_set_broadcast},
+ {"udpSetTTL", luv_udp_set_ttl},
+ {"udpSetMulticastTTL", luv_udp_set_multicast_ttl},
+ {"udpSetMulticastLoopback", luv_udp_set_multicast_loopback},
/* FS Watcher functions */
{"newFsWatcher", luv_new_fs_watcher},
View
196 src/luv_udp.c
@@ -16,30 +16,130 @@
*/
#include <stdlib.h>
+#include <assert.h>
#include "luv_portability.h"
#include "luv_udp.h"
#include "utils.h"
-void luv_on_udp_recv(uv_udp_t* handle, ssize_t nread, uv_buf_t buf, struct sockaddr* addr, unsigned flags) {
- printf("TODO: implement luv_on_udp_recv\n");
+#define X(name, fn) \
+ int luv_udp_##name(lua_State *L) { \
+ uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp"); \
+ int flag = luaL_checkint(L, 2); \
+ int rc = fn(handle, flag); \
+ if (rc) { \
+ uv_err_t err = uv_last_error(luv_get_loop(L)); \
+ return luaL_error(L, #name": %s", uv_strerror(err)); \
+ } \
+ return 0; \
+ }
+
+X(set_ttl, uv_udp_set_ttl);
+X(set_broadcast, uv_udp_set_broadcast);
+X(set_multicast_ttl, uv_udp_set_multicast_ttl);
+X(set_multicast_loopback, uv_udp_set_multicast_loop);
+
+#undef X
+
+static void luv_on_udp_recv(uv_udp_t* handle,
+ ssize_t nread,
+ uv_buf_t buf,
+ struct sockaddr* addr,
+ unsigned flags) {
+ int port;
+ char ip[INET6_ADDRSTRLEN];
+
+ /* load the lua state and the userdata */
+ lua_State *L = luv_handle_get_lua(handle->data);
+
+ if (nread == 0) {
+ return;
+ }
+
+ if (nread < 0) {
+ uv_close((uv_handle_t *)handle, luv_on_close);
+ luv_push_async_error(L, uv_last_error(luv_get_loop(L)), "on_recv", NULL);
+ luv_emit_event(L, "error", 1);
+ return;
+ }
+
+ lua_pushlstring(L, buf.base, nread);
+ lua_newtable(L);
+
+ if (addr->sa_family == AF_INET) {
+ uv_inet_ntop(AF_INET, &(((struct sockaddr_in*)addr)->sin_addr), ip, INET6_ADDRSTRLEN);
+ port = ntohs(((struct sockaddr_in*)addr)->sin_port);
+ } else if (addr->sa_family == AF_INET6){
+ uv_inet_ntop(AF_INET6, &(((struct sockaddr_in6*)addr)->sin6_addr), ip, INET6_ADDRSTRLEN);
+ port = ntohs(((struct sockaddr_in6*)addr)->sin6_port);
+ }
+
+ lua_pushstring(L, ip);
+ lua_setfield(L, -2, "address");
+ lua_pushnumber(L, port);
+ lua_setfield(L, -2, "port");
+ lua_pushboolean(L, flags == UV_UDP_PARTIAL);
+ lua_setfield(L, -2, "partial");
+ lua_pushnumber(L, nread);
+ lua_setfield(L, -2, "size");
+ luv_emit_event(L, "message", 2);
+
+ free(buf.base);
+ buf.base = NULL;
+}
+
+static void luv_on_udp_send(uv_udp_send_t* req, int status) {
+ /* load the lua state and the userdata */
+ lua_State *L = luv_handle_get_lua(req->handle->data);
+ lua_pop(L, 1); /* We don't need the userdata */
+ /* load the callback */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, (int)(req->data));
+ luaL_unref(L, LUA_REGISTRYINDEX, (int)(req->data));
+
+ if (lua_isfunction(L, -1)) {
+ if (status != 0) {
+ luv_push_async_error(L, uv_last_error(luv_get_loop(L)), "on_udp_send", NULL);
+ luv_acall(L, 1, 0, "on_udp_send");
+ } else {
+ luv_acall(L, 0, 0, "on_udp_send");
+ }
+ } else {
+ lua_pop(L, 1);
+ }
+
+ luv_handle_unref(L, req->handle->data);
+ free(req);
}
int luv_new_udp (lua_State* L) {
uv_udp_t* handle = luv_create_udp(L);
+ /* uv_udp_init memset's the handle so we need to reset the data baton */
+ void *data = handle->data;
uv_udp_init(luv_get_loop(L), handle);
+ handle->data = data;
return 1;
}
-int luv_udp_bind(lua_State* L) {
+static int luv__udp_bind(lua_State *L, int family) {
uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp");
const char* host = luaL_checkstring(L, 2);
int port = luaL_checkint(L, 3);
int flags = 0;
+ int rc = 0;
- struct sockaddr_in address = uv_ip4_addr(host, port);
+ switch (family) {
+ case AF_INET:
+ rc = uv_udp_bind(handle, uv_ip4_addr(host, port), flags);
+ break;
+ case AF_INET6:
+ rc = uv_udp_bind6(handle, uv_ip6_addr(host, port), flags);
+ break;
+ default:
+ assert(0 && "unexpected family type");
+ abort();
+ }
- if (uv_udp_bind(handle, address, flags)) {
+ if (rc) {
uv_err_t err = uv_last_error(luv_get_loop(L));
return luaL_error(L, "udp_bind: %s", uv_strerror(err));
}
@@ -47,20 +147,12 @@ int luv_udp_bind(lua_State* L) {
return 0;
}
-int luv_udp_bind6(lua_State* L) {
- uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp");
- const char* host = luaL_checkstring(L, 2);
- int port = luaL_checkint(L, 3);
- int flags = 0;
-
- struct sockaddr_in6 address = uv_ip6_addr(host, port);
-
- if (uv_udp_bind6(handle, address, flags)) {
- uv_err_t err = uv_last_error(luv_get_loop(L));
- return luaL_error(L, "udp_bind: %s", uv_strerror(err));
- }
+int luv_udp_bind(lua_State* L) {
+ return luv__udp_bind(L, AF_INET);
+}
- return 0;
+int luv_udp_bind6(lua_State* L) {
+ return luv__udp_bind(L, AF_INET6);
}
static const char *const luv_membership_options[] = {"join", "leave", NULL};
@@ -69,13 +161,12 @@ static const char *const luv_membership_options[] = {"join", "leave", NULL};
/* const char* interface_addr, uv_membership membership);*/
int luv_udp_set_membership(lua_State* L) {
uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp");
-/* const char* multicast_addr = luaL_checkstring(L, 2);*/
+ const char* multicast_addr = luaL_checkstring(L, 2);
const char* interface_addr = luaL_checkstring(L, 3);
int option = luaL_checkoption (L, 4, "membership", luv_membership_options);
uv_membership membership = option ? UV_LEAVE_GROUP : UV_JOIN_GROUP;
- /* TODO: don't use null, let user pass in */
- if (uv_udp_set_membership(handle, NULL, interface_addr, membership)) {
+ if (uv_udp_set_membership(handle, multicast_addr, interface_addr, membership)) {
uv_err_t err = uv_last_error(luv_get_loop(L));
return luaL_error(L, "udp_set_membership: %s", uv_strerror(err));
}
@@ -119,23 +210,78 @@ int luv_udp_getsockname(lua_State* L) {
return 1;
}
+static int luv_udp__send(lua_State* L, int family) {
+ uv_buf_t buf;
+ uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp");
+ size_t len;
+ const char* chunk = luaL_checklstring(L, 2, &len);
+
+ uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
+ int port = luaL_checkint(L, 3);
+ const char* host = luaL_checkstring(L, 4);
+ struct sockaddr_in dest;
+ struct sockaddr_in6 dest6;
+ int rc;
+
+ /* Store a reference to the callback */
+ lua_pushvalue(L, 5);
+ req->data = (void *)luaL_ref(L, LUA_REGISTRYINDEX);
+
+ luv_handle_ref(L, handle->data, 1);
+
+ /* Store the chunk
+ * TODO: this is probably unsafe, should investigate
+ */
+ buf = uv_buf_init((char*)chunk, len);
+
+ switch(family) {
+ case AF_INET:
+ dest = uv_ip4_addr(host, port);
+ rc = uv_udp_send(req, handle, &buf, 1, dest, luv_on_udp_send);
+ break;
+ case AF_INET6:
+ dest6 = uv_ip6_addr(host, port);
+ rc = uv_udp_send6(req, handle, &buf, 1, dest6, luv_on_udp_send);
+ break;
+ default:
+ assert(0 && "unexpected family type");
+ abort();
+ }
+
+ if (rc) {
+ uv_err_t err = uv_last_error(luv_get_loop(L));
+ return luaL_error(L, "udp_send: %s", uv_strerror(err));
+ }
+
+ return 0;
+}
+
int luv_udp_send(lua_State* L) {
- return luaL_error(L, "TODO: Implement luv_udp_send");
+ return luv_udp__send(L, AF_INET);
}
int luv_udp_send6(lua_State* L) {
- return luaL_error(L, "TODO: Implement luv_udp_send6");
+ return luv_udp__send(L, AF_INET6);
}
int luv_udp_recv_start(lua_State* L) {
uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp");
- uv_udp_recv_start(handle, luv_on_alloc, luv_on_udp_recv);
+ int rc = uv_udp_recv_start(handle, luv_on_alloc, luv_on_udp_recv);
+ if (rc && uv_last_error(luv_get_loop(L)).code != UV_EALREADY) {
+ uv_err_t err = uv_last_error(luv_get_loop(L));
+ return luaL_error(L, "udp_recv_start: %s", uv_strerror(err));
+ }
+ luv_handle_ref(L, handle->data, 1);
return 0;
}
int luv_udp_recv_stop(lua_State* L) {
uv_udp_t* handle = (uv_udp_t*)luv_checkudata(L, 1, "udp");
- uv_udp_recv_stop(handle);
+ if (uv_udp_recv_stop(handle)) {
+ uv_err_t err = uv_last_error(luv_get_loop(L));
+ return luaL_error(L, "udp_recv_stop: %s", uv_strerror(err));
+ }
+ luv_handle_unref(L, handle->data);
return 0;
}
View
4 src/luv_udp.h
@@ -33,5 +33,9 @@ int luv_udp_send(lua_State* L);
int luv_udp_send6(lua_State* L);
int luv_udp_recv_start(lua_State* L);
int luv_udp_recv_stop(lua_State* L);
+int luv_udp_set_broadcast(lua_State* L);
+int luv_udp_set_ttl(lua_State* L);
+int luv_udp_set_multicast_ttl(lua_State* L);
+int luv_udp_set_multicast_loopback(lua_State* L);
#endif
View
2  src/luvit_exports.c
@@ -7,6 +7,7 @@ const void *luvit_ugly_hack = NULL;
extern const char *luaJIT_BC_buffer[];
extern const char *luaJIT_BC_childprocess[];
extern const char *luaJIT_BC_core[];
+extern const char *luaJIT_BC_dgram[];
extern const char *luaJIT_BC_dns[];
extern const char *luaJIT_BC_fiber[];
extern const char *luaJIT_BC_fs[];
@@ -38,6 +39,7 @@ const void *luvit__suck_in_symbols(void)
(size_t)(const char *)luaJIT_BC_buffer +
(size_t)(const char *)luaJIT_BC_childprocess +
(size_t)(const char *)luaJIT_BC_core +
+ (size_t)(const char *)luaJIT_BC_dgram +
(size_t)(const char *)luaJIT_BC_dns +
(size_t)(const char *)luaJIT_BC_fiber +
(size_t)(const char *)luaJIT_BC_fs +
View
40 tests/test-dgram-multicast.lua
@@ -0,0 +1,40 @@
+--[[
+
+Copyright 2012 The Luvit Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS-IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+--]]
+
+require("helper")
+local dgram = require('dgram')
+
+local PORT = process.env.PORT or 10081
+local HOST = '127.0.0.1'
+
+local s1 = dgram.createSocket('udp4')
+local s2 = dgram.createSocket('udp4')
+
+s2:on('message', function(msg, rinfo)
+ assert(#msg == 5)
+ assert(msg == 'HELLO')
+ s2:close()
+ s1:close()
+end)
+
+s2:bind(PORT+1)
+s1:bind(PORT)
+
+s2:addMembership('224.0.0.1')
+
+s1:send('HELLO', PORT+1, '224.0.0.1')
View
62 tests/test-dgram.lua
@@ -0,0 +1,62 @@
+--[[
+
+Copyright 2012 The Luvit Authors. All Rights Reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS-IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+--]]
+
+require("helper")
+local dgram = require('dgram')
+
+local PORT = process.env.PORT or 10081
+local HOST = '127.0.0.1'
+
+local s1 = dgram.createSocket('udp4')
+local s2 = dgram.createSocket('udp4')
+
+s1:on('message', function(msg, rinfo)
+ assert(#msg == 4)
+ assert(msg == 'PING')
+ s1:send('PONG', PORT+1, HOST, function(err)
+ if err then
+ assert(err)
+ end
+ end)
+ s1:close()
+end)
+
+s2:on('message', function(msg, rinfo)
+ assert(#msg == 4)
+ assert(msg == 'PONG')
+ s2:close()
+end)
+
+s1:on('error', function(err)
+ assert(err)
+end)
+
+s2:on('error', function(err)
+ assert(err)
+end)
+
+s1:bind(PORT)
+s2:bind(PORT+1)
+
+s2:send('PING', PORT, HOST, function(err)
+ if err then
+ assert(err)
+ end
+ s1:close()
+ s2:close()
+end)
Please sign in to comment.
Something went wrong with that request. Please try again.