diff --git a/src/libcore/io.rs b/src/libcore/io.rs index e76eb9f2f99d8..1811f5b92d096 100644 --- a/src/libcore/io.rs +++ b/src/libcore/io.rs @@ -175,13 +175,13 @@ impl T : ReaderUtil { } fn read_line(&self) -> ~str { - let mut bytes = ~[]; + let mut line = ~""; loop { let ch = self.read_byte(); if ch == -1 || ch == 10 { break; } - bytes.push(ch as u8); + str::push_char(&mut line, ch as char); } - str::from_bytes(bytes) + line } fn read_chars(&self, n: uint) -> ~[char] { diff --git a/src/libcore/oldcomm.rs b/src/libcore/oldcomm.rs index a5b0336ab60d9..6cc2f232bba7b 100644 --- a/src/libcore/oldcomm.rs +++ b/src/libcore/oldcomm.rs @@ -24,14 +24,14 @@ across channels. # Example ~~~ -let po = comm::Port(); -let ch = comm::Chan(po); +let po = oldcomm::Port(); +let ch = oldcomm::Chan(&po); do task::spawn { - comm::send(ch, "Hello, World"); + oldcomm::send(ch, ~"Hello, World"); } -io::println(comm::recv(p)); +io::println(oldcomm::recv(po)); ~~~ # Note diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index c888b457356b6..7a23474909939 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -19,6 +19,7 @@ use future_spawn = future::spawn; use result::{Result}; use libc::size_t; use io::{Reader, ReaderUtil, Writer}; +use core::ptr::*; #[nolink] extern mod rustrt { @@ -60,7 +61,7 @@ pub fn TcpSocket(socket_data: @TcpSocketData) -> TcpSocket { */ pub struct TcpSocketBuf { data: @TcpBufferedSocketData, - mut end_of_stream: bool, + mut end_of_stream: bool } pub fn TcpSocketBuf(data: @TcpBufferedSocketData) -> TcpSocketBuf { @@ -155,19 +156,19 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, iotask: iotask }; let socket_data_ptr = ptr::addr_of(&(*socket_data)); - log(debug, fmt!("tcp_connect result_ch %?", conn_data.result_ch)); + debug!("tcp_connect result_ch %?", conn_data.result_ch); // get an unsafe representation of our stream_handle_ptr that // we can send into the interact cb to be handled in libuv.. - log(debug, fmt!("stream_handle_ptr outside interact %?", - stream_handle_ptr)); + debug!("stream_handle_ptr outside interact %?", + stream_handle_ptr); do iotask::interact(iotask) |move input_ip, loop_ptr| unsafe { - log(debug, ~"in interact cb for tcp client connect.."); - log(debug, fmt!("stream_handle_ptr in interact %?", - stream_handle_ptr)); + debug!("in interact cb for tcp client connect.."); + debug!("stream_handle_ptr in interact %?", + stream_handle_ptr); match uv::ll::tcp_init( loop_ptr, stream_handle_ptr) { 0i32 => { - log(debug, ~"tcp_init successful"); - log(debug, ~"dealing w/ ipv4 connection.."); + debug!("tcp_init successful"); + debug!("dealing w/ ipv4 connection.."); let connect_req_ptr = ptr::addr_of(&((*socket_data_ptr).connect_req)); let addr_str = ip::format_addr(&input_ip); @@ -178,7 +179,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, // info.. should probably add an additional // rust type that actually is closer to // what the libuv API expects (ip str + port num) - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip4_addr(addr_str, port as int); uv::ll::tcp_connect( connect_req_ptr, @@ -187,7 +188,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, tcp_connect_on_connect_cb) } ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip6_addr(addr_str, port as int); uv::ll::tcp_connect6( connect_req_ptr, @@ -198,7 +199,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, }; match connect_result { 0i32 => { - log(debug, ~"tcp_connect successful"); + debug!("tcp_connect successful"); // reusable data that we'll have for the // duration.. uv::ll::set_data_for_uv_handle(stream_handle_ptr, @@ -208,7 +209,7 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, // outcome.. uv::ll::set_data_for_req(connect_req_ptr, conn_data_ptr); - log(debug, ~"leaving tcp_connect interact cb..."); + debug!("leaving tcp_connect interact cb..."); // let tcp_connect_on_connect_cb send on // the result_ch, now.. } @@ -234,12 +235,12 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, }; match oldcomm::recv(result_po) { ConnSuccess => { - log(debug, ~"tcp::connect - received success on result_po"); + debug!("tcp::connect - received success on result_po"); result::Ok(TcpSocket(socket_data)) } ConnFailure(ref err_data) => { oldcomm::recv(closed_signal_po); - log(debug, ~"tcp::connect - received failure on result_po"); + debug!("tcp::connect - received failure on result_po"); // still have to free the malloc'd stream handle.. rustrt::rust_uv_current_kernel_free(stream_handle_ptr as *libc::c_void); @@ -344,7 +345,7 @@ pub fn read_start(sock: &TcpSocket) pub fn read_stop(sock: &TcpSocket, read_port: oldcomm::Port>) -> result::Result<(), TcpErrData> unsafe { - log(debug, fmt!("taking the read_port out of commission %?", read_port)); + debug!("taking the read_port out of commission %?", read_port); let socket_data = ptr::addr_of(&(*sock.socket_data)); read_stop_common_impl(socket_data) } @@ -509,31 +510,31 @@ pub fn accept(new_conn: TcpNewConnection) // the rules here because this always has to be // called within the context of a listen() new_connect_cb // callback (or it will likely fail and drown your cat) - log(debug, ~"in interact cb for tcp::accept"); + debug!("in interact cb for tcp::accept"); let loop_ptr = uv::ll::get_loop_for_uv_handle( server_handle_ptr); match uv::ll::tcp_init(loop_ptr, client_stream_handle_ptr) { 0i32 => { - log(debug, ~"uv_tcp_init successful for client stream"); + debug!("uv_tcp_init successful for client stream"); match uv::ll::accept( server_handle_ptr as *libc::c_void, client_stream_handle_ptr as *libc::c_void) { 0i32 => { - log(debug, ~"successfully accepted client connection"); + debug!("successfully accepted client connection"); uv::ll::set_data_for_uv_handle(client_stream_handle_ptr, client_socket_data_ptr as *libc::c_void); oldcomm::send(result_ch, None); } _ => { - log(debug, ~"failed to accept client conn"); + debug!("failed to accept client conn"); oldcomm::send(result_ch, Some( uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); } } } _ => { - log(debug, ~"failed to init client stream"); + debug!("failed to init client stream"); oldcomm::send(result_ch, Some( uv::ll::get_last_err_data(loop_ptr).to_tcp_err())); } @@ -634,13 +635,13 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, let addr_str = ip::format_addr(&loc_ip); let bind_result = match loc_ip { ip::Ipv4(ref addr) => { - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip4_addr(addr_str, port as int); uv::ll::tcp_bind(server_stream_ptr, ptr::addr_of(&in_addr)) } ip::Ipv6(ref addr) => { - log(debug, fmt!("addr: %?", addr)); + debug!("addr: %?", addr); let in_addr = uv::ll::ip6_addr(addr_str, port as int); uv::ll::tcp_bind6(server_stream_ptr, ptr::addr_of(&in_addr)) @@ -653,21 +654,21 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, tcp_lfc_on_connection_cb) { 0i32 => oldcomm::send(setup_ch, None), _ => { - log(debug, ~"failure to uv_listen()"); + debug!("failure to uv_listen()"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(setup_ch, Some(err_data)); } } } _ => { - log(debug, ~"failure to uv_tcp_bind"); + debug!("failure to uv_tcp_bind"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(setup_ch, Some(err_data)); } } } _ => { - log(debug, ~"failure to uv_tcp_init"); + debug!("failure to uv_tcp_init"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(setup_ch, Some(err_data)); } @@ -678,24 +679,24 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, match setup_result { Some(ref err_data) => { do iotask::interact(iotask) |loop_ptr| unsafe { - log(debug, fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); + debug!("tcp::listen post-kill recv hl interact %?", + loop_ptr); (*server_data_ptr).active = false; uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); }; stream_closed_po.recv(); match err_data.err_name { ~"EACCES" => { - log(debug, ~"Got EACCES error"); + debug!("Got EACCES error"); result::Err(AccessDenied) } ~"EADDRINUSE" => { - log(debug, ~"Got EADDRINUSE error"); + debug!("Got EADDRINUSE error"); result::Err(AddressInUse) } _ => { - log(debug, fmt!("Got '%s' '%s' libuv error", - err_data.err_name, err_data.err_msg)); + debug!("Got '%s' '%s' libuv error", + err_data.err_name, err_data.err_msg); result::Err( GenericListenErr(err_data.err_name, err_data.err_msg)) } @@ -705,8 +706,8 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, on_establish_cb(kill_ch); let kill_result = oldcomm::recv(kill_po); do iotask::interact(iotask) |loop_ptr| unsafe { - log(debug, fmt!("tcp::listen post-kill recv hl interact %?", - loop_ptr)); + debug!("tcp::listen post-kill recv hl interact %?", + loop_ptr); (*server_data_ptr).active = false; uv::ll::close(server_stream_ptr, tcp_lfc_close_cb); }; @@ -739,7 +740,7 @@ fn listen_common(host_ip: ip::IpAddr, port: uint, backlog: uint, * A buffered wrapper that you can cast as an `io::reader` or `io::writer` */ pub fn socket_buf(sock: TcpSocket) -> TcpSocketBuf { - TcpSocketBuf(@{ sock: move sock, mut buf: ~[] }) + TcpSocketBuf(@TcpBufferedSocketData { sock: move sock, mut buf: ~[], buf_off: 0 }) } /// Convenience methods extending `net::tcp::tcp_socket` @@ -789,55 +790,92 @@ impl TcpSocket { /// Implementation of `io::reader` trait for a buffered `net::tcp::tcp_socket` impl TcpSocketBuf: io::Reader { fn read(&self, buf: &[mut u8], len: uint) -> uint { - // Loop until our buffer has enough data in it for us to read from. - while self.data.buf.len() < len { - let read_result = read(&self.data.sock, 0u); - if read_result.is_err() { - let err_data = read_result.get_err(); + if len == 0 { return 0 } + let mut count: uint = 0; + + loop { + assert count < len; + + // If possible, copy up to `len` bytes from the internal + // `data.buf` into `buf` + let nbuffered = self.data.buf.len() - self.data.buf_off; + let needed = len - count; + if nbuffered > 0 unsafe { + let ncopy = uint::min(nbuffered, needed); + let dst = ptr::mut_offset(vec::raw::to_mut_ptr(buf), count); + let src = ptr::const_offset(vec::raw::to_const_ptr(self.data.buf), + self.data.buf_off); + ptr::memcpy(dst, src, ncopy); + self.data.buf_off += ncopy; + count += ncopy; + } - if err_data.err_name == ~"EOF" { - self.end_of_stream = true; - break; - } else { - debug!("ERROR sock_buf as io::reader.read err %? %?", - err_data.err_name, err_data.err_msg); + assert count <= len; + if count == len { + break; + } - return 0; - } - } - else { - self.data.buf.push_all(result::unwrap(read_result)); - } + // We copied all the bytes we had in the internal buffer into + // the result buffer, but the caller wants more bytes, so we + // need to read in data from the socket. Note that the internal + // buffer is of no use anymore as we read all bytes from it, + // so we can throw it away. + let read_result = read(&self.data.sock, 0u); + if read_result.is_err() { + let err_data = read_result.get_err(); + + if err_data.err_name == ~"EOF" { + self.end_of_stream = true; + break; + } else { + debug!("ERROR sock_buf as io::reader.read err %? %?", + err_data.err_name, err_data.err_msg); + // As we have already copied data into result buffer, + // we cannot simply return 0 here. Instead the error + // should show up in a later call to read(). + break; + } + } + else { + self.data.buf = result::unwrap(read_result); + self.data.buf_off = 0; + } } - let count = uint::min(len, self.data.buf.len()); - - let mut data = ~[]; - self.data.buf <-> data; - - vec::bytes::memcpy(buf, vec::view(data, 0, data.len()), count); - - self.data.buf.push_all(vec::view(data, count, data.len())); - count } fn read_byte(&self) -> int { - let mut bytes = ~[0]; - if self.read(bytes, 1u) == 0 { - if self.end_of_stream { - -1 - } else { - fail - } - } else { - bytes[0] as int + loop { + if self.data.buf.len() > self.data.buf_off { + let c = self.data.buf[self.data.buf_off]; + self.data.buf_off += 1; + return c as int + } + + let read_result = read(&self.data.sock, 0u); + if read_result.is_err() { + let err_data = read_result.get_err(); + + if err_data.err_name == ~"EOF" { + self.end_of_stream = true; + return -1 + } else { + debug!("ERROR sock_buf as io::reader.read err %? %?", + err_data.err_name, err_data.err_msg); + fail + } + } + else { + self.data.buf = result::unwrap(read_result); + self.data.buf_off = 0; + } } } fn eof(&self) -> bool { self.end_of_stream } fn seek(&self, dist: int, seek: io::SeekStyle) { - log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek)); + debug!("tcp_socket_buf seek stub %? %?", dist, seek); // noop } fn tell(&self) -> uint { @@ -854,12 +892,12 @@ impl TcpSocketBuf: io::Writer { vec::slice(data, 0, vec::len(data))); if w_result.is_err() { let err_data = w_result.get_err(); - log(debug, fmt!("ERROR sock_buf as io::writer.writer err: %? %?", - err_data.err_name, err_data.err_msg)); + debug!("ERROR sock_buf as io::writer.writer err: %? %?", + err_data.err_name, err_data.err_msg); } } fn seek(&self, dist: int, seek: io::SeekStyle) { - log(debug, fmt!("tcp_socket_buf seek stub %? %?", dist, seek)); + debug!("tcp_socket_buf seek stub %? %?", dist, seek); // noop } fn tell(&self) -> uint { @@ -884,24 +922,24 @@ fn tear_down_socket_data(socket_data: @TcpSocketData) unsafe { let close_data_ptr = ptr::addr_of(&close_data); let stream_handle_ptr = (*socket_data).stream_handle_ptr; do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { - log(debug, fmt!("interact dtor for tcp_socket stream %? loop %?", - stream_handle_ptr, loop_ptr)); + debug!("interact dtor for tcp_socket stream %? loop %?", + stream_handle_ptr, loop_ptr); uv::ll::set_data_for_uv_handle(stream_handle_ptr, close_data_ptr); uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb); }; oldcomm::recv(closed_po); //the line below will most likely crash - //log(debug, fmt!("about to free socket_data at %?", socket_data)); + //debug!("about to free socket_data at %?", socket_data); rustrt::rust_uv_current_kernel_free(stream_handle_ptr as *libc::c_void); - log(debug, ~"exiting dtor for tcp_socket"); + debug!("exiting dtor for tcp_socket"); } // shared implementation for tcp::read fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) -> result::Result<~[u8],TcpErrData> unsafe { - log(debug, ~"starting tcp::read"); + debug!("starting tcp::read"); let iotask = (*socket_data).iotask; let rs_result = read_start_common_impl(socket_data); if result::is_err(&rs_result) { @@ -909,17 +947,17 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) result::Err(err_data) } else { - log(debug, ~"tcp::read before recv_timeout"); + debug!("tcp::read before recv_timeout"); let read_result = if timeout_msecs > 0u { timer::recv_timeout( iotask, timeout_msecs, result::get(&rs_result)) } else { Some(oldcomm::recv(result::get(&rs_result))) }; - log(debug, ~"tcp::read after recv_timeout"); + debug!("tcp::read after recv_timeout"); match move read_result { None => { - log(debug, ~"tcp::read: timed out.."); + debug!("tcp::read: timed out.."); let err_data = { err_name: ~"TIMEOUT", err_msg: ~"req timed out" @@ -928,7 +966,7 @@ fn read_common_impl(socket_data: *TcpSocketData, timeout_msecs: uint) result::Err(err_data) } Some(move data_result) => { - log(debug, ~"tcp::read got data"); + debug!("tcp::read got data"); read_stop_common_impl(socket_data); data_result } @@ -943,14 +981,14 @@ fn read_stop_common_impl(socket_data: *TcpSocketData) -> let stop_po = oldcomm::Port::>(); let stop_ch = oldcomm::Chan(&stop_po); do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { - log(debug, ~"in interact cb for tcp::read_stop"); + debug!("in interact cb for tcp::read_stop"); match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) { 0i32 => { - log(debug, ~"successfully called uv_read_stop"); + debug!("successfully called uv_read_stop"); oldcomm::send(stop_ch, None); } _ => { - log(debug, ~"failure in calling uv_read_stop"); + debug!("failure in calling uv_read_stop"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(stop_ch, Some(err_data.to_tcp_err())); } @@ -969,18 +1007,18 @@ fn read_start_common_impl(socket_data: *TcpSocketData) let stream_handle_ptr = (*socket_data).stream_handle_ptr; let start_po = oldcomm::Port::>(); let start_ch = oldcomm::Chan(&start_po); - log(debug, ~"in tcp::read_start before interact loop"); + debug!("in tcp::read_start before interact loop"); do iotask::interact((*socket_data).iotask) |loop_ptr| unsafe { - log(debug, fmt!("in tcp::read_start interact cb %?", loop_ptr)); + debug!("in tcp::read_start interact cb %?", loop_ptr); match uv::ll::read_start(stream_handle_ptr as *uv::ll::uv_stream_t, on_alloc_cb, on_tcp_read_cb) { 0i32 => { - log(debug, ~"success doing uv_read_start"); + debug!("success doing uv_read_start"); oldcomm::send(start_ch, None); } _ => { - log(debug, ~"error attempting uv_read_start"); + debug!("error attempting uv_read_start"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send(start_ch, Some(err_data)); } @@ -1011,17 +1049,17 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, }; let write_data_ptr = ptr::addr_of(&write_data); do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| unsafe { - log(debug, fmt!("in interact cb for tcp::write %?", loop_ptr)); + debug!("in interact cb for tcp::write %?", loop_ptr); match uv::ll::write(write_req_ptr, stream_handle_ptr, write_buf_vec_ptr, tcp_write_complete_cb) { 0i32 => { - log(debug, ~"uv_write() invoked successfully"); + debug!("uv_write() invoked successfully"); uv::ll::set_data_for_req(write_req_ptr, write_data_ptr); } _ => { - log(debug, ~"error invoking uv_write()"); + debug!("error invoking uv_write()"); let err_data = uv::ll::get_last_err_data(loop_ptr); oldcomm::send((*write_data_ptr).result_ch, TcpWriteError(err_data.to_tcp_err())); @@ -1116,8 +1154,8 @@ impl uv::ll::uv_err_data: ToTcpErr { extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, nread: libc::ssize_t, ++buf: uv::ll::uv_buf_t) unsafe { - log(debug, fmt!("entering on_tcp_read_cb stream: %? nread: %?", - stream, nread)); + debug!("entering on_tcp_read_cb stream: %? nread: %?", + stream, nread); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream); let socket_data_ptr = uv::ll::get_data_for_uv_handle(stream) as *TcpSocketData; @@ -1125,8 +1163,8 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, // incoming err.. probably eof -1 => { let err_data = uv::ll::get_last_err_data(loop_ptr).to_tcp_err(); - log(debug, fmt!("on_tcp_read_cb: incoming err.. name %? msg %?", - err_data.err_name, err_data.err_msg)); + debug!("on_tcp_read_cb: incoming err.. name %? msg %?", + err_data.err_name, err_data.err_msg); let reader_ch = (*socket_data_ptr).reader_ch; oldcomm::send(reader_ch, result::Err(err_data)); } @@ -1135,7 +1173,7 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, // have data _ => { // we have data - log(debug, fmt!("tcp on_read_cb nread: %d", nread as int)); + debug!("tcp on_read_cb nread: %d", nread as int); let reader_ch = (*socket_data_ptr).reader_ch; let buf_base = uv::ll::get_base_from_buf(buf); let new_bytes = vec::from_buf(buf_base, nread as uint); @@ -1143,18 +1181,18 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t, } } uv::ll::free_base_of_buf(buf); - log(debug, ~"exiting on_tcp_read_cb"); + debug!("exiting on_tcp_read_cb"); } extern fn on_alloc_cb(handle: *libc::c_void, suggested_size: size_t) -> uv::ll::uv_buf_t unsafe { - log(debug, ~"tcp read on_alloc_cb!"); + debug!("tcp read on_alloc_cb!"); let char_ptr = uv::ll::malloc_buf_base_of(suggested_size); - log(debug, fmt!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u", + debug!("tcp read on_alloc_cb h: %? char_ptr: %u sugsize: %u", handle, char_ptr as uint, - suggested_size as uint)); + suggested_size as uint); uv::ll::buf_init(char_ptr, suggested_size as uint) } @@ -1167,7 +1205,7 @@ extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { as *TcpSocketCloseData; let closed_ch = (*data).closed_ch; oldcomm::send(closed_ch, ()); - log(debug, ~"tcp_socket_dtor_close_cb exiting.."); + debug!("tcp_socket_dtor_close_cb exiting.."); } extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t, @@ -1175,14 +1213,14 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t, let write_data_ptr = uv::ll::get_data_for_req(write_req) as *WriteReqData; if status == 0i32 { - log(debug, ~"successful write complete"); + debug!("successful write complete"); oldcomm::send((*write_data_ptr).result_ch, TcpWriteSuccess); } else { let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req( write_req); let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr); - log(debug, ~"failure to write"); + debug!("failure to write"); oldcomm::send((*write_data_ptr).result_ch, TcpWriteError(err_data)); } @@ -1201,11 +1239,11 @@ extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { let data = uv::ll::get_data_for_uv_handle(handle) as *ConnectReqData; oldcomm::send((*data).closed_signal_ch, ()); - log(debug, fmt!("exiting steam_error_close_cb for %?", handle)); + debug!("exiting steam_error_close_cb for %?", handle); } extern fn tcp_connect_close_cb(handle: *uv::ll::uv_tcp_t) unsafe { - log(debug, fmt!("closed client tcp handle %?", handle)); + debug!("closed client tcp handle %?", handle); } extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, @@ -1213,27 +1251,27 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t, let conn_data_ptr = (uv::ll::get_data_for_req(connect_req_ptr) as *ConnectReqData); let result_ch = (*conn_data_ptr).result_ch; - log(debug, fmt!("tcp_connect result_ch %?", result_ch)); + debug!("tcp_connect result_ch %?", result_ch); let tcp_stream_ptr = uv::ll::get_stream_handle_from_connect_req(connect_req_ptr); match status { 0i32 => { - log(debug, ~"successful tcp connection!"); + debug!("successful tcp connection!"); oldcomm::send(result_ch, ConnSuccess); } _ => { - log(debug, ~"error in tcp_connect_on_connect_cb"); + debug!("error in tcp_connect_on_connect_cb"); let loop_ptr = uv::ll::get_loop_for_uv_handle(tcp_stream_ptr); let err_data = uv::ll::get_last_err_data(loop_ptr); - log(debug, fmt!("err_data %? %?", err_data.err_name, - err_data.err_msg)); + debug!("err_data %? %?", err_data.err_name, + err_data.err_msg); oldcomm::send(result_ch, ConnFailure(err_data)); uv::ll::set_data_for_uv_handle(tcp_stream_ptr, conn_data_ptr); uv::ll::close(tcp_stream_ptr, stream_error_close_cb); } } - log(debug, ~"leaving tcp_connect_on_connect_cb"); + debug!("leaving tcp_connect_on_connect_cb"); } enum ConnAttempt { @@ -1251,10 +1289,11 @@ type TcpSocketData = { iotask: IoTask }; -type TcpBufferedSocketData = { +struct TcpBufferedSocketData { sock: TcpSocket, - mut buf: ~[u8] -}; + mut buf: ~[u8], + mut buf_off: uint +} //#[cfg(test)] mod test { @@ -1356,7 +1395,7 @@ mod test { }; oldcomm::recv(cont_po); // client - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); let actual_resp_result = do oldcomm::listen |client_ch| { run_tcp_test_client( server_ip, @@ -1368,10 +1407,10 @@ mod test { assert actual_resp_result.is_ok(); let actual_resp = actual_resp_result.get(); let actual_req = oldcomm::recv(server_result_po); - log(debug, fmt!("REQ: expected: '%s' actual: '%s'", - expected_req, actual_req)); - log(debug, fmt!("RESP: expected: '%s' actual: '%s'", - expected_resp, actual_resp)); + debug!("REQ: expected: '%s' actual: '%s'", + expected_req, actual_req); + debug!("RESP: expected: '%s' actual: '%s'", + expected_resp, actual_resp); assert str::contains(actual_req, expected_req); assert str::contains(actual_resp, expected_resp); } @@ -1401,7 +1440,7 @@ mod test { }; oldcomm::recv(cont_po); // client - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); do oldcomm::listen |client_ch| { let server_ip_addr = ip::v4::parse_addr(server_ip); let iotask = uv::global_loop::get(); @@ -1428,7 +1467,7 @@ mod test { let server_port = 8889u; let expected_req = ~"ping"; // client - log(debug, ~"firing up client.."); + debug!("firing up client.."); let actual_resp_result = do oldcomm::listen |client_ch| { run_tcp_test_client( server_ip, @@ -1474,7 +1513,7 @@ mod test { server_port, hl_loop); // client.. just doing this so that the first server tears down - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); do oldcomm::listen |client_ch| { run_tcp_test_client( server_ip, @@ -1556,10 +1595,10 @@ mod test { }; let actual_req = core::comm::recv(server_result_po); - log(debug, fmt!("REQ: expected: '%s' actual: '%s'", - expected_req, actual_req)); - log(debug, fmt!("RESP: expected: '%s' actual: '%s'", - expected_resp, actual_resp)); + debug!("REQ: expected: '%s' actual: '%s'", + expected_req, actual_req); + debug!("RESP: expected: '%s' actual: '%s'", + expected_resp, actual_resp); assert str::contains(actual_req, expected_req); assert str::contains(actual_resp, expected_resp); */ @@ -1593,7 +1632,7 @@ mod test { }; oldcomm::recv(cont_po); // client - log(debug, ~"server started, firing up client.."); + debug!("server started, firing up client.."); let server_addr = ip::v4::parse_addr(server_ip); let conn_result = connect(move server_addr, server_port, hl_loop); if result::is_err(&conn_result) { @@ -1604,23 +1643,23 @@ mod test { let buf_reader = sock_buf as Reader; let actual_response = str::from_bytes(buf_reader.read_whole_stream()); - log(debug, fmt!("Actual response: %s", actual_response)); + debug!("Actual response: %s", actual_response); assert expected_resp == actual_response; } fn buf_write(w: &W, val: &str) { - log(debug, fmt!("BUF_WRITE: val len %?", str::len(val))); + debug!("BUF_WRITE: val len %?", str::len(val)); do str::byte_slice(val) |b_slice| { - log(debug, fmt!("BUF_WRITE: b_slice len %?", - vec::len(b_slice))); + debug!("BUF_WRITE: b_slice len %?", + vec::len(b_slice)); w.write(b_slice) } } fn buf_read(r: &R, len: uint) -> ~str { let new_bytes = (*r).read_bytes(len); - log(debug, fmt!("in buf_read.. new_bytes len: %?", - vec::len(new_bytes))); + debug!("in buf_read.. new_bytes len: %?", + vec::len(new_bytes)); str::from_bytes(new_bytes) } @@ -1633,65 +1672,61 @@ mod test { iotask, // on_establish_cb -- called when listener is set up |kill_ch| { - log(debug, fmt!("establish_cb %?", - kill_ch)); + debug!("establish_cb %?", kill_ch); oldcomm::send(cont_ch, ()); }, // risky to run this on the loop, but some users // will want the POWER |new_conn, kill_ch| { - log(debug, ~"SERVER: new connection!"); + debug!("SERVER: new connection!"); do oldcomm::listen |cont_ch| { do task::spawn_sched(task::ManualThreads(1u)) { - log(debug, ~"SERVER: starting worker for new req"); + debug!("SERVER: starting worker for new req"); let accept_result = accept(new_conn); - log(debug, ~"SERVER: after accept()"); + debug!("SERVER: after accept()"); if result::is_err(&accept_result) { - log(debug, ~"SERVER: error accept connection"); + debug!("SERVER: error accept connection"); let err_data = result::get_err(&accept_result); oldcomm::send(kill_ch, Some(err_data)); - log(debug, - ~"SERVER/WORKER: send on err cont ch"); + debug!("SERVER/WORKER: send on err cont ch"); cont_ch.send(()); } else { - log(debug, - ~"SERVER/WORKER: send on cont ch"); + debug!("SERVER/WORKER: send on cont ch"); cont_ch.send(()); let sock = result::unwrap(move accept_result); let peer_addr = sock.get_peer_addr(); - log(debug, ~"SERVER: successfully accepted"+ - fmt!(" connection from %s:%u", + debug!("SERVER: successfully accepted connection from %s:%u", ip::format_addr(&peer_addr), - ip::get_port(&peer_addr))); + ip::get_port(&peer_addr)); let received_req_bytes = read(&sock, 0u); match move received_req_bytes { result::Ok(move data) => { - log(debug, ~"SERVER: got REQ str::from_bytes.."); - log(debug, fmt!("SERVER: REQ data len: %?", - vec::len(data))); + debug!("SERVER: got REQ str::from_bytes.."); + debug!("SERVER: REQ data len: %?", + vec::len(data)); server_ch.send( str::from_bytes(data)); - log(debug, ~"SERVER: before write"); + debug!("SERVER: before write"); tcp_write_single(&sock, str::to_bytes(resp)); - log(debug, ~"SERVER: after write.. die"); + debug!("SERVER: after write.. die"); oldcomm::send(kill_ch, None); } result::Err(move err_data) => { - log(debug, fmt!("SERVER: error recvd: %s %s", - err_data.err_name, err_data.err_msg)); + debug!("SERVER: error recvd: %s %s", + err_data.err_name, err_data.err_msg); oldcomm::send(kill_ch, Some(err_data)); server_ch.send(~""); } } - log(debug, ~"SERVER: worker spinning down"); + debug!("SERVER: worker spinning down"); } } - log(debug, ~"SERVER: waiting to recv on cont_ch"); + debug!("SERVER: waiting to recv on cont_ch"); cont_ch.recv() }; - log(debug, ~"SERVER: recv'd on cont_ch..leaving listen cb"); + debug!("SERVER: recv'd on cont_ch..leaving listen cb"); }); // err check on listen_result if result::is_err(&listen_result) { @@ -1709,7 +1744,7 @@ mod test { } } let ret_val = server_ch.recv(); - log(debug, fmt!("SERVER: exited and got return val: '%s'", ret_val)); + debug!("SERVER: exited and got return val: '%s'", ret_val); ret_val } @@ -1720,8 +1755,7 @@ mod test { iotask, // on_establish_cb -- called when listener is set up |kill_ch| { - log(debug, fmt!("establish_cb %?", - kill_ch)); + debug!("establish_cb %?", kill_ch); }, |new_conn, kill_ch| { fail fmt!("SERVER: shouldn't be called.. %? %?", @@ -1742,11 +1776,11 @@ mod test { TcpConnectErrData> { let server_ip_addr = ip::v4::parse_addr(server_ip); - log(debug, ~"CLIENT: starting.."); + debug!("CLIENT: starting.."); let connect_result = connect(move server_ip_addr, server_port, iotask); if result::is_err(&connect_result) { - log(debug, ~"CLIENT: failed to connect"); + debug!("CLIENT: failed to connect"); let err_data = result::get_err(&connect_result); Err(err_data) } @@ -1756,14 +1790,13 @@ mod test { tcp_write_single(&sock, resp_bytes); let read_result = sock.read(0u); if read_result.is_err() { - log(debug, ~"CLIENT: failure to read"); + debug!("CLIENT: failure to read"); Ok(~"") } else { client_ch.send(str::from_bytes(read_result.get())); let ret_val = client_ch.recv(); - log(debug, fmt!("CLIENT: after client_ch recv ret: '%s'", - ret_val)); + debug!("CLIENT: after client_ch recv ret: '%s'", ret_val); Ok(ret_val) } } @@ -1773,10 +1806,10 @@ mod test { let write_result_future = sock.write_future(val); let write_result = write_result_future.get(); if result::is_err(&write_result) { - log(debug, ~"tcp_write_single: write failed!"); + debug!("tcp_write_single: write failed!"); let err_data = result::get_err(&write_result); - log(debug, fmt!("tcp_write_single err name: %s msg: %s", - err_data.err_name, err_data.err_msg)); + debug!("tcp_write_single err name: %s msg: %s", + err_data.err_name, err_data.err_msg); // meh. torn on what to do here. fail ~"tcp_write_single failed"; }