Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
Fix buffer size counting bug and add logging to UDP sender (#151)
Browse files Browse the repository at this point in the history
* Fix buffer size counting bug and add logging to UDP sender
  • Loading branch information
yurishkuro committed Aug 28, 2017
1 parent 5bd0763 commit 51e40ec
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 34 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -46,6 +46,7 @@
"mocha": "^3.0.1",
"request": "^2.74.0",
"rsvp": "^3.3.1",
"semver": "^5",
"sinon": "^1.17.5",
"source-map-support": "^0.4.5",
"tchannel": "^3.9.0",
Expand Down
33 changes: 33 additions & 0 deletions src/_flow/socket.js
@@ -0,0 +1,33 @@
// @flow
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

declare class dgram$Socket {
on(event: string, listener: Function): this;
close(): void;
send(
msg: Buffer,
offset: number,
length: number,
port: number,
address: string,
callback?: (err: ?any, bytes: any) => mixed,
): void;
};
4 changes: 3 additions & 1 deletion src/configuration.js
Expand Up @@ -101,7 +101,9 @@ export default class Configuration {
static _getReporter(config, options) {
let reporterConfig = {};
let reporters = [];
let senderConfig = {};
let senderConfig = {
'logger': config.logger
};
if (config.reporter) {
if (config.reporter.logSpans) {
reporters.push(new LoggingReporter(options.logger));
Expand Down
46 changes: 27 additions & 19 deletions src/reporters/udp_sender.js
Expand Up @@ -23,6 +23,7 @@ import dgram from 'dgram';
import fs from 'fs';
import path from 'path';
import {Thrift} from 'thriftrw';
import NullLogger from '../logger.js';

const HOST = 'localhost';
const PORT = 6832;
Expand All @@ -34,20 +35,24 @@ export default class UDPSender {
_maxPacketSize: number;
_process: Process;
_emitSpanBatchOverhead: number;
_maxSpanBytes: number;
_client: any;
_byteBufferSize: number;
_logger: Logger;
_client: dgram$Socket;
_agentThrift: Thrift;
_jaegerThrift: Thrift;
_batch: Batch;
_thriftProcessMessage: any;
_maxSpanBytes: number; // maxPacketSize - (batch + tags overhead)
_totalSpanBytes: number; // size of currently batched spans as Thrift bytes

constructor(options: any = {}) {
this._host = options.host || HOST;
this._port = options.port || PORT;
this._maxPacketSize = options.maxPacketSize || UDP_PACKET_MAX_LENGTH;
this._byteBufferSize = 0;
this._logger = options.logger || new NullLogger();
this._client = dgram.createSocket('udp4');
this._client.on('error', err => {
this._logger.error(`error sending spans over UDP: ${err}`)
})
this._agentThrift = new Thrift({
source: fs.readFileSync(path.join(__dirname, '../thriftrw-idl/agent.thrift'), 'ascii'),
allowOptionalArguments: true,
Expand All @@ -57,6 +62,7 @@ export default class UDPSender {
source: fs.readFileSync(path.join(__dirname, '../jaeger-idl/thrift/jaeger.thrift'), 'ascii'),
allowOptionalArguments: true
});
this._totalSpanBytes = 0;
}

_calcBatchSize(batch: Batch) {
Expand Down Expand Up @@ -99,18 +105,19 @@ export default class UDPSender {
return { err: true, numSpans: 1 };
}

this._byteBufferSize += spanSize;
if (this._byteBufferSize <= this._maxSpanBytes) {
if (this._totalSpanBytes + spanSize <= this._maxSpanBytes) {
this._batch.spans.push(span);
if (this._byteBufferSize < this._maxSpanBytes) {
this._totalSpanBytes += spanSize;
if (this._totalSpanBytes < this._maxSpanBytes) {
// still have space in the buffer, don't flush it yet
return {err: false, numSpans: 0};
}
return this.flush();
}

let flushResponse: SenderResponse = this.flush();
this._batch.spans.push(span);
this._byteBufferSize = spanSize;
this._totalSpanBytes = spanSize;
return flushResponse;
}

Expand All @@ -120,23 +127,24 @@ export default class UDPSender {
return {err: false, numSpans: 0}
}

let bufferLen = this._byteBufferSize + this._emitSpanBatchOverhead;
let bufferLen = this._totalSpanBytes + this._emitSpanBatchOverhead;
let thriftBuffer = new Buffer(bufferLen);
let bufferResult = this._agentThrift.Agent.emitBatch.argumentsMessageRW.writeInto(
let writeResult = this._agentThrift.Agent.emitBatch.argumentsMessageRW.writeInto(
this._convertBatchToThriftMessage(this._batch), thriftBuffer, 0
);

if (bufferResult.err) {
console.log('err', bufferResult.err);
if (writeResult.err) {
this._logger.error(`error writing Thrift object: ${writeResult.err}`);
return {err: true, numSpans: numSpans};
}

// https://nodejs.org/api/dgram.html#dgram_socket_send_msg_offset_length_port_address_callback
this._client.on('error', err => {
console.log(`error sending span: ${err}`)
})

this._client.send(thriftBuffer, 0, thriftBuffer.length, this._port, this._host);
// Having the error callback here does not prevent uncaught exception from being thrown,
// that's why in the constructor we also add a general on('error') handler.
this._client.send(thriftBuffer, 0, thriftBuffer.length, this._port, this._host, (err, sent) => {
if (err) {
this._logger.error(`error sending spans over UDP: ${err}, packet size: ${writeResult.offset}, bytes sent: ${sent}`);
}
});
this._reset();

return {err: false, numSpans: numSpans};
Expand All @@ -161,7 +169,7 @@ export default class UDPSender {

_reset() {
this._batch.spans = [];
this._byteBufferSize = 0;
this._totalSpanBytes = 0;
}

close(): void {
Expand Down
47 changes: 33 additions & 14 deletions test/udp_sender.js
Expand Up @@ -24,13 +24,15 @@ import ConstSampler from '../src/samplers/const_sampler.js';
import dgram from 'dgram';
import fs from 'fs';
import path from 'path';
import semver from 'semver';
import InMemoryReporter from '../src/reporters/in_memory_reporter.js';
import RemoteReporter from '../src/reporters/remote_reporter.js';
import opentracing from 'opentracing';
import Tracer from '../src/tracer.js';
import {Thrift} from 'thriftrw';
import ThriftUtils from '../src/thrift.js';
import UDPSender from '../src/reporters/udp_sender.js';
import NullLogger from '../src/logger.js';

const PORT = 6832;
const HOST = '127.0.0.1';
Expand Down Expand Up @@ -179,7 +181,6 @@ describe('udp sender should', () => {
let spanOne = tracer.startSpan('operation-one');
spanOne.finish(); // finish to set span duration
spanOne = ThriftUtils.spanToThrift(spanOne);
sender._maxSpanBytes = 1;
let spanSize = sender._calcSpanSize(spanOne);
sender._maxSpanBytes = spanSize * 2;

Expand All @@ -192,7 +193,7 @@ describe('udp sender should', () => {
assert.equal(responseTwo.numSpans, 2);

assert.equal(sender._batch.spans.length, 0);
assert.equal(sender._byteBufferSize, 0);
assert.equal(sender._totalSpanBytes, 0);
});

it ('flush spans when just over capacity', () => {
Expand All @@ -212,14 +213,23 @@ describe('udp sender should', () => {
let expectedBufferSize = sender._calcSpanSize(spanThatExceedsCapacity);

assert.equal(sender._batch.spans.length, 1);
assert.equal(sender._byteBufferSize, expectedBufferSize);
assert.equal(sender._totalSpanBytes, expectedBufferSize);
assert.equal(responseOne.err, false);
assert.equal(responseOne.numSpans, 0);
assert.equal(responseTwo.err, false);
assert.equal(responseTwo.numSpans, 1);
});

it('flush returns error, on failed buffer conversion', () => {
it('flush returns error, on failed buffer conversion', (done) => {
sender._logger = {
info: (msg) => {
console.log('sender info: ' + msg);
},
error: (msg) => {
expect(msg).to.have.string('error writing Thrift object:');
done();
}
};
let span = tracer.startSpan('leela');
span.finish(); // finish to set span duration
span = ThriftUtils.spanToThrift(span);
Expand Down Expand Up @@ -252,19 +262,28 @@ describe('udp sender should', () => {
});

it ('flush gracefully handles errors emitted by socket.send', done => {
sender._host = 'foo.bar.com';
sender._port = 1234;
new Tracer(
sender._host = 'foo.bar.xyz';
// In Node 0.10 and 0.12 the error is logged twice: (1) from inline callback, (2) from on('error') handler.
let node0_10_12 = semver.satisfies(process.version, '0.10.x || 0.12.x');
let expectedLogs = node0_10_12 ? 2 : 1;
sender._logger = {
info: (msg) => {
console.log('sender info: ' + msg);
},
error: (msg) => {
expect(msg).to.have.string('error sending spans over UDP: Error: getaddrinfo ENOTFOUND');
expectedLogs--;
if (expectedLogs == 0) {
done();
}
}
};
let tracer = new Tracer(
'test-service-name',
new RemoteReporter(sender),
new ConstSampler(true)
).startSpan('testSpan').finish();
let oldLog = console.log;
console.log = message => {
expect(message).to.have.string('error sending span: Error: getaddrinfo ENOTFOUND');
console.log = oldLog;
done();
};
);
tracer.startSpan('testSpan').finish();
sender.flush();
});
});

0 comments on commit 51e40ec

Please sign in to comment.