diff --git a/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java b/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java index e75900b61f..63d2e538ee 100644 --- a/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java +++ b/src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java @@ -84,6 +84,14 @@ public StreamingSlopPusherJob(StoreRepository storeRepo, this.adminClient = null; this.consumerResults = Lists.newArrayList(); this.zoneMapping = Maps.newHashMap(); + this.consumerExecutor = Executors.newCachedThreadPool(new ThreadFactory() { + + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName("slop-pusher"); + return thread; + } + }); } public void run() { @@ -239,8 +247,8 @@ public void run() { logger.info("Slops to node " + nodeId + " - Succeeded - " + succeededByNode.get(nodeId) + " - Attempted - " + attemptedByNode.get(nodeId)); - outstanding.put(nodeId, attemptedByNode.get(nodeId) - - succeededByNode.get(nodeId)); + outstanding.put(nodeId, + attemptedByNode.get(nodeId) - succeededByNode.get(nodeId)); } slopStorageEngine.resetStats(outstanding); logger.info("Completed streaming slop pusher job which started at " + startTime); @@ -266,15 +274,6 @@ public void run() { private void loadMetadata() { this.cluster = metadataStore.getCluster(); this.slopQueues = new ConcurrentHashMap>>(cluster.getNumberOfNodes()); - this.consumerExecutor = Executors.newFixedThreadPool(cluster.getNumberOfNodes(), - new ThreadFactory() { - - public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setName("slop-pusher"); - return thread; - } - }); this.attemptedByNode = new ConcurrentHashMap(cluster.getNumberOfNodes()); this.succeededByNode = new ConcurrentHashMap(cluster.getNumberOfNodes()); }