From 73fc0e986e48e40f97d9b77091fd4e7dd04e8bb7 Mon Sep 17 00:00:00 2001 From: eirslett Date: Fri, 29 Apr 2016 00:23:48 +0200 Subject: [PATCH] first commit --- .editorconfig | 6 + .eslintignore | 2 + .eslintrc | 19 + .gitignore | 2 + .travis.yml | 7 + README.md | 65 +++ index.js | 1 + lerna.json | 4 + package.json | 24 + .../index.js | 1 + .../package.json | 17 + .../src/restInterceptor.js | 69 +++ .../test/.eslintrc | 8 + .../test/integrationTest.js | 73 +++ .../zipkin-instrumentation-express/README.md | 18 + .../zipkin-instrumentation-express/index.js | 1 + .../package.json | 18 + .../src/expressMiddleware.js | 85 +++ .../test/.eslintrc | 8 + .../test/integrationTest.js | 72 +++ packages/zipkin-transport-kafka/index.js | 1 + packages/zipkin-transport-kafka/package.json | 21 + .../zipkin-transport-kafka/src/kafkaLogger.js | 39 ++ .../zipkin-transport-kafka/test/.eslintrc | 8 + .../test/integrationTest.js | 112 ++++ packages/zipkin-transport-scribe/index.js | 1 + packages/zipkin-transport-scribe/package.json | 20 + .../src/ScribeLogger.js | 36 ++ .../zipkin-transport-scribe/test/.eslintrc | 8 + .../test/gen-nodejs/scribe.js | 251 +++++++++ .../test/gen-nodejs/scribeServer_types.js | 81 +++ .../test/integrationTest.js | 48 ++ .../test/scribeServer.thrift | 15 + packages/zipkin/index.js | 19 + packages/zipkin/package.json | 19 + packages/zipkin/src/InetAddress.js | 27 + packages/zipkin/src/ZipkinTracer.js | 96 ++++ packages/zipkin/src/annotation.js | 84 +++ packages/zipkin/src/consoleTracer.js | 12 + .../zipkin/src/gen-nodejs/zipkinCore_types.js | 496 ++++++++++++++++++ packages/zipkin/src/httpHeaders.js | 7 + .../zipkin/src/internalRepresentations.js | 136 +++++ packages/zipkin/src/option.js | 76 +++ packages/zipkin/src/randomTraceId.js | 12 + packages/zipkin/src/serializeSpan.js | 19 + packages/zipkin/src/time.js | 4 + packages/zipkin/src/trace.js | 229 ++++++++ packages/zipkin/src/traceId.js | 49 ++ packages/zipkin/src/zipkin.js | 21 + packages/zipkin/src/zipkinCore.thrift | 46 ++ packages/zipkin/test/.eslintrc | 8 + packages/zipkin/test/InetAddress.test.js | 16 + packages/zipkin/test/annotation.test.js | 10 + packages/zipkin/test/serializeSpan.test.js | 57 ++ packages/zipkin/test/zipkinTracer.test.js | 59 +++ test/helper.js | 3 + test/mocha.opts | 5 + 57 files changed, 2651 insertions(+) create mode 100644 .editorconfig create mode 100644 .eslintignore create mode 100644 .eslintrc create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 README.md create mode 100644 index.js create mode 100644 lerna.json create mode 100644 package.json create mode 100644 packages/zipkin-instrumentation-cujojs-rest/index.js create mode 100644 packages/zipkin-instrumentation-cujojs-rest/package.json create mode 100644 packages/zipkin-instrumentation-cujojs-rest/src/restInterceptor.js create mode 100644 packages/zipkin-instrumentation-cujojs-rest/test/.eslintrc create mode 100644 packages/zipkin-instrumentation-cujojs-rest/test/integrationTest.js create mode 100644 packages/zipkin-instrumentation-express/README.md create mode 100644 packages/zipkin-instrumentation-express/index.js create mode 100644 packages/zipkin-instrumentation-express/package.json create mode 100644 packages/zipkin-instrumentation-express/src/expressMiddleware.js create mode 100644 packages/zipkin-instrumentation-express/test/.eslintrc create mode 100644 packages/zipkin-instrumentation-express/test/integrationTest.js create mode 100644 packages/zipkin-transport-kafka/index.js create mode 100644 packages/zipkin-transport-kafka/package.json create mode 100644 packages/zipkin-transport-kafka/src/kafkaLogger.js create mode 100644 packages/zipkin-transport-kafka/test/.eslintrc create mode 100644 packages/zipkin-transport-kafka/test/integrationTest.js create mode 100644 packages/zipkin-transport-scribe/index.js create mode 100644 packages/zipkin-transport-scribe/package.json create mode 100644 packages/zipkin-transport-scribe/src/ScribeLogger.js create mode 100644 packages/zipkin-transport-scribe/test/.eslintrc create mode 100644 packages/zipkin-transport-scribe/test/gen-nodejs/scribe.js create mode 100644 packages/zipkin-transport-scribe/test/gen-nodejs/scribeServer_types.js create mode 100644 packages/zipkin-transport-scribe/test/integrationTest.js create mode 100644 packages/zipkin-transport-scribe/test/scribeServer.thrift create mode 100644 packages/zipkin/index.js create mode 100644 packages/zipkin/package.json create mode 100644 packages/zipkin/src/InetAddress.js create mode 100644 packages/zipkin/src/ZipkinTracer.js create mode 100644 packages/zipkin/src/annotation.js create mode 100644 packages/zipkin/src/consoleTracer.js create mode 100644 packages/zipkin/src/gen-nodejs/zipkinCore_types.js create mode 100644 packages/zipkin/src/httpHeaders.js create mode 100644 packages/zipkin/src/internalRepresentations.js create mode 100644 packages/zipkin/src/option.js create mode 100644 packages/zipkin/src/randomTraceId.js create mode 100644 packages/zipkin/src/serializeSpan.js create mode 100644 packages/zipkin/src/time.js create mode 100644 packages/zipkin/src/trace.js create mode 100644 packages/zipkin/src/traceId.js create mode 100644 packages/zipkin/src/zipkin.js create mode 100644 packages/zipkin/src/zipkinCore.thrift create mode 100644 packages/zipkin/test/.eslintrc create mode 100644 packages/zipkin/test/InetAddress.test.js create mode 100644 packages/zipkin/test/annotation.test.js create mode 100644 packages/zipkin/test/serializeSpan.test.js create mode 100644 packages/zipkin/test/zipkinTracer.test.js create mode 100644 test/helper.js create mode 100644 test/mocha.opts diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..2d8401c3 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +[*.js] +end_of_line = lf +insert_final_newline = true +indent_style = space +indent_size = 2 +trim_trailing_whitespace = true diff --git a/.eslintignore b/.eslintignore new file mode 100644 index 00000000..da60c834 --- /dev/null +++ b/.eslintignore @@ -0,0 +1,2 @@ +node_modules +gen-nodejs diff --git a/.eslintrc b/.eslintrc new file mode 100644 index 00000000..288f6b29 --- /dev/null +++ b/.eslintrc @@ -0,0 +1,19 @@ +{ + "extends": "airbnb", + "rules": { + "comma-dangle": "off", + "func-names": "off", + "object-curly-spacing": [2, "never"], + "space-before-function-paren": [2, "never"], + "eqeqeq": [2, "allow-null"], + "no-else-return": "off", + "prefer-arrow-callback": ["error", { "allowNamedFunctions": true }], + "no-underscore-dangle": "off", + "import/no-unresolved": [2, { + "ignore": ["url"] } + ] + }, + "env": { + "node": true + } +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..5c593d0f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +**/node_modules diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..c529da88 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,7 @@ +language: node_js +install: + - npm install + - npm run lerna-bootstrap +script: + - npm run lint + - npm test diff --git a/README.md b/README.md new file mode 100644 index 00000000..6babd6dc --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# Zipkin.js + +This is a library for instrumenting Node.js applications. It uses a lot of +new JavaScript features and syntax, so Node.js version 6 or newer is required. + + +## Installation: + +npm install zipkin --save + +## Usage: + +```javascript +const {tracer, consoleTracer} = require('zipkin'); + +tracer.pushTracer(consoleTracer); +``` + +## Instrumentations + +Various Node.js libraries have been instrumented with Zipkin support. +In this project: + +- express +- cujojs/rest + +## Transports + +You can choose between multiple transports; for now, +scribe and kafka transports are implemented. They can +be used like this: + +Scribe: + +```javascript +const ScribeTracer = require('zipkin-transport-scribe'); +tracer.pushTracer(new ScribeTracer({ + host: 'localhost', + port: 9410 +}); +``` + +Kafka + +```javascript +const KafkaTracer = require('zipkin-transport-kafka'); +tracer.pushTracer(new KafkaTracer({ + clientOpts: {connectionString: 'localhost:2181'} +}); +``` + +## Developing + +The code base is a monorepo. We use [Lerna](https://github.com/kittens/lerna) for managing inter-module +dependencies, which makes it easier to develop coordinated changes. + +To setup, run: + +npm install +npm run lerna-bootstrap + +Running tests: npm test + +Running code style linting: npm run lint + diff --git a/index.js b/index.js new file mode 100644 index 00000000..122c38c7 --- /dev/null +++ b/index.js @@ -0,0 +1 @@ +module.exports.Zipkin = require('src/zipkin/zipkin'); diff --git a/lerna.json b/lerna.json new file mode 100644 index 00000000..b1f4c6fb --- /dev/null +++ b/lerna.json @@ -0,0 +1,4 @@ +{ + "lerna": "2.0.0-beta.6", + "version": "0.1.0" +} diff --git a/package.json b/package.json new file mode 100644 index 00000000..0c8f3209 --- /dev/null +++ b/package.json @@ -0,0 +1,24 @@ +{ + "name": "zipkin-js", + "private": true, + "license": "Apache-2.0", + "dependencies": { + "lerna": "2.0.0-beta.6" + }, + "scripts": { + "lint": "node node_modules/eslint/bin/eslint.js packages", + "test": "node node_modules/mocha/bin/mocha", + "lerna-bootstrap": "node node_modules/lerna/bin/lerna.js bootstrap" + }, + "devDependencies": { + "chai": "^3.5.0", + "eslint": "^2.8.0", + "eslint-config-airbnb": "^8.0.0", + "eslint-plugin-import": "^1.6.0", + "eslint-plugin-jsx-a11y": "^1.0.2", + "eslint-plugin-react": "^5.0.1", + "lerna": "^2.0.0-beta.6", + "mocha": "^2.4.5", + "sinon": "^1.17.4" + } +} diff --git a/packages/zipkin-instrumentation-cujojs-rest/index.js b/packages/zipkin-instrumentation-cujojs-rest/index.js new file mode 100644 index 00000000..603324e0 --- /dev/null +++ b/packages/zipkin-instrumentation-cujojs-rest/index.js @@ -0,0 +1 @@ +module.exports.restInterceptor = require('./src/restInterceptor'); diff --git a/packages/zipkin-instrumentation-cujojs-rest/package.json b/packages/zipkin-instrumentation-cujojs-rest/package.json new file mode 100644 index 00000000..f1e9c91d --- /dev/null +++ b/packages/zipkin-instrumentation-cujojs-rest/package.json @@ -0,0 +1,17 @@ +{ + "name": "zipkin-instrumentation-cujojs-rest", + "version": "0.1.0", + "description": "Interceptor for instrumenting HTTP calls from the cujoJS rest library", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "OpenZipkin", + "license": "Apache-2.0", + "devDependencies": { + "rest": "^1.3.2" + }, + "peerDependencies": { + "zipkin": "~0.1.0" + } +} diff --git a/packages/zipkin-instrumentation-cujojs-rest/src/restInterceptor.js b/packages/zipkin-instrumentation-cujojs-rest/src/restInterceptor.js new file mode 100644 index 00000000..21ec88a0 --- /dev/null +++ b/packages/zipkin-instrumentation-cujojs-rest/src/restInterceptor.js @@ -0,0 +1,69 @@ +/* eslint-disable no-param-reassign */ +const interceptor = require('rest/interceptor'); +const { + trace, + HttpHeaders: Header, + Annotation +} = require('zipkin'); + +function getRequestMethod(req) { + let method = 'get'; + if (req.entity) { + method = 'post'; + } + if (req.method) { + method = req.method; + } + return method; +} + +function request(req, {serviceName}) { + function recordTraceData(name) { + const method = getRequestMethod(req); + trace.recordServiceName(name); + trace.recordRpc(method.toUpperCase()); + trace.recordBinary('http.url', req.path); + trace.recordAnnotation(new Annotation.ClientSend()); + } + + trace.withContext(() => { + trace.setId(trace.nextId()); + const traceId = trace.id(); + this.traceId = traceId; + + req.headers = req.headers || {}; + req.headers[Header.TraceId] = traceId.traceId; + req.headers[Header.SpanId] = traceId.spanId; + traceId._parentId.ifPresent(psid => { + req.headers[Header.ParentSpanId] = psid; + }); + traceId.sampled.ifPresent(sampled => { + req.headers[Header.Sampled] = sampled ? 'true' : 'false'; + }); + + if (trace.isActivelyTracing()) { + if (serviceName instanceof Function) { + serviceName(req, recordTraceData); + } else { + recordTraceData(serviceName); + } + } + }); + + return req; +} + +function response(res) { + trace.withContext(() => { + trace.setId(this.traceId); + trace.recordBinary('http.status_code', res.status.code.toString()); + trace.recordAnnotation(new Annotation.ClientRecv()); + }); + return res; +} + +module.exports = interceptor({ + + request, + response +}); diff --git a/packages/zipkin-instrumentation-cujojs-rest/test/.eslintrc b/packages/zipkin-instrumentation-cujojs-rest/test/.eslintrc new file mode 100644 index 00000000..3c8fcd73 --- /dev/null +++ b/packages/zipkin-instrumentation-cujojs-rest/test/.eslintrc @@ -0,0 +1,8 @@ +{ + "env": { + "mocha": true + }, + "globals": { + "expect": true + } +} diff --git a/packages/zipkin-instrumentation-cujojs-rest/test/integrationTest.js b/packages/zipkin-instrumentation-cujojs-rest/test/integrationTest.js new file mode 100644 index 00000000..38c2f6b8 --- /dev/null +++ b/packages/zipkin-instrumentation-cujojs-rest/test/integrationTest.js @@ -0,0 +1,73 @@ +const {trace} = require('zipkin'); +const express = require('express'); +const sinon = require('sinon'); +const rest = require('rest'); +const restInterceptor = require('../src/restInterceptor'); + +describe('cujojs rest interceptor - integration test', () => { + it('should add headers to requests', done => { + const app = express(); + app.get('/abc', (req, res) => { + res.status(202).json({ + traceId: req.header('X-B3-TraceId'), + spanId: req.header('X-B3-SpanId') + }); + }); + + const server = app.listen(0, () => { + const record = sinon.spy(); + const tracer = {record}; + + trace.letTracer(tracer, () => { + const client = rest.wrap(restInterceptor, {serviceName: 'service-a'}); + const port = server.address().port; + const path = `http://127.0.0.1:${port}/abc`; + client(path).then(successResponse => { + const responseData = JSON.parse(successResponse.entity); + server.close(); + + const annotations = record.args.map(args => args[0]); + + // All annotations should have the same trace id and span id + const traceId = annotations[0].traceId.traceId; + const spanId = annotations[0].traceId.spanId; + annotations.forEach(ann => expect(ann.traceId.traceId).to.equal(traceId)); + annotations.forEach(ann => expect(ann.traceId.spanId).to.equal(spanId)); + + expect(annotations[0].annotation.annotationType).to.equal('ServiceName'); + expect(annotations[0].annotation.serviceName).to.equal('service-a'); + + expect(annotations[1].annotation.annotationType).to.equal('Rpc'); + expect(annotations[1].annotation.name).to.equal('GET'); + + expect(annotations[2].annotation.annotationType).to.equal('BinaryAnnotation'); + expect(annotations[2].annotation.key).to.equal('http.url'); + expect(annotations[2].annotation.value).to.equal(path); + + expect(annotations[3].annotation.annotationType).to.equal('ClientSend'); + + expect(annotations[4].annotation.annotationType).to.equal('BinaryAnnotation'); + expect(annotations[4].annotation.key).to.equal('http.status_code'); + expect(annotations[4].annotation.value).to.equal('202'); + + expect(annotations[5].annotation.annotationType).to.equal('ClientRecv'); + + const traceIdOnServer = responseData.traceId; + expect(traceIdOnServer).to.equal(traceId); + + const spanIdOnServer = responseData.spanId; + expect(spanIdOnServer).to.equal(spanId); + + done(); + }, errorResponse => { + if (errorResponse instanceof Error) { + done(errorResponse); + } else { + server.close(); + done(new Error(`The request failed: ${errorResponse.error.toString()}`)); + } + }); + }); + }); + }); +}); diff --git a/packages/zipkin-instrumentation-express/README.md b/packages/zipkin-instrumentation-express/README.md new file mode 100644 index 00000000..fe3fb508 --- /dev/null +++ b/packages/zipkin-instrumentation-express/README.md @@ -0,0 +1,18 @@ +# zipkin-instrumentation-express + +An express middleware that adds Zipkin tracing to the application. + +## Usage + +```javascript +const express = require('express'); +const {trace} = require('zipkin'); +const zipkinMiddleware = require('zipkin-instrumentation-express'); + +trace.pushTracer(ConsoleTracer); +const app = express(); +app.use(zipkinMiddleware({ + serviceName: 'service-a', // name of this application + port: 8080 // port this application is listening on +})); +``` diff --git a/packages/zipkin-instrumentation-express/index.js b/packages/zipkin-instrumentation-express/index.js new file mode 100644 index 00000000..440b59e3 --- /dev/null +++ b/packages/zipkin-instrumentation-express/index.js @@ -0,0 +1 @@ +module.exports.expressMiddleware = require('./src/expressMiddleware'); diff --git a/packages/zipkin-instrumentation-express/package.json b/packages/zipkin-instrumentation-express/package.json new file mode 100644 index 00000000..d0d8d2c4 --- /dev/null +++ b/packages/zipkin-instrumentation-express/package.json @@ -0,0 +1,18 @@ +{ + "name": "zipkin-instrumentation-express", + "version": "0.1.0", + "description": "Express middleware for instrumentation with Zipkin.js", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "OpenZipkin", + "license": "Apache-2.0", + "peerDependencies": { + "zipkin": "~0.1.0" + }, + "devDependencies": { + "express": "^4.13.4", + "node-fetch": "^1.5.1" + } +} diff --git a/packages/zipkin-instrumentation-express/src/expressMiddleware.js b/packages/zipkin-instrumentation-express/src/expressMiddleware.js new file mode 100644 index 00000000..ef3c0bd6 --- /dev/null +++ b/packages/zipkin-instrumentation-express/src/expressMiddleware.js @@ -0,0 +1,85 @@ +const { + trace, + Annotation, + HttpHeaders: Header, + option: {Some, None}, + TraceId +} = require('zipkin'); +const url = require('url'); + +function containsRequiredHeaders(req) { + return req.header(Header.TraceId) !== undefined && + req.header(Header.SpanId) !== undefined; +} + +function stringToBoolean(str) { + return str === '1'; +} + +module.exports = function expressMiddleware(options) { + const serviceName = options.serviceName || 'unknown'; + const port = options.port || 0; + return trace.bindContext(function zipkinExpressMiddleware(req, res, next) { + trace.withContext(() => { + function readHeader(header) { + const val = req.header(header); + if (val != null) { + return new Some(val); + } else { + return None; + } + } + + if (containsRequiredHeaders(req)) { + const spanId = readHeader(Header.SpanId); + spanId.ifPresent(sid => { + const traceId = readHeader(Header.TraceId); + const parentSpanId = readHeader(Header.ParentSpanId); + const sampled = readHeader(Header.Sampled); + const flags = readHeader(Header.Flags).getOrElse(0); + const id = new TraceId({ + traceId, + parentId: parentSpanId, + spanId: sid, + sampled: sampled.map(stringToBoolean), + flags + }); + trace.setId(id); + }); + } else { + trace.setId(trace.nextId()); + + if (req.header(Header.Flags)) { + const currentId = trace.id(); + const idWithFlags = new TraceId({ + traceId: currentId.traceId, + parentId: currentId.parentId, + spanId: currentId.spanId, + sampled: currentId.sampled, + flags: readHeader(Header.Flags) + }); + trace.setId(idWithFlags); + } + } + + trace.recordServiceName(serviceName); + trace.recordRpc(req.method); + trace.recordBinary('http.url', url.format({ + protocol: req.protocol, + host: req.get('host'), + pathname: req.originalUrl + })); + trace.recordAnnotation(new Annotation.ServerRecv()); + trace.recordAnnotation(new Annotation.LocalAddr({port})); + + const id = trace.id(); + res.on('finish', () => { + trace.setId(id); + trace.recordBinary('http.status_code', res.statusCode.toString()); + trace.recordAnnotation(new Annotation.ServerSend()); + }); + + next(); + }); + }); +}; diff --git a/packages/zipkin-instrumentation-express/test/.eslintrc b/packages/zipkin-instrumentation-express/test/.eslintrc new file mode 100644 index 00000000..3c8fcd73 --- /dev/null +++ b/packages/zipkin-instrumentation-express/test/.eslintrc @@ -0,0 +1,8 @@ +{ + "env": { + "mocha": true + }, + "globals": { + "expect": true + } +} diff --git a/packages/zipkin-instrumentation-express/test/integrationTest.js b/packages/zipkin-instrumentation-express/test/integrationTest.js new file mode 100644 index 00000000..b1b740e7 --- /dev/null +++ b/packages/zipkin-instrumentation-express/test/integrationTest.js @@ -0,0 +1,72 @@ +const sinon = require('sinon'); +const {trace} = require('zipkin'); +const fetch = require('node-fetch'); +const express = require('express'); +const middleware = require('../src/expressMiddleware'); + +describe('express middleware - integration test', () => { + it('should receive trace info from the client', done => { + const record = sinon.spy(); + const tracer = {record}; + + trace.letTracer(tracer, () => { + const app = express(); + app.use(middleware({ + serviceName: 'service-a' + })); + app.post('/foo', (req, res) => { + // Use setTimeout to test that the trace context is propagated into the callback + setTimeout(() => { + trace.recordBinary('message', 'hello from within app'); + res.status(202).json({status: 'OK'}); + }, 10); + }); + const server = app.listen(0, () => { + const port = server.address().port; + const url = `http://127.0.0.1:${port}/foo`; + fetch(url, { + method: 'post', + headers: { + 'X-B3-TraceId': 'aaa', + 'X-B3-SpanId': 'bbb' + } + }).then(res => res.json()).then(() => { + server.close(); + + const annotations = record.args.map(args => args[0]); + + annotations.forEach(ann => expect(ann.traceId.traceId).to.equal('aaa')); + annotations.forEach(ann => expect(ann.traceId.spanId).to.equal('bbb')); + + expect(annotations[0].annotation.annotationType).to.equal('ServiceName'); + expect(annotations[0].annotation.serviceName).to.equal('service-a'); + + expect(annotations[1].annotation.annotationType).to.equal('Rpc'); + expect(annotations[1].annotation.name).to.equal('POST'); + + expect(annotations[2].annotation.annotationType).to.equal('BinaryAnnotation'); + expect(annotations[2].annotation.key).to.equal('http.url'); + expect(annotations[2].annotation.value).to.equal(url); + + expect(annotations[3].annotation.annotationType).to.equal('ServerRecv'); + + expect(annotations[4].annotation.annotationType).to.equal('LocalAddr'); + + expect(annotations[5].annotation.annotationType).to.equal('BinaryAnnotation'); + expect(annotations[5].annotation.key).to.equal('message'); + expect(annotations[5].annotation.value).to.equal('hello from within app'); + + expect(annotations[6].annotation.annotationType).to.equal('BinaryAnnotation'); + expect(annotations[6].annotation.key).to.equal('http.status_code'); + expect(annotations[6].annotation.value).to.equal('202'); + + expect(annotations[7].annotation.annotationType).to.equal('ServerSend'); + done(); + }).catch(err => { + server.close(); + done(err); + }); + }); + }); + }); +}); diff --git a/packages/zipkin-transport-kafka/index.js b/packages/zipkin-transport-kafka/index.js new file mode 100644 index 00000000..9a2a0fc9 --- /dev/null +++ b/packages/zipkin-transport-kafka/index.js @@ -0,0 +1 @@ +module.exports.KafkaLogger = require('./src/KafkaLogger'); diff --git a/packages/zipkin-transport-kafka/package.json b/packages/zipkin-transport-kafka/package.json new file mode 100644 index 00000000..0cfa6260 --- /dev/null +++ b/packages/zipkin-transport-kafka/package.json @@ -0,0 +1,21 @@ +{ + "name": "zipkin-transport-kafka", + "version": "0.1.0", + "description": "Transports Zipkin trace data via Kafka to the collector", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "OpenZipkin", + "license": "Apache-2.0", + "dependencies": { + "kafka-node": "^0.3.2", + "scribe": "0.0.9" + }, + "peerDependencies": { + "zipkin": "~0.1.0" + }, + "devDependencies": { + "kafka-please": "^0.1.4" + } +} diff --git a/packages/zipkin-transport-kafka/src/kafkaLogger.js b/packages/zipkin-transport-kafka/src/kafkaLogger.js new file mode 100644 index 00000000..3ca866ca --- /dev/null +++ b/packages/zipkin-transport-kafka/src/kafkaLogger.js @@ -0,0 +1,39 @@ +const kafka = require('kafka-node'); +const {serializeSpan} = require('zipkin'); + +module.exports = class KafkaLogger { + constructor(options) { + const clientDefaults = { + clientId: 'zipkin-transport-kafka', + zkOpts: {} + }; + const clientOpts = Object.assign({}, clientDefaults, options.clientOpts || {}); + const producerDefaults = { + requireAcks: 0 + }; + const producerOpts = Object.assign({}, producerDefaults, options.producerOpts || {}); + this.producerPromise = new Promise((resolve, reject) => { + this.topic = options.topic || 'zipkin'; + this.client = new kafka.Client( + clientOpts.connectionString, clientOpts.clientId, clientOpts.zkOpts + ); + const producer = new kafka.HighLevelProducer(this.client, producerOpts); + producer.on('ready', () => resolve(producer)); + producer.on('error', reject); + }); + } + + logSpan(span) { + this.producerPromise.then(producer => { + const data = serializeSpan(span, 'binary'); + producer.send([{ + topic: this.topic, + messages: data + }], () => {}); + }); + } + + close() { + return new Promise(resolve => this.client.close(resolve)); + } +}; diff --git a/packages/zipkin-transport-kafka/test/.eslintrc b/packages/zipkin-transport-kafka/test/.eslintrc new file mode 100644 index 00000000..3c8fcd73 --- /dev/null +++ b/packages/zipkin-transport-kafka/test/.eslintrc @@ -0,0 +1,8 @@ +{ + "env": { + "mocha": true + }, + "globals": { + "expect": true + } +} diff --git a/packages/zipkin-transport-kafka/test/integrationTest.js b/packages/zipkin-transport-kafka/test/integrationTest.js new file mode 100644 index 00000000..17ca04e7 --- /dev/null +++ b/packages/zipkin-transport-kafka/test/integrationTest.js @@ -0,0 +1,112 @@ +/* eslint-disable no-console */ +const kafka = require('kafka-node'); +const {trace, ZipkinTracer, Annotation} = require('zipkin'); +const KafkaLogger = require('../src/KafkaLogger'); +const makeKafkaServer = require('kafka-please'); + +function waitPromise(length) { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, length); + }); +} + +describe('Kafka transport - integration test', () => { + it('should send trace data to Kafka', function(done) { + this.timeout(3000000); + return makeKafkaServer().then(kafkaServer => { + const producerClient = new kafka.Client( + `localhost:${kafkaServer.zookeeperPort}`, + 'zipkin-integration-test-producer' + ); + const producer = new kafka.Producer(producerClient); + let client; + let kafkaLogger; + function finish(arg) { + /* eslint-disable arrow-body-style */ + + const closeProducerClient = () => new Promise(resolve => producerClient.close(resolve)); + const closeClient = () => { + return client ? + new Promise(resolve => client.close(resolve)) + : Promise.resolve(); + }; + + const closeKafkaLogger = () => { + return kafkaLogger ? + kafkaLogger.close() : + Promise.resolve(); + }; + + closeKafkaLogger() + .then(closeProducerClient()) + .then(closeClient()) + .then(() => kafkaServer.close() + .then(() => done(arg))); + } + + return new Promise(resolve => { + console.log('creating topic...'); + producer.on('ready', () => { + producer.createTopics(['zipkin'], true, err => { + if (err) { + finish(err); + } else { + console.log('topic was created'); + resolve(); + } + }); + }); + }).then(() => waitPromise(200)).then(() => { + client = new kafka.Client( + `localhost:${kafkaServer.zookeeperPort}`, + 'zipkin-integration-test-consumer' + ); + const consumer = new kafka.HighLevelConsumer( + client, + [{topic: 'zipkin'}], + { + groupId: 'zipkin' + } + ); + consumer.on('message', message => { + console.log('Received Zipkin data from Kafka'); + expect(message.topic).to.equal('zipkin'); + expect(message.value).to.contain('http://example.com'); + consumer.close(true, finish); + }); + + client.on('error', err => { + console.log('client error', err); + finish(err); + }); + consumer.on('error', err => { + console.log('consumer error', err); + consumer.close(true, () => finish(err)); + }); + + kafkaLogger = new KafkaLogger({ + clientOpts: { + connectionString: `localhost:${kafkaServer.zookeeperPort}` + } + }); + const tracer = new ZipkinTracer({logger: kafkaLogger}); + + trace.letTracer(tracer, () => { + trace.withContext(() => { + trace.recordAnnotation(new Annotation.ServerRecv()); + trace.recordServiceName('my-service'); + trace.recordRpc('GET'); + trace.recordBinary('http.url', 'http://example.com'); + trace.recordBinary('http.response_code', '200'); + trace.recordAnnotation(new Annotation.ServerSend()); + }); + }); + }); + }).catch(err => { + console.error('Big err', err); + done(err); + }); + }); +}); diff --git a/packages/zipkin-transport-scribe/index.js b/packages/zipkin-transport-scribe/index.js new file mode 100644 index 00000000..7dbadaaf --- /dev/null +++ b/packages/zipkin-transport-scribe/index.js @@ -0,0 +1 @@ +module.exports.ScribeLogger = require('./src/ScribeLogger'); diff --git a/packages/zipkin-transport-scribe/package.json b/packages/zipkin-transport-scribe/package.json new file mode 100644 index 00000000..12fad83c --- /dev/null +++ b/packages/zipkin-transport-scribe/package.json @@ -0,0 +1,20 @@ +{ + "name": "zipkin-transport-scribe", + "version": "0.1.0", + "description": "Transports Zipkin trace data via Scribe to the collector", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "OpenZipkin", + "license": "Apache-2.0", + "dependencies": { + "scribe": "0.0.9" + }, + "peerDependencies": { + "zipkin": "~0.1.0" + }, + "devDependencies": { + "thrift": "^0.9.3" + } +} diff --git a/packages/zipkin-transport-scribe/src/ScribeLogger.js b/packages/zipkin-transport-scribe/src/ScribeLogger.js new file mode 100644 index 00000000..2dab0173 --- /dev/null +++ b/packages/zipkin-transport-scribe/src/ScribeLogger.js @@ -0,0 +1,36 @@ +/* eslint-disable no-console */ +const {Scribe} = require('scribe'); +const {serializeSpan} = require('zipkin'); + +function ScribeLogger({scribeHost, scribePort = 9410, scribeInterval = 1000}) { + const scribeClient = new Scribe(scribeHost, scribePort, {autoReconnect: true}); + scribeClient.on('error', () => {}); + + this.queue = []; + + setInterval(() => { + if (this.queue.length > 0) { + try { + console.log('flush'); + scribeClient.open(err => { + if (err) { + console.error('Error writing Zipkin data to Scribe', err); + } else { + this.queue.forEach(span => { + scribeClient.send('zipkin', serializeSpan(span)); + }); + scribeClient.flush(); + this.queue.length = 0; + } + }); + } catch (err) { + console.error('Error writing Zipkin data to Scribe', err); + } + } + }, scribeInterval); +} +ScribeLogger.prototype.logSpan = function logSpan(span) { + this.queue.push(span); +}; + +module.exports = ScribeLogger; diff --git a/packages/zipkin-transport-scribe/test/.eslintrc b/packages/zipkin-transport-scribe/test/.eslintrc new file mode 100644 index 00000000..3c8fcd73 --- /dev/null +++ b/packages/zipkin-transport-scribe/test/.eslintrc @@ -0,0 +1,8 @@ +{ + "env": { + "mocha": true + }, + "globals": { + "expect": true + } +} diff --git a/packages/zipkin-transport-scribe/test/gen-nodejs/scribe.js b/packages/zipkin-transport-scribe/test/gen-nodejs/scribe.js new file mode 100644 index 00000000..b8b653ee --- /dev/null +++ b/packages/zipkin-transport-scribe/test/gen-nodejs/scribe.js @@ -0,0 +1,251 @@ +// +// Autogenerated by Thrift Compiler (0.9.3) +// +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +// +var thrift = require('thrift'); +var Thrift = thrift.Thrift; +var Q = thrift.Q; + + +var ttypes = require('./scribeServer_types'); +//HELPER FUNCTIONS AND STRUCTURES + +scribe_Log_args = function(args) { + this.messages = null; + if (args) { + if (args.messages !== undefined && args.messages !== null) { + this.messages = Thrift.copyList(args.messages, [ttypes.LogEntry]); + } + } +}; +scribe_Log_args.prototype = {}; +scribe_Log_args.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == Thrift.Type.LIST) { + var _size0 = 0; + var _rtmp34; + this.messages = []; + var _etype3 = 0; + _rtmp34 = input.readListBegin(); + _etype3 = _rtmp34.etype; + _size0 = _rtmp34.size; + for (var _i5 = 0; _i5 < _size0; ++_i5) + { + var elem6 = null; + elem6 = new ttypes.LogEntry(); + elem6.read(input); + this.messages.push(elem6); + } + input.readListEnd(); + } else { + input.skip(ftype); + } + break; + case 0: + input.skip(ftype); + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +scribe_Log_args.prototype.write = function(output) { + output.writeStructBegin('scribe_Log_args'); + if (this.messages !== null && this.messages !== undefined) { + output.writeFieldBegin('messages', Thrift.Type.LIST, 1); + output.writeListBegin(Thrift.Type.STRUCT, this.messages.length); + for (var iter7 in this.messages) + { + if (this.messages.hasOwnProperty(iter7)) + { + iter7 = this.messages[iter7]; + iter7.write(output); + } + } + output.writeListEnd(); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + +scribe_Log_result = function(args) { + this.success = null; + if (args) { + if (args.success !== undefined && args.success !== null) { + this.success = args.success; + } + } +}; +scribe_Log_result.prototype = {}; +scribe_Log_result.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == Thrift.Type.I32) { + this.success = input.readI32(); + } else { + input.skip(ftype); + } + break; + case 0: + input.skip(ftype); + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +scribe_Log_result.prototype.write = function(output) { + output.writeStructBegin('scribe_Log_result'); + if (this.success !== null && this.success !== undefined) { + output.writeFieldBegin('success', Thrift.Type.I32, 0); + output.writeI32(this.success); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + +scribeClient = exports.Client = function(output, pClass) { + this.output = output; + this.pClass = pClass; + this._seqid = 0; + this._reqs = {}; +}; +scribeClient.prototype = {}; +scribeClient.prototype.seqid = function() { return this._seqid; } +scribeClient.prototype.new_seqid = function() { return this._seqid += 1; } +scribeClient.prototype.Log = function(messages, callback) { + this._seqid = this.new_seqid(); + if (callback === undefined) { + var _defer = Q.defer(); + this._reqs[this.seqid()] = function(error, result) { + if (error) { + _defer.reject(error); + } else { + _defer.resolve(result); + } + }; + this.send_Log(messages); + return _defer.promise; + } else { + this._reqs[this.seqid()] = callback; + this.send_Log(messages); + } +}; + +scribeClient.prototype.send_Log = function(messages) { + var output = new this.pClass(this.output); + output.writeMessageBegin('Log', Thrift.MessageType.CALL, this.seqid()); + var args = new scribe_Log_args(); + args.messages = messages; + args.write(output); + output.writeMessageEnd(); + return this.output.flush(); +}; + +scribeClient.prototype.recv_Log = function(input,mtype,rseqid) { + var callback = this._reqs[rseqid] || function() {}; + delete this._reqs[rseqid]; + if (mtype == Thrift.MessageType.EXCEPTION) { + var x = new Thrift.TApplicationException(); + x.read(input); + input.readMessageEnd(); + return callback(x); + } + var result = new scribe_Log_result(); + result.read(input); + input.readMessageEnd(); + + if (null !== result.success) { + return callback(null, result.success); + } + return callback('Log failed: unknown result'); +}; +scribeProcessor = exports.Processor = function(handler) { + this._handler = handler +} +scribeProcessor.prototype.process = function(input, output) { + var r = input.readMessageBegin(); + if (this['process_' + r.fname]) { + return this['process_' + r.fname].call(this, r.rseqid, input, output); + } else { + input.skip(Thrift.Type.STRUCT); + input.readMessageEnd(); + var x = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN_METHOD, 'Unknown function ' + r.fname); + output.writeMessageBegin(r.fname, Thrift.MessageType.EXCEPTION, r.rseqid); + x.write(output); + output.writeMessageEnd(); + output.flush(); + } +} + +scribeProcessor.prototype.process_Log = function(seqid, input, output) { + var args = new scribe_Log_args(); + args.read(input); + input.readMessageEnd(); + if (this._handler.Log.length === 1) { + Q.fcall(this._handler.Log, args.messages) + .then(function(result) { + var result = new scribe_Log_result({success: result}); + output.writeMessageBegin("Log", Thrift.MessageType.REPLY, seqid); + result.write(output); + output.writeMessageEnd(); + output.flush(); + }, function (err) { + var result = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN, err.message); + output.writeMessageBegin("Log", Thrift.MessageType.EXCEPTION, seqid); + result.write(output); + output.writeMessageEnd(); + output.flush(); + }); + } else { + this._handler.Log(args.messages, function (err, result) { + if (err == null) { + var result = new scribe_Log_result((err != null ? err : {success: result})); + output.writeMessageBegin("Log", Thrift.MessageType.REPLY, seqid); + } else { + var result = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN, err.message); + output.writeMessageBegin("Log", Thrift.MessageType.EXCEPTION, seqid); + } + result.write(output); + output.writeMessageEnd(); + output.flush(); + }); + } +} + diff --git a/packages/zipkin-transport-scribe/test/gen-nodejs/scribeServer_types.js b/packages/zipkin-transport-scribe/test/gen-nodejs/scribeServer_types.js new file mode 100644 index 00000000..12ff45b0 --- /dev/null +++ b/packages/zipkin-transport-scribe/test/gen-nodejs/scribeServer_types.js @@ -0,0 +1,81 @@ +// +// Autogenerated by Thrift Compiler (0.9.3) +// +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +// +var thrift = require('thrift'); +var Thrift = thrift.Thrift; +var Q = thrift.Q; + + +var ttypes = module.exports = {}; +ttypes.ResultCode = { + 'OK' : 0, + 'TRY_LATER' : 1 +}; +LogEntry = module.exports.LogEntry = function(args) { + this.category = null; + this.message = null; + if (args) { + if (args.category !== undefined && args.category !== null) { + this.category = args.category; + } + if (args.message !== undefined && args.message !== null) { + this.message = args.message; + } + } +}; +LogEntry.prototype = {}; +LogEntry.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == Thrift.Type.STRING) { + this.category = input.readString(); + } else { + input.skip(ftype); + } + break; + case 2: + if (ftype == Thrift.Type.STRING) { + this.message = input.readString(); + } else { + input.skip(ftype); + } + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +LogEntry.prototype.write = function(output) { + output.writeStructBegin('LogEntry'); + if (this.category !== null && this.category !== undefined) { + output.writeFieldBegin('category', Thrift.Type.STRING, 1); + output.writeString(this.category); + output.writeFieldEnd(); + } + if (this.message !== null && this.message !== undefined) { + output.writeFieldBegin('message', Thrift.Type.STRING, 2); + output.writeString(this.message); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + diff --git a/packages/zipkin-transport-scribe/test/integrationTest.js b/packages/zipkin-transport-scribe/test/integrationTest.js new file mode 100644 index 00000000..f5bfd935 --- /dev/null +++ b/packages/zipkin-transport-scribe/test/integrationTest.js @@ -0,0 +1,48 @@ +const {trace, TraceId, ZipkinTracer, Annotation, option: {Some}} = require('zipkin'); +const thrift = require('thrift'); +const sinon = require('sinon'); +const ScribeLogger = require('../src/ScribeLogger'); +const Scribe = require('./gen-nodejs/scribe'); +const {ResultCode} = require('./gen-nodejs/scribeServer_types'); + +describe('Scribe transport - integration test', () => { + it('should send trace data to Scribe', done => { + const logSpy = sinon.spy(); + const scribeHandler = { + Log: (messages, result) => { + logSpy(messages, result); + result(ResultCode.OK); + } + }; + + const server = thrift.createServer(Scribe, scribeHandler, { + transport: thrift.TFramedTransport, + protocol: thrift.TBinaryProtocol + }); + const scribeServer = server.listen(0, () => { + const port = scribeServer.address().port; + trace.letTracer(new ZipkinTracer({logger: new ScribeLogger({ + scribeHost: '127.0.0.1', + scribePort: port, + scribeInterval: 1 + })}), () => { + const id = new TraceId({ + traceId: new Some('abc'), + parentId: new Some('def'), + spanId: '123', + sampled: new Some(true), + flags: 0 + }); + trace.setId(id); + trace.recordAnnotation(new Annotation.ClientSend()); + trace.recordAnnotation(new Annotation.ClientRecv()); + setTimeout(() => { + scribeServer.close(); + expect(logSpy.getCall(0).args[0][0].message).to.include( + 'CgABAAAAAAAACrwLAAMAAAAHVW5rbm93bgoABAAAAAAAAAEj'); + done(); + }, 50); + }); + }); + }); +}); diff --git a/packages/zipkin-transport-scribe/test/scribeServer.thrift b/packages/zipkin-transport-scribe/test/scribeServer.thrift new file mode 100644 index 00000000..e8af8e20 --- /dev/null +++ b/packages/zipkin-transport-scribe/test/scribeServer.thrift @@ -0,0 +1,15 @@ +# # Generate with the command "thrift -gen js:node scribeServer.thrift" +# Used to create a mock Scribe server in node.js, for the integration test. +enum ResultCode { + OK, + TRY_LATER +} + +struct LogEntry { + 1: string category, + 2: string message +} + +service scribe { + ResultCode Log(1: list messages); +} diff --git a/packages/zipkin/index.js b/packages/zipkin/index.js new file mode 100644 index 00000000..03256b54 --- /dev/null +++ b/packages/zipkin/index.js @@ -0,0 +1,19 @@ +const Annotation = require('./src/annotation'); +const TraceId = require('./src/traceId'); +const HttpHeaders = require('./src/httpHeaders'); +const InetAddress = require('./src/InetAddress'); +const option = require('./src/option'); +const ZipkinTracer = require('./src/ZipkinTracer'); +const consoleTracer = require('./src/consoleTracer'); +const trace = require('./src/trace'); +const serializeSpan = require('./src/serializeSpan'); + +module.exports.trace = trace; +module.exports.TraceId = TraceId; +module.exports.option = option; +module.exports.Annotation = Annotation; +module.exports.InetAddress = InetAddress; +module.exports.HttpHeaders = HttpHeaders; +module.exports.ZipkinTracer = ZipkinTracer; +module.exports.consoleTracer = consoleTracer; +module.exports.serializeSpan = serializeSpan; diff --git a/packages/zipkin/package.json b/packages/zipkin/package.json new file mode 100644 index 00000000..a0b76510 --- /dev/null +++ b/packages/zipkin/package.json @@ -0,0 +1,19 @@ +{ + "name": "zipkin", + "version": "0.1.0", + "description": "The core tracer for zipkin.js", + "main": "index.js", + "scripts": { + "gen-thrift": "thrift -gen js:node -o src/ src/zipkinCore.thrift" + }, + "author": "OpenZipkin", + "license": "Apache-2.0", + "dependencies": { + "base64-js": "^1.1.2", + "continuation-local-storage": "^3.1.7", + "network-address": "^1.1.0", + "thrift": "^0.9.3" + }, + "devDependencies": { + } +} diff --git a/packages/zipkin/src/InetAddress.js b/packages/zipkin/src/InetAddress.js new file mode 100644 index 00000000..b4de4ea1 --- /dev/null +++ b/packages/zipkin/src/InetAddress.js @@ -0,0 +1,27 @@ +const networkAddress = require('network-address'); + +class InetAddress { + constructor(addr) { + this.addr = addr; + } + toInt() { + // e.g. 10.57.50.83 + // should become + // 171520595 + const parts = this.addr.split('.'); + + // The jshint tool always complains about using bitwise operators, + // but in this case it's actually intentional, so we disable the warning: + // jshint bitwise: false + return parts[0] << 24 | parts[1] << 16 | parts[2] << 8 | parts[3]; + } + toString() { + return `InetAddress(${this.addr})`; + } +} + +InetAddress.getLocalAddress = function getLocalAddress() { + return new InetAddress(networkAddress.ipv4()); +}; + +module.exports = InetAddress; diff --git a/packages/zipkin/src/ZipkinTracer.js b/packages/zipkin/src/ZipkinTracer.js new file mode 100644 index 00000000..d49247d6 --- /dev/null +++ b/packages/zipkin/src/ZipkinTracer.js @@ -0,0 +1,96 @@ +const {now} = require('./time'); +const thriftTypes = require('./gen-nodejs/zipkinCore_types'); +const { + MutableSpan, + Endpoint, + ZipkinAnnotation, + BinaryAnnotation +} = require('./internalRepresentations'); + +function ZipkinTracer({ + logger, + timeout = 60 * 1000000 // default timeout = 60 seconds + }) { + const partialSpans = new Map(); + + function writeSpan(id) { + logger.logSpan(partialSpans.get(id)); + // ready for garbage collection + partialSpans.delete(id); + } + + function updateSpanMap(id, updater) { + let span; + if (partialSpans.has(id)) { + span = partialSpans.get(id); + } else { + span = new MutableSpan(id); + } + updater(span); + if (span.complete) { + writeSpan(id); + } else { + partialSpans.set(id, span); + } + } + + function timedOut(span) { + return span.started + timeout < now(); + } + + // read through the partials spans regularly + // and collect any timed-out ones + setInterval(() => { + partialSpans.forEach((span, id) => { + if (timedOut(span)) { + writeSpan(id); + } + }); + }, 1000); + + this.record = function record(rec) { + const id = rec.traceId; + + updateSpanMap(id, span => { + function annotate({timestamp}, value) { + span.addAnnotation(new ZipkinAnnotation({ + timestamp, + value + })); + } + + function binaryAnnotate(key, value) { + span.addBinaryAnnotation(new BinaryAnnotation({ + key, + value, + annotationType: thriftTypes.AnnotationType.STRING + })); + } + + if (rec.annotation.annotationType === 'ClientSend') { + annotate(rec, thriftTypes.CLIENT_SEND); + } else if (rec.annotation.annotationType === 'ClientRecv') { + annotate(rec, thriftTypes.CLIENT_RECV); + } else if (rec.annotation.annotationType === 'ServerSend') { + annotate(rec, thriftTypes.SERVER_SEND); + } else if (rec.annotation.annotationType === 'ServerRecv') { + annotate(rec, thriftTypes.SERVER_RECV); + } else if (rec.annotation.annotationType === 'Message') { + annotate(rec, rec.annotation.message); + } else if (rec.annotation.annotationType === 'Rpc') { + span.setName(rec.annotation.name); + } else if (rec.annotation.annotationType === 'ServiceName') { + span.setServiceName(rec.annotation.serviceName); + } else if (rec.annotation.annotationType === 'BinaryAnnotation') { + binaryAnnotate(rec.annotation.key, rec.annotation.value); + } else if (rec.annotation.annotationType === 'LocalAddr') { + span.setEndpoint(new Endpoint({ + host: rec.annotation.host.toInt(), + port: rec.annotation.port + })); + } + }); + }; +} + +module.exports = ZipkinTracer; diff --git a/packages/zipkin/src/annotation.js b/packages/zipkin/src/annotation.js new file mode 100644 index 00000000..253ba87a --- /dev/null +++ b/packages/zipkin/src/annotation.js @@ -0,0 +1,84 @@ +const InetAddress = require('./InetAddress'); + +class SimpleAnnotation { + toString() { + return `${this.annotationType}()`; + } +} +class ClientSend extends SimpleAnnotation {} +class ClientRecv extends SimpleAnnotation {} +class ServerSend extends SimpleAnnotation {} +class ServerRecv extends SimpleAnnotation {} + +function Message(message) { + this.message = message; +} +Message.prototype.toString = function() { + return `Message("${this.message}")`; +}; + +function ServiceName(serviceName) { + this.serviceName = serviceName; +} +ServiceName.prototype.toString = function() { + return `ServiceName("${this.serviceName}")`; +}; + +function Rpc(name) { + this.name = name; +} +Rpc.prototype.toString = function() { + return `Rpc("${this.name}")`; +}; + +function ClientAddr({host, port}) { + this.host = host; + this.port = port; +} +ClientAddr.prototype.toString = function() { + return `ClientAddr(host="${this.host}", port=${this.port})`; +}; + +function ServerAddr({host, port}) { + this.host = host; + this.port = port; +} +ServerAddr.prototype.toString = function() { + return `ServerAddr(host="${this.host}", port=${this.port})`; +}; + +function LocalAddr({host, port}) { + this.host = host || InetAddress.getLocalAddress(); + this.port = port || 0; +} +LocalAddr.prototype.toString = function() { + return `LocalAddr(host="${this.host.toString()}", port=${this.port})`; +}; + +function BinaryAnnotation(key, value) { + this.key = key; + this.value = value; +} +BinaryAnnotation.prototype.toString = function() { + return `BinaryAnnotation(${this.key}="${this.value}")`; +}; + +const annotation = { + ClientSend, + ClientRecv, + ServerSend, + ServerRecv, + Message, + ServiceName, + Rpc, + ClientAddr, + ServerAddr, + LocalAddr, + BinaryAnnotation +}; + +Object.keys(annotation).forEach(key => { + annotation[key].prototype.annotationType = key; +}); + +module.exports = annotation; diff --git a/packages/zipkin/src/consoleTracer.js b/packages/zipkin/src/consoleTracer.js new file mode 100644 index 00000000..217eab95 --- /dev/null +++ b/packages/zipkin/src/consoleTracer.js @@ -0,0 +1,12 @@ +module.exports = { + record(rec) { + /* eslint-disable no-console */ + const id = rec.traceId; + console.log( + `Record at (spanId=${id.spanId}, traceId=${id.traceId}): ${rec.annotation.toString()}` + ); + }, + toString() { + return 'consoleTracer'; + } +}; diff --git a/packages/zipkin/src/gen-nodejs/zipkinCore_types.js b/packages/zipkin/src/gen-nodejs/zipkinCore_types.js new file mode 100644 index 00000000..b93847f6 --- /dev/null +++ b/packages/zipkin/src/gen-nodejs/zipkinCore_types.js @@ -0,0 +1,496 @@ +// +// Autogenerated by Thrift Compiler (0.9.3) +// +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +// +var thrift = require('thrift'); +var Thrift = thrift.Thrift; +var Q = thrift.Q; + + +var ttypes = module.exports = {}; +ttypes.AnnotationType = { + 'BOOL' : 0, + 'BYTES' : 1, + 'I16' : 2, + 'I32' : 3, + 'I64' : 4, + 'DOUBLE' : 5, + 'STRING' : 6 +}; +Endpoint = module.exports.Endpoint = function(args) { + this.ipv4 = null; + this.port = null; + this.service_name = null; + if (args) { + if (args.ipv4 !== undefined && args.ipv4 !== null) { + this.ipv4 = args.ipv4; + } + if (args.port !== undefined && args.port !== null) { + this.port = args.port; + } + if (args.service_name !== undefined && args.service_name !== null) { + this.service_name = args.service_name; + } + } +}; +Endpoint.prototype = {}; +Endpoint.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == Thrift.Type.I32) { + this.ipv4 = input.readI32(); + } else { + input.skip(ftype); + } + break; + case 2: + if (ftype == Thrift.Type.I16) { + this.port = input.readI16(); + } else { + input.skip(ftype); + } + break; + case 3: + if (ftype == Thrift.Type.STRING) { + this.service_name = input.readString(); + } else { + input.skip(ftype); + } + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +Endpoint.prototype.write = function(output) { + output.writeStructBegin('Endpoint'); + if (this.ipv4 !== null && this.ipv4 !== undefined) { + output.writeFieldBegin('ipv4', Thrift.Type.I32, 1); + output.writeI32(this.ipv4); + output.writeFieldEnd(); + } + if (this.port !== null && this.port !== undefined) { + output.writeFieldBegin('port', Thrift.Type.I16, 2); + output.writeI16(this.port); + output.writeFieldEnd(); + } + if (this.service_name !== null && this.service_name !== undefined) { + output.writeFieldBegin('service_name', Thrift.Type.STRING, 3); + output.writeString(this.service_name); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + +Annotation = module.exports.Annotation = function(args) { + this.timestamp = null; + this.value = null; + this.host = null; + this.duration = null; + if (args) { + if (args.timestamp !== undefined && args.timestamp !== null) { + this.timestamp = args.timestamp; + } + if (args.value !== undefined && args.value !== null) { + this.value = args.value; + } + if (args.host !== undefined && args.host !== null) { + this.host = new ttypes.Endpoint(args.host); + } + if (args.duration !== undefined && args.duration !== null) { + this.duration = args.duration; + } + } +}; +Annotation.prototype = {}; +Annotation.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == Thrift.Type.I64) { + this.timestamp = input.readI64(); + } else { + input.skip(ftype); + } + break; + case 2: + if (ftype == Thrift.Type.STRING) { + this.value = input.readString(); + } else { + input.skip(ftype); + } + break; + case 3: + if (ftype == Thrift.Type.STRUCT) { + this.host = new ttypes.Endpoint(); + this.host.read(input); + } else { + input.skip(ftype); + } + break; + case 4: + if (ftype == Thrift.Type.I32) { + this.duration = input.readI32(); + } else { + input.skip(ftype); + } + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +Annotation.prototype.write = function(output) { + output.writeStructBegin('Annotation'); + if (this.timestamp !== null && this.timestamp !== undefined) { + output.writeFieldBegin('timestamp', Thrift.Type.I64, 1); + output.writeI64(this.timestamp); + output.writeFieldEnd(); + } + if (this.value !== null && this.value !== undefined) { + output.writeFieldBegin('value', Thrift.Type.STRING, 2); + output.writeString(this.value); + output.writeFieldEnd(); + } + if (this.host !== null && this.host !== undefined) { + output.writeFieldBegin('host', Thrift.Type.STRUCT, 3); + this.host.write(output); + output.writeFieldEnd(); + } + if (this.duration !== null && this.duration !== undefined) { + output.writeFieldBegin('duration', Thrift.Type.I32, 4); + output.writeI32(this.duration); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + +BinaryAnnotation = module.exports.BinaryAnnotation = function(args) { + this.key = null; + this.value = null; + this.annotation_type = null; + this.host = null; + if (args) { + if (args.key !== undefined && args.key !== null) { + this.key = args.key; + } + if (args.value !== undefined && args.value !== null) { + this.value = args.value; + } + if (args.annotation_type !== undefined && args.annotation_type !== null) { + this.annotation_type = args.annotation_type; + } + if (args.host !== undefined && args.host !== null) { + this.host = new ttypes.Endpoint(args.host); + } + } +}; +BinaryAnnotation.prototype = {}; +BinaryAnnotation.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == Thrift.Type.STRING) { + this.key = input.readString(); + } else { + input.skip(ftype); + } + break; + case 2: + if (ftype == Thrift.Type.STRING) { + this.value = input.readBinary(); + } else { + input.skip(ftype); + } + break; + case 3: + if (ftype == Thrift.Type.I32) { + this.annotation_type = input.readI32(); + } else { + input.skip(ftype); + } + break; + case 4: + if (ftype == Thrift.Type.STRUCT) { + this.host = new ttypes.Endpoint(); + this.host.read(input); + } else { + input.skip(ftype); + } + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +BinaryAnnotation.prototype.write = function(output) { + output.writeStructBegin('BinaryAnnotation'); + if (this.key !== null && this.key !== undefined) { + output.writeFieldBegin('key', Thrift.Type.STRING, 1); + output.writeString(this.key); + output.writeFieldEnd(); + } + if (this.value !== null && this.value !== undefined) { + output.writeFieldBegin('value', Thrift.Type.STRING, 2); + output.writeBinary(this.value); + output.writeFieldEnd(); + } + if (this.annotation_type !== null && this.annotation_type !== undefined) { + output.writeFieldBegin('annotation_type', Thrift.Type.I32, 3); + output.writeI32(this.annotation_type); + output.writeFieldEnd(); + } + if (this.host !== null && this.host !== undefined) { + output.writeFieldBegin('host', Thrift.Type.STRUCT, 4); + this.host.write(output); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + +Span = module.exports.Span = function(args) { + this.trace_id = null; + this.name = null; + this.id = null; + this.parent_id = null; + this.annotations = null; + this.binary_annotations = null; + this.debug = false; + if (args) { + if (args.trace_id !== undefined && args.trace_id !== null) { + this.trace_id = args.trace_id; + } + if (args.name !== undefined && args.name !== null) { + this.name = args.name; + } + if (args.id !== undefined && args.id !== null) { + this.id = args.id; + } + if (args.parent_id !== undefined && args.parent_id !== null) { + this.parent_id = args.parent_id; + } + if (args.annotations !== undefined && args.annotations !== null) { + this.annotations = Thrift.copyList(args.annotations, [ttypes.Annotation]); + } + if (args.binary_annotations !== undefined && args.binary_annotations !== null) { + this.binary_annotations = Thrift.copyList(args.binary_annotations, [ttypes.BinaryAnnotation]); + } + if (args.debug !== undefined && args.debug !== null) { + this.debug = args.debug; + } + } +}; +Span.prototype = {}; +Span.prototype.read = function(input) { + input.readStructBegin(); + while (true) + { + var ret = input.readFieldBegin(); + var fname = ret.fname; + var ftype = ret.ftype; + var fid = ret.fid; + if (ftype == Thrift.Type.STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == Thrift.Type.I64) { + this.trace_id = input.readI64(); + } else { + input.skip(ftype); + } + break; + case 3: + if (ftype == Thrift.Type.STRING) { + this.name = input.readString(); + } else { + input.skip(ftype); + } + break; + case 4: + if (ftype == Thrift.Type.I64) { + this.id = input.readI64(); + } else { + input.skip(ftype); + } + break; + case 5: + if (ftype == Thrift.Type.I64) { + this.parent_id = input.readI64(); + } else { + input.skip(ftype); + } + break; + case 6: + if (ftype == Thrift.Type.LIST) { + var _size0 = 0; + var _rtmp34; + this.annotations = []; + var _etype3 = 0; + _rtmp34 = input.readListBegin(); + _etype3 = _rtmp34.etype; + _size0 = _rtmp34.size; + for (var _i5 = 0; _i5 < _size0; ++_i5) + { + var elem6 = null; + elem6 = new ttypes.Annotation(); + elem6.read(input); + this.annotations.push(elem6); + } + input.readListEnd(); + } else { + input.skip(ftype); + } + break; + case 8: + if (ftype == Thrift.Type.LIST) { + var _size7 = 0; + var _rtmp311; + this.binary_annotations = []; + var _etype10 = 0; + _rtmp311 = input.readListBegin(); + _etype10 = _rtmp311.etype; + _size7 = _rtmp311.size; + for (var _i12 = 0; _i12 < _size7; ++_i12) + { + var elem13 = null; + elem13 = new ttypes.BinaryAnnotation(); + elem13.read(input); + this.binary_annotations.push(elem13); + } + input.readListEnd(); + } else { + input.skip(ftype); + } + break; + case 9: + if (ftype == Thrift.Type.BOOL) { + this.debug = input.readBool(); + } else { + input.skip(ftype); + } + break; + default: + input.skip(ftype); + } + input.readFieldEnd(); + } + input.readStructEnd(); + return; +}; + +Span.prototype.write = function(output) { + output.writeStructBegin('Span'); + if (this.trace_id !== null && this.trace_id !== undefined) { + output.writeFieldBegin('trace_id', Thrift.Type.I64, 1); + output.writeI64(this.trace_id); + output.writeFieldEnd(); + } + if (this.name !== null && this.name !== undefined) { + output.writeFieldBegin('name', Thrift.Type.STRING, 3); + output.writeString(this.name); + output.writeFieldEnd(); + } + if (this.id !== null && this.id !== undefined) { + output.writeFieldBegin('id', Thrift.Type.I64, 4); + output.writeI64(this.id); + output.writeFieldEnd(); + } + if (this.parent_id !== null && this.parent_id !== undefined) { + output.writeFieldBegin('parent_id', Thrift.Type.I64, 5); + output.writeI64(this.parent_id); + output.writeFieldEnd(); + } + if (this.annotations !== null && this.annotations !== undefined) { + output.writeFieldBegin('annotations', Thrift.Type.LIST, 6); + output.writeListBegin(Thrift.Type.STRUCT, this.annotations.length); + for (var iter14 in this.annotations) + { + if (this.annotations.hasOwnProperty(iter14)) + { + iter14 = this.annotations[iter14]; + iter14.write(output); + } + } + output.writeListEnd(); + output.writeFieldEnd(); + } + if (this.binary_annotations !== null && this.binary_annotations !== undefined) { + output.writeFieldBegin('binary_annotations', Thrift.Type.LIST, 8); + output.writeListBegin(Thrift.Type.STRUCT, this.binary_annotations.length); + for (var iter15 in this.binary_annotations) + { + if (this.binary_annotations.hasOwnProperty(iter15)) + { + iter15 = this.binary_annotations[iter15]; + iter15.write(output); + } + } + output.writeListEnd(); + output.writeFieldEnd(); + } + if (this.debug !== null && this.debug !== undefined) { + output.writeFieldBegin('debug', Thrift.Type.BOOL, 9); + output.writeBool(this.debug); + output.writeFieldEnd(); + } + output.writeFieldStop(); + output.writeStructEnd(); + return; +}; + +ttypes.CLIENT_SEND = 'cs'; +ttypes.CLIENT_RECV = 'cr'; +ttypes.SERVER_SEND = 'ss'; +ttypes.SERVER_RECV = 'sr'; diff --git a/packages/zipkin/src/httpHeaders.js b/packages/zipkin/src/httpHeaders.js new file mode 100644 index 00000000..5f84f2c6 --- /dev/null +++ b/packages/zipkin/src/httpHeaders.js @@ -0,0 +1,7 @@ +module.exports = { + TraceId: 'X-B3-TraceId', + SpanId: 'X-B3-SpanId', + ParentSpanId: 'X-B3-ParentSpanId', + Sampled: 'X-B3-Sampled', + Flags: 'X-B3-Flags' +}; diff --git a/packages/zipkin/src/internalRepresentations.js b/packages/zipkin/src/internalRepresentations.js new file mode 100644 index 00000000..e1c57a42 --- /dev/null +++ b/packages/zipkin/src/internalRepresentations.js @@ -0,0 +1,136 @@ +const thriftTypes = require('./gen-nodejs/zipkinCore_types'); +const {now} = require('./time'); +const {Some, None} = require('./option'); + +function Endpoint({host = 0, port = 0}) { + this.host = host; + this.port = port; +} +Endpoint.prototype.isUnknown = function isUnknown() { + return this.host === 0 && this.port === 0; +}; +Endpoint.prototype.toThrift = function toThrift() { + return new thriftTypes.Endpoint({ + ipv4: this.host, + port: this.port + }); +}; + +function ZipkinAnnotation({timestamp, value, endpoint, duration}) { + this.timestamp = timestamp; + this.value = value; + this.endpoint = endpoint; + this.duration = duration; +} + +ZipkinAnnotation.prototype.toThrift = function toThrift() { + const res = new thriftTypes.Annotation({ + timestamp: this.timestamp, // must be in micros + value: this.value + }); + if (this.endpoint) { + res.host = this.endpoint.toThrift(); + } + if (this.duration) { + res.duration = this.duration; // must be in micros + } + return res; +}; +ZipkinAnnotation.prototype.toString = function toString() { + return `Annotation(value="${this.value}")`; +}; + +function BinaryAnnotation({key, value, endpoint}) { + this.key = key; + this.value = value; + this.endpoint = endpoint; +} +BinaryAnnotation.prototype.toThrift = function toThrift() { + const res = new thriftTypes.BinaryAnnotation({ + key: this.key, + value: this.value, + annotation_type: thriftTypes.AnnotationType.STRING + }); + if (this.endpoint) { + res.host = this.endpoint.toThrift(); + } + return res; +}; + +function MutableSpan(traceId) { + this.traceId = traceId; + this.complete = false; + this.started = now(); + this.name = None; + this.service = None; + this.endpoint = new Endpoint({}); + this.annotations = []; + this.binaryAnnotations = []; +} +MutableSpan.prototype.setName = function setName(name) { + this.name = new Some(name); +}; +MutableSpan.prototype.setServiceName = function setServiceName(name) { + this.service = new Some(name); +}; +MutableSpan.prototype.addAnnotation = function addAnnotation(ann) { + if (!this.complete && ( + ann.value === thriftTypes.CLIENT_RECV || + ann.value === thriftTypes.SERVER_SEND + )) { + this.complete = true; + } + this.annotations.push(ann); +}; +MutableSpan.prototype.addBinaryAnnotation = function addBinaryAnnotation(ann) { + this.binaryAnnotations.push(ann); +}; +MutableSpan.prototype.setEndpoint = function setEndpoint(ep) { + this.endpoint = ep; + /* eslint-disable no-param-reassign */ + this.annotations.forEach(ann => { + if (ann.endpoint === undefined || ann.endpoint === null || ann.endpoint.isUnknown()) { + ann.endpoint = ep; + } + }); +}; +MutableSpan.prototype.overrideEndpoint = function overrideEndpoint(ann) { + const ep = ann.host != null ? ann.host : this.endpoint.toThrift(); + ep.service_name = this.service.getOrElse('Unknown'); + ann.host = ep; +}; +MutableSpan.prototype.toThrift = function toThrift() { + const span = new thriftTypes.Span(); + + span.id = this.traceId.spanId; + this.traceId._parentId.ifPresent(id => { + span.parent_id = id; + }); + + span.trace_id = this.traceId.traceId; + span.name = this.name.getOrElse('Unknown'); + span.debug = this.traceId.isDebug(); + + span.annotations = this.annotations.map(ann => { + const a = ann.toThrift(); + this.overrideEndpoint(a); + return a; + }); + + span.binary_annotations = this.binaryAnnotations.map(ann => { + const a = ann.toThrift(); + this.overrideEndpoint(a); + return a; + }); + + return span; +}; +MutableSpan.prototype.toString = function toString() { + const annotations = this.annotations.map(a => a.toString()).join(', '); + return `MutableSpan(id=${this.traceId.toString()}, annotations=[${annotations}])`; +}; + +module.exports.MutableSpan = MutableSpan; +module.exports.Endpoint = Endpoint; +module.exports.ZipkinAnnotation = ZipkinAnnotation; +module.exports.BinaryAnnotation = BinaryAnnotation; diff --git a/packages/zipkin/src/option.js b/packages/zipkin/src/option.js new file mode 100644 index 00000000..4b67b145 --- /dev/null +++ b/packages/zipkin/src/option.js @@ -0,0 +1,76 @@ +const None = { + type: 'None', + map: function map() { + return None; + }, + ifPresent: function ifPresent() {}, + flatMap: function flatMap() { + return None; + }, + getOrElse: function getOrElse(f) { + if (f instanceof Function) { + return f(); + } else { + return f; + } + }, + equals: function equals(other) { + return other.type === 'None'; + }, + toString: function toString() { + return 'None'; + } +}; + +class Some { + constructor(value) { + this.value = value; + } + map(f) { + return new Some(f(this.value)); + } + ifPresent(f) { + return this.map(f); + } + flatMap(f) { + return this.map(f).getOrElse(None); + } + getOrElse() { + return this.value; + } + equals(other) { + return other instanceof Some && other.value === this.value; + } + toString() { + return `Some(${this.value.toString()})`; + } +} +Some.prototype.type = 'Some'; + +// Used to validate input arguments +function isOptional(data) { + return data instanceof Some || data === None; +} + +function verifyIsOptional(data) { + if (isOptional(data)) { + if (isOptional(data.value)) { + throw new Error(`Error: data (${data.value.toString()}) is wrapped in Option twice`); + } + } else { + throw new Error(`Error: data (${data}) is not an Option!`); + } +} + +function fromNullable(nullable) { + if (nullable != null) { + return new Some(nullable); + } else { + return None; + } +} + +module.exports.Some = Some; +module.exports.None = None; +module.exports.verifyIsOptional = verifyIsOptional; +module.exports.fromNullable = fromNullable; diff --git a/packages/zipkin/src/randomTraceId.js b/packages/zipkin/src/randomTraceId.js new file mode 100644 index 00000000..67cb08f9 --- /dev/null +++ b/packages/zipkin/src/randomTraceId.js @@ -0,0 +1,12 @@ +// === Generate random 64-bit number in hex format +function randomTraceId() { + const digits = '0123456789abcdef'; + let n = ''; + for (let i = 0; i < 16; i++) { + const rand = Math.floor(Math.random() * 16); + n += digits[rand]; + } + return n; +} + +module.exports = randomTraceId; diff --git a/packages/zipkin/src/serializeSpan.js b/packages/zipkin/src/serializeSpan.js new file mode 100644 index 00000000..2dd80912 --- /dev/null +++ b/packages/zipkin/src/serializeSpan.js @@ -0,0 +1,19 @@ +const {TBufferedTransport, TBinaryProtocol} = require('thrift'); +const {fromByteArray: base64encode} = require('base64-js'); + +let serialized; +const transport = new TBufferedTransport(null, res => { + serialized = res; +}); + +const protocol = new TBinaryProtocol(transport); + +module.exports = function serializeSpan(span, format = 'base64') { + span.toThrift().write(protocol); + protocol.flush(); + if (format === 'base64') { + return base64encode(serialized); + } else { + return serialized; + } +}; diff --git a/packages/zipkin/src/time.js b/packages/zipkin/src/time.js new file mode 100644 index 00000000..88da86d4 --- /dev/null +++ b/packages/zipkin/src/time.js @@ -0,0 +1,4 @@ +module.exports.now = function now() { + const d = new Date(); + return d.getTime() * 1000 + d.getMilliseconds(); +}; diff --git a/packages/zipkin/src/trace.js b/packages/zipkin/src/trace.js new file mode 100644 index 00000000..0693ec87 --- /dev/null +++ b/packages/zipkin/src/trace.js @@ -0,0 +1,229 @@ +const {createNamespace, getNamespace} = require('continuation-local-storage'); +const randomTraceId = require('./randomTraceId'); +const Annotation = require('./annotation'); +const TraceId = require('./traceId'); +const {now} = require('./time'); +const {Some, None, fromNullable} = require('./option'); + +const trace = {}; + +const session = getNamespace('zipkin') || createNamespace('zipkin'); +const defaultContext = session.createContext(); +session.enter(defaultContext); + +function State({id, terminal, tracers}) { + this.id = id; + this.terminal = terminal; + this.tracers = tracers; +} +State.prototype.toString = function() { + return `State(id=${this.id.toString()}, terminal=${this.terminal}, tracers=[${ + this.tracers.map(t => t.toString()).join(', ') + }])`; +}; + +const defaultId = new TraceId({ + traceId: None, + parentId: None, + spanId: randomTraceId(), + sampled: None, + flags: 0 +}); + +let tracingEnabled = true; + +let sampleRate = 1; + +function local() { + return session.get('trace') || None; +} + +function setLocal(value) { + session.set('trace', value); +} + +function setState(state) { + setLocal(new Some(state)); +} + +function shouldSample() { + let sample; + if (sampleRate === 1) { + sample = new Some(true); + } else { + sample = new Some(Math.random() <= sampleRate); + } + return sample; +} + +trace.setSampleRate = function setSampleRate(rate) { + sampleRate = rate; +}; + +trace.withContext = function withContext(callback) { + return session.run(callback); +}; + +trace.bindContext = function bindContext(callback) { + return session.bind(callback); +}; + +trace.dumpLocal = function dumpLocal(message) { + /* eslint-disable no-console */ + console.log(`${message}: ${local().toString()}`); +}; + +trace.bindEmitter = function bindEmitter(obj) { + session.bindEmitter(obj); +}; + +trace.id = function id() { + return trace.idOption().getOrElse(defaultId); +}; + +trace.idOption = function idOption() { + return local().flatMap(state => state.id); +}; + +trace.isTerminal = function isTerminal() { + return local().map(state => state.terminal).getOrElse(false); +}; + +trace.tracers = function tracers() { + return local().map(state => state.tracers).getOrElse([]); +}; + +trace.clear = function clear() { + session.set('trace', None); +}; + +trace.enable = function enable() { + tracingEnabled = true; +}; + +trace.disable = function disable() { + tracingEnabled = false; +}; + +trace.nextId = function nextId() { + const currentId = trace.idOption(); + const _nextId = new TraceId({ + traceId: currentId.map(id => id.traceId), + parentId: currentId.map(id => id.spanId), + spanId: randomTraceId(), + sampled: currentId.flatMap(id => id.sampled), + flags: currentId.map(id => id.flags).getOrElse(0) + }); + if (_nextId.sampled === None) { + _nextId._sampled = shouldSample(_nextId); + } + return _nextId; +}; + +trace.setId = function setId(traceId, terminal = false) { + if (!trace.isTerminal()) { + setState(new State({ + id: new Some(traceId), + terminal, + tracers: trace.tracers() + })); + } +}; + +trace.setTerminalId = function setTerminalId(traceId) { + trace.setId(traceId, true); +}; + +trace.pushTracer = function pushTracer(additionalTracer) { + local().map(state => { + setState(new State({ + id: state.id, + terminal: state.terminal, + tracers: [...state.tracers, additionalTracer] + })); + return true; + }).getOrElse(() => { + setState(new State({ + id: None, + terminal: false, + tracers: [additionalTracer] + })); + }); +}; + +trace.setState = setState; + +trace.letTracer = function letTracer(tracer, f) { + return trace.withContext(() => { + trace.pushTracer(tracer); + return f(); + }); +}; + +trace.isActivelyTracing = function isActivelyTracing() { + let res; + if (!tracingEnabled) { + res = false; + } else if (local() === None) { + res = false; + } else { + const sampled = trace.id().sampled; + res = !sampled.equals(new Some(false)); + } + return res; +}; + +function arrayUnique(array) { + return array.filter((val, i, arr) => i <= arr.indexOf(val)); +} + +function uncheckedRecord(rec) { + arrayUnique(trace.tracers()).forEach(tracer => { + tracer.record(rec); + }); +} + +trace.record = function record(rec) { + if (trace.isActivelyTracing()) { + uncheckedRecord(rec); + } +}; + +trace.recordAnnotation = function recordAnnotation(ann, duration) { + trace.record({ + traceId: trace.id(), + timestamp: now(), + annotation: ann, + duration: fromNullable(duration) + }); +}; + +trace.recordMessage = function recordMessage(message, duration) { + trace.recordAnnotation(new Annotation.Message(message), duration); +}; + +trace.recordServiceName = function recordServiceName(serviceName) { + trace.recordAnnotation(new Annotation.ServiceName(serviceName)); +}; + +trace.recordRpc = function recordRpc(name) { + trace.recordAnnotation(new Annotation.Rpc(name)); +}; + +trace.recordClientAddr = function recordClientAddr(ia) { + trace.recordAnnotation(new Annotation.ClientAddr(ia)); +}; + +trace.recordServerAddr = function recordServerAddr(ia) { + trace.recordAnnotation(new Annotation.ServerAddr(ia)); +}; + +trace.recordLocalAddr = function recordLocalAddr(ia) { + trace.recordAnnotation(new Annotation.LocalAddr(ia)); +}; + +trace.recordBinary = function recordBinary(key, value) { + trace.recordAnnotation(new Annotation.BinaryAnnotation(key, value)); +}; + +module.exports = trace; diff --git a/packages/zipkin/src/traceId.js b/packages/zipkin/src/traceId.js new file mode 100644 index 00000000..2c665697 --- /dev/null +++ b/packages/zipkin/src/traceId.js @@ -0,0 +1,49 @@ +const {Some, verifyIsOptional} = require('./option'); + +class TraceId { + constructor({traceId, parentId, spanId, sampled}) { + verifyIsOptional(traceId); + verifyIsOptional(parentId); + verifyIsOptional(sampled); + this._traceId = traceId; + this._parentId = parentId; + this._spanId = spanId; + this._sampled = sampled; + this._flags = 0; + } + + get spanId() { + return this._spanId; + } + + get parentId() { + return this._parentId.getOrElse(this.spanId); + } + + get traceId() { + return this._traceId.getOrElse(this.parentId); + } + + get sampled() { + return this.isDebug() ? new Some(true) : this._sampled; + } + + get flags() { + return this._flags; + } + + isDebug() { + // The jshint tool always complains about using bitwise operators, + // but in this case it's actually intentional, so we disable the warning: + // jshint bitwise: false + return (this._flags & 1) === 1; + } + + toString() { + return `TraceId(spanId=${this.spanId.toString()}` + + `, parentId=${this.parentId.toString()}` + + `, traceId=${this.traceId.toString()})`; + } +} + +module.exports = TraceId; diff --git a/packages/zipkin/src/zipkin.js b/packages/zipkin/src/zipkin.js new file mode 100644 index 00000000..c9dd2205 --- /dev/null +++ b/packages/zipkin/src/zipkin.js @@ -0,0 +1,21 @@ +/* +const Annotation = require('./zipkin/annotation'); +const TraceId = require('./zipkin/traceId'); +const HttpHeaders = require('./zipkin/httpHeaders'); +const address = require('./zipkin/address'); +const option = require('./zipkin/option'); +const {ZipkinTracer} = require('./zipkin/ZipkinTracer'); +const consoleTracer = require('./zipkin/consoleTracer'); +const trace = require('./zipkin/trace'); +const time = require('./zipkin/time'); + +module.exports.trace = trace; +module.exports.TraceId = TraceId; +module.exports.option = option; +module.exports.Annotation = Annotation; +module.exports.address = address; +module.exports.HttpHeaders = HttpHeaders; +module.exports.ZipkinTracer = ZipkinTracer; +module.exports.consoleTracer = consoleTracer; +module.exports.time = time; +*/ diff --git a/packages/zipkin/src/zipkinCore.thrift b/packages/zipkin/src/zipkinCore.thrift new file mode 100644 index 00000000..1ca21e06 --- /dev/null +++ b/packages/zipkin/src/zipkinCore.thrift @@ -0,0 +1,46 @@ +# Generate with the command "thrift -gen js:node zipkinCore.thrift" +namespace java com.twitter.zipkin.gen +namespace rb Zipkin + +#************** Collection related structs ************** + +# these are the annotations we always expect to find in a span +const string CLIENT_SEND = "cs" +const string CLIENT_RECV = "cr" +const string SERVER_SEND = "ss" +const string SERVER_RECV = "sr" + +# this represents a host and port in a network +struct Endpoint { + 1: i32 ipv4, + 2: i16 port # beware that this will give us negative ports. some conversion needed + 3: string service_name # which service did this operation happen on? +} + +# some event took place, either one by the framework or by the user +struct Annotation { + 1: i64 timestamp # microseconds from epoch + 2: string value # what happened at the timestamp? + 3: optional Endpoint host # host this happened on + 4: optional i32 duration # how long did the operation take? microseconds +} + +enum AnnotationType { BOOL, BYTES, I16, I32, I64, DOUBLE, STRING } + +struct BinaryAnnotation { + 1: string key, + 2: binary value, + 3: AnnotationType annotation_type, + 4: optional Endpoint host +} + +struct Span { + 1: i64 trace_id # unique trace id, use for all spans in trace + 3: string name, # span name, rpc method for example + 4: i64 id, # unique span id, only used for this span + 5: optional i64 parent_id, # parent span id + 6: list annotations, # list of all annotations/events that occured + 8: list binary_annotations # any binary annotations + 9: optional bool debug = 0 # if true, we DEMAND that this span passes all samplers +} + diff --git a/packages/zipkin/test/.eslintrc b/packages/zipkin/test/.eslintrc new file mode 100644 index 00000000..3c8fcd73 --- /dev/null +++ b/packages/zipkin/test/.eslintrc @@ -0,0 +1,8 @@ +{ + "env": { + "mocha": true + }, + "globals": { + "expect": true + } +} diff --git a/packages/zipkin/test/InetAddress.test.js b/packages/zipkin/test/InetAddress.test.js new file mode 100644 index 00000000..6e82853d --- /dev/null +++ b/packages/zipkin/test/InetAddress.test.js @@ -0,0 +1,16 @@ +const InetAddress = require('../src/InetAddress'); +describe('InetAddress', () => { + it('should get the local address', () => { + InetAddress.getLocalAddress(); + }); + + it('should convert an IP address to integer representation', () => { + const addr = new InetAddress('80.91.37.133'); + expect(addr.toInt()).to.equal(1348150661); + }); + + it('should make a string representation', () => { + const addr = new InetAddress('80.91.37.133'); + expect(addr.toString()).to.equal('InetAddress(80.91.37.133)'); + }); +}); diff --git a/packages/zipkin/test/annotation.test.js b/packages/zipkin/test/annotation.test.js new file mode 100644 index 00000000..f8670702 --- /dev/null +++ b/packages/zipkin/test/annotation.test.js @@ -0,0 +1,10 @@ +const annotation = require('../src/annotation'); + +describe('Annotation types', () => { + Object.keys(annotation).forEach(key => { + it(`should have annotationType ${key}`, () => { + const ann = new annotation[key]({}); + expect(ann.annotationType).to.equal(key); + }); + }); +}); diff --git a/packages/zipkin/test/serializeSpan.test.js b/packages/zipkin/test/serializeSpan.test.js new file mode 100644 index 00000000..9756392e --- /dev/null +++ b/packages/zipkin/test/serializeSpan.test.js @@ -0,0 +1,57 @@ +const TraceId = require('../src/traceId'); +const serializeSpan = require('../src/serializeSpan'); +const { + MutableSpan, + Endpoint, + ZipkinAnnotation, + BinaryAnnotation +} = require('../src/internalRepresentations'); +const {Some, None} = require('../src/option'); + +describe('Serialising a span', () => { + // The Thrift IDL has camel_cased variable names, so we need camel-casing + // jshint camelcase: false + + const ms = new MutableSpan(new TraceId({ + traceId: new Some('a'), + parentId: new Some('b'), + spanId: 'c', + sampled: None + })); + ms.setName('GET'); + ms.setServiceName('PortalService'); + + const here = new Endpoint({host: 171520595, port: 8080}); + + ms.setEndpoint(here); + ms.addBinaryAnnotation(new BinaryAnnotation({ + key: 'warning', + value: 'The cake is a lie', + endpoint: here + })); + ms.addAnnotation(new ZipkinAnnotation({ + timestamp: 1, + endpoint: here, + value: 'sr' + })); + ms.addAnnotation(new ZipkinAnnotation({ + timestamp: 2, + endpoint: here, + value: 'ss' + })); + + const expected = + 'CgABAAAAAAAAAAoLAAMAAAADR0VUCgAEAAAAAAAAAAwKAAUAA' + + 'AAAAAAACw8ABgwAAAACCgABAAAAAAAAAAELAAIAAAACc3IMAA' + + 'MIAAEKOTJTBgACH5ALAAMAAAANUG9ydGFsU2VydmljZQAACgA' + + 'BAAAAAAAAAAILAAIAAAACc3MMAAMIAAEKOTJTBgACH5ALAAMA' + + 'AAANUG9ydGFsU2VydmljZQAADwAIDAAAAAELAAEAAAAHd2Fyb' + + 'mluZwsAAgAAABFUaGUgY2FrZSBpcyBhIGxpZQgAAwAAAAYMAA' + + 'QIAAEKOTJTBgACH5ALAAMAAAANUG9ydGFsU2VydmljZQAAAgA' + + 'JAAA='; + + it('should serialize correctly from MutableSpan to base64 encoded representation', () => { + const serialized = serializeSpan(ms); + expect(serialized).to.equal(expected); + }); +}); diff --git a/packages/zipkin/test/zipkinTracer.test.js b/packages/zipkin/test/zipkinTracer.test.js new file mode 100644 index 00000000..0a08ec66 --- /dev/null +++ b/packages/zipkin/test/zipkinTracer.test.js @@ -0,0 +1,59 @@ +const sinon = require('sinon'); +const trace = require('../src/trace'); +const ZipkinTracer = require('../src/ZipkinTracer'); +const TraceId = require('../src/TraceId'); +const Annotation = require('../src/Annotation'); +const InetAddress = require('../src/InetAddress'); +const {Some, None} = require('../src/option'); + +describe('The raw tracer', () => { + it('should accumulate annotations into MutableSpans', () => { + trace.withContext(() => { + // let spanHasBeenLogged = false; + // let loggedSpan; + + const logSpan = sinon.spy(); + + const tracer = new ZipkinTracer({ + logger: {logSpan} + }); + + trace.pushTracer(tracer); + trace.setId(new TraceId({ + traceId: None, + parentId: new Some('a'), + spanId: 'c', + sampled: new Some(true) + })); + + trace.recordServiceName('SmoothieStore'); + trace.recordRpc('buySmoothie'); + trace.recordBinary('taste', 'banana'); + trace.recordAnnotation(new Annotation.ServerRecv()); + trace.recordAnnotation(new Annotation.LocalAddr({ + host: new InetAddress('127.0.0.1'), + port: 7070 + })); + + // Should only log after the span is complete + expect(logSpan.calledOnce).to.equal(false); + trace.recordAnnotation(new Annotation.ServerSend()); + expect(logSpan.calledOnce).to.equal(true); + + const loggedSpan = logSpan.getCall(0).args[0]; + + expect(loggedSpan.traceId.traceId).to.equal('a'); + expect(loggedSpan.traceId.parentId).to.equal('a'); + expect(loggedSpan.traceId.spanId).to.equal('c'); + expect(loggedSpan.complete).to.equal(true); + expect(loggedSpan.name).to.eql(new Some('buySmoothie')); + expect(loggedSpan.service).to.eql(new Some('SmoothieStore')); + expect(loggedSpan.endpoint.host).to.equal(2130706433); + expect(loggedSpan.endpoint.port).to.equal(7070); + expect(loggedSpan.binaryAnnotations[0].key).to.equal('taste'); + expect(loggedSpan.binaryAnnotations[0].value).to.equal('banana'); + expect(loggedSpan.annotations[0].value).to.equal('sr'); + expect(loggedSpan.annotations[1].value).to.equal('ss'); + }); + }); +}); diff --git a/test/helper.js b/test/helper.js new file mode 100644 index 00000000..5407685d --- /dev/null +++ b/test/helper.js @@ -0,0 +1,3 @@ +const chai = require('chai'); +chai.config.includeStack = true; +global.expect = chai.expect; diff --git a/test/mocha.opts b/test/mocha.opts new file mode 100644 index 00000000..a6ed38a4 --- /dev/null +++ b/test/mocha.opts @@ -0,0 +1,5 @@ +--reporter spec +--recursive +--growl +--require test/helper.js +packages/*/test/*.js