Skip to content

Commit 0ee4e89

Browse files
authored
feature: raw request downstream socket support for stream servers. #a290cac
1 parent a9e8565 commit 0ee4e89

8 files changed

+447
-25
lines changed

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,13 @@ is ignored and the raw request socket is always returned. Unlike `ngx_http_lua_m
301301
you can still call output API functions like `ngx.say`, `ngx.print`, and `ngx.flush`
302302
after acquiring the raw request socket via this function.
303303

304-
Raw request socket returned by this module will contain the following extra method:
304+
When stream server is in UDP mode, reading from the downstream socket returned by the
305+
`ngx.req.socket` call will only return the content of a single packet. Therefore
306+
the reading call will never block and will return `nil, "no more data"` when all the
307+
data from the datagram has been consumed. However, you may choose to send multiple UDP
308+
packets back to the client using the downstream socket.
309+
310+
Raw TCP request socket returned by this module will contain the following extra method:
305311

306312
tcpsock:shutdown
307313
----------------

src/ngx_stream_lua_output.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ ngx_stream_lua_ngx_echo(lua_State *L, unsigned newline)
6363
return luaL_error(L, "no request ctx found");
6464
}
6565

66+
if (r->connection->type == SOCK_DGRAM) {
67+
return luaL_error(L, "API disabled in the current context");
68+
}
69+
6670
ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT
6771
| NGX_STREAM_LUA_CONTEXT_PREREAD);
6872

@@ -468,6 +472,10 @@ ngx_stream_lua_ngx_flush(lua_State *L)
468472
return luaL_error(L, "no request ctx found");
469473
}
470474

475+
if (r->connection->type == SOCK_DGRAM) {
476+
return luaL_error(L, "API disabled in the current context");
477+
}
478+
471479
ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT
472480
| NGX_STREAM_LUA_CONTEXT_PREREAD);
473481

@@ -587,6 +595,10 @@ ngx_stream_lua_ngx_eof(lua_State *L)
587595
return 2;
588596
}
589597

598+
if (r->connection->type == SOCK_DGRAM) {
599+
return luaL_error(L, "API disabled in the current context");
600+
}
601+
590602
ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT
591603
| NGX_STREAM_LUA_CONTEXT_PREREAD);
592604

src/ngx_stream_lua_socket_tcp.c

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ static int ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L);
101101
static ngx_int_t ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len,
102102
ngx_stream_lua_socket_compiled_pattern_t *cp, ngx_log_t *log);
103103
static int ngx_stream_lua_socket_cleanup_compiled_pattern(lua_State *L);
104-
static int ngx_stream_lua_req_socket(lua_State *L);
105104
static void ngx_stream_lua_req_socket_rev_handler(ngx_stream_lua_request_t *r);
106105
static int ngx_stream_lua_socket_tcp_getreusedtimes(lua_State *L);
107106
static int ngx_stream_lua_socket_tcp_setkeepalive(lua_State *L);
@@ -368,14 +367,6 @@ ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
368367
}
369368

370369

371-
void
372-
ngx_stream_lua_inject_req_socket_api(lua_State *L)
373-
{
374-
lua_pushcfunction(L, ngx_stream_lua_req_socket);
375-
lua_setfield(L, -2, "socket");
376-
}
377-
378-
379370
static int
380371
ngx_stream_lua_socket_tcp(lua_State *L)
381372
{
@@ -4255,8 +4246,8 @@ ngx_stream_lua_socket_cleanup_compiled_pattern(lua_State *L)
42554246
}
42564247

42574248

4258-
static int
4259-
ngx_stream_lua_req_socket(lua_State *L)
4249+
int
4250+
ngx_stream_lua_req_socket_tcp(lua_State *L)
42604251
{
42614252
int n, raw;
42624253
ngx_peer_connection_t *pc;

src/ngx_stream_lua_socket_tcp.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ typedef struct {
148148

149149

150150
void ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L);
151-
void ngx_stream_lua_inject_req_socket_api(lua_State *L);
152151
void ngx_stream_lua_cleanup_conn_pools(lua_State *L);
152+
int ngx_stream_lua_req_socket_tcp(lua_State *L);
153153

154154

155155
#endif /* _NGX_STREAM_LUA_SOCKET_TCP_H_INCLUDED_ */

src/ngx_stream_lua_socket_udp.c

Lines changed: 168 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ enum {
7373

7474
static char ngx_stream_lua_socket_udp_metatable_key;
7575
static char ngx_stream_lua_udp_udata_metatable_key;
76+
static char ngx_stream_lua_socket_udp_raw_req_socket_metatable_key;
77+
static char ngx_stream_lua_socket_udp_downstream_udata_metatable_key;
7678
static u_char ngx_stream_lua_socket_udp_buffer[UDP_MAX_DATAGRAM_SIZE];
7779

7880

@@ -84,7 +86,7 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L)
8486
lua_pushcfunction(L, ngx_stream_lua_socket_udp);
8587
lua_setfield(L, -2, "udp"); /* ngx socket */
8688

87-
/* udp socket object metatable */
89+
/* udp upstream socket object metatable */
8890
lua_pushlightuserdata(L, &ngx_stream_lua_socket_udp_metatable_key);
8991
lua_createtable(L, 0 /* narr */, 6 /* nrec */);
9092

@@ -108,14 +110,41 @@ ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L)
108110
lua_rawset(L, LUA_REGISTRYINDEX);
109111
/* }}} */
110112

111-
/* udp socket object metatable */
113+
/* udp downstream socket object metatable */
114+
lua_pushlightuserdata(L, &ngx_stream_lua_socket_udp_raw_req_socket_metatable_key);
115+
lua_createtable(L, 0 /* narr */, 4 /* nrec */);
116+
117+
lua_pushcfunction(L, ngx_stream_lua_socket_udp_send);
118+
lua_setfield(L, -2, "send");
119+
120+
lua_pushcfunction(L, ngx_stream_lua_socket_udp_receive);
121+
lua_setfield(L, -2, "receive");
122+
123+
lua_pushcfunction(L, ngx_stream_lua_socket_udp_settimeout);
124+
lua_setfield(L, -2, "settimeout"); /* ngx socket mt */
125+
126+
lua_pushvalue(L, -1);
127+
lua_setfield(L, -2, "__index");
128+
lua_rawset(L, LUA_REGISTRYINDEX);
129+
/* }}} */
130+
131+
/* udp upstream socket object metatable */
112132
lua_pushlightuserdata(L, &ngx_stream_lua_udp_udata_metatable_key);
113133
lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
114134
lua_pushcfunction(L, ngx_stream_lua_socket_udp_upstream_destroy);
115135
lua_setfield(L, -2, "__gc");
116136
lua_rawset(L, LUA_REGISTRYINDEX);
117137
/* }}} */
118138

139+
/* udp downstream socket object metatable */
140+
lua_pushlightuserdata(L, &ngx_stream_lua_socket_udp_downstream_udata_metatable_key);
141+
lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
142+
/* share the same destructor as upstream */
143+
lua_pushcfunction(L, ngx_stream_lua_socket_udp_upstream_destroy);
144+
lua_setfield(L, -2, "__gc");
145+
lua_rawset(L, LUA_REGISTRYINDEX);
146+
/* }}} */
147+
119148
lua_pop(L, 1);
120149
}
121150

@@ -890,7 +919,7 @@ ngx_stream_lua_socket_udp_send(lua_State *L)
890919

891920
dd("sending query %.*s", (int) query.len, query.data);
892921

893-
n = ngx_send(u->udp_connection.connection, query.data, query.len);
922+
n = ngx_udp_send(u->udp_connection.connection, query.data, query.len);
894923

895924
dd("ngx_send returns %d (query len %d)", (int) n, (int) query.len);
896925

@@ -988,7 +1017,28 @@ ngx_stream_lua_socket_udp_receive(lua_State *L)
9881017
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
9891018
"lua udp socket receive buffer size: %uz", u->recv_buf_size);
9901019

991-
rc = ngx_stream_lua_socket_udp_read(r, u);
1020+
if (u->raw_downstream) {
1021+
if (ngx_buf_size(r->connection->buffer) > 0) {
1022+
/* we still have unread data */
1023+
u->received = ngx_min((size_t) ngx_buf_size(r->connection->buffer),
1024+
u->recv_buf_size);
1025+
ngx_memcpy(ngx_stream_lua_socket_udp_buffer,
1026+
r->connection->buffer->pos, u->received);
1027+
r->connection->buffer->pos += u->received;
1028+
1029+
ngx_stream_lua_socket_udp_handle_success(r, u);
1030+
1031+
rc = NGX_OK;
1032+
1033+
} else {
1034+
lua_pushnil(L);
1035+
lua_pushliteral(L, "no more data");
1036+
return 2;
1037+
}
1038+
1039+
} else {
1040+
rc = ngx_stream_lua_socket_udp_read(r, u);
1041+
}
9921042

9931043
if (rc == NGX_ERROR) {
9941044
dd("read failed: %d", (int) u->ft_type);
@@ -1104,7 +1154,11 @@ ngx_stream_lua_socket_udp_finalize(ngx_stream_lua_request_t *r,
11041154
u->resolved->ctx = NULL;
11051155
}
11061156

1107-
if (u->udp_connection.connection) {
1157+
/*
1158+
* do not close if it is a downstream connection as that will
1159+
* be handled by stream subsystem itself
1160+
*/
1161+
if (u->udp_connection.connection && !u->raw_downstream) {
11081162
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
11091163
"lua close socket connection");
11101164

@@ -1617,4 +1671,113 @@ ngx_stream_lua_udp_socket_cleanup(void *data)
16171671
ngx_stream_lua_socket_udp_finalize(u->request, u);
16181672
}
16191673

1674+
1675+
int
1676+
ngx_stream_lua_req_socket_udp(lua_State *L)
1677+
{
1678+
ngx_stream_lua_udp_connection_t *pc;
1679+
ngx_stream_lua_srv_conf_t *lscf;
1680+
ngx_connection_t *c;
1681+
ngx_stream_lua_request_t *r;
1682+
ngx_stream_lua_ctx_t *ctx;
1683+
1684+
ngx_stream_lua_cleanup_t *cln;
1685+
ngx_stream_lua_co_ctx_t *coctx;
1686+
1687+
ngx_stream_lua_socket_udp_upstream_t *u;
1688+
1689+
r = ngx_stream_lua_get_req(L);
1690+
1691+
ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
1692+
if (ctx == NULL) {
1693+
return luaL_error(L, "no ctx found");
1694+
}
1695+
1696+
ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT
1697+
|NGX_STREAM_LUA_CONTEXT_PREREAD);
1698+
1699+
c = r->connection;
1700+
1701+
if (c->buffered) {
1702+
lua_pushnil(L);
1703+
lua_pushliteral(L, "pending data to write");
1704+
return 2;
1705+
}
1706+
1707+
dd("ctx acquired raw req socket: %d", ctx->acquired_raw_req_socket);
1708+
1709+
if (ctx->acquired_raw_req_socket) {
1710+
lua_pushnil(L);
1711+
lua_pushliteral(L, "duplicate call");
1712+
return 2;
1713+
}
1714+
1715+
ctx->acquired_raw_req_socket = 1;
1716+
1717+
lua_createtable(L, 3 /* narr */, 1 /* nrec */); /* the object */
1718+
lua_pushlightuserdata(L, &ngx_stream_lua_socket_udp_raw_req_socket_metatable_key);
1719+
lua_rawget(L, LUA_REGISTRYINDEX);
1720+
lua_setmetatable(L, -2);
1721+
1722+
u = lua_newuserdata(L, sizeof(ngx_stream_lua_socket_udp_upstream_t));
1723+
if (u == NULL) {
1724+
return luaL_error(L, "no memory");
1725+
}
1726+
1727+
#if 1
1728+
lua_pushlightuserdata(L, &ngx_stream_lua_socket_udp_downstream_udata_metatable_key);
1729+
lua_rawget(L, LUA_REGISTRYINDEX);
1730+
lua_setmetatable(L, -2);
1731+
#endif
1732+
1733+
lua_rawseti(L, 1, SOCKET_CTX_INDEX);
1734+
1735+
ngx_memzero(u, sizeof(ngx_stream_lua_socket_udp_upstream_t));
1736+
1737+
u->raw_downstream = 1;
1738+
1739+
coctx = ctx->cur_co_ctx;
1740+
1741+
u->request = r;
1742+
1743+
lscf = ngx_stream_lua_get_module_srv_conf(r, ngx_stream_lua_module);
1744+
1745+
u->conf = lscf;
1746+
1747+
u->read_timeout = u->conf->read_timeout;
1748+
1749+
cln = ngx_stream_lua_cleanup_add(r, 0);
1750+
if (cln == NULL) {
1751+
u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
1752+
lua_pushnil(L);
1753+
lua_pushliteral(L, "no memory");
1754+
return 2;
1755+
}
1756+
1757+
cln->handler = ngx_stream_lua_socket_udp_cleanup;
1758+
cln->data = u;
1759+
u->cleanup = &cln->handler;
1760+
1761+
pc = &u->udp_connection;
1762+
pc->log = *c->log;
1763+
pc->connection = c;
1764+
1765+
dd("setting data to %p", u);
1766+
1767+
coctx->data = u;
1768+
ctx->downstream = u;
1769+
1770+
if (c->read->timer_set) {
1771+
ngx_del_timer(c->read);
1772+
}
1773+
1774+
if (c->write->timer_set) {
1775+
ngx_del_timer(c->write);
1776+
}
1777+
1778+
lua_settop(L, 1);
1779+
return 1;
1780+
}
1781+
1782+
16201783
/* vi:set ft=c ts=4 sw=4 et fdm=marker: */

src/ngx_stream_lua_socket_udp.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,22 +42,24 @@ struct ngx_stream_lua_socket_udp_upstream_s {
4242
ngx_stream_lua_request_t *request;
4343
ngx_stream_lua_udp_connection_t udp_connection;
4444

45-
ngx_msec_t read_timeout;
45+
ngx_msec_t read_timeout;
4646

4747
ngx_stream_upstream_resolved_t *resolved;
4848

49-
ngx_uint_t ft_type;
50-
ngx_err_t socket_errno;
51-
size_t received; /* for receive */
52-
size_t recv_buf_size;
49+
ngx_uint_t ft_type;
50+
ngx_err_t socket_errno;
51+
size_t received; /* for receive */
52+
size_t recv_buf_size;
5353

5454
ngx_stream_lua_co_ctx_t *co_ctx;
5555

56-
unsigned waiting; /* :1 */
56+
unsigned waiting:1;
57+
unsigned raw_downstream:1;
5758
};
5859

5960

6061
void ngx_stream_lua_inject_socket_udp_api(ngx_log_t *log, lua_State *L);
62+
int ngx_stream_lua_req_socket_udp(lua_State *L);
6163

6264

6365
#endif /* _NGX_STREAM_LUA_SOCKET_UDP_H_INCLUDED_ */

0 commit comments

Comments
 (0)