diff --git a/src/Std/Internal/Async/TCP.lean b/src/Std/Internal/Async/TCP.lean index bf08cfd5dc15..70d8524647cd 100644 --- a/src/Std/Internal/Async/TCP.lean +++ b/src/Std/Internal/Async/TCP.lean @@ -118,12 +118,19 @@ Connects the client socket to the given address. def connect (s : Client) (addr : SocketAddress) : Async Unit := Async.ofPromise <| s.native.connect addr +/-- +Sends multiple data buffers through the client socket. +-/ +@[inline] +def sendAll (s : Client) (data : Array ByteArray) : Async Unit := + Async.ofPromise <| s.native.send data + /-- Sends data through the client socket. -/ @[inline] def send (s : Client) (data : ByteArray) : Async Unit := - Async.ofPromise <| s.native.send data + Async.ofPromise <| s.native.send #[data] /-- Receives data from the client socket. If data is received, it’s wrapped in .some. If EOF is reached, diff --git a/src/Std/Internal/Async/UDP.lean b/src/Std/Internal/Async/UDP.lean index f1d9c9c6ce1c..dde60e4bc83a 100644 --- a/src/Std/Internal/Async/UDP.lean +++ b/src/Std/Internal/Async/UDP.lean @@ -61,13 +61,21 @@ automatically sent to that destination. def connect (s : Socket) (addr : SocketAddress) : IO Unit := s.native.connect addr +/-- +Sends multiple data buffers through an UDP socket. The `addr` parameter specifies the destination +address. If `addr` is `none`, the data is sent to the default peer address set by `connect`. +-/ +@[inline] +def sendAll (s : Socket) (data : Array ByteArray) (addr : Option SocketAddress := none) : Async Unit := + Async.ofPromise <| s.native.send data addr + /-- Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr` is `none`, the data is sent to the default peer address set by `connect`. -/ @[inline] def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : Async Unit := - Async.ofPromise <| s.native.send data addr + Async.ofPromise <| s.native.send #[data] addr /-- Receives data from an UDP socket. `size` is for the maximum bytes to receive. diff --git a/src/Std/Internal/UV/TCP.lean b/src/Std/Internal/UV/TCP.lean index 7e7dae5e07ca..9f84acc01fff 100644 --- a/src/Std/Internal/UV/TCP.lean +++ b/src/Std/Internal/UV/TCP.lean @@ -47,7 +47,7 @@ opaque connect (socket : @& Socket) (addr : @& SocketAddress) : IO (IO.Promise ( Sends data through a TCP socket. -/ @[extern "lean_uv_tcp_send"] -opaque send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit)) +opaque send (socket : @& Socket) (data : Array ByteArray) : IO (IO.Promise (Except IO.Error Unit)) /-- Receives data from a TCP socket with a maximum size of size bytes. The promise resolves when data is diff --git a/src/Std/Internal/UV/UDP.lean b/src/Std/Internal/UV/UDP.lean index 163a6798996d..66b54269dc67 100644 --- a/src/Std/Internal/UV/UDP.lean +++ b/src/Std/Internal/UV/UDP.lean @@ -55,7 +55,7 @@ Sends data through an UDP socket. The `addr` parameter specifies the destination is `none`, the data is sent to the default peer address set by `connect`. -/ @[extern "lean_uv_udp_send"] -opaque send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) +opaque send (socket : @& Socket) (data : Array ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) /-- Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise diff --git a/src/runtime/uv/tcp.cpp b/src/runtime/uv/tcp.cpp index 30f76609cbab..ca728b37910c 100644 --- a/src/runtime/uv/tcp.cpp +++ b/src/runtime/uv/tcp.cpp @@ -22,6 +22,7 @@ typedef struct { lean_object* promise; lean_object* data; lean_object* socket; + uv_buf_t* bufs; } tcp_send_data; // ======================================= @@ -164,14 +165,31 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_ return lean_io_result_mk_ok(promise); } -/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */ -extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data, obj_arg /* w */) { +/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : Array ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data_array, obj_arg /* w */) { lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket); - size_t data_len = lean_sarray_size(data); - char* data_str = (char*)lean_sarray_cptr(data); + size_t array_len = lean_array_size(data_array); + + if (array_len == 0) { + lean_dec(data_array); + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + lean_promise_resolve_with_code(0, promise); + + return lean_io_result_mk_ok(promise); + } - uv_buf_t buf = uv_buf_init(data_str, data_len); + // Allocate buffer array for uv_write + uv_buf_t* bufs = (uv_buf_t*)malloc(array_len * sizeof(uv_buf_t)); + + for (size_t i = 0; i < array_len; i++) { + lean_object* byte_array = lean_array_get_core(data_array, i); + size_t data_len = lean_sarray_size(byte_array); + char* data_str = (char*)lean_sarray_cptr(byte_array); + bufs[i] = uv_buf_init(data_str, data_len); + } lean_object* promise = lean_promise_new(); mark_mt(promise); @@ -181,8 +199,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d tcp_send_data* send_data = (tcp_send_data*)write_uv->data; send_data->promise = promise; - send_data->data = data; + send_data->data = data_array; send_data->socket = socket; + send_data->bufs = bufs; // These objects are going to enter the loop and be owned by it lean_inc(promise); @@ -190,7 +209,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d event_loop_lock(&global_ev); - int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, &buf, 1, [](uv_write_t* req, int status) { + int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, bufs, array_len, [](uv_write_t* req, int status) { tcp_send_data* tup = (tcp_send_data*) req->data; lean_promise_resolve_with_code(status, tup->promise); @@ -199,6 +218,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d lean_dec(tup->data); lean_dec(tup->socket); + free(req->bufs); free(req->data); free(req); }); @@ -209,8 +229,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d lean_dec(promise); // The structure does not own it. lean_dec(promise); // We are not going to return it. lean_dec(socket); - lean_dec(data); - + lean_dec(data_array); + free(bufs); + free(write_uv->data); free(write_uv); @@ -385,7 +406,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, ob tcp_socket->m_byte_array = nullptr; } - lean_dec((lean_object*)tcp_socket); + lean_dec(socket); event_loop_unlock(&global_ev); return lean_io_result_mk_ok(lean_box(0)); @@ -709,4 +730,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_keepalive(b_obj_arg socket, int3 } #endif -} +} \ No newline at end of file diff --git a/src/runtime/uv/tcp.h b/src/runtime/uv/tcp.h index 385156ffc908..4b82e08f5277 100644 --- a/src/runtime/uv/tcp.h +++ b/src/runtime/uv/tcp.h @@ -43,7 +43,7 @@ static inline lean_uv_tcp_socket_object* lean_to_uv_tcp_socket(lean_object* o) { extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_new(obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */); -extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data_array, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_wait_readable(b_obj_arg socket, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, obj_arg /* w */); diff --git a/src/runtime/uv/udp.cpp b/src/runtime/uv/udp.cpp index d0d63a131c94..890055a33d3d 100644 --- a/src/runtime/uv/udp.cpp +++ b/src/runtime/uv/udp.cpp @@ -16,6 +16,7 @@ typedef struct { lean_object *promise; lean_object *data; lean_object *socket; + uv_buf_t* bufs; } udp_send_data; void lean_uv_udp_socket_finalizer(void* ptr) { @@ -123,14 +124,30 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_ return lean_io_result_mk_ok(lean_box(0)); } -/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */ -extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) { +/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : Array ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data_array, b_obj_arg opt_addr, obj_arg /* w */) { lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket); - size_t data_len = lean_sarray_size(data); - char* data_str = (char*)lean_sarray_cptr(data); + size_t array_len = lean_array_size(data_array); + + if (array_len == 0) { + lean_dec(data_array); + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + lean_promise_resolve_with_code(0, promise); + + return lean_io_result_mk_ok(promise); + } + + uv_buf_t* bufs = (uv_buf_t*)malloc(array_len * sizeof(uv_buf_t)); - uv_buf_t buf = uv_buf_init(data_str, data_len); + for (size_t i = 0; i < array_len; i++) { + lean_object* byte_array = lean_array_get_core(data_array, i); + size_t data_len = lean_sarray_size(byte_array); + char* data_str = (char*)lean_sarray_cptr(byte_array); + bufs[i] = uv_buf_init(data_str, data_len); + } lean_object* promise = lean_promise_new(); mark_mt(promise); @@ -140,8 +157,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d udp_send_data* send_data = (udp_send_data*)send_uv->data; send_data->promise = promise; - send_data->data = data; + send_data->data = data_array; send_data->socket = socket; + send_data->bufs = bufs; // These objects are going to enter the loop and be owned by it lean_inc(promise); @@ -157,7 +175,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d event_loop_lock(&global_ev); - int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, &buf, 1, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) { + int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, bufs, array_len, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) { udp_send_data* tup = (udp_send_data*) req->data; lean_promise_resolve_with_code(status, tup->promise); @@ -165,6 +183,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d lean_dec(tup->socket); lean_dec(tup->data); + free(req->bufs); free(req->data); free(req); }); @@ -179,7 +198,8 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d lean_dec(promise); // The structure does not own it. lean_dec(promise); // We are not going to return it. lean_dec(socket); // The loop does not own the object. - lean_dec(data); // The data is owned. + lean_dec(data_array); // The data is owned. + free(bufs); free(send_uv->data); free(send_uv); @@ -344,10 +364,12 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_wait_readable(b_obj_arg socket, extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, obj_arg /* w */) { lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket); + lean_inc(socket); event_loop_lock(&global_ev); if (udp_socket->m_promise_read == nullptr) { event_loop_unlock(&global_ev); + lean_dec(socket); return lean_io_result_mk_ok(lean_box(0)); } @@ -358,14 +380,14 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, ob udp_socket->m_promise_read = nullptr; lean_object* byte_array = udp_socket->m_byte_array; + if (byte_array != nullptr) { lean_dec(byte_array); udp_socket->m_byte_array = nullptr; } - lean_dec((lean_object*)udp_socket); - event_loop_unlock(&global_ev); + lean_dec(socket); return lean_io_result_mk_ok(lean_box(0)); } @@ -601,4 +623,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32 } #endif -} +} \ No newline at end of file diff --git a/src/runtime/uv/udp.h b/src/runtime/uv/udp.h index 7cff0fbcf463..91744272b6d9 100644 --- a/src/runtime/uv/udp.h +++ b/src/runtime/uv/udp.h @@ -41,7 +41,7 @@ static inline lean_uv_udp_socket_object* lean_to_uv_udp_socket(lean_object * o) extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */); -extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data_array, b_obj_arg opt_addr, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_wait_readable(b_obj_arg socket, obj_arg /* w */); extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, obj_arg /* w */);