Skip to content

Commit

Permalink
Fixed the multithreading issue which is happening when nrOfThread is …
Browse files Browse the repository at this point in the history
…more than 1. Refer openzipkin#60
  • Loading branch information
raunak.agrawal committed Feb 11, 2015
1 parent 9e1b512 commit 949f73f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Expand Up @@ -66,10 +66,11 @@ public SpanProcessingThread(final BlockingQueue<Span> queue, final ZipkinCollect
}

/**
* Requests the thread to stop.
* Requests the thread to stop as well as closes the client connection for this thread.
*/
public void stop() {
stop = true;
clientProvider.close();
}

/**
Expand Down
Expand Up @@ -44,7 +44,6 @@ public class ZipkinSpanCollector implements SpanCollector {
private static final String UTF_8 = "UTF-8";
private static final Logger LOGGER = LoggerFactory.getLogger(ZipkinSpanCollector.class);

private final ZipkinCollectorClientProvider clientProvider;
private final BlockingQueue<Span> spanQueue;
private final ExecutorService executorService;
private final List<SpanProcessingThread> spanProcessingThreads = new ArrayList<SpanProcessingThread>();
Expand Down Expand Up @@ -73,20 +72,24 @@ public ZipkinSpanCollector(final String zipkinCollectorHost, final int zipkinCol
final ZipkinSpanCollectorParams params) {
Validate.notEmpty(zipkinCollectorHost);
Validate.notNull(params);
clientProvider =
new ZipkinCollectorClientProvider(zipkinCollectorHost, zipkinCollectorPort, params.getSocketTimeout());
try {
clientProvider.setup();
} catch (final TException e) {
if (params.failOnSetup()) {
throw new IllegalStateException(e);
} else {
LOGGER.warn("Connection could not be established during setup.", e);
}
}

spanQueue = new ArrayBlockingQueue<Span>(params.getQueueSize());
executorService = Executors.newFixedThreadPool(params.getNrOfThreads());

for (int i = 1; i <= params.getNrOfThreads(); i++) {

//Creating a client provider for every spanProcessingThread.
ZipkinCollectorClientProvider clientProvider =
new ZipkinCollectorClientProvider(zipkinCollectorHost, zipkinCollectorPort, params.getSocketTimeout());
try {
clientProvider.setup();
} catch (final TException e) {
if (params.failOnSetup()) {
throw new IllegalStateException(e);
} else {
LOGGER.warn("Connection could not be established during setup.", e);
}
}
final SpanProcessingThread spanProcessingThread =
new SpanProcessingThread(spanQueue, clientProvider, params.getBatchSize());
spanProcessingThreads.add(spanProcessingThread);
Expand Down Expand Up @@ -161,7 +164,6 @@ public void close() {
}
}
executorService.shutdown();
clientProvider.close();
LOGGER.info("ZipkinSpanCollector closed.");
}

Expand Down

0 comments on commit 949f73f

Please sign in to comment.