Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merged with master

  • Loading branch information...
commit 39a11284107fda4e19a00eaff4ab668047c5a567 2 parents e1d3012 + 1d7c6e8
Maysam Yabandeh authored
Showing with 1,951 additions and 368 deletions.
  1. +1 −1  bin/omid.sh
  2. +45 −0 conf/log4j.properties
  3. +33 −0 conf/omid-site.xml
  4. +19 −14 pom.xml
  5. +7 −0 src/main/java/com/yahoo/omid/client/TransactionManager.java
  6. +1 −10 src/main/java/com/yahoo/omid/client/TransactionalTable.java
  7. +66 −35 src/main/java/com/yahoo/omid/tso/TSOHandler.java
  8. +34 −81 src/main/java/com/yahoo/omid/tso/TSOServer.java
  9. +118 −0 src/main/java/com/yahoo/omid/tso/TSOServerConfig.java
  10. +109 −18 src/main/java/com/yahoo/omid/tso/TSOState.java
  11. +44 −12 src/main/java/com/yahoo/omid/tso/TimestampOracle.java
  12. +369 −0 src/main/java/com/yahoo/omid/tso/persistence/BookKeeperStateBuilder.java
  13. +268 −0 src/main/java/com/yahoo/omid/tso/persistence/BookKeeperStateLogger.java
  14. +33 −0 src/main/java/com/yahoo/omid/tso/persistence/LoggerAsyncCallback.java
  15. +24 −0 src/main/java/com/yahoo/omid/tso/persistence/LoggerConstants.java
  16. +127 −0 src/main/java/com/yahoo/omid/tso/persistence/LoggerException.java
  17. +99 −0 src/main/java/com/yahoo/omid/tso/persistence/LoggerProtocol.java
  18. +54 −0 src/main/java/com/yahoo/omid/tso/persistence/StateBuilder.java
  19. +49 −0 src/main/java/com/yahoo/omid/tso/persistence/StateLogger.java
  20. +7 −6 src/test/java/com/yahoo/omid/OmidTestBase.java
  21. +0 −5 src/test/java/com/yahoo/omid/TestAbortTransaction.java
  22. +0 −5 src/test/java/com/yahoo/omid/TestBasicTransaction.java
  23. +95 −5 src/test/java/com/yahoo/omid/TestCompaction.java
  24. +0 −5 src/test/java/com/yahoo/omid/TestSingleColumnFamily.java
  25. +227 −162 src/test/java/com/yahoo/omid/TestTransactionConflict.java
  26. +10 −6 src/test/java/com/yahoo/omid/tso/TSOTestBase.java
  27. +0 −2  src/test/java/com/yahoo/omid/tso/TestCommitReport.java
  28. +112 −0 src/test/java/com/yahoo/omid/tso/TestPersistence.java
  29. +0 −1  src/test/java/com/yahoo/omid/tso/TestTimestamps.java
