diff --git a/package.json b/package.json index b755a9737..b2ca314f1 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "mocha": "^3.0.1", "request": "^2.74.0", "rsvp": "^3.3.1", + "semver": "^5", "sinon": "^1.17.5", "source-map-support": "^0.4.5", "tchannel": "^3.9.0", diff --git a/src/_flow/socket.js b/src/_flow/socket.js new file mode 100644 index 000000000..4e0d740f0 --- /dev/null +++ b/src/_flow/socket.js @@ -0,0 +1,33 @@ +// @flow +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +declare class dgram$Socket { + on(event: string, listener: Function): this; + close(): void; + send( + msg: Buffer, + offset: number, + length: number, + port: number, + address: string, + callback?: (err: ?any, bytes: any) => mixed, + ): void; + }; diff --git a/src/configuration.js b/src/configuration.js index f23a52114..341a74bbc 100644 --- a/src/configuration.js +++ b/src/configuration.js @@ -101,7 +101,9 @@ export default class Configuration { static _getReporter(config, options) { let reporterConfig = {}; let reporters = []; - let senderConfig = {}; + let senderConfig = { + 'logger': config.logger + }; if (config.reporter) { if (config.reporter.logSpans) { reporters.push(new LoggingReporter(options.logger)); diff --git a/src/reporters/udp_sender.js b/src/reporters/udp_sender.js index 3c096419d..0c05afcce 100644 --- a/src/reporters/udp_sender.js +++ b/src/reporters/udp_sender.js @@ -23,6 +23,7 @@ import dgram from 'dgram'; import fs from 'fs'; import path from 'path'; import {Thrift} from 'thriftrw'; +import NullLogger from '../logger.js'; const HOST = 'localhost'; const PORT = 6832; @@ -34,20 +35,24 @@ export default class UDPSender { _maxPacketSize: number; _process: Process; _emitSpanBatchOverhead: number; - _maxSpanBytes: number; - _client: any; - _byteBufferSize: number; + _logger: Logger; + _client: dgram$Socket; _agentThrift: Thrift; _jaegerThrift: Thrift; _batch: Batch; _thriftProcessMessage: any; + _maxSpanBytes: number; // maxPacketSize - (batch + tags overhead) + _totalSpanBytes: number; // size of currently batched spans as Thrift bytes constructor(options: any = {}) { this._host = options.host || HOST; this._port = options.port || PORT; this._maxPacketSize = options.maxPacketSize || UDP_PACKET_MAX_LENGTH; - this._byteBufferSize = 0; + this._logger = options.logger || new NullLogger(); this._client = dgram.createSocket('udp4'); + this._client.on('error', err => { + this._logger.error(`error sending spans over UDP: ${err}`) + }) this._agentThrift = new Thrift({ source: fs.readFileSync(path.join(__dirname, '../thriftrw-idl/agent.thrift'), 'ascii'), allowOptionalArguments: true, @@ -57,6 +62,7 @@ export default class UDPSender { source: fs.readFileSync(path.join(__dirname, '../jaeger-idl/thrift/jaeger.thrift'), 'ascii'), allowOptionalArguments: true }); + this._totalSpanBytes = 0; } _calcBatchSize(batch: Batch) { @@ -99,10 +105,11 @@ export default class UDPSender { return { err: true, numSpans: 1 }; } - this._byteBufferSize += spanSize; - if (this._byteBufferSize <= this._maxSpanBytes) { + if (this._totalSpanBytes + spanSize <= this._maxSpanBytes) { this._batch.spans.push(span); - if (this._byteBufferSize < this._maxSpanBytes) { + this._totalSpanBytes += spanSize; + if (this._totalSpanBytes < this._maxSpanBytes) { + // still have space in the buffer, don't flush it yet return {err: false, numSpans: 0}; } return this.flush(); @@ -110,7 +117,7 @@ export default class UDPSender { let flushResponse: SenderResponse = this.flush(); this._batch.spans.push(span); - this._byteBufferSize = spanSize; + this._totalSpanBytes = spanSize; return flushResponse; } @@ -120,23 +127,24 @@ export default class UDPSender { return {err: false, numSpans: 0} } - let bufferLen = this._byteBufferSize + this._emitSpanBatchOverhead; + let bufferLen = this._totalSpanBytes + this._emitSpanBatchOverhead; let thriftBuffer = new Buffer(bufferLen); - let bufferResult = this._agentThrift.Agent.emitBatch.argumentsMessageRW.writeInto( + let writeResult = this._agentThrift.Agent.emitBatch.argumentsMessageRW.writeInto( this._convertBatchToThriftMessage(this._batch), thriftBuffer, 0 ); - if (bufferResult.err) { - console.log('err', bufferResult.err); + if (writeResult.err) { + this._logger.error(`error writing Thrift object: ${writeResult.err}`); return {err: true, numSpans: numSpans}; } - // https://nodejs.org/api/dgram.html#dgram_socket_send_msg_offset_length_port_address_callback - this._client.on('error', err => { - console.log(`error sending span: ${err}`) - }) - - this._client.send(thriftBuffer, 0, thriftBuffer.length, this._port, this._host); + // Having the error callback here does not prevent uncaught exception from being thrown, + // that's why in the constructor we also add a general on('error') handler. + this._client.send(thriftBuffer, 0, thriftBuffer.length, this._port, this._host, (err, sent) => { + if (err) { + this._logger.error(`error sending spans over UDP: ${err}, packet size: ${writeResult.offset}, bytes sent: ${sent}`); + } + }); this._reset(); return {err: false, numSpans: numSpans}; @@ -161,7 +169,7 @@ export default class UDPSender { _reset() { this._batch.spans = []; - this._byteBufferSize = 0; + this._totalSpanBytes = 0; } close(): void { diff --git a/test/udp_sender.js b/test/udp_sender.js index 20ceb5cfa..1b5d5aca0 100644 --- a/test/udp_sender.js +++ b/test/udp_sender.js @@ -24,6 +24,7 @@ import ConstSampler from '../src/samplers/const_sampler.js'; import dgram from 'dgram'; import fs from 'fs'; import path from 'path'; +import semver from 'semver'; import InMemoryReporter from '../src/reporters/in_memory_reporter.js'; import RemoteReporter from '../src/reporters/remote_reporter.js'; import opentracing from 'opentracing'; @@ -31,6 +32,7 @@ import Tracer from '../src/tracer.js'; import {Thrift} from 'thriftrw'; import ThriftUtils from '../src/thrift.js'; import UDPSender from '../src/reporters/udp_sender.js'; +import NullLogger from '../src/logger.js'; const PORT = 6832; const HOST = '127.0.0.1'; @@ -179,7 +181,6 @@ describe('udp sender should', () => { let spanOne = tracer.startSpan('operation-one'); spanOne.finish(); // finish to set span duration spanOne = ThriftUtils.spanToThrift(spanOne); - sender._maxSpanBytes = 1; let spanSize = sender._calcSpanSize(spanOne); sender._maxSpanBytes = spanSize * 2; @@ -192,7 +193,7 @@ describe('udp sender should', () => { assert.equal(responseTwo.numSpans, 2); assert.equal(sender._batch.spans.length, 0); - assert.equal(sender._byteBufferSize, 0); + assert.equal(sender._totalSpanBytes, 0); }); it ('flush spans when just over capacity', () => { @@ -212,14 +213,23 @@ describe('udp sender should', () => { let expectedBufferSize = sender._calcSpanSize(spanThatExceedsCapacity); assert.equal(sender._batch.spans.length, 1); - assert.equal(sender._byteBufferSize, expectedBufferSize); + assert.equal(sender._totalSpanBytes, expectedBufferSize); assert.equal(responseOne.err, false); assert.equal(responseOne.numSpans, 0); assert.equal(responseTwo.err, false); assert.equal(responseTwo.numSpans, 1); }); - it('flush returns error, on failed buffer conversion', () => { + it('flush returns error, on failed buffer conversion', (done) => { + sender._logger = { + info: (msg) => { + console.log('sender info: ' + msg); + }, + error: (msg) => { + expect(msg).to.have.string('error writing Thrift object:'); + done(); + } + }; let span = tracer.startSpan('leela'); span.finish(); // finish to set span duration span = ThriftUtils.spanToThrift(span); @@ -252,19 +262,28 @@ describe('udp sender should', () => { }); it ('flush gracefully handles errors emitted by socket.send', done => { - sender._host = 'foo.bar.com'; - sender._port = 1234; - new Tracer( + sender._host = 'foo.bar.xyz'; + // In Node 0.10 and 0.12 the error is logged twice: (1) from inline callback, (2) from on('error') handler. + let node0_10_12 = semver.satisfies(process.version, '0.10.x || 0.12.x'); + let expectedLogs = node0_10_12 ? 2 : 1; + sender._logger = { + info: (msg) => { + console.log('sender info: ' + msg); + }, + error: (msg) => { + expect(msg).to.have.string('error sending spans over UDP: Error: getaddrinfo ENOTFOUND'); + expectedLogs--; + if (expectedLogs == 0) { + done(); + } + } + }; + let tracer = new Tracer( 'test-service-name', new RemoteReporter(sender), new ConstSampler(true) - ).startSpan('testSpan').finish(); - let oldLog = console.log; - console.log = message => { - expect(message).to.have.string('error sending span: Error: getaddrinfo ENOTFOUND'); - console.log = oldLog; - done(); - }; + ); + tracer.startSpan('testSpan').finish(); sender.flush(); }); });