From 949f73f69aa359e6ff03602706c0d3bc6602d853 Mon Sep 17 00:00:00 2001 From: "raunak.agrawal" Date: Wed, 11 Feb 2015 10:55:33 +0530 Subject: [PATCH] Fixed the multithreading issue which is happening when nrOfThread is more than 1. Refer https://github.com/kristofa/brave/issues/60 --- .../brave/zipkin/SpanProcessingThread.java | 3 +- .../brave/zipkin/ZipkinSpanCollector.java | 28 ++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/SpanProcessingThread.java b/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/SpanProcessingThread.java index 7a559b5121..f445a45dcb 100644 --- a/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/SpanProcessingThread.java +++ b/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/SpanProcessingThread.java @@ -66,10 +66,11 @@ public SpanProcessingThread(final BlockingQueue queue, final ZipkinCollect } /** - * Requests the thread to stop. + * Requests the thread to stop as well as closes the client connection for this thread. */ public void stop() { stop = true; + clientProvider.close(); } /** diff --git a/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/ZipkinSpanCollector.java b/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/ZipkinSpanCollector.java index 1416ac7874..79171ddb57 100644 --- a/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/ZipkinSpanCollector.java +++ b/brave-zipkin-spancollector/src/main/java/com/github/kristofa/brave/zipkin/ZipkinSpanCollector.java @@ -44,7 +44,6 @@ public class ZipkinSpanCollector implements SpanCollector { private static final String UTF_8 = "UTF-8"; private static final Logger LOGGER = LoggerFactory.getLogger(ZipkinSpanCollector.class); - private final ZipkinCollectorClientProvider clientProvider; private final BlockingQueue spanQueue; private final ExecutorService executorService; private final List spanProcessingThreads = new ArrayList(); @@ -73,20 +72,24 @@ public ZipkinSpanCollector(final String zipkinCollectorHost, final int zipkinCol final ZipkinSpanCollectorParams params) { Validate.notEmpty(zipkinCollectorHost); Validate.notNull(params); - clientProvider = - new ZipkinCollectorClientProvider(zipkinCollectorHost, zipkinCollectorPort, params.getSocketTimeout()); - try { - clientProvider.setup(); - } catch (final TException e) { - if (params.failOnSetup()) { - throw new IllegalStateException(e); - } else { - LOGGER.warn("Connection could not be established during setup.", e); - } - } + spanQueue = new ArrayBlockingQueue(params.getQueueSize()); executorService = Executors.newFixedThreadPool(params.getNrOfThreads()); + for (int i = 1; i <= params.getNrOfThreads(); i++) { + + //Creating a client provider for every spanProcessingThread. + ZipkinCollectorClientProvider clientProvider = + new ZipkinCollectorClientProvider(zipkinCollectorHost, zipkinCollectorPort, params.getSocketTimeout()); + try { + clientProvider.setup(); + } catch (final TException e) { + if (params.failOnSetup()) { + throw new IllegalStateException(e); + } else { + LOGGER.warn("Connection could not be established during setup.", e); + } + } final SpanProcessingThread spanProcessingThread = new SpanProcessingThread(spanQueue, clientProvider, params.getBatchSize()); spanProcessingThreads.add(spanProcessingThread); @@ -161,7 +164,6 @@ public void close() { } } executorService.shutdown(); - clientProvider.close(); LOGGER.info("ZipkinSpanCollector closed."); }