Skip to content

Commit

Permalink
Added stats to both pusher jobs + Fixed big bug in the way slop key w…
Browse files Browse the repository at this point in the history
…as being generated
  • Loading branch information
rsumbaly committed Nov 1, 2010
1 parent cde9758 commit 10ab61f
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 59 deletions.
Expand Up @@ -17,7 +17,9 @@
package voldemort.server.scheduler.slop;

import java.util.Date;
import java.util.Map;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.cluster.Cluster;
Expand All @@ -40,6 +42,8 @@
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

import com.google.common.collect.Maps;

/**
* A task which goes through the slop table and attempts to push out all the
* slop to its rightful owner node
Expand Down Expand Up @@ -89,8 +93,17 @@ public void run() {
Cluster cluster = metadataStore.getCluster();
ClosableIterator<Pair<ByteArray, Versioned<Slop>>> iterator = null;

SlopStorageEngine slopStorageEngine = storeRepo.getSlopStore();
Map<Integer, Long> attemptedByNode = Maps.newHashMapWithExpectedSize(cluster.getNumberOfNodes());
Map<Integer, Long> succeededByNode = Maps.newHashMapWithExpectedSize(cluster.getNumberOfNodes());
long slopsPushed = 0L;
long attemptedPushes = 0L;
for(Node node: cluster.getNodes()) {
attemptedByNode.put(node.getId(), 0L);
succeededByNode.put(node.getId(), 0L);
}

try {
SlopStorageEngine slopStorageEngine = storeRepo.getSlopStore();
StorageEngine<ByteArray, Slop, byte[]> slopStore = slopStorageEngine.asSlopStore();
EventThrottler throttler = new EventThrottler(maxWriteBytesPerSec);

Expand All @@ -99,6 +112,7 @@ public void run() {
while(iterator.hasNext()) {
if(Thread.interrupted())
throw new InterruptedException("Slop pusher job cancelled");

try {
Pair<ByteArray, Versioned<Slop>> keyAndVal;
try {
Expand All @@ -113,6 +127,12 @@ public void run() {
int nodeId = slop.getNodeId();
Node node = cluster.getNodeById(nodeId);

attemptedPushes++;
if(attemptedPushes % 10000 == 0)
logger.info("Attempted pushing " + attemptedPushes + " slops");
Long attempted = attemptedByNode.get(nodeId);
attemptedByNode.put(nodeId, attempted + 1L);

if(failureDetector.isAvailable(node)) {
Store<ByteArray, byte[], byte[]> store = storeRepo.getNodeStore(slop.getStoreName(),
node.getId());
Expand All @@ -138,10 +158,25 @@ public void run() {
}
failureDetector.recordSuccess(node, deltaMs(startNs));
slopStore.delete(slop.makeKey(), versioned.getVersion());

slopsPushed++;
// Increment succeeded
Long succeeded = succeededByNode.get(nodeId);
succeededByNode.put(nodeId, succeeded + 1L);

// Throttle the bytes...
throttler.maybeThrottle(nBytes);

} catch(ObsoleteVersionException e) {

// okay it is old, just delete it
slopStore.delete(slop.makeKey(), versioned.getVersion());
slopsPushed++;

// Increment succeeded
Long succeeded = succeededByNode.get(nodeId);
succeededByNode.put(nodeId, succeeded + 1L);

// Throttle the bytes...
throttler.maybeThrottle(nBytes);

Expand All @@ -153,6 +188,20 @@ public void run() {
logger.error(e, e);
}
}

// Only if we reached here do we update stats
logger.log(attemptedPushes > 0 ? Level.INFO : Level.DEBUG,
"Attempted " + attemptedPushes + " hinted handoff pushes of which "
+ slopsPushed + " succeeded.");

Map<Integer, Long> outstanding = Maps.newHashMapWithExpectedSize(cluster.getNumberOfNodes());
for(int nodeId: succeededByNode.keySet()) {
outstanding.put(nodeId, attemptedByNode.get(nodeId)
- succeededByNode.get(nodeId));
}

slopStorageEngine.resetStats(outstanding);

} catch(Exception e) {
logger.error(e, e);
} finally {
Expand Down
Expand Up @@ -3,14 +3,17 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
Expand Down Expand Up @@ -102,6 +105,16 @@ public void run() {

SlopStorageEngine slopStorageEngine = storeRepo.getSlopStore();
ClosableIterator<Pair<ByteArray, Versioned<Slop>>> iterator = null;

ConcurrentHashMap<Integer, Long> attemptedByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
ConcurrentHashMap<Integer, Long> succeededByNode = new ConcurrentHashMap<Integer, Long>(cluster.getNumberOfNodes());
AtomicLong slopsPushed = new AtomicLong(0);
AtomicLong attemptedPushes = new AtomicLong(0);
for(Node node: cluster.getNodes()) {
attemptedByNode.put(node.getId(), 0L);
succeededByNode.put(node.getId(), 0L);
}

try {
StorageEngine<ByteArray, Slop, byte[]> slopStore = slopStorageEngine.asSlopStore();
iterator = slopStore.entries();
Expand All @@ -116,7 +129,13 @@ public void run() {
int nodeId = versioned.getValue().getNodeId();
Node node = cluster.getNodeById(nodeId);

logger.trace("On slop = " + versioned.getValue().getNodeId() + " => "
attemptedPushes.incrementAndGet();
Long attempted = attemptedByNode.get(nodeId);
attemptedByNode.put(nodeId, attempted + 1L);
if(attemptedPushes.get() % 10000 == 0)
logger.info("Attempted pushing " + attemptedPushes + " slops");

logger.info("On slop = " + versioned.getValue().getNodeId() + " => "
+ new String(versioned.getValue().getKey().get()));

if(failureDetector.isAvailable(node)) {
Expand All @@ -127,17 +146,32 @@ public void run() {
slopQueues.put(nodeId, slopQueue);
consumerResults.add(consumerExecutor.submit(new SlopConsumer(nodeId,
slopQueue,
slopStorageEngine)));
slopStorageEngine,
succeededByNode.get(nodeId),
slopsPushed)));
}
slopQueue.offer(versioned, 1, TimeUnit.SECONDS);
readThrottler.maybeThrottle(nBytesRead(keyAndVal));
} else {
logger.trace(node + " declared down, won't push slop");
logger.info(node + " declared down, won't push slop");
}
} catch(RejectedExecutionException e) {
throw new VoldemortException("Ran out of threads in executor", e);
}
}

// Only if we reached here do we update stats
logger.log(attemptedPushes.get() > 0 ? Level.INFO : Level.DEBUG,
"Attempted " + attemptedPushes + " hinted handoff pushes of which "
+ slopsPushed + " succeeded.");

Map<Integer, Long> outstanding = Maps.newHashMapWithExpectedSize(cluster.getNumberOfNodes());
for(int nodeId: succeededByNode.keySet()) {
outstanding.put(nodeId, attemptedByNode.get(nodeId)
- succeededByNode.get(nodeId));
}

slopStorageEngine.resetStats(outstanding);
} catch(InterruptedException e) {
// Swallow interrupted exceptions
} catch(Exception e) {
Expand Down Expand Up @@ -269,18 +303,24 @@ private class SlopConsumer implements Runnable {
private SynchronousQueue<Versioned<Slop>> slopQueue;
private long startTime;
private SlopStorageEngine slopStorageEngine;
private Long succeededNode;
private AtomicLong succeededGlobal;

// Keep two lists to track deleted items
private List<Pair<ByteArray, Version>> previous, current;

public SlopConsumer(int nodeId,
SynchronousQueue<Versioned<Slop>> slopQueue,
SlopStorageEngine slopStorageEngine) {
SlopStorageEngine slopStorageEngine,
Long succeededNode,
AtomicLong succeededGlobal) {
this.nodeId = nodeId;
this.slopQueue = slopQueue;
this.slopStorageEngine = slopStorageEngine;
this.previous = Lists.newArrayList();
this.current = Lists.newArrayList();
this.succeededGlobal = succeededGlobal;
this.succeededNode = succeededNode;
}

public void run() {
Expand All @@ -289,8 +329,11 @@ public void run() {
do {
if(!current.isEmpty()) {
if(!previous.isEmpty()) {
for(Pair<ByteArray, Version> entry: previous)
for(Pair<ByteArray, Version> entry: previous) {
slopStorageEngine.delete(entry.getFirst(), entry.getSecond());
}
succeededGlobal.addAndGet(previous.size());
succeededNode += previous.size();
previous.clear();
}
previous = null;
Expand All @@ -303,12 +346,20 @@ public void run() {
} while(!iterator.isComplete());

// Clear up both previous and current
if(!previous.isEmpty())
if(!previous.isEmpty()) {
for(Pair<ByteArray, Version> entry: previous)
slopStorageEngine.delete(entry.getFirst(), entry.getSecond());
if(!current.isEmpty())
succeededGlobal.addAndGet(previous.size());
succeededNode += previous.size();
previous.clear();
}
if(!current.isEmpty()) {
for(Pair<ByteArray, Version> entry: current)
slopStorageEngine.delete(entry.getFirst(), entry.getSecond());
succeededGlobal.addAndGet(current.size());
succeededNode += current.size();
current.clear();
}

} catch(UnreachableStoreException e) {
failureDetector.recordException(metadataStore.getCluster().getNodeById(nodeId),
Expand Down
10 changes: 9 additions & 1 deletion src/java/voldemort/store/slop/Slop.java
Expand Up @@ -113,7 +113,15 @@ public String getStoreName() {
public ByteArray makeKey() {
byte[] storeName = ByteUtils.getBytes(getStoreName(), "UTF-8");
byte[] opCode = new byte[] { operation.getOpCode() };
return new ByteArray(ByteUtils.cat(opCode, spacer, storeName, spacer, key.get()));
byte[] nodeIdBytes = new byte[ByteUtils.SIZE_OF_INT];
ByteUtils.writeInt(nodeIdBytes, nodeId, 0);
return new ByteArray(ByteUtils.cat(opCode,
spacer,
storeName,
spacer,
nodeIdBytes,
spacer,
key.get()));
}

@Override
Expand Down
18 changes: 0 additions & 18 deletions src/java/voldemort/store/slop/SlopStorageEngine.java
Expand Up @@ -53,21 +53,6 @@ public SlopStorageEngine(StorageEngine<ByteArray, byte[], byte[]> slopEngine, Cl
this.slopStats = new SlopStats(cluster);
}

@JmxGetter(name = "addedSinceResetTotal", description = "slops added since reset")
public Long getAddedSinceResetTotal() {
return slopStats.getTotalCount(SlopStats.Tracked.ADDED);
}

@JmxGetter(name = "addedSinceResetByNode", description = "slops added since reset by node")
public Map<Integer, Long> getAddedSinceResetByNode() {
return slopStats.asMap(SlopStats.Tracked.ADDED);
}

@JmxGetter(name = "addedSinceResetByZone", description = "slops added since reset by zone")
public Map<Integer, Long> getAddedSinceResetByZone() {
return slopStats.byZone(SlopStats.Tracked.ADDED);
}

@JmxGetter(name = "outstandingTotal", description = "slops outstanding since last push")
public long getOutstandingTotal() {
return slopStats.getTotalCount(SlopStats.Tracked.OUTSTANDING);
Expand All @@ -84,7 +69,6 @@ public Map<Integer, Long> getOutstandingByZone() {
}

public void resetStats(Map<Integer, Long> newValues) {
slopStats.clearCount(SlopStats.Tracked.ADDED);
slopStats.setAll(SlopStats.Tracked.OUTSTANDING, newValues);
}

Expand Down Expand Up @@ -119,9 +103,7 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,

public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
Slop slop = slopSerializer.toObject(value.getValue());
slopEngine.put(key, value, transforms);
slopStats.incrementCount(SlopStats.Tracked.ADDED, slop.getNodeId());
}

public boolean delete(ByteArray key, Version version) throws VoldemortException {
Expand Down
15 changes: 7 additions & 8 deletions src/java/voldemort/store/stats/SlopStats.java
@@ -1,12 +1,12 @@
/*
* Copyright 2008-2010 LinkedIn, Inc
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -16,11 +16,11 @@

package voldemort.store.stats;

import voldemort.cluster.Cluster;

import java.util.EnumMap;
import java.util.Map;

import voldemort.cluster.Cluster;

/**
* Statistics for hinted handoff
*/
Expand Down Expand Up @@ -53,7 +53,7 @@ public Long getTotalCount(Tracked metric) {
public void clearCount(Tracked metric) {
counters.get(metric).clearCount();
}

public void clearCount(Tracked metric, int nodeId) {
counters.get(metric).clearCount(nodeId);
}
Expand All @@ -71,8 +71,7 @@ public void setAll(Tracked metric, Map<Integer, Long> newValues) {
}

public static enum Tracked {
OUTSTANDING("outstanding"),
ADDED("added");
OUTSTANDING("outstanding");

private final String name;

Expand Down

0 comments on commit 10ab61f

Please sign in to comment.