Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/Std/Internal/Async/TCP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,19 @@ Connects the client socket to the given address.
def connect (s : Client) (addr : SocketAddress) : Async Unit :=
Async.ofPromise <| s.native.connect addr

/--
Sends multiple data buffers through the client socket.
-/
@[inline]
def sendAll (s : Client) (data : Array ByteArray) : Async Unit :=
Async.ofPromise <| s.native.send data

/--
Sends data through the client socket.
-/
@[inline]
def send (s : Client) (data : ByteArray) : Async Unit :=
Async.ofPromise <| s.native.send data
Async.ofPromise <| s.native.send #[data]

/--
Receives data from the client socket. If data is received, it’s wrapped in .some. If EOF is reached,
Expand Down
10 changes: 9 additions & 1 deletion src/Std/Internal/Async/UDP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,21 @@ automatically sent to that destination.
def connect (s : Socket) (addr : SocketAddress) : IO Unit :=
s.native.connect addr

/--
Sends multiple data buffers through an UDP socket. The `addr` parameter specifies the destination
address. If `addr` is `none`, the data is sent to the default peer address set by `connect`.
-/
@[inline]
def sendAll (s : Socket) (data : Array ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
Async.ofPromise <| s.native.send data addr

/--
Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr`
is `none`, the data is sent to the default peer address set by `connect`.
-/
@[inline]
def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : Async Unit :=
Async.ofPromise <| s.native.send data addr
Async.ofPromise <| s.native.send #[data] addr

/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive.
Expand Down
2 changes: 1 addition & 1 deletion src/Std/Internal/UV/TCP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ opaque connect (socket : @& Socket) (addr : @& SocketAddress) : IO (IO.Promise (
Sends data through a TCP socket.
-/
@[extern "lean_uv_tcp_send"]
opaque send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit))
opaque send (socket : @& Socket) (data : Array ByteArray) : IO (IO.Promise (Except IO.Error Unit))

/--
Receives data from a TCP socket with a maximum size of size bytes. The promise resolves when data is
Expand Down
2 changes: 1 addition & 1 deletion src/Std/Internal/UV/UDP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Sends data through an UDP socket. The `addr` parameter specifies the destination
is `none`, the data is sent to the default peer address set by `connect`.
-/
@[extern "lean_uv_udp_send"]
opaque send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit))
opaque send (socket : @& Socket) (data : Array ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit))

/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise
Expand Down
43 changes: 32 additions & 11 deletions src/runtime/uv/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ typedef struct {
lean_object* promise;
lean_object* data;
lean_object* socket;
uv_buf_t* bufs;
} tcp_send_data;

// =======================================
Expand Down Expand Up @@ -164,14 +165,31 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_
return lean_io_result_mk_ok(promise);
}

/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data, obj_arg /* w */) {
/* Std.Internal.UV.TCP.Socket.send (socket : @& Socket) (data : Array ByteArray) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data_array, obj_arg /* w */) {
lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket);

size_t data_len = lean_sarray_size(data);
char* data_str = (char*)lean_sarray_cptr(data);
size_t array_len = lean_array_size(data_array);

if (array_len == 0) {
lean_dec(data_array);

lean_object* promise = lean_promise_new();
mark_mt(promise);
lean_promise_resolve_with_code(0, promise);

return lean_io_result_mk_ok(promise);
}

uv_buf_t buf = uv_buf_init(data_str, data_len);
// Allocate buffer array for uv_write
uv_buf_t* bufs = (uv_buf_t*)malloc(array_len * sizeof(uv_buf_t));

for (size_t i = 0; i < array_len; i++) {
lean_object* byte_array = lean_array_get_core(data_array, i);
size_t data_len = lean_sarray_size(byte_array);
char* data_str = (char*)lean_sarray_cptr(byte_array);
bufs[i] = uv_buf_init(data_str, data_len);
}

lean_object* promise = lean_promise_new();
mark_mt(promise);
Expand All @@ -181,16 +199,17 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d

tcp_send_data* send_data = (tcp_send_data*)write_uv->data;
send_data->promise = promise;
send_data->data = data;
send_data->data = data_array;
send_data->socket = socket;
send_data->bufs = bufs;

// These objects are going to enter the loop and be owned by it
lean_inc(promise);
lean_inc(socket);

event_loop_lock(&global_ev);

int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, &buf, 1, [](uv_write_t* req, int status) {
int result = uv_write(write_uv, (uv_stream_t*)tcp_socket->m_uv_tcp, bufs, array_len, [](uv_write_t* req, int status) {
tcp_send_data* tup = (tcp_send_data*) req->data;

lean_promise_resolve_with_code(status, tup->promise);
Expand All @@ -199,6 +218,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
lean_dec(tup->data);
lean_dec(tup->socket);

free(req->bufs);
free(req->data);
free(req);
});
Expand All @@ -209,8 +229,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
lean_dec(promise); // The structure does not own it.
lean_dec(promise); // We are not going to return it.
lean_dec(socket);
lean_dec(data);

lean_dec(data_array);
free(bufs);

free(write_uv->data);
free(write_uv);

Expand Down Expand Up @@ -385,7 +406,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, ob
tcp_socket->m_byte_array = nullptr;
}

lean_dec((lean_object*)tcp_socket);
lean_dec(socket);

event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(lean_box(0));
Expand Down Expand Up @@ -709,4 +730,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_keepalive(b_obj_arg socket, int3
}

#endif
}
}
2 changes: 1 addition & 1 deletion src/runtime/uv/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static inline lean_uv_tcp_socket_object* lean_to_uv_tcp_socket(lean_object* o) {

extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_new(obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg data_array, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_wait_readable(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, obj_arg /* w */);
Expand Down
44 changes: 33 additions & 11 deletions src/runtime/uv/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ typedef struct {
lean_object *promise;
lean_object *data;
lean_object *socket;
uv_buf_t* bufs;
} udp_send_data;

void lean_uv_udp_socket_finalizer(void* ptr) {
Expand Down Expand Up @@ -123,14 +124,30 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_
return lean_io_result_mk_ok(lean_box(0));
}

/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) {
/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : Array ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data_array, b_obj_arg opt_addr, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);

size_t data_len = lean_sarray_size(data);
char* data_str = (char*)lean_sarray_cptr(data);
size_t array_len = lean_array_size(data_array);

if (array_len == 0) {
lean_dec(data_array);

lean_object* promise = lean_promise_new();
mark_mt(promise);
lean_promise_resolve_with_code(0, promise);

return lean_io_result_mk_ok(promise);
}

uv_buf_t* bufs = (uv_buf_t*)malloc(array_len * sizeof(uv_buf_t));

uv_buf_t buf = uv_buf_init(data_str, data_len);
for (size_t i = 0; i < array_len; i++) {
lean_object* byte_array = lean_array_get_core(data_array, i);
size_t data_len = lean_sarray_size(byte_array);
char* data_str = (char*)lean_sarray_cptr(byte_array);
bufs[i] = uv_buf_init(data_str, data_len);
}

lean_object* promise = lean_promise_new();
mark_mt(promise);
Expand All @@ -140,8 +157,9 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d

udp_send_data* send_data = (udp_send_data*)send_uv->data;
send_data->promise = promise;
send_data->data = data;
send_data->data = data_array;
send_data->socket = socket;
send_data->bufs = bufs;

// These objects are going to enter the loop and be owned by it
lean_inc(promise);
Expand All @@ -157,14 +175,15 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d

event_loop_lock(&global_ev);

int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, &buf, 1, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) {
int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, bufs, array_len, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) {
udp_send_data* tup = (udp_send_data*) req->data;
lean_promise_resolve_with_code(status, tup->promise);

lean_dec(tup->promise);
lean_dec(tup->socket);
lean_dec(tup->data);

free(req->bufs);
free(req->data);
free(req);
});
Expand All @@ -179,7 +198,8 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg d
lean_dec(promise); // The structure does not own it.
lean_dec(promise); // We are not going to return it.
lean_dec(socket); // The loop does not own the object.
lean_dec(data); // The data is owned.
lean_dec(data_array); // The data is owned.
free(bufs);

free(send_uv->data);
free(send_uv);
Expand Down Expand Up @@ -344,10 +364,12 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_wait_readable(b_obj_arg socket,
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);

lean_inc(socket);
event_loop_lock(&global_ev);

if (udp_socket->m_promise_read == nullptr) {
event_loop_unlock(&global_ev);
lean_dec(socket);
return lean_io_result_mk_ok(lean_box(0));
}

Expand All @@ -358,14 +380,14 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, ob
udp_socket->m_promise_read = nullptr;

lean_object* byte_array = udp_socket->m_byte_array;

if (byte_array != nullptr) {
lean_dec(byte_array);
udp_socket->m_byte_array = nullptr;
}

lean_dec((lean_object*)udp_socket);

event_loop_unlock(&global_ev);
lean_dec(socket);

return lean_io_result_mk_ok(lean_box(0));
}
Expand Down Expand Up @@ -601,4 +623,4 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32
}

#endif
}
}
2 changes: 1 addition & 1 deletion src/runtime/uv/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static inline lean_uv_udp_socket_object* lean_to_uv_udp_socket(lean_object * o)
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data_array, b_obj_arg opt_addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_wait_readable(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_cancel_recv(b_obj_arg socket, obj_arg /* w */);
Expand Down
Loading