View
2  bin/omid.sh
@@ -40,7 +40,7 @@ fi
tso() {
export LD_LIBRARY_PATH=`$READLINK -f ../src/main/native`
- exec java -Xmx1024m -cp $CLASSPATH -Domid.maxItems=1000000 -Domid.maxCommits=30000000 -Djava.library.path=$LD_LIBRARY_PATH -Dlog4j.configuration=log4j.properties com.yahoo.omid.tso.TSOServer 1234 $BATCHSIZE 4 2 localhost:2181
+ exec java -Xmx1024m -cp $CLASSPATH -Domid.maxItems=1000000 -Domid.maxCommits=30000000 -Djava.library.path=$LD_LIBRARY_PATH -Dlog4j.configuration=log4j.properties com.yahoo.omid.tso.TSOServer -port 1234 -batch $BATCHSIZE -ensemble 4 -quorum 2 -zk localhost:2181
}
tsobench() {
View
45 conf/log4j.properties
@@ -0,0 +1,45 @@
+########################################################################
+#
+# Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+########################################################################
+
+#log4j.rootCategory=TRACE, R, O
+log4j.rootCategory=WARN, R, O
+
+# Stdout
+log4j.appender.O=org.apache.log4j.ConsoleAppender
+#log4j.appender.O.Threshold=WARN
+
+# File
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.File=logs/log4j.log
+
+# Control the maximum log file size
+log4j.appender.R.MaxFileSize=100MB
+
+# Clear log file each time
+log4j.appender.R.Append=false
+
+# Archive log files (one backup file here)
+log4j.appender.R.MaxBackupIndex=5
+
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.O.layout=org.apache.log4j.PatternLayout
+
+log4j.appender.R.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+log4j.appender.O.layout.ConversionPattern=[%d{ISO8601}]%5p%6.6r[%t]%x - %C.%M(%F:%L) - %m%n
+
+log4j.logger.com.yahoo.omid.tso.ThroughputMonitor=TRACE
View
33 conf/omid-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>tso.rwcheck</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>tso.wwcheck</name>
+ <value>true</value>
+ </property>
+</configuration>
View
33 pom.xml
@@ -25,19 +25,17 @@
<name>omid</name>
<url>http://maven.apache.org</url>
- <repositories>
- <repository>
- <id>apache.snapshots</id>
- <name>Temporary repo waiting for HBase 0.92.0 release</name>
- <url>https://repository.apache.org/content/groups/snapshots-group/</url>
- <snapshots>
- <enabled>true</enabled>
- </snapshots>
- </repository>
- </repositories>
-
<build>
- <plugins>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <argLine>-Xmx1G</argLine>
+ <forkMode>pertest</forkMode>
+ </configuration>
+ </plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
@@ -113,14 +111,14 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
- <version>0.92.0-SNAPSHOT</version>
+ <version>0.92.0</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
- <version>3.4.0</version>
+ <version>3.4.3</version>
<scope>compile</scope>
</dependency>
<dependency>
@@ -149,5 +147,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ <version>1.23</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
View
7 src/main/java/com/yahoo/omid/client/TransactionManager.java
@@ -228,5 +228,12 @@ private void cleanup(final TransactionState transactionState)
throw new TransactionException("Could not clean up for table " + entry.getKey(), ioe);
}
}
+ AbortCompleteCallback cb = new SyncAbortCompleteCallback();
+ try {
+ tsoclient.completeAbort(transactionState.getStartTimestamp(), cb );
+ } catch (IOException ioe) {
+ throw new TransactionException("Could not notify TSO about cleanup completion for transaction " +
+ transactionState.getStartTimestamp(), ioe);
+ }
}
}
View
11 src/main/java/com/yahoo/omid/client/TransactionalTable.java
@@ -219,29 +219,20 @@ public void delete(TransactionState transactionState, Delete delete) throws IOEx
*/
public void put(TransactionState transactionState, Put put) throws IOException, IllegalArgumentException {
final long startTimestamp = transactionState.getStartTimestamp();
-// byte[] startTSBytes = Bytes.toBytes(startTimestamp);
// create put with correct ts
final Put tsput = new Put(put.getRow(), startTimestamp);
Map<byte[], List<KeyValue>> kvs = put.getFamilyMap();
for (List<KeyValue> kvl : kvs.values()) {
for (KeyValue kv : kvl) {
-// int tsOffset = kv.getTimestampOffset();
-// System.arraycopy(startTSBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG);
tsput.add(new KeyValue(kv.getRow(), kv.getFamily(), kv.getQualifier(), startTimestamp, kv.getValue()));
}
}
// should add the table as well
transactionState.addWrittenRow(new RowKeyFamily(put.getRow(), getTableName(), put.getFamilyMap()));
+ transactionState.addRow(new RowKeyFamily(tsput.getRow(), getTableName(), tsput.getFamilyMap()));
put(tsput);
-// super.getConnection().getRegionServerWithRetries(
-// new ServerCallable<Boolean>(super.getConnection(), super.getTableName(), put.getRow()) {
-// public Boolean call() throws IOException {
-// server.put(location.getRegionInfo().getRegionName(), tsput);
-// return true;
-// }
-// });
}
/**
* Transactional version of {@link HTable#getScanner(Scan)}
View
101 src/main/java/com/yahoo/omid/tso/TSOHandler.java
@@ -61,6 +61,10 @@
import com.yahoo.omid.tso.messages.FailedElderReport;
import com.yahoo.omid.tso.messages.LargestDeletedTimestampReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
+import com.yahoo.omid.tso.persistence.LoggerException;
+import com.yahoo.omid.tso.persistence.LoggerException.Code;
+import com.yahoo.omid.tso.persistence.LoggerProtocol;
import com.yahoo.omid.IsolationLevel;
import java.util.Arrays;
import java.util.HashSet;
@@ -71,7 +75,7 @@
* @author maysam
*
*/
-public class TSOHandler extends SimpleChannelHandler implements AddCallback {
+public class TSOHandler extends SimpleChannelHandler {
private static final Log LOG = LogFactory.getLog(TSOHandler.class);
@@ -110,11 +114,11 @@
* Constructor
* @param channelGroup
*/
- public TSOHandler(ChannelGroup channelGroup, TimestampOracle to, TSOState state) {
+ public TSOHandler(ChannelGroup channelGroup, TSOState state) {
//System.out.println("This is rwcimbo with elders - no filter is installed");
//System.out.println("This is buggy rwcimbo");
this.channelGroup = channelGroup;
- this.timestampOracle = to;
+ this.timestampOracle = state.getSO();
this.sharedState = state;
this.flushThread = new FlushThread();
this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@@ -177,14 +181,13 @@ public void handle(AbortRequest msg, ChannelHandlerContext ctx) {
synchronized (sharedState) {
DataOutputStream toWAL = sharedState.toWAL;
try {
- toWAL.writeByte((byte)-3);
+ toWAL.writeByte(LoggerProtocol.ABORT);
toWAL.writeLong(msg.startTimestamp);
} catch (IOException e) {
e.printStackTrace();
}
abortCount++;
- sharedState.hashmap.setHalfAborted(msg.startTimestamp);
- sharedState.uncommited.abort(msg.startTimestamp);
+ sharedState.processAbort(msg.startTimestamp);
synchronized (sharedMsgBufLock) {
queueHalfAbort(msg.startTimestamp);
}
@@ -318,7 +321,8 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
sharedState.uncommited.commit(reply.commitTimestamp);
sharedState.uncommited.commit(msg.startTimestamp);
//c) commit the transaction
- newmax = sharedState.hashmap.setCommitted(msg.startTimestamp, reply.commitTimestamp, newmax);
+ //newmax = sharedState.hashmap.setCommitted(msg.startTimestamp, reply.commitTimestamp, newmax);
+ newmax = sharedState.processCommit(msg.startTimestamp, reply.commitTimestamp, newmax);
if (reply.wwRowsLength > 0) {//if it is supposed to be reincarnated, also map Tc to Tc just in case of a future query.
newmax = sharedState.hashmap.setCommitted(reply.commitTimestamp, reply.commitTimestamp, newmax);
}
@@ -342,10 +346,15 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
}
}
//now do the rest out of sync block to allow more concurrency
- toWAL.writeLong(reply.commitTimestamp);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Adding commit to WAL");
+ }
+ toWAL.writeByte(LoggerProtocol.COMMIT);
+ toWAL.writeLong(msg.startTimestamp);
+ toWAL.writeLong(reply.commitTimestamp);//Tc is not necessary in theory, since we abort the in-progress txn after recovery, but it makes it easier for the recovery algorithm to bypass snapshotting
//TODO: should we be able to recover the writeset of failed elders?
if (newmax > oldmax) {//I caused a raise in Tmax
- toWAL.writeByte((byte)-2);
+ toWAL.writeByte(LoggerProtocol.LARGESTDELETEDTIMESTAMP);
toWAL.writeLong(newmax);
synchronized (sharedState.hashmap) {
for (Long id : toAbort)
@@ -377,14 +386,13 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
} else { //add it to the aborted list
abortCount++;
try {
- toWAL.writeByte((byte)-3);
+ toWAL.writeByte(LoggerProtocol.ABORT);
toWAL.writeLong(msg.startTimestamp);
} catch (IOException e) {
e.printStackTrace();
}
synchronized (sharedState.hashmap) {
- sharedState.hashmap.setHalfAborted(msg.startTimestamp);
- sharedState.uncommited.abort(msg.startTimestamp);
+ sharedState.processAbort(msg.startTimestamp);
}
synchronized (sharedMsgBufLock) {
queueHalfAbort(msg.startTimestamp);
@@ -407,14 +415,32 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
TSOHandler.transferredBytes.incrementAndGet();
- //async write into WAL, callback function is addComplete
ChannelandMessage cam = new ChannelandMessage(ctx, reply);
synchronized (sharedState) {
sharedState.nextBatch.add(cam);
if (sharedState.baos.size() >= TSOState.BATCH_SIZE) {
- sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
- sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Going to add record of size " + sharedState.baos.size());
+ }
+ //sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
+ sharedState.addRecord(baos.toByteArray(), new AddRecordCallback() {
+ @Override
+ public void addRecordComplete(int rc, Object ctx) {
+ if (rc != Code.OK) {
+ LOG.warn("Write failed: " + LoggerException.getMessage(rc));
+ } else {
+ synchronized (callbackLock) {
+ @SuppressWarnings("unchecked")
+ ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
+ for (ChannelandMessage cam : theBatch) {
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
+ }
+ }
+ }
+ }, sharedState.nextBatch);
+ sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
sharedState.baos.reset();
}
}
@@ -523,7 +549,25 @@ else if (sharedState.hashmap.isHalfAborted(msg.queryTimestamp))
public void flush() {
synchronized (sharedState) {
- sharedState.lh.asyncAddEntry(sharedState.baos.toByteArray(), this, sharedState.nextBatch);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Adding record, size " + sharedState.baos.size());
+ }
+ sharedState.addRecord(baos.toByteArray(), new AddRecordCallback() {
+ @Override
+ public void addRecordComplete(int rc, Object ctx) {
+ if (rc != Code.OK) {
+ LOG.warn("Write failed: " + LoggerException.getMessage(rc));
+ } else {
+ synchronized (callbackLock) {
+ @SuppressWarnings("unchecked")
+ ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
+ for (ChannelandMessage cam : theBatch) {
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
+ }
+ }
+ }
+ }, sharedState.nextBatch);
sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
sharedState.baos.reset();
if (flushFuture.cancel(false)) {
@@ -541,6 +585,9 @@ public void run() {
if (sharedState.nextBatch.size() > 0) {
synchronized (sharedState) {
if (sharedState.nextBatch.size() > 0) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Flushing log batch.");
+ }
flush();
}
}
@@ -610,12 +657,12 @@ public void handle(FullAbortReport msg, ChannelHandlerContext ctx) {
synchronized (sharedState) {
DataOutputStream toWAL = sharedState.toWAL;
try {
- toWAL.writeByte((byte)-4);
+ toWAL.writeByte(LoggerProtocol.FULLABORT);
toWAL.writeLong(msg.startTimestamp);
} catch (IOException e) {
e.printStackTrace();
}
- sharedState.hashmap.setFullAborted(msg.startTimestamp);
+ sharedState.processFullAbort(msg.startTimestamp);
}
synchronized (sharedMsgBufLock) {
queueFullAbort(msg.startTimestamp);
@@ -637,23 +684,6 @@ public void handle(FullAbortReport msg, ChannelHandlerContext ctx) {
private Object sharedMsgBufLock = new Object();
private Object callbackLock = new Object();
- /*
- * Callback of asyncAddEntry from WAL
- */
- @Override
- public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
- // Guarantee that messages sent to the WAL are delivered in order
- if (lh != sharedState.lh)
- return;
- synchronized (callbackLock) {
- @SuppressWarnings("unchecked")
- ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
- for (ChannelandMessage cam : theBatch) {
- Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
- }
- }
- }
-
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
@@ -667,3 +697,4 @@ public void stop() {
}
+
View
115 src/main/java/com/yahoo/omid/tso/TSOServer.java
@@ -16,23 +16,13 @@
package com.yahoo.omid.tso;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@@ -46,34 +36,36 @@
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
-import com.yahoo.omid.tso.serialization.TSODecoder;
-import com.yahoo.omid.tso.serialization.TSOEncoder;
+import com.yahoo.omid.tso.persistence.BookKeeperStateBuilder;
/**
* TSO Server with serialization
*/
public class TSOServer implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(BookKeeperStateBuilder.class);
private TSOState state;
- private int port;
- private int batch;
- private int ensemble;
- private int quorum;
- private String[] zkservers;
+ private TSOServerConfig config;
private boolean finish;
private Object lock;
- public TSOServer(int port, int batch, int ensemble, int quorum, String[] zkservers) {
+ public TSOServer() {
super();
- this.port = port;
- this.batch = batch;
- this.ensemble = ensemble;
- this.quorum = quorum;
- this.zkservers = zkservers;
+ this.config = TSOServerConfig.configFactory();
+
this.finish = false;
this.lock = new Object();
}
-
+
+ public TSOServer(TSOServerConfig config) {
+ super();
+ this.config = config;
+
+ this.finish = false;
+ this.lock = new Object();
+ }
+
public TSOState getState() {
return state;
}
@@ -87,19 +79,9 @@ public TSOState getState() {
* @throws Exception
*/
public static void main(String[] args) throws Exception {
- // Print usage if no argument is specified.
- if (args.length < 1) {
- System.err.println("Usage: " + TSOServer.class.getSimpleName() + " <port>");
- return;
- }
-
- // Parse options.
- int port = Integer.parseInt(args[0]);
- int batch = Integer.parseInt(args[1]);
- int ensSize = Integer.parseInt(args[2]), qSize = Integer.parseInt(args[3]);
- String[] bookies = Arrays.copyOfRange(args, 4, args.length);
+ TSOServerConfig config = TSOServerConfig.parseConfig(args);
- new TSOServer(port, batch, ensSize, qSize, bookies).run();
+ new TSOServer(config).run();
}
@Override
@@ -113,7 +95,7 @@ public void run() {
// Create the global ChannelGroup
ChannelGroup channelGroup = new DefaultChannelGroup(TSOServer.class.getName());
// threads max
-// int maxThreads = Runtime.getRuntime().availableProcessors() *2 + 1;
+ // int maxThreads = Runtime.getRuntime().availableProcessors() *2 + 1;
int maxThreads = 5;
// Memory limitation: 1MB by channel, 1GB global, 100 ms of timeout
ThreadPoolExecutor pipelineExecutor = new OrderedMemoryAwareThreadPoolExecutor(maxThreads, 1048576, 1073741824,
@@ -121,29 +103,21 @@ public void run() {
// This is the only object of timestamp oracle
// TODO: make it singleton
- TimestampOracle timestampOracle = new TimestampOracle();
+ //TimestampOracle timestampOracle = new TimestampOracle();
// The wrapper for the shared state of TSO
- state = new TSOState(timestampOracle.get());
- TSOState.BATCH_SIZE = batch;
+ state = BookKeeperStateBuilder.getState(this.config);
+
+ if(state == null){
+ LOG.error("Couldn't build state");
+ return;
+ }
+ TSOState.BATCH_SIZE = config.getBatchSize();
System.out.println("PARAM MAX_ITEMS: " + TSOState.MAX_ITEMS);
System.out.println("PARAM BATCH_SIZE: " + TSOState.BATCH_SIZE);
System.out.println("PARAM LOAD_FACTOR: " + TSOState.LOAD_FACTOR);
System.out.println("PARAM MAX_THREADS: " + maxThreads);
- // BookKeeper stuff
- String servers = StringUtils.join(zkservers, ',');
- try {
- state.bookkeeper = new BookKeeper(servers);
- state.lh = state.bookkeeper.createLedger(ensemble, quorum, BookKeeper.DigestType.CRC32, new byte[] { 'a',
- 'b' });
- System.out.println("Ledger handle: " + state.lh.getId());
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- }
-
- final TSOHandler handler = new TSOHandler(channelGroup, timestampOracle, state);
+ final TSOHandler handler = new TSOHandler(channelGroup, state);
bootstrap.setPipelineFactory(new TSOPipelineFactory(pipelineExecutor, handler));
bootstrap.setOption("tcpNoDelay", false);
@@ -158,7 +132,7 @@ public void run() {
// Create the monitor
ThroughputMonitor monitor = new ThroughputMonitor(state);
// Add the parent channel to the group
- Channel channel = bootstrap.bind(new InetSocketAddress(port));
+ Channel channel = bootstrap.bind(new InetSocketAddress(config.getPort()));
channelGroup.add(channel);
// Compacter handler
@@ -184,7 +158,7 @@ public ChannelPipeline getPipeline() throws Exception {
comBootstrap.setOption("child.reuseAddress", true);
comBootstrap.setOption("child.connectTimeoutMillis", 100);
comBootstrap.setOption("readWriteFair", true);
- channel = comBootstrap.bind(new InetSocketAddress(port + 1));
+ channel = comBootstrap.bind(new InetSocketAddress(config.getPort() + 1));
// Starts the monitor
monitor.start();
@@ -198,9 +172,10 @@ public ChannelPipeline getPipeline() throws Exception {
}
}
- timestampOracle.stop();
+ //timestampOracle.stop();
handler.stop();
comHandler.stop();
+ state.stop();
// *** Start the Netty shutdown ***
@@ -219,28 +194,6 @@ public ChannelPipeline getPipeline() throws Exception {
factory.releaseExternalResources();
comFactory.releaseExternalResources();
}
-
- private void recoverState() throws BKException, InterruptedException, KeeperException, IOException {
- String servers = StringUtils.join(zkservers, ',');
- ZooKeeper zooKeeper = new ZooKeeper(servers, 1000, null);
- ClientConfiguration conf = new ClientConfiguration();
- BookKeeper bookKeeper = new BookKeeper(conf, zooKeeper);
-
- List<String> children = zooKeeper.getChildren("/ledgers", false);
- children.remove("available");
- if (!children.isEmpty()) {
- Collections.sort(children);
- String ledgerName = children.get(children.size());
-
- long ledgerId = Long.parseLong(ledgerName.substring(1));
-
- LedgerHandle handle = bookKeeper.openLedger(ledgerId, BookKeeper.DigestType.CRC32, new byte[] { 'a', 'b' });
- long lastEntryId = handle.getLastAddConfirmed();
-
-
- }
- state.lh = bookKeeper.createLedger(BookKeeper.DigestType.CRC32, new byte[] { 'a', 'b' });
- }
public void stop() {
finish = true;
View
118 src/main/java/com/yahoo/omid/tso/TSOServerConfig.java
@@ -0,0 +1,118 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.tso;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+
+/**
+ * Holds the configuration parameters of a TSO server instance.
+ *
+ */
+public class TSOServerConfig {
+
+ private static TSOServerConfig config;
+
+ static public TSOServerConfig configFactory(){
+ if(config == null){
+ config = new TSOServerConfig();
+ }
+
+ return config;
+ }
+
+ static public TSOServerConfig configFactory(int port, int batch, boolean recoveryEnabled, int ensSize, int qSize, String zkservers){
+ if(config == null){
+ config = new TSOServerConfig(port, batch, recoveryEnabled, ensSize, qSize, zkservers);
+ }
+
+ return config;
+ }
+
+ static public TSOServerConfig parseConfig(String args[]){
+ config = new TSOServerConfig();
+
+ if (args.length == 0) {
+ new JCommander(config).usage();
+ System.exit(0);
+ }
+
+ new JCommander(config, args);
+
+ return config;
+ }
+
+ @Parameter(names = "-port", description = "Port reserved by the Status Oracle", required = true)
+ private int port;
+
+ @Parameter(names = "-batch", description = "Threshold for the batch sent to the WAL")
+ private int batch;
+
+ @Parameter(names = "-ha", description = "Highly Available status oracle: logs operations to the WAL and recovers from a crash")
+ private boolean recoveryEnabled;
+
+ @Parameter(names = "-zk", description = "ZooKeeper ensemble: host1:port1,host2:port2...")
+ private String zkServers;
+
+ @Parameter(names = "-ensemble", description = "WAL ensemble size")
+ private int ensemble;
+
+ @Parameter(names = "-quorum", description = "WAL quorum size")
+ private int quorum;
+
+ TSOServerConfig(){
+ this.port = Integer.parseInt(System.getProperty("PORT", "1234"));
+ this.batch = Integer.parseInt(System.getProperty("BATCH", "0"));
+ this.recoveryEnabled = Boolean.parseBoolean(System.getProperty("RECOVERABLE", "false"));
+ this.zkServers = System.getProperty("ZKSERVERS");
+ this.ensemble = Integer.parseInt(System.getProperty("ENSEMBLE", "3"));
+ this.quorum = Integer.parseInt(System.getProperty("QUORUM", "2"));
+ }
+
+ TSOServerConfig(int port, int batch, boolean recoveryEnabled, int ensemble, int quorum, String zkServers){
+ this.port = port;
+ this.batch = batch;
+ this.recoveryEnabled = recoveryEnabled;
+ this.zkServers = zkServers;
+ this.ensemble = ensemble;
+ this.quorum = quorum;
+ }
+
+ public int getPort(){
+ return port;
+ }
+
+ public int getBatchSize(){
+ return batch;
+ }
+
+ public boolean isRecoveryEnabled(){
+ return recoveryEnabled;
+ }
+
+ public String getZkServers(){
+ return zkServers;
+ }
+
+ public int getEnsembleSize(){
+ return ensemble;
+ }
+
+ public int getQuorumSize(){
+ return quorum;
+ }
+}
View
127 src/main/java/com/yahoo/omid/tso/TSOState.java
@@ -19,15 +19,16 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.TreeSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
+import com.yahoo.omid.tso.persistence.LoggerException.Code;
+import com.yahoo.omid.tso.persistence.BookKeeperStateBuilder;
+import com.yahoo.omid.tso.persistence.StateLogger;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/**
* The wrapper for different states of TSO
@@ -35,6 +36,9 @@
* @author maysam
*/
public class TSOState {
+ private static final Log LOG = LogFactory.getLog(TSOState.class);
+
+
/**
* The maximum entries kept in TSO
*/
@@ -70,12 +74,34 @@
/**
* Hash map load factor
*/
-// static final public float LOAD_FACTOR = 0.2f;
static final public float LOAD_FACTOR = 0.5f;
+
+ /**
+ * Object that implements the logic to log records
+ * for recoverability
+ */
+
+ StateLogger logger;
+ public StateLogger getLogger(){
+ return logger;
+ }
+
+ public void setLogger(StateLogger logger){
+ this.logger = logger;
+ }
+
/**
- * Largest Deleted Timestamp from the hashmap.committed list
- * this timestmap must be reported to the client
+ * Only timestamp oracle instance in the system.
+ */
+ private TimestampOracle timestampOracle;
+
+ protected TimestampOracle getSO(){
+ return timestampOracle;
+ }
+
+ /**
+ * Largest Deleted Timestamp
*/
public long largestDeletedTimestamp = 0;
@@ -87,7 +113,7 @@
public TSOSharedMessageBuffer sharedMessageBuffer = new TSOSharedMessageBuffer(this);
/**
- * The hash map to to keep track of recetly committed rows
+ * The hash map to to keep track of recently committed rows
* each bucket is about 20 byte, so the initial capacity is 20MB
*/
public CommitHashMap hashmap = new CommitHashMap(MAX_ITEMS, LOAD_FACTOR);
@@ -98,11 +124,67 @@
public Elders elders;
/**
- * Reference to BookKeeper
+ * Process commit request.
+ *
+ * @param startTimestamp
+ */
+ protected long processCommit(long startTimestamp, long commitTimestamp, long newmax){
+ newmax = hashmap.setCommitted(startTimestamp, commitTimestamp, newmax);
+ return newmax;
+ }
+
+ /**
+ * Process largest deleted timestamp.
+ *
+ * @param largestDeletedTimestamp
*/
- public BookKeeper bookkeeper;
- LedgerHandle lh;
+ protected synchronized void processLargestDeletedTimestamp(long largestDeletedTimestamp){
+ this.largestDeletedTimestamp = Math.max(largestDeletedTimestamp, this.largestDeletedTimestamp);
+ }
+
+ /**
+ * Process abort request.
+ *
+ * @param startTimestamp
+ */
+ protected void processAbort(long startTimestamp){
+ hashmap.setHalfAborted(startTimestamp);
+ uncommited.abort(startTimestamp);
+ }
+
+ /**
+ * Process full abort report.
+ *
+ * @param startTimestamp
+ */
+ protected void processFullAbort(long startTimestamp){
+ hashmap.setFullAborted(startTimestamp);
+ }
+ /**
+ * If logger is disabled, then this call is a noop.
+ *
+ * @param record
+ * @param cb
+ * @param ctx
+ */
+ public void addRecord(byte[] record, final AddRecordCallback cb, Object ctx) {
+ if(logger != null){
+ logger.addRecord(record, cb, ctx);
+ } else{
+ cb.addRecordComplete(Code.OK, ctx);
+ }
+ }
+
+ /**
+ * Closes this state object.
+ */
+ void stop(){
+ if(logger != null){
+ logger.shutdown();
+ }
+ }
+
/*
* WAL related pointers
*/
@@ -111,10 +193,19 @@
public DataOutputStream toWAL = new DataOutputStream(baos);
public List<TSOHandler.ChannelandMessage> nextBatch = new ArrayList<TSOHandler.ChannelandMessage>();
- public TSOState(long largestDeletedTimestamp) {
- this.largestDeletedTimestamp = largestDeletedTimestamp;
- this.uncommited = new Uncommited(largestDeletedTimestamp);
- this.elders = new Elders();
+ public TSOState(StateLogger logger, TimestampOracle timestampOracle) {
+ this.timestampOracle = timestampOracle;
+ this.largestDeletedTimestamp = this.previousLargestDeletedTimestamp = this.timestampOracle.get();
+ this.uncommited = new Uncommited(largestDeletedTimestamp);
+ this.logger = logger;
}
+
+ public TSOState(TimestampOracle timestampOracle) {
+ this.timestampOracle = timestampOracle;
+ this.largestDeletedTimestamp = timestampOracle.get();
+ this.uncommited = new Uncommited(largestDeletedTimestamp);
+ this.elders = new Elders();
+ this.logger = null;
+ }
}
View
56 src/main/java/com/yahoo/omid/tso/TimestampOracle.java
@@ -28,12 +28,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.yahoo.omid.tso.persistence.LoggerProtocol;
+
/**
* The Timestamp Oracle that gives monotonically increasing timestamps
*
* @author maysam
*
*/
+
public class TimestampOracle {
private static final Log LOG = LogFactory.getLog(TimestampOracle.class);
@@ -48,12 +51,7 @@
private long last;
private long first;
- private boolean finish;
-
- // public long next() {
- // last ++;
- // return last;
- // }
+ private boolean enabled;
/**
* Must be called holding an exclusive lock
@@ -63,10 +61,14 @@
public long next(DataOutputStream toWal) throws IOException {
last++;
if (last >= maxTimestamp) {
+ toWal.writeByte(LoggerProtocol.TIMESTAMPORACLE);
toWal.writeLong(last);
- toWal.writeByte((byte) -1);
maxTimestamp += TIMESTAMP_BATCH;
}
+ if(LOG.isTraceEnabled()){
+ LOG.trace("Next timestamp: " + last);
+ }
+
return last;
}
@@ -78,14 +80,44 @@ public long first() {
return first;
}
- private static final String BACKUP = "/tmp/tso-persist.backup";
- private static final String PERSIST = "/tmp/tso-persist.txt";
+ //private static final String BACKUP = "/tmp/tso-persist.backup";
+ //private static final String PERSIST = "/tmp/tso-persist.txt";
+
+ /**
+ * Constructor
+ */
+ public TimestampOracle(){
+ this.enabled = false;
+ //make sure you do not use timestamp 0. It triggers a bug in HBase. Since the first time we use last++, initializing it to 0 is fine.
+ this.last = 0;
+ }
+
+ /**
+ * Starts from scratch.
+ */
+ public void initialize(){
+ this.enabled = true;
+ }
+
+ /**
+ * Starts with a given timestamp.
+ *
+ * @param timestamp
+ */
+ public void initialize(long timestamp){
+ LOG.info("Initializing timestamp oracle");
+ this.last = this.first = Math.max(this.last, TIMESTAMP_BATCH);
+ maxTimestamp = this.first;
+ LOG.info("First: " + this.first + ", Last: " + this.last);
+ initialize();
+ }
+
/**
* Constructor initialize the last timestamp TODO: initialize from the last
* persisted timestamp + wallclock time
*/
- public TimestampOracle() {
+ /* public TimestampOracle() {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(PERSIST));
@@ -150,9 +182,9 @@ public void run() {
});
flusher.start();
}
-
+*/
public void stop() {
- this.finish = true;
+ this.enabled = false;
}
@Override
View
369 src/main/java/com/yahoo/omid/tso/persistence/BookKeeperStateBuilder.java
@@ -0,0 +1,369 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package com.yahoo.omid.tso.persistence;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.yahoo.omid.tso.TSOServerConfig;
+import com.yahoo.omid.tso.TSOState;
+import com.yahoo.omid.tso.TimestampOracle;
+import com.yahoo.omid.tso.persistence.BookKeeperStateLogger.LedgerIdCreateCallback;
+import com.yahoo.omid.tso.persistence.BookKeeperStateLogger.LoggerWatcher;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.BuilderInitCallback;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.LoggerInitCallback;
+import com.yahoo.omid.tso.persistence.LoggerException.Code;
+
+
+/**
+ * Builds the TSO state from a BookKeeper ledger if there has been a previous
+ * incarnation of TSO. Note that we need to make sure that the zookeeper session
+ * is the same across builder and logger, so we create in builder and pass it
+ * to logger. This is the case to prevent two TSO instances from creating a lock
+ * and updating the ledger id after having lost the lock. This case constitutes
+ * leads to an invalid system state.
+ *
+ */
+
+public class BookKeeperStateBuilder extends StateBuilder {
+ private static final Log LOG = LogFactory.getLog(BookKeeperStateBuilder.class);
+
+ /*
+ * Assuming that each entry is 1k bytes, we read 50k bytes at each call.
+ * It provides a good degree of parallelism.
+ */
+ private static final long BKREADBATCHSIZE = 50;
+
+ public static TSOState getState(TSOServerConfig config){
+ TSOState returnValue;
+ if(config.getZkServers() == null){
+ LOG.warn("Logger is disabled");
+ returnValue = new TSOState(new TimestampOracle());
+ } else {
+ BookKeeperStateBuilder builder = new BookKeeperStateBuilder(config);
+
+ try{
+ returnValue = builder.buildState();
+ LOG.info("State built");
+ } catch (Throwable e) {
+ LOG.error("Error while building the state.", e);
+ returnValue = null;
+ } finally {
+ builder.shutdown();
+ }
+ }
+ return returnValue;
+ }
+
+ TimestampOracle timestampOracle;
+ ZooKeeper zk;
+ LoggerProtocol lp;
+ boolean enabled;
+ TSOServerConfig config;
+
+ BookKeeperStateBuilder(TSOServerConfig config) {
+ this.timestampOracle = new TimestampOracle();
+ this.config = config;
+ }
+
+ /**
+ * Context objects for callbacks.
+ *
+ */
+ class Context {
+ TSOState state = null;
+ boolean ready = false;
+ boolean hasState = false;
+ boolean hasLogger = false;
+ StateLogger logger;
+
+ synchronized void setState(TSOState state){
+ this.state = state;
+ hasState = true;
+ validate();
+ }
+
+ synchronized void setLogger(StateLogger logger){
+ this.logger = logger;
+ hasLogger = true;
+ validate();
+ }
+
+ synchronized private void validate(){
+ if(logger != null && state != null){
+ state.setLogger(logger);
+ }
+
+ if(hasLogger && hasState){
+ this.ready = true;
+ notify();
+ }
+ }
+
+ synchronized boolean isReady(){
+ return ready;
+ }
+ }
+
+ class LoggerWatcher implements Watcher{
+ CountDownLatch latch;
+
+ LoggerWatcher(CountDownLatch latch){
+ this.latch = latch;
+ }
+ public void process(WatchedEvent event){
+ if(event.getState() != Watcher.Event.KeeperState.SyncConnected)
+ shutdown();
+ else
+ latch.countDown();
+ }
+ }
+
+
+ class LockCreateCallback implements StringCallback {
+
+ public void processResult(int rc, String path, Object ctx, String name){
+ if(rc != Code.OK){
+ LOG.warn("Failed to create lock znode: " + path);
+ ((BookKeeperStateBuilder.Context) ctx).setState(null);
+ } else {
+ zk.getData(LoggerConstants.OMID_LEDGER_ID_PATH,
+ false,
+ new LedgerIdReadCallback(),
+ ctx);
+ }
+ }
+
+ }
+
+ class LedgerIdReadCallback implements DataCallback {
+
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat){
+ if(rc == Code.OK){
+ buildStateFromLedger(data, ctx);
+ } else if (rc == KeeperException.Code.NONODE.intValue()) {
+ LOG.warn("No node exists. " + KeeperException.Code.get(rc).toString());
+ TSOState tempState;
+ try{
+ tempState = new TSOState(timestampOracle);
+ } catch (Exception e) {
+ LOG.error("Error while creating state logger.", e);
+ tempState = null;
+ }
+ ((BookKeeperStateBuilder.Context) ctx).setState(tempState);
+ } else {
+ LOG.warn("Failed to read data. " + KeeperException.Code.get(rc).toString());
+ ((BookKeeperStateBuilder.Context) ctx).setState(null);
+ }
+ }
+ }
+
+ /**
+ * Invoked after the execution of a ledger read. Instances are
+ * created in the open callback.
+ *
+ */
+ class LoggerExecutor implements ReadCallback {
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx){
+ if(rc != BKException.Code.OK){
+ LOG.error("Error while reading ledger entries." + BKException.getMessage(rc));
+ ((BookKeeperStateBuilder.Context) ctx).setState(null);
+ } else {
+ while(entries.hasMoreElements()){
+ LedgerEntry le = entries.nextElement();
+ lp.execute(ByteBuffer.wrap(le.getEntry()));
+
+ if(le.getEntryId() == 0){
+ ((BookKeeperStateBuilder.Context) ctx).setState(lp.getState());
+ }
+ }
+ }
+ }
+ }
+
+ BookKeeper bk;
+
+ @Override
+ public TSOState buildState()
+ throws LoggerException {
+ try{
+ CountDownLatch latch = new CountDownLatch(1);
+
+ this.zk = new ZooKeeper(config.getZkServers(),
+ Integer.parseInt(System.getProperty("SESSIONTIMEOUT", Integer.toString(10000))),
+ new LoggerWatcher(latch));
+
+ latch.await();
+ } catch (Exception e) {
+ LOG.error("Exception while starting zookeeper client", e);
+ this.zk = null;
+ throw LoggerException.create(Code.ZKOPFAILED);
+ }
+
+ LOG.info("Creating bookkeeper client");
+
+ try{
+ bk = new BookKeeper(new ClientConfiguration(), this.zk);
+ } catch (Exception e) {
+ LOG.error("Error while creating bookkeeper object", e);
+ return null;
+ }
+
+
+ /*
+ * Create ZooKeeper lock
+ */
+
+ Context ctx = new Context();
+
+ zk.create(LoggerConstants.OMID_LOCK_PATH,
+ new byte[0],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL,
+ new LockCreateCallback(),
+ ctx);
+
+ new BookKeeperStateLogger(zk).initialize(new LoggerInitCallback(){
+ public void loggerInitComplete(int rc, StateLogger sl, Object ctx){
+ if(rc == Code.OK){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Logger is ok.");
+ }
+ ((Context) ctx).setLogger(sl);
+ } else {
+ LOG.error("Error when initializing logger: " + LoggerException.getMessage(rc));
+ }
+ }
+
+ }, ctx);
+
+ try{
+ synchronized(ctx){
+ int counter = 0;
+ while(!ctx.isReady() || (counter > 10)){
+ ctx.wait(1000);
+ counter++;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for state to build up.", e);
+ ctx.setState(null);
+ }
+
+ return ctx.state;
+ }
+
+
+ /**
+ * Builds state from a ledger.
+ *
+ *
+ * @param data
+ * @param ctx
+ * @return
+ */
+ private void buildStateFromLedger(byte[] data, Object ctx){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Building state from ledger");
+ }
+
+ if(data == null){
+ LOG.error("No data on znode, can't determine ledger id");
+ ((BookKeeperStateBuilder.Context) ctx).setState(null);
+ }
+
+ /*
+ * Instantiates LoggerProtocol
+ */
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Creating logger protocol object");
+ }
+ try{
+ this.lp = new LoggerProtocol(timestampOracle);
+ } catch (Exception e) {
+ LOG.error("Error while creating state logger for logger protocol.", e);
+ ((BookKeeperStateBuilder.Context) ctx).setState(null);
+ }
+
+ /*
+ * Open ledger for reading.
+ */
+
+ ByteBuffer bb = ByteBuffer.wrap(data);
+ long ledgerId = bb.getLong();
+
+ bk.asyncOpenLedger(ledgerId, BookKeeper.DigestType.CRC32,
+ "flavio was here".getBytes(),
+ new OpenCallback(){
+ public void openComplete(int rc, LedgerHandle lh, Object ctx){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Open complete, ledger id: " + lh.getId());
+ }
+ if(rc != BKException.Code.OK){
+ LOG.error("Could not open ledger for reading." + BKException.getMessage(rc));
+ ((BookKeeperStateBuilder.Context) ctx).setState(null);
+ } else {
+ long counter = lh.getLastAddConfirmed();
+ while(counter >= 0){
+ long nextBatch = Math.max(counter - BKREADBATCHSIZE + 1, 0);
+ lh.asyncReadEntries(nextBatch, counter, new LoggerExecutor(), ctx);
+ counter -= BKREADBATCHSIZE;
+ }
+ }
+ }
+ }, ctx);
+
+ }
+
+ /**
+ * Disables this builder.
+ */
+ public void shutdown(){
+ this.enabled = false;
+ try {
+ this.zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error while shutting down", e);
+ }
+ }
+}
View
268 src/main/java/com/yahoo/omid/tso/persistence/BookKeeperStateLogger.java
@@ -0,0 +1,268 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package com.yahoo.omid.tso.persistence;
+
+
+/**
+ * BookKeeper implementation of StateLogger.
+ */
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import com.yahoo.omid.tso.TSOHandler;
+import com.yahoo.omid.tso.TSOServerConfig;
+import com.yahoo.omid.tso.persistence.StateLogger;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.LoggerInitCallback;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.BuilderInitCallback;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
+import com.yahoo.omid.tso.persistence.LoggerConstants;
+import com.yahoo.omid.tso.persistence.LoggerException.Code;
+
+class BookKeeperStateLogger implements StateLogger {
+ private static final Log LOG = LogFactory.getLog(BookKeeperStateLogger.class);
+
+ private ZooKeeper zk;
+ private BookKeeper bk;
+ private LedgerHandle lh;
+
+
+
+ /**
+ * We try to acquire a lock for this primary first. If we succeed, then we check
+ * if there is a ledger to recover from.
+ *
+ * The next two classes implement asynchronously the sequence of
+ * operations to write the ledger id.
+ */
+
+
+ class LedgerIdCreateCallback implements StringCallback {
+ LoggerInitCallback cb;
+ byte[] ledgerId;
+
+ LedgerIdCreateCallback (LoggerInitCallback cb, byte[] ledgerId){
+ this.cb = cb;
+ this.ledgerId = ledgerId;
+ }
+
+ public void processResult(int rc, String path, Object ctx, String name){
+ if(rc == KeeperException.Code.OK.intValue()){
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Created znode succesfully: " + name);
+ }
+
+ BookKeeperStateLogger.this.enabled = true;
+ cb.loggerInitComplete(Code.OK, BookKeeperStateLogger.this, ctx);
+ } else if(rc != KeeperException.Code.NODEEXISTS.intValue()){
+ LOG.warn("Node exists: " + name);
+ cb.loggerInitComplete(Code.INITLOCKFAILED, BookKeeperStateLogger.this, ctx);
+ } else {
+ zk.setData(LoggerConstants.OMID_LEDGER_ID_PATH,
+ ledgerId,
+ -1,
+ new LedgerIdSetCallback(cb),
+ ctx);
+ }
+ }
+ }
+
+ class LedgerIdSetCallback implements StatCallback {
+ LoggerInitCallback cb;
+
+ LedgerIdSetCallback (LoggerInitCallback cb){
+ this.cb = cb;
+ }
+
+ public void processResult(int rc, String path, Object ctx, Stat stat){
+ if(rc == KeeperException.Code.OK.intValue()){
+ LOG.debug("Set ledger id");
+ BookKeeperStateLogger.this.enabled = true;
+ cb.loggerInitComplete(Code.OK, BookKeeperStateLogger.this, ctx);
+ } else {
+ cb.loggerInitComplete(Code.ZKOPFAILED, BookKeeperStateLogger.this, ctx);
+ }
+ }
+
+ }
+
+ /**
+ * Flag to determine whether this logger is operating or not.
+ */
+ boolean enabled = false;
+
+ /**
+ * Constructor creates a zookeeper and a bookkeeper objects.
+ */
+ BookKeeperStateLogger(ZooKeeper zk) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Constructing Logger");
+ }
+
+ this.zk = zk;
+ }
+
+ /**
+ * Watcher for the zookeeper object.
+ */
+
+ class LoggerWatcher implements Watcher{
+ public void process(WatchedEvent event){
+ if(event.getState() != Watcher.Event.KeeperState.SyncConnected)
+ shutdown();
+ }
+ }
+
+
+ /**
+ * Initializes this logger object to add records. Implements the initialize
+ * method of the StateLogger interface.
+ *
+ * @param cb
+ * @param ctx
+ */
+
+ @Override
+ public void initialize(final LoggerInitCallback cb, Object ctx)
+ throws LoggerException {
+ TSOServerConfig config = TSOServerConfig.configFactory();
+
+ /*
+ * Create new ledger for adding records
+ */
+ try{
+ bk = new BookKeeper(new ClientConfiguration(), zk);
+ } catch (Exception e) {
+ LOG.error("Exception while initializing bookkeeper", e);
+ throw new LoggerException.BKOpFailedException();
+ }
+
+ bk.asyncCreateLedger(config.getEnsembleSize(),
+ config.getQuorumSize(),
+ BookKeeper.DigestType.CRC32,
+ "flavio was here".getBytes(),
+ new CreateCallback(){
+ @Override
+ public void createComplete(int rc, LedgerHandle lh, Object ctx){
+ if(rc == BKException.Code.OK){
+ try{
+ BookKeeperStateLogger.this.lh = lh;
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeLong(lh.getId());
+
+ zk.create(LoggerConstants.OMID_LEDGER_ID_PATH,
+ bos.toByteArray(),
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ new LedgerIdCreateCallback(cb, bos.toByteArray()),
+ ctx);
+ } catch (IOException e) {
+ LOG.error("Failed to write to zookeeper. ", e );
+ cb.loggerInitComplete(Code.BKOPFAILED, BookKeeperStateLogger.this, ctx);
+ }
+ } else {
+ LOG.error("Failed to create ledger. " + BKException.getMessage(rc));
+ cb.loggerInitComplete(Code.BKOPFAILED, BookKeeperStateLogger.this, ctx);
+ }
+ }
+ }, ctx);
+ }
+
+
+ /**
+ * Adds a record to the log of operations. The record is a byte array.
+ *
+ * @param record
+ * @param cb
+ * @param ctx
+ */
+ @Override
+ public void addRecord(byte[] record, final AddRecordCallback cb, Object ctx) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Adding record.");
+ }
+
+ if(!enabled){
+ cb.addRecordComplete(Code.LOGGERDISABLED, ctx);
+ return;
+ }
+
+ this.lh.asyncAddEntry(record,
+ new AddCallback() {
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if(LOG.isDebugEnabled()){
+ LOG.info("Add to ledger complete: " + lh.getId() + ", " + entryId);
+ }
+ if (rc != BKException.Code.OK) {
+ LOG.error("Asynchronous add entry failed: " + BKException.getMessage(rc));
+ cb.addRecordComplete(Code.ADDFAILED, ctx);
+ } else {
+ cb.addRecordComplete(Code.OK, ctx);
+ }
+ }
+ }, ctx);
+ }
+
+
+ /**
+ * Shuts down this logger.
+ *
+ */
+ public void shutdown(){
+ enabled = false;
+ try{
+ try{
+ if(zk.getState() == ZooKeeper.States.CONNECTED){
+ zk.delete(LoggerConstants.OMID_LEDGER_ID_PATH, -1);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while deleting lock znode", e);
+ }
+ if(this.bk != null) bk.close();
+ if(this.zk != null) zk.close();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while closing logger.", e);
+ } catch (BKException e) {
+ LOG.warn("Exception while closing BookKeeper object.", e);
+ }
+ }
+
+}
View
33 src/main/java/com/yahoo/omid/tso/persistence/LoggerAsyncCallback.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.tso.persistence;
+
+import com.yahoo.omid.tso.TSOState;
+
+public interface LoggerAsyncCallback {
+ public interface LoggerInitCallback{
+ void loggerInitComplete(int rc, StateLogger sl, Object ctx);
+ }
+
+ public interface BuilderInitCallback{
+ void builderInitComplete(int rc, TSOState state, Object ctx);
+ }
+
+ public interface AddRecordCallback {
+ void addRecordComplete(int rc, Object ctx);
+ }
+}
View
24 src/main/java/com/yahoo/omid/tso/persistence/LoggerConstants.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.tso.persistence;
+
+public interface LoggerConstants {
+
+ final static String OMID_LOCK_PATH = "/omid_lock";
+ final static String OMID_LEDGER_ID_PATH = "/omid_ledger_id";
+
+}
View
127 src/main/java/com/yahoo/omid/tso/persistence/LoggerException.java
@@ -0,0 +1,127 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package com.yahoo.omid.tso.persistence;
+
+
+@SuppressWarnings("serial")
+public abstract class LoggerException extends Exception {
+
+ private int code;
+
+ LoggerException(int code) {
+ this.code = code;
+ }
+
+ public interface Code {
+ int OK = 0;
+ int ADDFAILED = -1;
+ int INITLOCKFAILED = -2;
+ int BKOPFAILED = -3;
+ int ZKOPFAILED = -4;
+ int LOGGERDISABLED = -5;
+
+ int ILLEGALOP = -101;
+ }
+
+ /**
+ * Create an exception from an error code
+ * @param code return error code
+ * @return corresponding exception
+ */
+ public static LoggerException create(int code) {
+ switch(code) {
+ case Code.ADDFAILED:
+ return new AddFailedException();
+ case Code.INITLOCKFAILED:
+ return new InitLockFailedException();
+ case Code.BKOPFAILED:
+ return new BKOpFailedException();
+ case Code.ZKOPFAILED:
+ return new ZKOpFailedException();
+ case Code.LOGGERDISABLED:
+ return new LoggerDisabledException();
+ default:
+ return new IllegalOpException();
+ }
+
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return this.code;
+ }
+
+ public static String getMessage(int code) {
+ switch (code) {
+ case Code.OK:
+ return "No problem";
+ case Code.ADDFAILED:
+ return "Error while reading ledger";
+ case Code.INITLOCKFAILED:
+ return "Failed to obtain zookeeper lock";
+ case Code.BKOPFAILED:
+ return "BookKeeper operation failed";
+ case Code.ZKOPFAILED:
+ return "ZooKeeper operation failed";
+ case Code.LOGGERDISABLED:
+ return "Logger disabled";
+ default:
+ return "Invalid operation";
+ }
+ }
+
+ public static class AddFailedException extends LoggerException {
+ public AddFailedException() {
+ super(Code.ADDFAILED);
+ }
+ }
+
+ public static class InitLockFailedException extends LoggerException {
+ public InitLockFailedException() {
+ super(Code.INITLOCKFAILED);
+ }
+ }
+
+ public static class LoggerDisabledException extends LoggerException {
+ public LoggerDisabledException() {
+ super(Code.LOGGERDISABLED);
+ }
+ }
+
+ public static class BKOpFailedException extends LoggerException {
+ public BKOpFailedException() {
+ super(Code.BKOPFAILED);
+ }
+ }
+
+ public static class ZKOpFailedException extends LoggerException {
+ public ZKOpFailedException() {
+ super(Code.ZKOPFAILED);
+ }
+ }
+
+ public static class IllegalOpException extends LoggerException {
+ public IllegalOpException() {
+ super(Code.ILLEGALOP);
+ }
+ }
+
+}
View
99 src/main/java/com/yahoo/omid/tso/persistence/LoggerProtocol.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.tso.persistence;
+
+import java.nio.ByteBuffer;
+
+import com.yahoo.omid.tso.TSOState;
+import com.yahoo.omid.tso.TimestampOracle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class LoggerProtocol extends TSOState{
+ private static final Log LOG = LogFactory.getLog(LoggerProtocol.class);
+
+ /*
+ * Protocol flags. Used to identify fields of the logger records.
+ */
+ public final static byte TIMESTAMPORACLE = (byte) -1;
+ public final static byte COMMIT = (byte) -2;
+ public final static byte LARGESTDELETEDTIMESTAMP = (byte) -3;
+ public final static byte ABORT = (byte) -4;
+ public final static byte FULLABORT = (byte) -5;
+
+
+ /**
+ * Logger protocol constructor. Currently it only constructs the
+ * super class, TSOState.
+ *
+ * @param logger
+ * @param largestDeletedTimestamp
+ */
+ LoggerProtocol(TimestampOracle timestampOracle){
+ super(timestampOracle);
+ }
+
+ void execute(ByteBuffer bb){
+ boolean done = false;
+ while(!done){
+ byte op = bb.get();
+ long timestamp, startTimestamp, commitTimestamp;
+ if(LOG.isTraceEnabled()){
+ LOG.trace("Operation: " + op);
+ }
+ switch(op){
+ case TIMESTAMPORACLE:
+ timestamp = bb.getLong();
+ this.getSO().initialize(timestamp);
+ break;
+ case COMMIT:
+ startTimestamp = bb.getLong();
+ commitTimestamp = bb.getLong();
+ processCommit(startTimestamp, commitTimestamp);
+
+ break;
+ case LARGESTDELETEDTIMESTAMP:
+ timestamp = bb.getLong();
+ processLargestDeletedTimestamp(timestamp);
+
+ break;
+ case ABORT:
+ timestamp = bb.getLong();
+ processAbort(timestamp);
+
+ break;
+ case FULLABORT:
+ timestamp = bb.getLong();
+ processFullAbort(timestamp);
+
+ break;
+ }
+ if(bb.remaining() == 0) done = true;
+ }
+ }
+
+ /**
+ * Returns a TSOState object based on this object.
+ *
+ * @return
+ */
+ TSOState getState(){
+ return ((TSOState) this);
+ }
+
+}
View
54 src/main/java/com/yahoo/omid/tso/persistence/StateBuilder.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+
+package com.yahoo.omid.tso.persistence;
+
+
+import com.yahoo.omid.tso.TSOState;
+
+/**
+ * This is an interface for mechanisms that implement
+ * the recovery of the TSO state.
+ *
+ */
+
+public abstract class StateBuilder {
+
+ /**
+ * Logger protocol object. Implements the logic to execute
+ * state taken out of log records.
+ */
+ LoggerProtocol protocol;
+
+ /**
+ * This call should create a new TSOState object and populate
+ * it accordingly. If there was an incarnation of TSO in the past,
+ * then this call recovers the state to populate the TSOState
+ * object.
+ *
+ *
+ * @return a new TSOState
+ */
+ abstract TSOState buildState()
+ throws LoggerException;
+
+ /**
+ * Shuts down state builder.
+ */
+ abstract void shutdown();
+
+}
View
49 src/main/java/com/yahoo/omid/tso/persistence/StateLogger.java
@@ -0,0 +1,49 @@
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.tso.persistence;
+
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.LoggerInitCallback;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
+
+public interface StateLogger {
+
+ /**
+ * Initializes the storage subsystem to add
+ * new records.
+ *
+ * @param cb
+ * @param ctx
+ */
+ void initialize(LoggerInitCallback cb, Object ctx) throws LoggerException;
+
+
+ /**
+ * Add a new record.
+ *
+ * @param record
+ * @param cb
+ * @param ctx
+ */
+ void addRecord(byte[] record, AddRecordCallback cb, Object ctx);
+
+
+ /**
+ * Shut down logger.
+ */
+ void shutdown();
+
+}
View
13 src/test/java/com/yahoo/omid/OmidTestBase.java
@@ -76,12 +76,13 @@ public void run() {
tsothread = new Thread("TSO Thread") {
public void run() {
try {
- String[] args = new String[5];
- args[0] = "1234";
- args[1] = "100";
- args[2] = "4";
- args[3] = "2";
- args[4] = "localhost:2181";
+ String[] args = new String[] {
+ "-port", "1234",
+ "-batch", "100",
+ "-ensemble", "4",
+ "-quorum", "2",
+ "-zk", "localhost:2181"
+ };
TSOServer.main(args);
} catch (InterruptedException e) {
// go away quietly
View
5 src/test/java/com/yahoo/omid/TestAbortTransaction.java
@@ -4,16 +4,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import com.yahoo.omid.client.TransactionManager;
View
5 src/test/java/com/yahoo/omid/TestBasicTransaction.java
@@ -20,18 +20,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import com.yahoo.omid.client.TransactionManager;
View
100 src/test/java/com/yahoo/omid/TestCompaction.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
@@ -37,14 +39,14 @@
public class TestCompaction extends OmidTestBase {
private static final Log LOG = LogFactory.getLog(TestCompaction.class);
- @Test public void runDeleteOld() throws Exception {
+ @Test public void testDeleteOld() throws Exception {
try {
TransactionManager tm = new TransactionManager(conf);
TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
-
+
TransactionState t1 = tm.beginTransaction();
LOG.info("Transaction created " + t1);
-
+
byte[] row = Bytes.toBytes("test-simple");
byte[] fam = Bytes.toBytes(TEST_FAMILY);
byte[] col = Bytes.toBytes("testdata");
@@ -87,7 +89,7 @@
g.addColumn(fam, col2);
Result r = tt.get(g);
int size = r.getColumn(fam, col2).size();
- System.out.println("Size before compaction : " + size);
+ LOG.info("Size before compaction : " + size);
admin.compact(TEST_TABLE);
@@ -103,11 +105,99 @@
g.setMaxVersions();
g.addColumn(fam, col2);
r = tt.get(g);
- System.out.println("Size after compaction : " + r.getColumn(fam, col2).size());
+ LOG.info("Size after compaction " + r.getColumn(fam, col2).size());
assertThat(r.getColumn(fam, col2).size(), is(lessThan(size)));
} catch (Exception e) {
LOG.error("Exception occurred", e);
throw e;
}
}
+
+ @Test public void testLimitEqualToColumns() throws Exception {
+ try {
+ TransactionManager tm = new TransactionManager(conf);
+ TransactionalTable tt = new TransactionalTable(conf, TEST_TABLE);
+
+ TransactionState t1 = tm.beginTransaction();
+
+ byte[] row = Bytes.toBytes("test-simple");
+ byte[] row2 = Bytes.toBytes("test-simple2");
+ byte[] row3 = Bytes.toBytes("test-simple3");
+ byte[] row4 = Bytes.toBytes("test-simple4");
+ byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ byte[] col = Bytes