From 9f98f7a3d8880745b172efc2fbba8a080cf04fc1 Mon Sep 17 00:00:00 2001 From: Michael Neumann Date: Thu, 24 Jan 2013 07:25:57 -0600 Subject: [PATCH 1/5] Fix example code --- src/libcore/oldcomm.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libcore/oldcomm.rs b/src/libcore/oldcomm.rs index a5b0336ab60d9..6cc2f232bba7b 100644 --- a/src/libcore/oldcomm.rs +++ b/src/libcore/oldcomm.rs @@ -24,14 +24,14 @@ across channels. # Example ~~~ -let po = comm::Port(); -let ch = comm::Chan(po); +let po = oldcomm::Port(); +let ch = oldcomm::Chan(&po); do task::spawn { - comm::send(ch, "Hello, World"); + oldcomm::send(ch, ~"Hello, World"); } -io::println(comm::recv(p)); +io::println(oldcomm::recv(po)); ~~~ # Note From a8cc13ee633cba3e56d6dbc17e0a9b30315dd068 Mon Sep 17 00:00:00 2001 From: Michael Neumann Date: Thu, 24 Jan 2013 11:57:09 -0600 Subject: [PATCH 2/5] Convert log(debug, ...) to debug!(...) --- src/libstd/net_tcp.rs | 252 +++++++++++++++++++++--------------------- 1 file changed, 123 insertions(+), 129 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index c888b457356b6..d4a9a33e57d2b 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -155,19 +155,19 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, iotask: iotask }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); - log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); + debug!("tcp_connect result_ch %?", conn_data.result_ch); // get an unsafe representation of our stream_handle_ptr that // we can send into the interact cb to be handled in libuv.. - log(debug, fmt!("stream_handle_ptr outside interact %?", - stream_handle_ptr)); + debug!("stream_handle_ptr outside interact %?", + stream_handle_ptr); do iotask::interact(iotask) |move input_ip, loop_ptr| unsafe { - log(debug, ~"in interact cb for tcp client connect.."); - log(debug, fmt!("stream_handle_ptr in interact %?", - stream_handle_ptr)); + debug!("in interact cb for tcp client connect.."); + debug!("stream_handle_ptr in interact %?", + stream_handle_ptr); match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) { 0i32 => { - log(debug, ~"tcp_init successful"); - log(debug, ~"dealing w/ ipv4 connection.."); + debug!("tcp_init successful"); + debug!("dealing w/ ipv4 connection.."); let connect_req_ptr = ptr::addr_of(&((*socket_data_ptr).connect_req)); let addr_str = ip::format_addr(&input_ip); @@ -178,7 +178,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, // info.. should probably add an additional // rust type that actually is closer to // what the libuv API expects (ip str + port num) - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip4_addr(addr_str, port as int); uv::ll::tcp_connect( connect_req_ptr, @@ -187,7 +187,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, tcp_connect_on_connect_cb) } ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip6_addr(addr_str, port as int); uv::ll::tcp_connect6( connect_req_ptr, @@ -198,7 +198,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, }; match connect_result { 0i32 => { - log(debug, ~"tcp_connect successful"); + debug!("tcp_connect successful"); // reusable data that we'll have for the // duration.. uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -208,7 +208,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, // outcome.. uv::ll::set_data_for_req(connect_req_ptr, conn_data_ptr); - log(debug, ~"leaving tcp_connect interact cb..."); + debug!("leaving tcp_connect interact cb..."); // let tcp_connect_on_connect_cb send on // the result_ch, now.. } @@ -234,12 +234,12 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, }; match oldcomm::recv(result_po) { ConnSuccess => { - log(debug, ~"tcp::connect - received success on result_po"); + debug!("tcp::connect - received success on result_po"); result::Ok(TcpSocket(socket_data)) } ConnFailure(ref err_data) => { oldcomm::recv(closed_signal_po); - log(debug, ~"tcp::connect - received failure on result_po"); + debug!("tcp::connect - received failure on result_po"); // still have to free the malloc'd stream handle.. rustrt::rust_uv_current_kernel_free(stream_handle_ptr as *libc::c_void); @@ -344,7 +344,7 @@ pub fn read_start(sock: &TcpSocket) pub fn read_stop(sock: &TcpSocket, read_port: oldcomm::Port>) -> result::Result<(), TcpErrData> unsafe { - log(debug, fmt!("taking the read_port out of commission %?", read_port)); + debug!("taking the read_port out of commission %?", read_port); let socket_data = ptr::addr_of(&(*sock.socket_data)); read_stop_common_impl(socket_data) } @@ -509,31 +509,31 @@ pub fn accept(new_conn: TcpNewConnection) // the rules here because this always has to be // called within the context of a listen() new_connect_cb // callback (or it will likely fail and drown your cat) - log(debug, ~"in interact cb for tcp::accept"); + debug!("in interact cb for tcp::accept"); let loop_ptr = uv::ll::get_loop_for_uv_handle( server_handle_ptr); match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { 0i32 => { - log(debug, ~"uv_tcp_init successful for client stream"); + debug!("uv_tcp_init successful for client stream"); match uv::ll::accept( server_handle_ptr as *libc::c_void, client_stream_handle_ptr as *libc::c_void) { 0i32 => { - log(debug, ~"successfully accepted client connection"); + debug!("successfully accepted client connection"); uv::ll::set_data_for_uv_handle(client_stream_handle_ptr, client_socket_data_ptr as *libc::c_void); oldcomm::send(result_ch, None); } _ => { - log(debug, ~"failed to accept client conn"); + debug!("failed to accept client conn"); oldcomm::send(result_ch, Some( uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); } } } _ => { - log(debug, ~"failed to init client stream"); + debug!("failed to init client stream"); oldcomm::send(result_ch, Some( uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); } @@ -634,13 +634,13 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let addr_str = ip::format_addr(&loc_ip); let bind_result = match loc_ip { ip::Ipv4(ref addr) => { - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip4_addr(addr_str, port as int); uv::ll::tcp_bind(server_stream_ptr, ptr::addr_of(&in_addr)) } ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip6_addr(addr_str, port as int); uv::ll::tcp_bind6(server_stream_ptr, ptr::addr_of(&in_addr)) @@ -653,21 +653,21 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, tcp_lfc_on_connection_cb) { 0i32 => oldcomm::send(setup_ch, None), _ => { - log(debug, ~"failure to uv_listen()"); + debug!("failure to uv_listen()"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(setup_ch, Some(err_data)); } } } _ => { - log(debug, ~"failure to uv_tcp_bind"); + debug!("failure to uv_tcp_bind"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(setup_ch, Some(err_data)); } } } _ => { - log(debug, ~"failure to uv_tcp_init"); + debug!("failure to uv_tcp_init"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(setup_ch, Some(err_data)); } @@ -678,24 +678,24 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, match setup_result { Some(ref err_data) => { do iotask::interact(iotask) |loop_ptr| unsafe { - log(debug, fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); + debug!("tcp::listen post-kill recv hl interact %?", + loop_ptr); (*server_data_ptr).active = false; uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); }; stream_closed_po.recv(); match err_data.err_name { ~"EACCES" => { - log(debug, ~"Got EACCES error"); + debug!("Got EACCES error"); result::Err(AccessDenied) } ~"EADDRINUSE" => { - log(debug, ~"Got EADDRINUSE error"); + debug!("Got EADDRINUSE error"); result::Err(AddressInUse) } _ => { - log(debug, fmt!("Got '%s' '%s' libuv error", - err_data.err_name, err_data.err_msg)); + debug!("Got '%s' '%s' libuv error", + err_data.err_name, err_data.err_msg); result::Err( GenericListenErr(err_data.err_name, err_data.err_msg)) } @@ -705,8 +705,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, on_establish_cb(kill_ch); let kill_result = oldcomm::recv(kill_po); do iotask::interact(iotask) |loop_ptr| unsafe { - log(debug, fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); + debug!("tcp::listen post-kill recv hl interact %?", + loop_ptr); (*server_data_ptr).active = false; uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); }; @@ -837,7 +837,7 @@ impl TcpSocketBuf: io::Reader { self.end_of_stream } fn seek(&self, dist: int, seek: io::SeekStyle) { - log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek)); + debug!("tcp_socket_buf seek stub %? %?", dist, seek); // noop } fn tell(&self) -> uint { @@ -854,12 +854,12 @@ impl TcpSocketBuf: io::Writer { vec::slice(data, 0, vec::len(data))); if w_result.is_err() { let err_data = w_result.get_err(); - log(debug, fmt!("ERROR sock_buf as io::writer.writer err: %? %?", - err_data.err_name, err_data.err_msg)); + debug!("ERROR sock_buf as io::writer.writer err: %? %?", + err_data.err_name, err_data.err_msg); } } fn seek(&self, dist: int, seek: io::SeekStyle) { - log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek)); + debug!("tcp_socket_buf seek stub %? %?", dist, seek); // noop } fn tell(&self) -> uint { @@ -884,24 +884,24 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe { let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { - log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", - stream_handle_ptr, loop_ptr)); + debug!("interact dtor for tcp_socket stream %? loop %?", + stream_handle_ptr, loop_ptr); uv::ll::set_data_for_uv_handle(stream_handle_ptr, close_data_ptr); uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); }; oldcomm::recv(closed_po); //the line below will most likely crash - //log(debug, fmt!("about to free socket_data at %?", socket_data)); + //debug!("about to free socket_data at %?", socket_data); rustrt::rust_uv_current_kernel_free(stream_handle_ptr as *libc::c_void); - log(debug, ~"exiting dtor for tcp_socket"); + debug!("exiting dtor for tcp_socket"); } // shared implementation for tcp::read fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) -> result::Result<~[u8],TcpErrData> unsafe { - log(debug, ~"starting tcp::read"); + debug!("starting tcp::read"); let iotask = (*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { @@ -909,17 +909,17 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) result::Err(err_data) } else { - log(debug, ~"tcp::read before recv_timeout"); + debug!("tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( iotask, timeout_msecs, result::get(&rs_result)) } else { Some(oldcomm::recv(result::get(&rs_result))) }; - log(debug, ~"tcp::read after recv_timeout"); + debug!("tcp::read after recv_timeout"); match move read_result { None => { - log(debug, ~"tcp::read: timed out.."); + debug!("tcp::read: timed out.."); let err_data = { err_name: ~"TIMEOUT", err_msg: ~"req timed out" @@ -928,7 +928,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) result::Err(err_data) } Some(move data_result) => { - log(debug, ~"tcp::read got data"); + debug!("tcp::read got data"); read_stop_common_impl(socket_data); data_result } @@ -943,14 +943,14 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stop_po = oldcomm::Port::>(); let stop_ch = oldcomm::Chan(&stop_po); do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { - log(debug, ~"in interact cb for tcp::read_stop"); + debug!("in interact cb for tcp::read_stop"); match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 => { - log(debug, ~"successfully called uv_read_stop"); + debug!("successfully called uv_read_stop"); oldcomm::send(stop_ch, None); } _ => { - log(debug, ~"failure in calling uv_read_stop"); + debug!("failure in calling uv_read_stop"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); } @@ -969,18 +969,18 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let stream_handle_ptr = (*socket_data).stream_handle_ptr; let start_po = oldcomm::Port::>(); let start_ch = oldcomm::Chan(&start_po); - log(debug, ~"in tcp::read_start before interact loop"); + debug!("in tcp::read_start before interact loop"); do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { - log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr)); + debug!("in tcp::read_start interact cb %?", loop_ptr); match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, on_alloc_cb, on_tcp_read_cb) { 0i32 => { - log(debug, ~"success doing uv_read_start"); + debug!("success doing uv_read_start"); oldcomm::send(start_ch, None); } _ => { - log(debug, ~"error attempting uv_read_start"); + debug!("error attempting uv_read_start"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(start_ch, Some(err_data)); } @@ -1011,17 +1011,17 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, }; let write_data_ptr = ptr::addr_of(&write_data); do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe { - log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); + debug!("in interact cb for tcp::write %?", loop_ptr); match uv::ll::write(write_req_ptr, stream_handle_ptr, write_buf_vec_ptr, tcp_write_complete_cb) { 0i32 => { - log(debug, ~"uv_write() invoked successfully"); + debug!("uv_write() invoked successfully"); uv::ll::set_data_for_req(write_req_ptr, write_data_ptr); } _ => { - log(debug, ~"error invoking uv_write()"); + debug!("error invoking uv_write()"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send((*write_data_ptr).result_ch, TcpWriteError(err_data.to_tcp_err())); @@ -1116,8 +1116,8 @@ impl uv::ll::uv_err_data: ToTcpErr { extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, nread: libc::ssize_t, ++buf: uv::ll::uv_buf_t) unsafe { - log(debug, fmt!("entering on_tcp_read_cb stream: %? nread: %?", - stream, nread)); + debug!("entering on_tcp_read_cb stream: %? nread: %?", + stream, nread); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream); let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream) as *TcpSocketData; @@ -1125,8 +1125,8 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, // incoming err.. probably eof -1 => { let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err(); - log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?", - err_data.err_name, err_data.err_msg)); + debug!("on_tcp_read_cb: incoming err.. name %? msg %?", + err_data.err_name, err_data.err_msg); let reader_ch = (*socket_data_ptr).reader_ch; oldcomm::send(reader_ch, result::Err(err_data)); } @@ -1135,7 +1135,7 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, // have data _ => { // we have data - log(debug, fmt!("tcp on_read_cb nread: %d", nread as int)); + debug!("tcp on_read_cb nread: %d", nread as int); let reader_ch = (*socket_data_ptr).reader_ch; let buf_base = uv::ll::get_base_from_buf(buf); let new_bytes = vec::from_buf(buf_base, nread as uint); @@ -1143,18 +1143,18 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, } } uv::ll::free_base_of_buf(buf); - log(debug, ~"exiting on_tcp_read_cb"); + debug!("exiting on_tcp_read_cb"); } extern fn on_alloc_cb(handle: *libc::c_void, suggested_size: size_t) -> uv::ll::uv_buf_t unsafe { - log(debug, ~"tcp read on_alloc_cb!"); + debug!("tcp read on_alloc_cb!"); let char_ptr = uv::ll::malloc_buf_base_of(suggested_size); - log(debug, fmt!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u", + debug!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u", handle, char_ptr as uint, - suggested_size as uint)); + suggested_size as uint); uv::ll::buf_init(char_ptr, suggested_size as uint) } @@ -1167,7 +1167,7 @@ extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { as *TcpSocketCloseData; let closed_ch = (*data).closed_ch; oldcomm::send(closed_ch, ()); - log(debug, ~"tcp_socket_dtor_close_cb exiting.."); + debug!("tcp_socket_dtor_close_cb exiting.."); } extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t, @@ -1175,14 +1175,14 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t, let write_data_ptr = uv::ll::get_data_for_req(write_req) as *WriteReqData; if status == 0i32 { - log(debug, ~"successful write complete"); + debug!("successful write complete"); oldcomm::send((*write_data_ptr).result_ch, TcpWriteSuccess); } else { let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req( write_req); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr); - log(debug, ~"failure to write"); + debug!("failure to write"); oldcomm::send((*write_data_ptr).result_ch, TcpWriteError(err_data)); } @@ -1201,11 +1201,11 @@ extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { let data = uv::ll::get_data_for_uv_handle(handle) as *ConnectReqData; oldcomm::send((*data).closed_signal_ch, ()); - log(debug, fmt!("exiting steam_error_close_cb for %?", handle)); + debug!("exiting steam_error_close_cb for %?", handle); } extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { - log(debug, fmt!("closed client tcp handle %?", handle)); + debug!("closed client tcp handle %?", handle); } extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, @@ -1213,27 +1213,27 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr) as *ConnectReqData); let result_ch = (*conn_data_ptr).result_ch; - log(debug, fmt!("tcp_connect result_ch %?", result_ch)); + debug!("tcp_connect result_ch %?", result_ch); let tcp_stream_ptr = uv::ll::get_stream_handle_from_connect_req(connect_req_ptr); match status { 0i32 => { - log(debug, ~"successful tcp connection!"); + debug!("successful tcp connection!"); oldcomm::send(result_ch, ConnSuccess); } _ => { - log(debug, ~"error in tcp_connect_on_connect_cb"); + debug!("error in tcp_connect_on_connect_cb"); let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr); - log(debug, fmt!("err_data %? %?", err_data.err_name, - err_data.err_msg)); + debug!("err_data %? %?", err_data.err_name, + err_data.err_msg); oldcomm::send(result_ch, ConnFailure(err_data)); uv::ll::set_data_for_uv_handle(tcp_stream_ptr, conn_data_ptr); uv::ll::close(tcp_stream_ptr, stream_error_close_cb); } } - log(debug, ~"leaving tcp_connect_on_connect_cb"); + debug!("leaving tcp_connect_on_connect_cb"); } enum ConnAttempt { @@ -1356,7 +1356,7 @@ mod test { }; oldcomm::recv(cont_po); // client - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); let actual_resp_result = do oldcomm::listen |client_ch| { run_tcp_test_client( server_ip, @@ -1368,10 +1368,10 @@ mod test { assert actual_resp_result.is_ok(); let actual_resp = actual_resp_result.get(); let actual_req = oldcomm::recv(server_result_po); - log(debug, fmt!("REQ: expected: '%s' actual: '%s'", - expected_req, actual_req)); - log(debug, fmt!("RESP: expected: '%s' actual: '%s'", - expected_resp, actual_resp)); + debug!("REQ: expected: '%s' actual: '%s'", + expected_req, actual_req); + debug!("RESP: expected: '%s' actual: '%s'", + expected_resp, actual_resp); assert str::contains(actual_req, expected_req); assert str::contains(actual_resp, expected_resp); } @@ -1401,7 +1401,7 @@ mod test { }; oldcomm::recv(cont_po); // client - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); do oldcomm::listen |client_ch| { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); @@ -1428,7 +1428,7 @@ mod test { let server_port = 8889u; let expected_req = ~"ping"; // client - log(debug, ~"firing up client.."); + debug!("firing up client.."); let actual_resp_result = do oldcomm::listen |client_ch| { run_tcp_test_client( server_ip, @@ -1474,7 +1474,7 @@ mod test { server_port, hl_loop); // client.. just doing this so that the first server tears down - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); do oldcomm::listen |client_ch| { run_tcp_test_client( server_ip, @@ -1556,10 +1556,10 @@ mod test { }; let actual_req = core::comm::recv(server_result_po); - log(debug, fmt!("REQ: expected: '%s' actual: '%s'", - expected_req, actual_req)); - log(debug, fmt!("RESP: expected: '%s' actual: '%s'", - expected_resp, actual_resp)); + debug!("REQ: expected: '%s' actual: '%s'", + expected_req, actual_req); + debug!("RESP: expected: '%s' actual: '%s'", + expected_resp, actual_resp); assert str::contains(actual_req, expected_req); assert str::contains(actual_resp, expected_resp); */ @@ -1593,7 +1593,7 @@ mod test { }; oldcomm::recv(cont_po); // client - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); let server_addr = ip::v4::parse_addr(server_ip); let conn_result = connect(move server_addr, server_port, hl_loop); if result::is_err(&conn_result) { @@ -1604,23 +1604,23 @@ mod test { let buf_reader = sock_buf as Reader; let actual_response = str::from_bytes(buf_reader.read_whole_stream()); - log(debug, fmt!("Actual response: %s", actual_response)); + debug!("Actual response: %s", actual_response); assert expected_resp == actual_response; } fn buf_write(w: &W, val: &str) { - log(debug, fmt!("BUF_WRITE: val len %?", str::len(val))); + debug!("BUF_WRITE: val len %?", str::len(val)); do str::byte_slice(val) |b_slice| { - log(debug, fmt!("BUF_WRITE: b_slice len %?", - vec::len(b_slice))); + debug!("BUF_WRITE: b_slice len %?", + vec::len(b_slice)); w.write(b_slice) } } fn buf_read(r: &R, len: uint) -> ~str { let new_bytes = (*r).read_bytes(len); - log(debug, fmt!("in buf_read.. new_bytes len: %?", - vec::len(new_bytes))); + debug!("in buf_read.. new_bytes len: %?", + vec::len(new_bytes)); str::from_bytes(new_bytes) } @@ -1633,65 +1633,61 @@ mod test { iotask, // on_establish_cb -- called when listener is set up |kill_ch| { - log(debug, fmt!("establish_cb %?", - kill_ch)); + debug!("establish_cb %?", kill_ch); oldcomm::send(cont_ch, ()); }, // risky to run this on the loop, but some users // will want the POWER |new_conn, kill_ch| { - log(debug, ~"SERVER: new connection!"); + debug!("SERVER: new connection!"); do oldcomm::listen |cont_ch| { do task::spawn_sched(task::ManualThreads(1u)) { - log(debug, ~"SERVER: starting worker for new req"); + debug!("SERVER: starting worker for new req"); let accept_result = accept(new_conn); - log(debug, ~"SERVER: after accept()"); + debug!("SERVER: after accept()"); if result::is_err(&accept_result) { - log(debug, ~"SERVER: error accept connection"); + debug!("SERVER: error accept connection"); let err_data = result::get_err(&accept_result); oldcomm::send(kill_ch, Some(err_data)); - log(debug, - ~"SERVER/WORKER: send on err cont ch"); + debug!("SERVER/WORKER: send on err cont ch"); cont_ch.send(()); } else { - log(debug, - ~"SERVER/WORKER: send on cont ch"); + debug!("SERVER/WORKER: send on cont ch"); cont_ch.send(()); let sock = result::unwrap(move accept_result); let peer_addr = sock.get_peer_addr(); - log(debug, ~"SERVER: successfully accepted"+ - fmt!(" connection from %s:%u", + debug!("SERVER: successfully accepted connection from %s:%u", ip::format_addr(&peer_addr), - ip::get_port(&peer_addr))); + ip::get_port(&peer_addr)); let received_req_bytes = read(&sock, 0u); match move received_req_bytes { result::Ok(move data) => { - log(debug, ~"SERVER: got REQ str::from_bytes.."); - log(debug, fmt!("SERVER: REQ data len: %?", - vec::len(data))); + debug!("SERVER: got REQ str::from_bytes.."); + debug!("SERVER: REQ data len: %?", + vec::len(data)); server_ch.send( str::from_bytes(data)); - log(debug, ~"SERVER: before write"); + debug!("SERVER: before write"); tcp_write_single(&sock, str::to_bytes(resp)); - log(debug, ~"SERVER: after write.. die"); + debug!("SERVER: after write.. die"); oldcomm::send(kill_ch, None); } result::Err(move err_data) => { - log(debug, fmt!("SERVER: error recvd: %s %s", - err_data.err_name, err_data.err_msg)); + debug!("SERVER: error recvd: %s %s", + err_data.err_name, err_data.err_msg); oldcomm::send(kill_ch, Some(err_data)); server_ch.send(~""); } } - log(debug, ~"SERVER: worker spinning down"); + debug!("SERVER: worker spinning down"); } } - log(debug, ~"SERVER: waiting to recv on cont_ch"); + debug!("SERVER: waiting to recv on cont_ch"); cont_ch.recv() }; - log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb"); + debug!("SERVER: recv'd on cont_ch..leaving listen cb"); }); // err check on listen_result if result::is_err(&listen_result) { @@ -1709,7 +1705,7 @@ mod test { } } let ret_val = server_ch.recv(); - log(debug, fmt!("SERVER: exited and got return val: '%s'", ret_val)); + debug!("SERVER: exited and got return val: '%s'", ret_val); ret_val } @@ -1720,8 +1716,7 @@ mod test { iotask, // on_establish_cb -- called when listener is set up |kill_ch| { - log(debug, fmt!("establish_cb %?", - kill_ch)); + debug!("establish_cb %?", kill_ch); }, |new_conn, kill_ch| { fail fmt!("SERVER: shouldn't be called.. %? %?", @@ -1742,11 +1737,11 @@ mod test { TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); - log(debug, ~"CLIENT: starting.."); + debug!("CLIENT: starting.."); let connect_result = connect(move server_ip_addr, server_port, iotask); if result::is_err(&connect_result) { - log(debug, ~"CLIENT: failed to connect"); + debug!("CLIENT: failed to connect"); let err_data = result::get_err(&connect_result); Err(err_data) } @@ -1756,14 +1751,13 @@ mod test { tcp_write_single(&sock, resp_bytes); let read_result = sock.read(0u); if read_result.is_err() { - log(debug, ~"CLIENT: failure to read"); + debug!("CLIENT: failure to read"); Ok(~"") } else { client_ch.send(str::from_bytes(read_result.get())); let ret_val = client_ch.recv(); - log(debug, fmt!("CLIENT: after client_ch recv ret: '%s'", - ret_val)); + debug!("CLIENT: after client_ch recv ret: '%s'", ret_val); Ok(ret_val) } } @@ -1773,10 +1767,10 @@ mod test { let write_result_future = sock.write_future(val); let write_result = write_result_future.get(); if result::is_err(&write_result) { - log(debug, ~"tcp_write_single: write failed!"); + debug!("tcp_write_single: write failed!"); let err_data = result::get_err(&write_result); - log(debug, fmt!("tcp_write_single err name: %s msg: %s", - err_data.err_name, err_data.err_msg)); + debug!("tcp_write_single err name: %s msg: %s", + err_data.err_name, err_data.err_msg); // meh. torn on what to do here. fail ~"tcp_write_single failed"; } From 7b4459664b94512d39e196c676a49928fa545cc9 Mon Sep 17 00:00:00 2001 From: Michael Neumann Date: Fri, 25 Jan 2013 05:17:25 -0600 Subject: [PATCH 3/5] Greatly improve performance for TcpSocketBuf.read For every call to the read() function the internal buffer was copied into a new buffer (minus the bytes copied into the result buffer). When the internal buffer is large enough, this severely affects performance, especially when read_line() is used which calls read_byte() (which calls read()) for each read byte. For line oriented I/O this wasn't all that bad, because the internal buffers usually never were very big. The effect is much more visible once the buffer grows larger. Now we always first look into the internal buffer and copy as many bytes as possible (and desired) into the result buffer. If we need more, we call the socket read function and use the result as the new internal buffer, then continue to copy from the (new) internal buffer, and so on. --- src/libstd/net_tcp.rs | 116 ++++++++++++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 39 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index d4a9a33e57d2b..02277619a5782 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -60,7 +60,7 @@ pub fn TcpSocket(socket_data: @TcpSocketData) -> TcpSocket { */ pub struct TcpSocketBuf { data: @TcpBufferedSocketData, - mut end_of_stream: bool, + mut end_of_stream: bool } pub fn TcpSocketBuf(data: @TcpBufferedSocketData) -> TcpSocketBuf { @@ -739,7 +739,7 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, * A buffered wrapper that you can cast as an `io::reader` or `io::writer` */ pub fn socket_buf(sock: TcpSocket) -> TcpSocketBuf { - TcpSocketBuf(@{ sock: move sock, mut buf: ~[] }) + TcpSocketBuf(@TcpBufferedSocketData { sock: move sock, mut buf: ~[], buf_off: 0 }) } /// Convenience methods extending `net::tcp::tcp_socket` @@ -789,48 +789,85 @@ impl TcpSocket { /// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket` impl TcpSocketBuf: io::Reader { fn read(&self, buf: &[mut u8], len: uint) -> uint { - // Loop until our buffer has enough data in it for us to read from. - while self.data.buf.len() < len { - let read_result = read(&self.data.sock, 0u); - if read_result.is_err() { - let err_data = read_result.get_err(); + if len == 0 { return 0 } + let mut count: uint = 0; + + loop { + assert count < len; + + // If possible, copy up to `len` bytes from the internal + // `data.buf` into `buf` + let nbuffered = self.data.buf.len() - self.data.buf_off; + let needed = len - count; + if nbuffered > 0 unsafe { + let ncopy = uint::min(nbuffered, needed); + let dst = ptr::mut_offset(vec::raw::to_mut_ptr(buf), count); + let src = ptr::const_offset(vec::raw::to_const_ptr(self.data.buf), + self.data_buf_off); + ptr::copy_memory(dst, src, ncopy); + self.data.buf_off += ncopy; + count += ncopy; + } - if err_data.err_name == ~"EOF" { - self.end_of_stream = true; - break; - } else { - debug!("ERROR sock_buf as io::reader.read err %? %?", - err_data.err_name, err_data.err_msg); + assert count <= len; + if count == len { + break; + } - return 0; - } - } - else { - self.data.buf.push_all(result::unwrap(read_result)); - } + // We copied all the bytes we had in the internal buffer into + // the result buffer, but the caller wants more bytes, so we + // need to read in data from the socket. Note that the internal + // buffer is of no use anymore as we read all bytes from it, + // so we can throw it away. + let read_result = read(&self.data.sock, 0u); + if read_result.is_err() { + let err_data = read_result.get_err(); + + if err_data.err_name == ~"EOF" { + self.end_of_stream = true; + break; + } else { + debug!("ERROR sock_buf as io::reader.read err %? %?", + err_data.err_name, err_data.err_msg); + // As we have already copied data into result buffer, + // we cannot simply return 0 here. Instead the error + // should show up in a later call to read(). + break; + } + } + else { + self.data.buf = result::unwrap(read_result); + self.data.buf_off = 0; + } } - let count = uint::min(len, self.data.buf.len()); - - let mut data = ~[]; - self.data.buf <-> data; - - vec::bytes::memcpy(buf, vec::view(data, 0, data.len()), count); - - self.data.buf.push_all(vec::view(data, count, data.len())); - count } fn read_byte(&self) -> int { - let mut bytes = ~[0]; - if self.read(bytes, 1u) == 0 { - if self.end_of_stream { - -1 - } else { - fail - } - } else { - bytes[0] as int + loop { + if self.data.buf.len() > self.data.buf_off { + let c = self.data.buf[self.data.buf_off]; + self.data.buf_off += 1; + return c as int + } + + let read_result = read(&self.data.sock, 0u); + if read_result.is_err() { + let err_data = read_result.get_err(); + + if err_data.err_name == ~"EOF" { + self.end_of_stream = true; + return -1 + } else { + debug!("ERROR sock_buf as io::reader.read err %? %?", + err_data.err_name, err_data.err_msg); + fail + } + } + else { + self.data.buf = result::unwrap(read_result); + self.data.buf_off = 0; + } } } fn eof(&self) -> bool { @@ -1251,9 +1288,10 @@ type TcpSocketData = { iotask: IoTask }; -type TcpBufferedSocketData = { +struct TcpBufferedSocketData { sock: TcpSocket, - mut buf: ~[u8] + mut buf: ~[u8], + mut buf_off: uint }; //#[cfg(test)] From 43fcb2a35a68fc9bb506dbd16963b0a25377f70b Mon Sep 17 00:00:00 2001 From: Michael Neumann Date: Fri, 25 Jan 2013 05:26:51 -0600 Subject: [PATCH 4/5] Slightly optimize read_line() No need to allocate an additional vector. Instead directly push into the string. --- src/libcore/io.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/libcore/io.rs b/src/libcore/io.rs index e76eb9f2f99d8..1811f5b92d096 100644 --- a/src/libcore/io.rs +++ b/src/libcore/io.rs @@ -175,13 +175,13 @@ impl T : ReaderUtil { } fn read_line(&self) -> ~str { - let mut bytes = ~[]; + let mut line = ~""; loop { let ch = self.read_byte(); if ch == -1 || ch == 10 { break; } - bytes.push(ch as u8); + str::push_char(&mut line, ch as char); } - str::from_bytes(bytes) + line } fn read_chars(&self, n: uint) -> ~[char] { From 26d62869e9e86513b0550cf2da3a404bf55115a1 Mon Sep 17 00:00:00 2001 From: Michael Neumann Date: Sat, 26 Jan 2013 04:45:10 -0600 Subject: [PATCH 5/5] Make compilable --- src/libstd/net_tcp.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index 02277619a5782..7a23474909939 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -19,6 +19,7 @@ use future_spawn = future::spawn; use result::{Result}; use libc::size_t; use io::{Reader, ReaderUtil, Writer}; +use core::ptr::*; #[nolink] extern mod rustrt { @@ -803,8 +804,8 @@ impl TcpSocketBuf: io::Reader { let ncopy = uint::min(nbuffered, needed); let dst = ptr::mut_offset(vec::raw::to_mut_ptr(buf), count); let src = ptr::const_offset(vec::raw::to_const_ptr(self.data.buf), - self.data_buf_off); - ptr::copy_memory(dst, src, ncopy); + self.data.buf_off); + ptr::memcpy(dst, src, ncopy); self.data.buf_off += ncopy; count += ncopy; } @@ -1292,7 +1293,7 @@ struct TcpBufferedSocketData { sock: TcpSocket, mut buf: ~[u8], mut buf_off: uint -}; +} //#[cfg(test)] mod test {