Browse files

Merge branch 'cache'

  • Loading branch information...
2 parents 9d9fca9 + 5bf88e0 commit f6147f852c1799184ee42ca089ac5ab641ab89e6 Daniel Gómez Ferro committed Mar 21, 2013
View
9 bin/omid.sh
@@ -33,15 +33,8 @@ for j in ../lib/*.jar; do
CLASSPATH=$CLASSPATH:$j
done
-if which greadlink; then
- READLINK=greadlink
-else
- READLINK=readlink
-fi
-
tso() {
- export LD_LIBRARY_PATH=`$READLINK -f ../lib`
- exec java -Xmx1024m -cp $CLASSPATH -Domid.maxItems=100000 -Domid.maxCommits=100000 -Djava.library.path=$LD_LIBRARY_PATH -Dlog4j.configuration=log4j.properties com.yahoo.omid.tso.TSOServer -port 1234 -batch $BATCHSIZE -ensemble 4 -quorum 2 -zk localhost:2181
+ exec java -Xmx1024m -cp $CLASSPATH -Domid.maxItems=1000000 -Domid.maxCommits=1000000 -Dlog4j.configuration=log4j.properties com.yahoo.omid.tso.TSOServer -port 1234 -batch $BATCHSIZE -zk localhost:2181
}
tsobench() {
View
2 conf/log4j.properties
@@ -48,6 +48,6 @@ log4j.appender.O.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=[%d{HH:mm:ss,SSS}]%5p%6.6r[%t]%x - %C{1}.%M(%F:%L) - %m%n
log4j.appender.O.layout.ConversionPattern=[%d{HH:mm:ss,SSS}]%5p%6.6r[%t]%x - %C{1}.%M(%F:%L) - %m%n
-log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=INFO
+log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=TRACE
log4j.logger.com.yahoo.omid.notifications=TRACE
log4j.logger.com.yahoo.omid.examples.notifications=TRACE
View
5 pom.xml
@@ -167,5 +167,10 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0-rc1</version>
+ </dependency>
</dependencies>
</project>
View
9 src/main/java/com/yahoo/omid/tso/Cache.java
@@ -0,0 +1,9 @@
+package com.yahoo.omid.tso;
+
+public interface Cache {
+
+ public abstract long set(long key, long value);
+
+ public abstract long get(long key);
+
+}
View
94 src/main/java/com/yahoo/omid/tso/CacheEvaluation.java
@@ -0,0 +1,94 @@
+package com.yahoo.omid.tso;
+
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.Random;
+
+public class CacheEvaluation {
+
+ final static int ENTRIES = 150000000;
+ final static int WARMUP_ROUNDS = 2;
+ final static int ROUNDS = 4;
+ final static double HOT_PERC = 1;
+// Histogram hist = new Histogram(2 * ENTRIES);
+
+ public void testEntriesAge(Cache cache, PrintWriter writer) {
+ Random random = new Random();
+
+ long seed = random.nextLong();
+
+ writer.println("# Random seed: " + seed);
+ random.setSeed(seed);
+ int removals = 0;
+ double tempStdDev = 0;
+ double tempAvg = 0;
+
+ int i = 0;
+ int largestDeletedTimestamp = 0;
+ long hotItem = random.nextLong();
+
+ Runtime.getRuntime().gc();
+
+ for (; i < ENTRIES * WARMUP_ROUNDS; ++i) {
+ long toInsert = random.nextInt(100) < HOT_PERC ? hotItem : random.nextLong();
+ long removed = cache.set(toInsert, i);
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ if (i % ENTRIES == 0) {
+ int round = i / ENTRIES + 1;
+ System.err.format("Warmup [%d/%d]\n", round, WARMUP_ROUNDS);
+ }
+ }
+
+ long time = System.nanoTime();
+ for (; i < ENTRIES * (WARMUP_ROUNDS + ROUNDS); ++i) {
+ long toInsert = random.nextInt(100) < HOT_PERC ? hotItem : random.nextLong();
+ long removed = cache.set(toInsert, i);
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ int gap = i - largestDeletedTimestamp;
+ removals++;
+ double oldAvg = tempAvg;
+ tempAvg += (gap - tempAvg) / removals;
+ tempStdDev += (gap - oldAvg) * (gap - tempAvg);
+// hist.add(gap);
+ if (i % ENTRIES == 0) {
+ int round = i / ENTRIES - WARMUP_ROUNDS + 1;
+ System.err.format("Progress [%d/%d]\n", round, ROUNDS);
+ }
+ }
+ long elapsed = System.nanoTime() - time;
+ double elapsedSeconds = (elapsed / (double) 1000000000);
+ long totalOps = ENTRIES * ROUNDS;
+ writer.println("# Free mem before GC (MB) :" + (Runtime.getRuntime().freeMemory() / (double) (1024 * 1024)));
+ Runtime.getRuntime().gc();
+ writer.println("# Free mem (MB) :" + (Runtime.getRuntime().freeMemory() / (double) (1024 * 1024)));
+ writer.println("# Elapsed (s): " + elapsedSeconds);
+ writer.println("# Elapsed per 100 ops (ms): " + (elapsed / (double) totalOps / 100 / (double) 1000000));
+ writer.println("# Ops per s : " + (totalOps / elapsedSeconds));
+ writer.println("# Avg gap: " + (tempAvg));
+ writer.println("# Std dev gap: " + Math.sqrt((tempStdDev / ENTRIES)));
+// hist.print(writer);
+ }
+
+ public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException {
+ int[] asoc = new int[] { 8, 16, 32 };
+ for (int i = 0; i < asoc.length; ++i) {
+ PrintWriter writer = new PrintWriter(asoc[i] + ".out", "UTF-8");
+ new CacheEvaluation().testEntriesAge(new LongCache(ENTRIES, asoc[i]), writer);
+ writer.close();
+ }
+ {
+ PrintWriter writer = new PrintWriter("guava.out", "UTF-8");
+ new CacheEvaluation().testEntriesAge(new GuavaCache(ENTRIES), writer);
+ writer.close();
+ }
+
+ }
+}
View
4 src/main/java/com/yahoo/omid/tso/ClientHandler.java
@@ -348,11 +348,11 @@ private void startTransaction() {
return;
if (curMessage == 0) {
- LOG.warn("No more message");
+ LOG.info("No more messages, stopping benchmark");
// wait for all outstanding msgs and then close the channel
// if (outstandingTransactions.intValue() == 0) {
if (outstandingTransactions == 0) {
- LOG.warn("Close channel");
+ LOG.info("Close channel");
channel.close().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
answer.offer(true);
View
49 src/main/java/com/yahoo/omid/tso/CommitHashMap.java
@@ -44,10 +44,9 @@
class CommitHashMap {
- private final int size;
private long largestDeletedTimestamp;
- private final long[] startCommitMapping;
- private final long[] rowsCommitMapping;
+ private final Cache startCommitMapping;
+ private final Cache rowsCommitMapping;
/**
* Constructs a new, empty hashtable with a default size of 1000
@@ -69,53 +68,29 @@ public CommitHashMap(int size) {
throw new IllegalArgumentException("Illegal size: " + size);
}
- this.size = size;
- this.startCommitMapping = new long[size * 2];
- this.rowsCommitMapping = new long[size * 2];
- }
-
- private int index(long hash) {
- return (int) (Math.abs(hash) % size);
+ this.startCommitMapping = new LongCache(size, 4);
+ this.rowsCommitMapping = new LongCache(size, 32);
}
public long getLatestWriteForRow(long hash) {
- int index = index(hash);
- return rowsCommitMapping[index];
+ return rowsCommitMapping.get(hash);
}
public void putLatestWriteForRow(long hash, long commitTimestamp) {
- int index = index(hash);
- long oldCommitTS = rowsCommitMapping[index];
-
- if (oldCommitTS == commitTimestamp)
- return;
-
- rowsCommitMapping[index] = commitTimestamp;
- largestDeletedTimestamp = Math.max(oldCommitTS, largestDeletedTimestamp);
+ long oldCommitTS = rowsCommitMapping.set(hash, commitTimestamp);
+ largestDeletedTimestamp = Math
+ .max(oldCommitTS, largestDeletedTimestamp);
}
public long getCommittedTimestamp(long startTimestamp) {
- int indexStart = 2 * index(startTimestamp);
- int indexCommit = indexStart + 1;
-
- if (startCommitMapping[indexStart] == startTimestamp) {
- return startCommitMapping[indexCommit];
- } else {
- return 0;
- }
+ return startCommitMapping.get(startTimestamp);
}
public void setCommittedTimestamp(long startTimestamp, long commitTimestamp) {
- int indexStart = 2 * index(startTimestamp);
- int indexCommit = indexStart + 1;
-
- long oldCommitTS = startCommitMapping[indexCommit];
- if (oldCommitTS == commitTimestamp)
- return;
+ long oldCommitTS = startCommitMapping.set(startTimestamp, commitTimestamp);
- startCommitMapping[indexStart] = startTimestamp;
- startCommitMapping[indexCommit] = commitTimestamp;
- largestDeletedTimestamp = Math.max(oldCommitTS, largestDeletedTimestamp);
+ largestDeletedTimestamp = Math
+ .max(oldCommitTS, largestDeletedTimestamp);
}
// set of half aborted transactions
View
45 src/main/java/com/yahoo/omid/tso/GuavaCache.java
@@ -0,0 +1,45 @@
+package com.yahoo.omid.tso;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class GuavaCache implements Cache, RemovalListener<Long, Long> {
+
+ private static final Log LOG = LogFactory.getLog(GuavaCache.class);
+ private com.google.common.cache.Cache<Long, Long> cache;
+ private long removed;
+
+ public GuavaCache(int size) {
+ cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumSize(size).initialCapacity(size)
+ .removalListener(this).build();
+ }
+
+ @Override
+ public long set(long key, long value) {
+ cache.put(key, value);
+ // cache.cleanUp();
+ return removed;
+ }
+
+ @Override
+ public long get(long key) {
+ Long result = cache.getIfPresent(key);
+ return result == null ? 0 : result;
+ }
+
+ @Override
+ public void onRemoval(RemovalNotification<Long, Long> notification) {
+ if (notification.getCause() == RemovalCause.REPLACED) {
+ return;
+ }
+// LOG.warn("Removing " + notification);
+// new Exception().printStackTrace();
+ removed = Math.max(removed, notification.getValue());
+ }
+
+}
View
59 src/main/java/com/yahoo/omid/tso/Histogram.java
@@ -0,0 +1,59 @@
+package com.yahoo.omid.tso;
+
+import java.io.PrintWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class Histogram {
+ private static final Log LOG = LogFactory.getLog(Histogram.class);
+ final private int size;
+ final private int[] counts;
+ private int max;
+ private int min = Integer.MIN_VALUE;
+
+ public Histogram(int size) {
+ this.size = size;
+ this.counts = new int[size];
+ }
+
+ public void add(int i) {
+ if (i >= size) {
+ LOG.error("Tried to add " + i + " which is bigger than size " + size);
+ return;
+ }
+ counts[i]++;
+ if (i > max) {
+ max = i;
+ }
+ if (i < min) {
+ min = i;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(max).append('\n');
+ for (int i = 0; i <= max; ++i) {
+ sb.append("[").append(i).append("]\t");
+ }
+ sb.append('\n');
+ for (int i = 0; i <= max; ++i) {
+ sb.append(counts[i]).append("\t");
+ }
+ return sb.toString();
+ }
+
+ public void log() {
+ for (int i = min; i <= max; ++i) {
+ LOG.debug(String.format("[%5d]\t%5d", i, counts[i]));
+ }
+ }
+
+ public void print(PrintWriter writer) {
+ for (int i = 0; i <= max; ++i) {
+ writer.format("%5d\t%5d\n", i, counts[i]);
+ }
+ }
+}
View
59 src/main/java/com/yahoo/omid/tso/LongCache.java
@@ -0,0 +1,59 @@
+package com.yahoo.omid.tso;
+
+public class LongCache implements Cache {
+
+ private final long [] cache;
+ private final int size;
+ private final int associativity;
+
+ public LongCache(int size, int associativity) {
+ this.size = size;
+ this.cache = new long[2*(size + associativity)];
+ this.associativity = associativity;
+ }
+
+ /* (non-Javadoc)
+ * @see com.yahoo.omid.tso.Cache#set(long, long)
+ */
+ @Override
+ public long set(long key, long value) {
+ final int index = index(key);
+ int oldestIndex = 0;
+ long oldestValue = Long.MAX_VALUE;
+ for (int i = 0; i < associativity; ++i) {
+ int currIndex = 2 * (index + i);
+ if (cache[currIndex] == key) {
+ oldestValue = 0;
+ oldestIndex = currIndex;
+ break;
+ }
+ if (cache[currIndex + 1] <= oldestValue) {
+ oldestValue = cache[currIndex + 1];
+ oldestIndex = currIndex;
+ }
+ }
+ cache[oldestIndex] = key;
+ cache[oldestIndex + 1] = value;
+ return oldestValue;
+ }
+
+ /* (non-Javadoc)
+ * @see com.yahoo.omid.tso.Cache#get(long)
+ */
+ @Override
+ public long get(long key) {
+ final int index = index(key);
+ for (int i = 0; i < associativity; ++i) {
+ int currIndex = 2 * (index + i);
+ if (cache[currIndex] == key) {
+ return cache[currIndex + 1];
+ }
+ }
+ return 0;
+ }
+
+ private int index(long hash) {
+ return (int) (Math.abs(hash) % size);
+ }
+
+}
View
6 src/main/java/com/yahoo/omid/tso/TSOHandler.java
@@ -53,6 +53,7 @@
import com.yahoo.omid.tso.messages.CommitRequest;
import com.yahoo.omid.tso.messages.CommitResponse;
import com.yahoo.omid.tso.messages.FullAbortRequest;
+import com.yahoo.omid.tso.messages.LargestDeletedTimestampReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
import com.yahoo.omid.tso.messages.TimestampResponse;
import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
@@ -226,6 +227,7 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
buffer.initializeIndexes();
}
}
+ channel.write(new LargestDeletedTimestampReport(sharedState.largestDeletedTimestamp));
for (AbortedTransaction halfAborted : sharedState.hashmap.halfAborted) {
channel.write(new AbortedTransactionReport(halfAborted.getStartTimestamp()));
}
@@ -299,8 +301,8 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
if (msg.startTimestamp < timestampOracle.first()) {
reply.committed = false;
LOG.warn("Aborting transaction after restarting TSO");
- } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp) {
- // Too old
+ } else if (msg.rows.length > 0 && msg.startTimestamp < sharedState.largestDeletedTimestamp) {
+ // Too old and not read only
reply.committed = false;// set as abort
LOG.warn("Too old starttimestamp: ST " + msg.startTimestamp + " MAX "
+ sharedState.largestDeletedTimestamp);
View
68 src/test/java/com/yahoo/omid/tso/TestLongCache.java
@@ -0,0 +1,68 @@
+package com.yahoo.omid.tso;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.junit.Assert.assertThat;
+
+import java.io.PrintWriter;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class TestLongCache {
+
+ private static final Log LOG = LogFactory.getLog(TestLongCache.class);
+ final int entries = 1000;
+ Histogram hist = new Histogram(entries * 10);
+
+ @Test
+ public void testEntriesAge() {
+
+
+ Cache cache = new LongCache(entries, 16);
+ Random random = new Random();
+
+ long seed = random.nextLong();
+
+ LOG.info("Random seed: " + seed);
+ random.setSeed(seed);
+ int removals = 0;
+ long totalAge = 0;
+ double tempStdDev = 0;
+ double tempAvg = 0;
+
+ int i = 0;
+ int largestDeletedTimestamp = 0;
+ for (; i < entries * 10; ++i) {
+ long removed = cache.set(random.nextLong(), i);
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ }
+
+ long time = System.nanoTime();
+ for (; i < entries * 100; ++i) {
+ long removed = cache.set(random.nextLong(), i);
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ int gap = i - ((int) largestDeletedTimestamp);
+ removals++;
+ totalAge += gap;
+ double oldAvg = tempAvg;
+ tempAvg += (gap - tempAvg) / removals;
+ tempStdDev += (gap - oldAvg) * (gap - tempAvg);
+ hist.add(gap);
+ }
+ long elapsed = System.nanoTime() - time;
+ LOG.info("Elapsed (ms): " + (elapsed / (double)1000));
+
+ double avgGap = totalAge / (double) removals;
+ LOG.info("Avg gap: " + (tempAvg ));
+ LOG.info("Std dev gap: " + Math.sqrt((tempStdDev / entries)));
+ assertThat(avgGap, is(greaterThan(entries * .6 )));
+ }
+
+}
View
13 src/test/resources/log4j.properties
@@ -1,16 +1,18 @@
-log4j.rootLogger=INFO,console
+log4j.rootLogger=TRACE,DRFA,console
# Logging Threshold
-log4j.threshhold=ALL
+#log4j.threshold=INFO
#
# Daily Rolling File Appender
#
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${omid.log.dir}/${omid.log.file}
+log4j.appender.DRFA.threshold=TRACE
+log4j.appender.DRFA=org.apache.log4j.FileAppender
+log4j.appender.DRFA.File=omid.log
# Rollver at midnight
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+#log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+log4j.appender.DRFA.append=false
# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
@@ -28,6 +30,7 @@ log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.threshold=INFO
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t] %p %c{2}: %m%n

0 comments on commit f6147f8

Please sign in to comment.