Skip to content

Commit

Permalink
net: track bytesWritten in C++ land
Browse files Browse the repository at this point in the history
Move tracking of `socket.bytesWritten` to C++ land.

This makes it easier to provide this functionality for all
`StreamBase` instances, and in particular should keep working
when they have been 'consumed' in C++ in some way (e.g. for
the network sockets that are underlying to TLS or HTTP2 streams).

Also, this parallels `socket.bytesRead` a lot more now.

PR-URL: #19551
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed Mar 30, 2018
1 parent abc8786 commit 923fb5c
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 10 deletions.
2 changes: 1 addition & 1 deletion lib/internal/net.js
Expand Up @@ -32,7 +32,7 @@ function makeSyncWrite(fd) {
if (enc !== 'buffer')
chunk = Buffer.from(chunk, enc);

this._bytesDispatched += chunk.length;
this._handle.bytesWritten += chunk.length;

const ctx = {};
writeBuffer(fd, chunk, 0, chunk.length, null, undefined, ctx);
Expand Down
28 changes: 20 additions & 8 deletions lib/net.js
Expand Up @@ -206,7 +206,6 @@ function normalizeArgs(args) {
// called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) {
self._undestroy();
self._bytesDispatched = 0;
self._sockname = null;

// Handle creation may be deferred to bind() or connect() time.
Expand All @@ -222,7 +221,8 @@ function initSocketHandle(self) {
}


const BYTES_READ = Symbol('bytesRead');
const kBytesRead = Symbol('kBytesRead');
const kBytesWritten = Symbol('kBytesWritten');


function Socket(options) {
Expand Down Expand Up @@ -278,6 +278,11 @@ function Socket(options) {

this._writev = null;
this._write = makeSyncWrite(fd);
// makeSyncWrite adjusts this value like the original handle would, so
// we need to let it do that by turning it into a writable, own property.
Object.defineProperty(this._handle, 'bytesWritten', {
value: 0, writable: true
});
}
} else {
// these will be set once there is a connection
Expand Down Expand Up @@ -316,7 +321,8 @@ function Socket(options) {
this._server = null;

// Used after `.destroy()`
this[BYTES_READ] = 0;
this[kBytesRead] = 0;
this[kBytesWritten] = 0;
}
util.inherits(Socket, stream.Duplex);

Expand Down Expand Up @@ -588,8 +594,9 @@ Socket.prototype._destroy = function(exception, cb) {
if (this !== process.stderr)
debug('close handle');
var isException = exception ? true : false;
// `bytesRead` should be accessible after `.destroy()`
this[BYTES_READ] = this._handle.bytesRead;
// `bytesRead` and `kBytesWritten` should be accessible after `.destroy()`
this[kBytesRead] = this._handle.bytesRead;
this[kBytesWritten] = this._handle.bytesWritten;

this._handle.close(() => {
debug('emit close');
Expand Down Expand Up @@ -689,7 +696,7 @@ function protoGetter(name, callback) {
}

protoGetter('bytesRead', function bytesRead() {
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
return this._handle ? this._handle.bytesRead : this[kBytesRead];
});

protoGetter('remoteAddress', function remoteAddress() {
Expand Down Expand Up @@ -761,8 +768,6 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// Bail out if handle.write* returned an error
if (ret) return ret;

this._bytesDispatched += req.bytes;

if (!req.async) {
cb();
return;
Expand All @@ -782,6 +787,13 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};


// Legacy alias. Having this is probably being overly cautious, but it doesn't
// really hurt anyone either. This can probably be removed safely if desired.
protoGetter('_bytesDispatched', function _bytesDispatched() {
return this._handle ? this._handle.bytesWritten : this[kBytesWritten];
});

protoGetter('bytesWritten', function bytesWritten() {
var bytes = this._bytesDispatched;
const state = this._writableState;
Expand Down
1 change: 1 addition & 0 deletions src/env.h
Expand Up @@ -117,6 +117,7 @@ struct PackageConfig {
V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \
V(cached_data_string, "cachedData") \
V(cached_data_produced_string, "cachedDataProduced") \
V(cached_data_rejected_string, "cachedDataRejected") \
Expand Down
28 changes: 27 additions & 1 deletion src/stream_base-inl.h
Expand Up @@ -193,6 +193,10 @@ inline StreamWriteResult StreamBase::Write(
v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
int err;

for (size_t i = 0; i < count; ++i)
bytes_written_ += bufs[i].len;

if (send_handle == nullptr) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
Expand Down Expand Up @@ -301,6 +305,12 @@ void StreamBase::AddMethods(Environment* env,
env->as_external(),
signature);

Local<FunctionTemplate> get_bytes_written_templ =
FunctionTemplate::New(env->isolate(),
GetBytesWritten<Base>,
env->as_external(),
signature);

t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(),
get_fd_templ,
Local<FunctionTemplate>(),
Expand All @@ -316,6 +326,11 @@ void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate>(),
attributes);

t->PrototypeTemplate()->SetAccessorProperty(env->bytes_written_string(),
get_bytes_written_templ,
Local<FunctionTemplate>(),
attributes);

env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
if ((flags & kFlagNoShutdown) == 0)
Expand Down Expand Up @@ -357,7 +372,6 @@ void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {

template <class Base>
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
// The handle instance hasn't been set. So no bytes could have been read.
Base* handle;
ASSIGN_OR_RETURN_UNWRAP(&handle,
args.This(),
Expand All @@ -368,6 +382,18 @@ void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
}

template <class Base>
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
Base* handle;
ASSIGN_OR_RETURN_UNWRAP(&handle,
args.This(),
args.GetReturnValue().Set(0));

StreamBase* wrap = static_cast<StreamBase*>(handle);
// uint64_t -> double. 53bits is enough for all real cases.
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
}

template <class Base>
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
Base* handle;
Expand Down
1 change: 1 addition & 0 deletions src/stream_base.cc
Expand Up @@ -243,6 +243,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
uv_buf_t* bufs = &buf;
size_t count = 1;
err = DoTryWrite(&bufs, &count);
bytes_written_ += data_size;

// Immediate failure or success
if (err != 0 || count == 0) {
Expand Down
4 changes: 4 additions & 0 deletions src/stream_base.h
Expand Up @@ -247,6 +247,7 @@ class StreamResource {

StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;

friend class StreamListener;
};
Expand Down Expand Up @@ -324,6 +325,9 @@ class StreamBase : public StreamResource {
template <class Base>
static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base>
static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base,
int (StreamBase::*Method)(
const v8::FunctionCallbackInfo<v8::Value>& args)>
Expand Down

0 comments on commit 923fb5c

Please sign in to comment.