Skip to content

Commit

Permalink
Merge branch 'slop-thread-fix'
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Feb 6, 2012
2 parents fe1b415 + 55a5b74 commit de2f94d
Showing 1 changed file with 10 additions and 11 deletions.
Expand Up @@ -84,6 +84,14 @@ public StreamingSlopPusherJob(StoreRepository storeRepo,
this.adminClient = null; this.adminClient = null;
this.consumerResults = Lists.newArrayList(); this.consumerResults = Lists.newArrayList();
this.zoneMapping = Maps.newHashMap(); this.zoneMapping = Maps.newHashMap();
this.consumerExecutor = Executors.newCachedThreadPool(new ThreadFactory() {

public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("slop-pusher");
return thread;
}
});
} }


public void run() { public void run() {
Expand Down Expand Up @@ -239,8 +247,8 @@ public void run() {
logger.info("Slops to node " + nodeId + " - Succeeded - " logger.info("Slops to node " + nodeId + " - Succeeded - "
+ succeededByNode.get(nodeId) + " - Attempted - " + succeededByNode.get(nodeId) + " - Attempted - "
+ attemptedByNode.get(nodeId)); + attemptedByNode.get(nodeId));
outstanding.put(nodeId, attemptedByNode.get(nodeId) outstanding.put(nodeId,
- succeededByNode.get(nodeId)); attemptedByNode.get(nodeId) - succeededByNode.get(nodeId));
} }
slopStorageEngine.resetStats(outstanding); slopStorageEngine.resetStats(outstanding);
logger.info("Completed streaming slop pusher job which started at " + startTime); logger.info("Completed streaming slop pusher job which started at " + startTime);
Expand All @@ -266,15 +274,6 @@ public void run() {
private void loadMetadata() { private void loadMetadata() {
this.cluster = metadataStore.getCluster(); this.cluster = metadataStore.getCluster();
this.slopQueues = new ConcurrentHashMap<Integer, SynchronousQueue<Versioned<Slop>>>(cluster.getNumberOfNodes()); this.slopQueues = new ConcurrentHashMap<Integer, SynchronousQueue<Versioned<Slop>>>(cluster.getNumberOfNodes());
this.consumerExecutor = Executors.newFixedThreadPool(cluster.getNumberOfNodes(),
new ThreadFactory() {

public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("slop-pusher");
return thread;
}
});
this.attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes()); this.attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
this.succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes()); this.succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
} }
Expand Down

0 comments on commit de2f94d

Please sign in to comment.