Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Remove native library

  • Loading branch information...
commit 0cd28921b4d2426dfd15f171f4447f79e9605c21 1 parent 163d773
Daniel Gómez Ferro authored
72 pom.xml
View
@@ -66,62 +66,7 @@
</execution>
</executions>
</plugin>
- <plugin>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <!-- Build native code after .java compilation -->
- <id>build-native</id>
- <phase>process-classes</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <target name="jni"
- description="Generate headers and compile the native code">
- <echo>Generating JNI headers</echo>
- <exec executable="javah">
- <arg value="-d" />
- <arg
- value="${project.basedir}/target/main/native" />
- <arg value="-classpath" />
- <arg
- value="${project.build.outputDirectory}" />
- <arg value="-jni" />
- <arg
- value="com.yahoo.omid.tso.CommitHashMap" />
- </exec>
- <exec
- dir="${project.basedir}/src/main/native"
- executable="make" failonerror="true">
- <arg value="all" />
- </exec>
- </target>
- </configuration>
- </execution>
- <execution>
- <!-- Make clean in the Maven clean phase -->
- <id>clean-lib</id>
- <phase>clean</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <target name="clean-lib" description="Delete native lib">
- <echo>Deleting native lib</echo>
- <exec
- dir="${project.basedir}/src/main/native"
- executable="make">
- <arg value="clean" />
- </exec>
- </target>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
-
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse
@@ -139,23 +84,6 @@
org.apache.maven.plugins
</groupId>
<artifactId>
- maven-antrun-plugin
- </artifactId>
- <versionRange>[1.3,)</versionRange>
- <goals>
- <goal>run</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore></ignore>
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
maven-dependency-plugin
</artifactId>
<versionRange>[2.1,)</versionRange>
327 src/main/java/com/yahoo/omid/tso/CommitHashMap.java
View
@@ -33,223 +33,112 @@
class CommitHashMap {
- native void init(int initialCapacity, int maxCommits, float loadFactor);
-
- native static long gettotalput();
-
- native static long gettotalget();
-
- native static long gettotalwalkforput();
-
- native static long gettotalwalkforget();
-
- // Load the library
- static {
- System.loadLibrary("tso-commithashmap");
- }
-
- /**
- * Constructs a new, empty hashtable with a default capacity and load factor,
- * which is <code>1000</code> and <code>0.75</code> respectively.
- */
- public CommitHashMap() {
- this(1000, 0.75f);
- }
-
- /**
- * Constructs a new, empty hashtable with the specified initial capacity and
- * default load factor, which is <code>0.75</code>.
- *
- * @param initialCapacity
- * the initial capacity of the hashtable.
- * @throws IllegalArgumentException
- * if the initial capacity is less than zero.
- */
- public CommitHashMap(int initialCapacity) {
- this(initialCapacity, 0.75f);
- }
-
- /**
- * Constructs a new, empty hashtable with the specified initial capacity and
- * the specified load factor.
- *
- * @param initialCapacity
- * the initial capacity of the hashtable.
- * @param loadFactor
- * the load factor of the hashtable.
- * @throws IllegalArgumentException
- * if the initial capacity is less than zero, or if the load
- * factor is nonpositive.
- */
- public CommitHashMap(int initialCapacity, float loadFactor) {
- if (initialCapacity < 0) {
- throw new IllegalArgumentException("Illegal Capacity: " + initialCapacity);
- }
- if (loadFactor <= 0) {
- throw new IllegalArgumentException("Illegal Load: " + loadFactor);
- }
- if (initialCapacity == 0) {
- initialCapacity = 1;
- }
-
- //assuming the worst case that each transaction modifies a value, this is the right size because it is proportional to the hashmap size
- int txnCommitArraySize = (int) (initialCapacity * loadFactor);
- this.init(initialCapacity, txnCommitArraySize, loadFactor);
- }
-
- /**
- * Returns the value to which the specified key is mapped in this map. If
- * there are multiple values with the same key, return the first The first is
- * the one with the largest key, because (i) put always put the recent ones
- * ahead, (ii) a new put on the same key has always larger value (because
- * value is commit timestamp and the map is atmoic)
- *
- * @param key
- * a key in the hashtable.
- * @return the value to which the key is mapped in this hashtable;
- * <code>null</code> if the key is not mapped to any value in this
- * hashtable.
- * @see #put(int, Object)
- */
- native long get(byte[] rowId, byte[] tableId, int hash);
-
- /**
- * Maps the specified <code>key</code> to the specified <code>value</code> in
- * this hashtable. The key cannot be <code>null</code>.
- *
- * The value can be retrieved by calling the <code>get</code> method with a
- * key that is equal to the original key.
- *
- * It guarantees that if multiple entries with the same keys exist then the
- * first one is the most fresh one, i.e., with the largest value
- *
- * @param key
- * the hashtable key.
- * @param value
- * the value.
- * @throws NullPointerException
- * if the key is <code>null</code>. return true if the vlaue is
- * replaced
- */
- native long put(byte[] rowId, byte[] tableId, long value, int hash, long largestDeletedTimestamp);
-
- /**
- * Returns the commit timestamp
- *
- * @param startTimestamp the transaction start timestamp
- * @return commit timestamp if such mapping exist, 0 otherwise
- */
- native long getCommittedTimestamp(long startTimestamp);
- native long setCommitted(long startTimestamp, long commitTimestamp, long largestDeletedTimestamp);
-
- // set of half aborted transactions
- // TODO: set the initial capacity in a smarter way
- Set<AbortedTransaction> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<AbortedTransaction, Boolean>(10000));
-
- private AtomicLong abortedSnapshot = new AtomicLong();
-
- long getAndIncrementAbortedSnapshot() {
- return abortedSnapshot.getAndIncrement();
- }
-
- // add a new half aborted transaction
- void setHalfAborted(long startTimestamp) {
- halfAborted.add(new AbortedTransaction(startTimestamp, abortedSnapshot.get()));
- }
-
- // call when a half aborted transaction is fully aborted
- void setFullAborted(long startTimestamp) {
- halfAborted.remove(new AbortedTransaction(startTimestamp, 0));
- }
-
- // query to see if a transaction is half aborted
- boolean isHalfAborted(long startTimestamp) {
- return halfAborted.contains(new AbortedTransaction(startTimestamp, 0));
- }
+ private final int size;
+ private long largestDeletedTimestamp;
+ private final long[] startCommitMapping;
+ private final long[] rowsCommitMapping;
+
+ /**
+ * Constructs a new, empty hashtable with a default size of 1000
+ */
+ public CommitHashMap() {
+ this(1000);
+ }
+
+ /**
+ * Constructs a new, empty hashtable with the specified initial capacity and
+ * default load factor, which is <code>0.75</code>.
+ *
+ * @param initialCapacity
+ * the initial capacity of the hashtable.
+ * @throws IllegalArgumentException
+ * if the initial capacity is less than zero.
+ */
+ public CommitHashMap(int size) {
+ if (size < 0) {
+ throw new IllegalArgumentException("Illegal size: " + size);
+ }
+
+ this.size = size;
+ this.startCommitMapping = new long[size * 2];
+ this.rowsCommitMapping = new long[size * 2];
+ }
+
+ private int index(long hash) {
+ return (int) (Math.abs(hash) % size);
+ }
+
+ public long getLatestWriteForRow(long hash) {
+ int index = index(hash);
+ return rowsCommitMapping[index];
+ }
+
+ public void putLatestWriteForRow(long hash, long commitTimestamp) {
+ int index = index(hash);
+ long oldCommitTS = rowsCommitMapping[index];
+
+ if (oldCommitTS == commitTimestamp)
+ return;
+
+ rowsCommitMapping[index] = commitTimestamp;
+ largestDeletedTimestamp = Math
+ .max(oldCommitTS, largestDeletedTimestamp);
+ }
+
+ public long getCommittedTimestamp(long startTimestamp) {
+ int indexStart = 2 * index(startTimestamp);
+ int indexCommit = indexStart + 1;
+
+ if (startCommitMapping[indexStart] == startTimestamp) {
+ return startCommitMapping[indexCommit];
+ } else {
+ return 0;
+ }
+ }
+
+ public void setCommittedTimestamp(long startTimestamp, long commitTimestamp) {
+ int indexStart = 2 * index(startTimestamp);
+ int indexCommit = indexStart + 1;
+
+ long oldCommitTS = startCommitMapping[indexCommit];
+ if (oldCommitTS == commitTimestamp)
+ return;
+
+ startCommitMapping[indexStart] = startTimestamp;
+ startCommitMapping[indexCommit] = commitTimestamp;
+ largestDeletedTimestamp = Math
+ .max(oldCommitTS, largestDeletedTimestamp);
+ }
+
+ // set of half aborted transactions
+ // TODO: set the initial capacity in a smarter way
+ Set<AbortedTransaction> halfAborted = Collections
+ .newSetFromMap(new ConcurrentHashMap<AbortedTransaction, Boolean>(
+ 10000));
+
+ private AtomicLong abortedSnapshot = new AtomicLong();
+
+ long getAndIncrementAbortedSnapshot() {
+ return abortedSnapshot.getAndIncrement();
+ }
+
+ // add a new half aborted transaction
+ void setHalfAborted(long startTimestamp) {
+ halfAborted.add(new AbortedTransaction(startTimestamp, abortedSnapshot
+ .get()));
+ }
+
+ // call when a half aborted transaction is fully aborted
+ void setFullAborted(long startTimestamp) {
+ halfAborted.remove(new AbortedTransaction(startTimestamp, 0));
+ }
+
+ // query to see if a transaction is half aborted
+ boolean isHalfAborted(long startTimestamp) {
+ return halfAborted.contains(new AbortedTransaction(startTimestamp, 0));
+ }
+
+ public long getLargestDeletedTimestamp() {
+ return largestDeletedTimestamp;
+ }
}
-
-/*
- * class CommitHashMap {
- *
- * private Entry table[];
- *
- * private int count;
- *
- * private int threshold;
- *
- * private static long largestOrder = 1;
- *
- * private TSOState sharedState = null;
- *
- * public static int totalget = 0; public static int totalwalkforget = 0; public
- * static int totalput = 0; public static int totalwalkforput = 0;
- *
- * private static class Entry { long order;//the assigned order after insert int
- * hash;//keep the computed hash for efficient comparison of keys byte[]
- * key;//which is row id; long tag;//which is the start timestamp; long
- * value;//which is commit timestamp Entry next;
- *
- * protected Entry(byte[] key, int hash, long tag, long value, Entry next) {
- * this.order = ++largestOrder; this.hash = hash; this.key = key; this.tag =
- * tag; this.value = value; this.next = next; } protected Entry(Entry next) {
- * this.order = 0; this.hash = 0; this.key = new byte[8]; this.tag = 0;
- * this.value = 0; this.next = next; } }
- *
- * public CommitHashMap(TSOState state) { this(1000, 0.75f, state); }
- *
- * public CommitHashMap(int initialCapacity, TSOState state) {
- * this(initialCapacity, 0.75f, state); }
- *
- * public CommitHashMap(int initialCapacity, float loadFactor, TSOState state) {
- * super(); if (initialCapacity < 0) { throw new
- * IllegalArgumentException("Illegal Capacity: " + initialCapacity); } if
- * (loadFactor <= 0) { throw new IllegalArgumentException("Illegal Load: " +
- * loadFactor); } if (initialCapacity == 0) { initialCapacity = 1; }
- *
- * table = new Entry[initialCapacity]; this.sharedState = state; threshold =
- * (int) (initialCapacity * loadFactor); //Initialze the table not to interfere
- * with garbage collection
- * System.out.println("MEMORY initialization start ..."); for (int i = 0; i <
- * initialCapacity; i++) { if (i%1000000==0) System.out.println("MEMORY i="+ i);
- * Entry e1 = new Entry(null); Entry e2 = new Entry(null); Entry e3 = new
- * Entry(null); e1.next = e2; e2.next = e3; table[i] = e1; }
- * System.out.println("MEMORY initialization end"); }
- *
- * public int size() { return count; }
- *
- * public boolean isEmpty() { return count == 0; }
- *
- * public long get(byte[] key, int hash) { totalget++; totalwalkforget++;//at
- * least one for array access Entry tab[] = table; //int hash = (int)key; int
- * index = (hash & 0x7FFFFFFF) % tab.length; for (Entry e = tab[index]; e !=
- * null; e = e.next) { totalwalkforget++; if (e.order == 0)//empty break; if
- * (e.hash == hash) //if (java.util.Arrays.equals(e.key, key)) //if (e.key ==
- * key) return e.value; } return 0; }
- *
- * public boolean put(byte[] key, long tag, long value, int hash) { totalput++;
- * totalwalkforput++;//at least one for array access // Makes sure the key is
- * not already in the hashtable. Entry tab[] = table; //int hash = (int)key; int
- * index = (hash & 0x7FFFFFFF) % tab.length; Entry firstReference = null;
- * //boolean thereIsOld = false; for (Entry e = tab[index]; e != null; e =
- * e.next) { totalwalkforput++; boolean isOld = e.order == 0 ? true :
- * largestOrder - e.order > threshold; if (isOld) { if (e.value >
- * sharedState.largestDeletedTimestamp) sharedState.largestDeletedTimestamp =
- * e.value; if (firstReference != null) {//swap it with e //e.key =
- * firstReference.key; for (byte b = 0; b < 8; b++) e.key[b] =
- * firstReference.key[b]; e.hash = firstReference.hash; e.tag =
- * firstReference.tag; e.value = firstReference.value; e.order =
- * firstReference.order; e = firstReference; } for (byte b = 0; b < 8; b++)
- * e.key[b] = key[b]; //e.key = key; e.hash = hash; e.tag = tag; e.value =
- * value; e.order = ++largestOrder; totalgced++; totalgcrun++; return true; }
- * //update the first reference to the key if (e.hash == hash && firstReference
- * == null) //if (java.util.Arrays.equals(e.key, key) && firstReference == null)
- * //if (e.key == key && firstReference == null) firstReference = e; //if
- * (!thereIsOld && isOld)//there are some old items here //thereIsOld = true; }
- *
- * // Creates the new entry. Entry e = new Entry(key, hash, tag, value,
- * tab[index]); if (count % 100000 == 0)
- * System.out.println("NNNNNNNNNNNNNNNNNNNew Entry " + count); tab[index] = e;
- * count++; return false; }
- *
- * public static int totalgcrun = 0; public static int totalgced = 0; }
- */
126 src/main/java/com/yahoo/omid/tso/RowKey.java
View
@@ -25,90 +25,60 @@
import org.jboss.netty.buffer.ChannelBuffer;
public class RowKey {
- private byte[] rowId;
- private byte[] tableId;
- private int hash = 0;
+ private byte[] rowId;
+ private byte[] tableId;
+ private int hash = 0;
- public RowKey() {
- rowId = new byte[0];
- tableId = new byte[0];
- }
+ public RowKey() {
+ rowId = new byte[0];
+ tableId = new byte[0];
+ }
- public RowKey(byte[] r, byte[] t) {
- rowId = r;
- tableId = t;
- }
- public byte[] getTable() {
- return tableId;
- }
+ public RowKey(byte[] r, byte[] t) {
+ rowId = r;
+ tableId = t;
+ }
- public byte[] getRow() {
- return rowId;
- }
+ public byte[] getTable() {
+ return tableId;
+ }
- public String toString() {
- return new String(tableId) + ":" + new String(rowId);
- }
+ public byte[] getRow() {
+ return rowId;
+ }
- public static RowKey readObject(ChannelBuffer aInputStream) {
- int hash = aInputStream.readInt();
- int len = aInputStream.readInt();
-// byte[] rowId = RowKeyBuffer.nextRowKey(len);
- byte[] rowId = new byte[len];
- aInputStream.readBytes(rowId, 0, len);
- len = aInputStream.readInt();
-// byte[] tableId = RowKeyBuffer.nextRowKey(len);
- byte[] tableId = new byte[len];
- aInputStream.readBytes(tableId, 0, len);
- RowKey rk = new RowKey(rowId, tableId);
- rk.hash = hash;
- return rk;
- }
+ public String toString() {
+ return new String(tableId) + ":" + new String(rowId);
+ }
- public void writeObject(DataOutputStream aOutputStream)
- throws IOException {
- hashCode();
- aOutputStream.writeInt(hash);
- aOutputStream.writeInt(rowId.length);
- aOutputStream.write(rowId,0,rowId.length);
- aOutputStream.writeInt(tableId.length);
- aOutputStream.write(tableId,0,tableId.length);
- }
+ public static RowKey readObject(ChannelBuffer aInputStream) {
+ RowKey rk = new RowKey(null, null);
+ rk.hash = aInputStream.readInt();
+ return rk;
+ }
- public boolean equals(Object obj) {
- if (obj instanceof RowKey) {
- RowKey other = (RowKey)obj;
-
- return Bytes.equals(other.rowId, rowId)
- && Bytes.equals(other.tableId, tableId);
- }
- return false;
- }
+ public void writeObject(DataOutputStream aOutputStream) throws IOException {
+ hashCode();
+ aOutputStream.writeInt(hash);
+ }
- public int hashCode() {
- if (hash != 0) {
- return hash;
- }
- //hash is the xor or row and table id
- /*int h = 0;
- for(int i =0; i < Math.min(8, rowId.length); i++){
- h <<= 8;
- h ^= (int)rowId[i] & 0xFF;
- }
- hash = h;
- h = 0;
- for(int i =0; i < Math.min(8,tableId.length); i++){
- h <<= 8;
- h ^= (int)tableId[i] & 0xFF;
- }
- hash ^= h;
- return hash;*/
- byte[] key = Arrays.copyOf(tableId, tableId.length + rowId.length);
- System.arraycopy(rowId, 0, key, tableId.length, rowId.length);
- hash = MurmurHash.getInstance().hash(key, 0, key.length, 0xdeadbeef);
- //return MurmurHash3.MurmurHash3_x64_32(rowId, 0xDEADBEEF);
- // return (31*Arrays.hashCode(tableId)) + Arrays.hashCode(rowId);
- return hash;
- }
-}
+ public boolean equals(Object obj) {
+ if (obj instanceof RowKey) {
+ RowKey other = (RowKey) obj;
+
+ return Bytes.equals(other.rowId, rowId)
+ && Bytes.equals(other.tableId, tableId);
+ }
+ return false;
+ }
+ public int hashCode() {
+ if (hash != 0) {
+ return hash;
+ }
+ byte[] key = Arrays.copyOf(tableId, tableId.length + rowId.length);
+ System.arraycopy(rowId, 0, key, tableId.length, rowId.length);
+ hash = MurmurHash.getInstance().hash(key, 0, key.length, 0xdeadbeef);
+ return hash;
+ }
+}
1,143 src/main/java/com/yahoo/omid/tso/TSOHandler.java
View
@@ -1,571 +1,572 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package com.yahoo.omid.tso;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-
-import com.yahoo.omid.replication.SharedMessageBuffer.ReadingBuffer;
-import com.yahoo.omid.tso.messages.AbortRequest;
-import com.yahoo.omid.tso.messages.AbortedTransactionReport;
-import com.yahoo.omid.tso.messages.CommitQueryRequest;
-import com.yahoo.omid.tso.messages.CommitQueryResponse;
-import com.yahoo.omid.tso.messages.CommitRequest;
-import com.yahoo.omid.tso.messages.CommitResponse;
-import com.yahoo.omid.tso.messages.FullAbortRequest;
-import com.yahoo.omid.tso.messages.TimestampRequest;
-import com.yahoo.omid.tso.messages.TimestampResponse;
-import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
-import com.yahoo.omid.tso.persistence.LoggerException;
-import com.yahoo.omid.tso.persistence.LoggerException.Code;
-import com.yahoo.omid.tso.persistence.LoggerProtocol;
-
-/**
- * ChannelHandler for the TSO Server
- * @author maysam
- *
- */
-public class TSOHandler extends SimpleChannelHandler {
-
- private static final Log LOG = LogFactory.getLog(TSOHandler.class);
-
- /**
- * Bytes monitor
- */
- public static final AtomicInteger transferredBytes = new AtomicInteger();
-// public static int transferredBytes = 0;
- public static int abortCount = 0;
- public static int hitCount = 0;
- public static long queries = 0;
-
- /**
- * Channel Group
- */
- private ChannelGroup channelGroup = null;
-
- private Map<Channel, ReadingBuffer> messageBuffersMap = new HashMap<Channel, ReadingBuffer>();
-
- /**
- * Timestamp Oracle
- */
- private TimestampOracle timestampOracle = null;
-
- /**
- * The wrapper for the shared state of TSO
- */
- private TSOState sharedState;
-
- private FlushThread flushThread;
- private ScheduledExecutorService scheduledExecutor;
- private ScheduledFuture<?> flushFuture;
-
- private ExecutorService executor;
-
- /**
- * Constructor
- * @param channelGroup
- */
- public TSOHandler(ChannelGroup channelGroup, TSOState state) {
- this.channelGroup = channelGroup;
- this.timestampOracle = state.getSO();
- this.sharedState = state;
- }
-
- public void start() {
- this.flushThread = new FlushThread();
- this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(Thread.currentThread().getThreadGroup(), r);
- t.setDaemon(true);
- t.setName("Flush Thread");
- return t;
- }
- });
- this.flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- /**
- * Returns the number of transferred bytes
- * @return the number of transferred bytes
- */
- public static long getTransferredBytes() {
- return transferredBytes.longValue();
- }
-
- /**
- * If write of a message was not possible before, we can do it here
- */
- @Override
- public void channelInterestChanged(ChannelHandlerContext ctx,
- ChannelStateEvent e) {
- }
-
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- channelGroup.add(ctx.getChannel());
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- synchronized (sharedMsgBufLock) {
- sharedState.sharedMessageBuffer.removeReadingBuffer(ctx);
- }
- }
-
- /**
- * Handle receieved messages
- */
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Object msg = e.getMessage();
- if (msg instanceof TimestampRequest) {
- handle((TimestampRequest) msg, ctx);
- return;
- } else if (msg instanceof CommitRequest) {
- handle((CommitRequest) msg, ctx);
- return;
- } else if (msg instanceof FullAbortRequest) {
- handle((FullAbortRequest) msg, ctx);
- return;
- } else if (msg instanceof CommitQueryRequest) {
- handle((CommitQueryRequest) msg, ctx);
- return;
- }
- }
-
- public void handle(AbortRequest msg, ChannelHandlerContext ctx) {
- synchronized (sharedState) {
- DataOutputStream toWAL = sharedState.toWAL;
- try {
- toWAL.writeByte(LoggerProtocol.ABORT);
- toWAL.writeLong(msg.startTimestamp);
- } catch (IOException e) {
- e.printStackTrace();
- }
- abortCount++;
- sharedState.processAbort(msg.startTimestamp);
- synchronized (sharedMsgBufLock) {
- queueHalfAbort(msg.startTimestamp);
- }
- }
- }
-
- /**
- * Handle the TimestampRequest message
- */
- public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
- long timestamp;
- synchronized (sharedState) {
- try {
- timestamp = timestampOracle.next(sharedState.toWAL);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- }
-
- ReadingBuffer buffer;
- Channel channel = ctx.getChannel();
- boolean bootstrap = false;
- synchronized (messageBuffersMap) {
- buffer = messageBuffersMap.get(ctx.getChannel());
- if (buffer == null) {
- synchronized (sharedMsgBufLock) {
- bootstrap = true;
- buffer = sharedState.sharedMessageBuffer.getReadingBuffer(ctx);
- messageBuffersMap.put(channel, buffer);
- channelGroup.add(channel);
- LOG.warn("Channel connected: " + messageBuffersMap.size());
- }
- }
- }
- if (bootstrap) {
- synchronized (sharedState) {
- synchronized (sharedMsgBufLock) {
- channel.write(buffer.getZipperState());
- buffer.initializeIndexes();
- }
- }
- for (AbortedTransaction halfAborted : sharedState.hashmap.halfAborted) {
- channel.write(new AbortedTransactionReport(halfAborted.getStartTimestamp()));
- }
- }
- ChannelBuffer cb;
- ChannelFuture future = Channels.future(channel);
- synchronized (sharedMsgBufLock) {
- cb = buffer.flush(future);
- }
- Channels.write(ctx, future, cb);
- Channels.write(channel, new TimestampResponse(timestamp));
- }
-
- ChannelBuffer cb = ChannelBuffers.buffer(10);
-
- private boolean finish;
-
- public static long waitTime = 0;
- public static long commitTime = 0;
- public static long checkTime = 0;
-
- private Object sharedMsgBufLock = new Object();
- private Object callbackLock = new Object();
- private AddRecordCallback noCallback = new AddRecordCallback() {
- @Override
- public void addRecordComplete(int rc, Object ctx) {
- }
- };
-
- private Runnable createAbortedSnaphostTask = new Runnable() {
- @Override
- public void run() {
- createAbortedSnapshot();
- }
- };
-
- public void createAbortedSnapshot() {
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream toWAL = new DataOutputStream(baos);
-
- long snapshot = sharedState.hashmap.getAndIncrementAbortedSnapshot();
-
- try {
- toWAL.writeByte(LoggerProtocol.SNAPSHOT);
- toWAL.writeLong(snapshot);
- for (AbortedTransaction aborted : sharedState.hashmap.halfAborted) {
- // ignore aborted transactions from last snapshot
- if (aborted.getSnapshot() < snapshot) {
- toWAL.writeByte(LoggerProtocol.ABORT);
- toWAL.writeLong(aborted.getStartTimestamp());
- }
- }
- } catch (IOException e) {
- // can't happen
- throw new RuntimeException(e);
- }
-
- sharedState.addRecord(baos.toByteArray(), noCallback, null);
- }
-
- /**
- * Handle the CommitRequest message
- */
- public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
- CommitResponse reply = new CommitResponse(msg.startTimestamp);
- ByteArrayOutputStream baos = sharedState.baos;
- DataOutputStream toWAL = sharedState.toWAL;
- synchronized (sharedState) {
- //0. check if it should abort
- if (msg.startTimestamp < timestampOracle.first()) {
- reply.committed = false;
- LOG.warn("Aborting transaction after restarting TSO");
- } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp) {
- // Too old
- reply.committed = false;//set as abort
- LOG.warn("Too old starttimestamp: ST "+ msg.startTimestamp +" MAX " + sharedState.largestDeletedTimestamp);
- } else {
- //1. check the write-write conflicts
- for (RowKey r: msg.rows) {
- long value;
- value = sharedState.hashmap.get(r.getRow(), r.getTable(), r.hashCode());
- if (value != 0 && value > msg.startTimestamp) {
- reply.committed = false;//set as abort
- break;
- } else if (value == 0 && sharedState.largestDeletedTimestamp > msg.startTimestamp) {
- //then it could have been committed after start timestamp but deleted by recycling
- LOG.warn("Old transaction {Start timestamp " + msg.startTimestamp + "} {Largest deleted timestamp " + sharedState.largestDeletedTimestamp + "}");
- reply.committed = false;//set as abort
- break;
- }
- }
- }
-
- if (reply.committed) {
- //2. commit
- try {
- long commitTimestamp = timestampOracle.next(toWAL);
- sharedState.uncommited.commit(commitTimestamp);
- sharedState.uncommited.commit(msg.startTimestamp);
- reply.commitTimestamp = commitTimestamp;
- if (msg.rows.length > 0) {
- if(LOG.isTraceEnabled()){
- LOG.trace("Adding commit to WAL");
- }
- toWAL.writeByte(LoggerProtocol.COMMIT);
- toWAL.writeLong(msg.startTimestamp);
- toWAL.writeLong(commitTimestamp);
-
- long oldLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
-
- for (RowKey r: msg.rows) {
- sharedState.largestDeletedTimestamp = sharedState.hashmap.put(r.getRow(),
- r.getTable(),
- commitTimestamp,
- r.hashCode(),
- oldLargestDeletedTimestamp);
- }
-
- sharedState.processCommit(msg.startTimestamp, commitTimestamp);
- if (sharedState.largestDeletedTimestamp > oldLargestDeletedTimestamp) {
- toWAL.writeByte(LoggerProtocol.LARGESTDELETEDTIMESTAMP);
- toWAL.writeLong(sharedState.largestDeletedTimestamp);
- Set<Long> toAbort = sharedState.uncommited.raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp);
- if (LOG.isWarnEnabled() && !toAbort.isEmpty()) {
- LOG.warn("Slow transactions after raising max: " + toAbort.size());
- }
- synchronized (sharedMsgBufLock) {
- for (Long id : toAbort) {
- sharedState.hashmap.setHalfAborted(id);
- queueHalfAbort(id);
- }
- queueLargestIncrease(sharedState.largestDeletedTimestamp);
- }
- }
- if (sharedState.largestDeletedTimestamp > sharedState.previousLargestDeletedTimestamp + TSOState.MAX_ITEMS) {
- // schedule snapshot
- executor.submit(createAbortedSnaphostTask);
- sharedState.previousLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
- }
- synchronized (sharedMsgBufLock) {
- queueCommit(msg.startTimestamp, commitTimestamp);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- } else { //add it to the aborted list
- abortCount++;
- try {
- toWAL.writeByte(LoggerProtocol.ABORT);
- toWAL.writeLong(msg.startTimestamp);
- } catch (IOException e) {
- e.printStackTrace();
- }
- sharedState.processAbort(msg.startTimestamp);
-
- synchronized (sharedMsgBufLock) {
- queueHalfAbort(msg.startTimestamp);
- }
- }
-
- TSOHandler.transferredBytes.incrementAndGet();
-
- ChannelandMessage cam = new ChannelandMessage(ctx, reply);
-
- sharedState.nextBatch.add(cam);
- if (sharedState.baos.size() >= TSOState.BATCH_SIZE) {
- if(LOG.isTraceEnabled()){
- LOG.trace("Going to add record of size " + sharedState.baos.size());
- }
- //sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
- sharedState.addRecord(baos.toByteArray(),
- new AddRecordCallback() {
- @Override
- public void addRecordComplete(int rc, Object ctx) {
- if (rc != Code.OK) {
- LOG.warn("Write failed: " + LoggerException.getMessage(rc));
-
- } else {
- synchronized (callbackLock) {
- @SuppressWarnings("unchecked")
- ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
- for (ChannelandMessage cam : theBatch) {
- Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
- }
- }
-
- }
- }
- }, sharedState.nextBatch);
- sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
- sharedState.baos.reset();
- }
-
- }
-
- }
-
- /**
- * Handle the CommitQueryRequest message
- */
- public void handle(CommitQueryRequest msg, ChannelHandlerContext ctx) {
- CommitQueryResponse reply = new CommitQueryResponse(msg.startTimestamp);
- reply.queryTimestamp = msg.queryTimestamp;
- synchronized (sharedState) {
- queries++;
- //1. check the write-write conflicts
- long value;
- value = sharedState.hashmap.getCommittedTimestamp(msg.queryTimestamp);
- if (value != 0) { //it exists
- reply.commitTimestamp = value;
- reply.committed = value < msg.startTimestamp;//set as abort
- }
- else if (sharedState.hashmap.isHalfAborted(msg.queryTimestamp))
- reply.committed = false;
- else if (sharedState.uncommited.isUncommited(msg.queryTimestamp))
- reply.committed = false;
- else
- reply.retry = true;
-// else if (sharedState.largestDeletedTimestamp >= msg.queryTimestamp)
-// reply.committed = true;
- // TODO retry needed? isnt it just fully aborted?
-
- ctx.getChannel().write(reply);
-
- // We send the message directly. If after a failure the state is inconsistent we'll detect it
-
- }
- }
-
- public void flush() {
- synchronized (sharedState) {
- if(LOG.isTraceEnabled()){
- LOG.trace("Adding record, size: " + sharedState.baos.size());
- }
- sharedState.addRecord(sharedState.baos.toByteArray(), new AddRecordCallback() {
- @Override
- public void addRecordComplete(int rc, Object ctx) {
- if (rc != Code.OK) {
- LOG.warn("Write failed: " + LoggerException.getMessage(rc));
-
- } else {
- synchronized (callbackLock) {
- @SuppressWarnings("unchecked")
- ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
- for (ChannelandMessage cam : theBatch) {
- Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
- }
- }
-
- }
- }
- }, sharedState.nextBatch);
- sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
- sharedState.baos.reset();
- if (flushFuture.cancel(false)) {
- flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- public class FlushThread implements Runnable {
- @Override
- public void run() {
- if (finish) {
- return;
- }
- if (sharedState.nextBatch.size() > 0) {
- synchronized (sharedState) {
- if (sharedState.nextBatch.size() > 0) {
- if(LOG.isTraceEnabled()){
- LOG.trace("Flushing log batch.");
- }
- flush();
- }
- }
- }
- flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
- }
- }
-
- private void queueCommit(long startTimestamp, long commitTimestamp) {
- sharedState.sharedMessageBuffer.writeCommit(startTimestamp, commitTimestamp);
- }
-
- private void queueHalfAbort(long startTimestamp) {
- sharedState.sharedMessageBuffer.writeHalfAbort(startTimestamp);
- }
-
- private void queueFullAbort(long startTimestamp) {
- sharedState.sharedMessageBuffer.writeFullAbort(startTimestamp);
- }
-
- private void queueLargestIncrease(long largestTimestamp) {
- sharedState.sharedMessageBuffer.writeLargestIncrease(largestTimestamp);
- }
-
- /**
- * Handle the FullAbortReport message
- */
- public void handle(FullAbortRequest msg, ChannelHandlerContext ctx) {
- synchronized (sharedState) {
- DataOutputStream toWAL = sharedState.toWAL;
- try {
- toWAL.writeByte(LoggerProtocol.FULLABORT);
- toWAL.writeLong(msg.startTimestamp);
- } catch (IOException e) {
- e.printStackTrace();
- }
- sharedState.processFullAbort(msg.startTimestamp);
- }
- synchronized (sharedMsgBufLock) {
- queueFullAbort(msg.startTimestamp);
- }
- }
-
- /*
- * Wrapper for Channel and Message
- */
- public static class ChannelandMessage {
- ChannelHandlerContext ctx;
- TSOMessage msg;
- ChannelandMessage(ChannelHandlerContext c, TSOMessage m) {
- ctx = c;
- msg = m;
- }
- }
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
- Channels.close(e.getChannel());
- }
-
- public void stop() {
- finish = true;
- }
-
-
-}
+/**
+ * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package com.yahoo.omid.tso;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+
+import com.yahoo.omid.replication.SharedMessageBuffer.ReadingBuffer;
+import com.yahoo.omid.tso.messages.AbortRequest;
+import com.yahoo.omid.tso.messages.AbortedTransactionReport;
+import com.yahoo.omid.tso.messages.CommitQueryRequest;
+import com.yahoo.omid.tso.messages.CommitQueryResponse;
+import com.yahoo.omid.tso.messages.CommitRequest;
+import com.yahoo.omid.tso.messages.CommitResponse;
+import com.yahoo.omid.tso.messages.FullAbortRequest;
+import com.yahoo.omid.tso.messages.TimestampRequest;
+import com.yahoo.omid.tso.messages.TimestampResponse;
+import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
+import com.yahoo.omid.tso.persistence.LoggerException;
+import com.yahoo.omid.tso.persistence.LoggerException.Code;
+import com.yahoo.omid.tso.persistence.LoggerProtocol;
+
+/**
+ * ChannelHandler for the TSO Server
+ */
+public class TSOHandler extends SimpleChannelHandler {
+
+ private static final Log LOG = LogFactory.getLog(TSOHandler.class);
+
+ /**
+ * Bytes monitor
+ */
+ public static final AtomicInteger transferredBytes = new AtomicInteger();
+ // public static int transferredBytes = 0;
+ public static int abortCount = 0;
+ public static int hitCount = 0;
+ public static long queries = 0;
+
+ /**
+ * Channel Group
+ */
+ private ChannelGroup channelGroup = null;
+
+ private Map<Channel, ReadingBuffer> messageBuffersMap = new HashMap<Channel, ReadingBuffer>();
+
+ /**
+ * Timestamp Oracle
+ */
+ private TimestampOracle timestampOracle = null;
+
+ /**
+ * The wrapper for the shared state of TSO
+ */
+ private TSOState sharedState;
+
+ private FlushThread flushThread;
+ private ScheduledExecutorService scheduledExecutor;
+ private ScheduledFuture<?> flushFuture;
+
+ private ExecutorService executor;
+
+ /**
+ * Constructor
+ *
+ * @param channelGroup
+ */
+ public TSOHandler(ChannelGroup channelGroup, TSOState state) {
+ this.channelGroup = channelGroup;
+ this.timestampOracle = state.getSO();
+ this.sharedState = state;
+ }
+
+ public void start() {
+ this.flushThread = new FlushThread();
+ this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(Thread.currentThread().getThreadGroup(), r);
+ t.setDaemon(true);
+ t.setName("Flush Thread");
+ return t;
+ }
+ });
+ this.flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+ this.executor = Executors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Returns the number of transferred bytes
+ *
+ * @return the number of transferred bytes
+ */
+ public static long getTransferredBytes() {
+ return transferredBytes.longValue();
+ }
+
+ /**
+ * If write of a message was not possible before, we can do it here
+ */
+ @Override
+ public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ }
+
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ channelGroup.add(ctx.getChannel());
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ synchronized (sharedMsgBufLock) {
+ sharedState.sharedMessageBuffer.removeReadingBuffer(ctx);
+ }
+ }
+
+ /**
+ * Handle receieved messages
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ Object msg = e.getMessage();
+ if (msg instanceof TimestampRequest) {
+ handle((TimestampRequest) msg, ctx);
+ return;
+ } else if (msg instanceof CommitRequest) {
+ handle((CommitRequest) msg, ctx);
+ return;
+ } else if (msg instanceof FullAbortRequest) {
+ handle((FullAbortRequest) msg, ctx);
+ return;
+ } else if (msg instanceof CommitQueryRequest) {
+ handle((CommitQueryRequest) msg, ctx);
+ return;
+ }
+ }
+
+ public void handle(AbortRequest msg, ChannelHandlerContext ctx) {
+ synchronized (sharedState) {
+ DataOutputStream toWAL = sharedState.toWAL;
+ try {
+ toWAL.writeByte(LoggerProtocol.ABORT);
+ toWAL.writeLong(msg.startTimestamp);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ abortCount++;
+ sharedState.processAbort(msg.startTimestamp);
+ synchronized (sharedMsgBufLock) {
+ queueHalfAbort(msg.startTimestamp);
+ }
+ }
+ }
+
+ /**
+ * Handle the TimestampRequest message
+ */
+ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
+ long timestamp;
+ synchronized (sharedState) {
+ try {
+ timestamp = timestampOracle.next(sharedState.toWAL);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ ReadingBuffer buffer;
+ Channel channel = ctx.getChannel();
+ boolean bootstrap = false;
+ synchronized (messageBuffersMap) {
+ buffer = messageBuffersMap.get(ctx.getChannel());
+ if (buffer == null) {
+ synchronized (sharedMsgBufLock) {
+ bootstrap = true;
+ buffer = sharedState.sharedMessageBuffer.getReadingBuffer(ctx);
+ messageBuffersMap.put(channel, buffer);
+ channelGroup.add(channel);
+ LOG.warn("Channel connected: " + messageBuffersMap.size());
+ }
+ }
+ }
+ if (bootstrap) {
+ synchronized (sharedState) {
+ synchronized (sharedMsgBufLock) {
+ channel.write(buffer.getZipperState());
+ buffer.initializeIndexes();
+ }
+ }
+ for (AbortedTransaction halfAborted : sharedState.hashmap.halfAborted) {
+ channel.write(new AbortedTransactionReport(halfAborted.getStartTimestamp()));
+ }
+ }
+ ChannelBuffer cb;
+ ChannelFuture future = Channels.future(channel);
+ synchronized (sharedMsgBufLock) {
+ cb = buffer.flush(future);
+ }
+ Channels.write(ctx, future, cb);
+ Channels.write(channel, new TimestampResponse(timestamp));
+ }
+
+ ChannelBuffer cb = ChannelBuffers.buffer(10);
+
+ private boolean finish;
+
+ public static long waitTime = 0;
+ public static long commitTime = 0;
+ public static long checkTime = 0;
+
+ private Object sharedMsgBufLock = new Object();
+ private Object callbackLock = new Object();
+ private AddRecordCallback noCallback = new AddRecordCallback() {
+ @Override
+ public void addRecordComplete(int rc, Object ctx) {
+ }
+ };
+
+ private Runnable createAbortedSnaphostTask = new Runnable() {
+ @Override
+ public void run() {
+ createAbortedSnapshot();
+ }
+ };
+
+ public void createAbortedSnapshot() {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream toWAL = new DataOutputStream(baos);
+
+ long snapshot = sharedState.hashmap.getAndIncrementAbortedSnapshot();
+
+ try {
+ toWAL.writeByte(LoggerProtocol.SNAPSHOT);
+ toWAL.writeLong(snapshot);
+ for (AbortedTransaction aborted : sharedState.hashmap.halfAborted) {
+ // ignore aborted transactions from last snapshot
+ if (aborted.getSnapshot() < snapshot) {
+ toWAL.writeByte(LoggerProtocol.ABORT);
+ toWAL.writeLong(aborted.getStartTimestamp());
+ }
+ }
+ } catch (IOException e) {
+ // can't happen
+ throw new RuntimeException(e);
+ }
+
+ sharedState.addRecord(baos.toByteArray(), noCallback, null);
+ }
+
+ /**
+ * Handle the CommitRequest message
+ */
+ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
+ CommitResponse reply = new CommitResponse(msg.startTimestamp);
+ ByteArrayOutputStream baos = sharedState.baos;
+ DataOutputStream toWAL = sharedState.toWAL;
+ synchronized (sharedState) {
+ // 0. check if it should abort
+ if (msg.startTimestamp < timestampOracle.first()) {
+ reply.committed = false;
+ LOG.warn("Aborting transaction after restarting TSO");
+ } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp) {
+ // Too old
+ reply.committed = false;// set as abort
+ LOG.warn("Too old starttimestamp: ST " + msg.startTimestamp + " MAX "
+ + sharedState.largestDeletedTimestamp);
+ } else {
+ // 1. check the write-write conflicts
+ for (RowKey r : msg.rows) {
+ long value;
+ value = sharedState.hashmap.getLatestWriteForRow(r.hashCode());
+ if (value != 0 && value > msg.startTimestamp) {
+ reply.committed = false;// set as abort
+ break;
+ } else if (value == 0 && sharedState.largestDeletedTimestamp > msg.startTimestamp) {
+ // then it could have been committed after start
+ // timestamp but deleted by recycling
+ LOG.warn("Old transaction {Start timestamp " + msg.startTimestamp
+ + "} {Largest deleted timestamp " + sharedState.largestDeletedTimestamp + "}");
+ reply.committed = false;// set as abort
+ break;
+ }
+ }
+ }
+
+ if (reply.committed) {
+ // 2. commit
+ try {
+ long commitTimestamp = timestampOracle.next(toWAL);
+ sharedState.uncommited.commit(commitTimestamp);
+ sharedState.uncommited.commit(msg.startTimestamp);
+ reply.commitTimestamp = commitTimestamp;
+ if (msg.rows.length > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding commit to WAL");
+ }
+ toWAL.writeByte(LoggerProtocol.COMMIT);
+ toWAL.writeLong(msg.startTimestamp);
+ toWAL.writeLong(commitTimestamp);
+
+ long oldLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
+
+ for (RowKey r : msg.rows) {
+ sharedState.hashmap.putLatestWriteForRow(r.hashCode(), commitTimestamp);
+ }
+
+ sharedState.largestDeletedTimestamp = sharedState.hashmap.getLargestDeletedTimestamp();
+ sharedState.processCommit(msg.startTimestamp, commitTimestamp);
+ if (sharedState.largestDeletedTimestamp > oldLargestDeletedTimestamp) {
+ toWAL.writeByte(LoggerProtocol.LARGESTDELETEDTIMESTAMP);
+ toWAL.writeLong(sharedState.largestDeletedTimestamp);
+ Set<Long> toAbort = sharedState.uncommited
+ .raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp);
+ if (LOG.isWarnEnabled() && !toAbort.isEmpty()) {
+ LOG.warn("Slow transactions after raising max: " + toAbort.size());
+ }
+ synchronized (sharedMsgBufLock) {
+ for (Long id : toAbort) {
+ sharedState.hashmap.setHalfAborted(id);
+ queueHalfAbort(id);
+ }
+ queueLargestIncrease(sharedState.largestDeletedTimestamp);
+ }
+ }
+ if (sharedState.largestDeletedTimestamp > sharedState.previousLargestDeletedTimestamp
+ + TSOState.MAX_ITEMS) {
+ // schedule snapshot
+ executor.submit(createAbortedSnaphostTask);
+ sharedState.previousLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
+ }
+ synchronized (sharedMsgBufLock) {
+ queueCommit(msg.startTimestamp, commitTimestamp);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else { // add it to the aborted list
+ abortCount++;
+ try {
+ toWAL.writeByte(LoggerProtocol.ABORT);
+ toWAL.writeLong(msg.startTimestamp);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ sharedState.processAbort(msg.startTimestamp);
+
+ synchronized (sharedMsgBufLock) {
+ queueHalfAbort(msg.startTimestamp);
+ }
+ }
+
+ TSOHandler.transferredBytes.incrementAndGet();
+
+ ChannelandMessage cam = new ChannelandMessage(ctx, reply);
+
+ sharedState.nextBatch.add(cam);
+ if (sharedState.baos.size() >= TSOState.BATCH_SIZE) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Going to add record of size " + sharedState.baos.size());
+ }
+ // sharedState.lh.asyncAddEntry(baos.toByteArray(), this,
+ // sharedState.nextBatch);
+ sharedState.addRecord(baos.toByteArray(), new AddRecordCallback() {
+ @Override
+ public void addRecordComplete(int rc, Object ctx) {
+ if (rc != Code.OK) {
+ LOG.warn("Write failed: " + LoggerException.getMessage(rc));
+
+ } else {
+ synchronized (callbackLock) {
+ @SuppressWarnings("unchecked")
+ ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
+ for (ChannelandMessage cam : theBatch) {
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
+ }
+
+ }
+ }
+ }, sharedState.nextBatch);
+ sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
+ sharedState.baos.reset();
+ }
+
+ }
+
+ }
+
+ /**
+ * Handle the CommitQueryRequest message
+ */
+ public void handle(CommitQueryRequest msg, ChannelHandlerContext ctx) {
+ CommitQueryResponse reply = new CommitQueryResponse(msg.startTimestamp);
+ reply.queryTimestamp = msg.queryTimestamp;
+ synchronized (sharedState) {
+ queries++;
+ // 1. check the write-write conflicts
+ long value;
+ value = sharedState.hashmap.getCommittedTimestamp(msg.queryTimestamp);
+ if (value != 0) { // it exists
+ reply.commitTimestamp = value;
+ reply.committed = value < msg.startTimestamp;// set as abort
+ } else if (sharedState.hashmap.isHalfAborted(msg.queryTimestamp))
+ reply.committed = false;
+ else if (sharedState.uncommited.isUncommited(msg.queryTimestamp))
+ reply.committed = false;
+ else
+ reply.retry = true;
+ // else if (sharedState.largestDeletedTimestamp >=
+ // msg.queryTimestamp)
+ // reply.committed = true;
+ // TODO retry needed? isnt it just fully aborted?
+
+ ctx.getChannel().write(reply);
+
+ // We send the message directly. If after a failure the state is
+ // inconsistent we'll detect it
+
+ }
+ }
+
+ public void flush() {
+ synchronized (sharedState) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding record, size: " + sharedState.baos.size());
+ }
+ sharedState.addRecord(sharedState.baos.toByteArray(), new AddRecordCallback() {
+ @Override
+ public void addRecordComplete(int rc, Object ctx) {
+ if (rc != Code.OK) {
+ LOG.warn("Write failed: " + LoggerException.getMessage(rc));
+
+ } else {
+ synchronized (callbackLock) {
+ @SuppressWarnings("unchecked")
+ ArrayList<ChannelandMessage> theBatch = (ArrayList<ChannelandMessage>) ctx;
+ for (ChannelandMessage cam : theBatch) {
+ Channels.write(cam.ctx, Channels.succeededFuture(cam.ctx.getChannel()), cam.msg);
+ }
+ }
+
+ }
+ }
+ }, sharedState.nextBatch);
+ sharedState.nextBatch = new ArrayList<ChannelandMessage>(sharedState.nextBatch.size() + 5);
+ sharedState.baos.reset();
+ if (flushFuture.cancel(false)) {
+ flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ public class FlushThread implements Runnable {
+ @Override
+ public void run() {
+ if (finish) {
+ return;
+ }
+ if (sharedState.nextBatch.size() > 0) {
+ synchronized (sharedState) {
+ if (sharedState.nextBatch.size() > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Flushing log batch.");
+ }
+ flush();
+ }
+ }
+ }
+ flushFuture = scheduledExecutor.schedule(flushThread, TSOState.FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void queueCommit(long startTimestamp, long commitTimestamp) {
+ sharedState.sharedMessageBuffer.writeCommit(startTimestamp, commitTimestamp);
+ }
+
+ private void queueHalfAbort(long startTimestamp) {
+ sharedState.sharedMessageBuffer.writeHalfAbort(startTimestamp);
+ }
+
+ private void queueFullAbort(long startTimestamp) {
+ sharedState.sharedMessageBuffer.writeFullAbort(startTimestamp);
+ }
+
+ private void queueLargestIncrease(long largestTimestamp) {
+ sharedState.sharedMessageBuffer.writeLargestIncrease(largestTimestamp);
+ }
+
+ /**
+ * Handle the FullAbortReport message
+ */
+ public void handle(FullAbortRequest msg, ChannelHandlerContext ctx) {
+ synchronized (sharedState) {
+ DataOutputStream toWAL = sharedState.toWAL;
+ try {
+ toWAL.writeByte(LoggerProtocol.FULLABORT);
+ toWAL.writeLong(msg.startTimestamp);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ sharedState.processFullAbort(msg.startTimestamp);
+ }
+ synchronized (sharedMsgBufLock) {
+ queueFullAbort(msg.startTimestamp);
+ }
+ }
+
+ /*
+ * Wrapper for Channel and Message
+ */
+ public static class ChannelandMessage {
+ ChannelHandlerContext ctx;
+ TSOMessage msg;
+
+ ChannelandMessage(ChannelHandlerContext c, TSOMessage m) {
+ ctx = c;
+ msg = m;
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ LOG.warn("TSOHandler: Unexpected exception from downstream.", e.getCause());
+ Channels.close(e.getChannel());
+ }
+
+ public void stop() {
+ finish = true;
+ }
+
+}
5 src/main/java/com/yahoo/omid/tso/TSOState.java
View
@@ -113,7 +113,7 @@ protected TimestampOracle getSO(){
* The hash map to to keep track of recently committed rows
* each bucket is about 20 byte, so the initial capacity is 20MB
*/
- public CommitHashMap hashmap = new CommitHashMap(MAX_ITEMS, LOAD_FACTOR);
+ public CommitHashMap hashmap = new CommitHashMap(MAX_ITEMS);
public Uncommited uncommited;
@@ -123,7 +123,8 @@ protected TimestampOracle getSO(){
* @param startTimestamp
*/
protected void processCommit(long startTimestamp, long commitTimestamp){
- largestDeletedTimestamp = hashmap.setCommitted(startTimestamp, commitTimestamp, largestDeletedTimestamp);
+ hashmap.setCommittedTimestamp(startTimestamp, commitTimestamp);
+ largestDeletedTimestamp = hashmap.getLargestDeletedTimestamp();
}
/**
64 src/main/native/Makefile
View
@@ -1,64 +0,0 @@
-
-########################################################################
-#
-# Copyright (c) 2011 Yahoo! Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License. See accompanying LICENSE file.
-#
-########################################################################
-
-OSTYPE=$(shell uname -s)
-
-SOURCESDIR=.
-OBJECTSDIR=../../../target/main/native
-LIBDIR=../../../lib
-
-CC=g++
-
-# This sets the name of the generated lib depending on the OS
-TSOCOMMITHASHMAP=libtso-commithashmap.so
-ifeq ($(OSTYPE),Darwin)
-TSOCOMMITHASHMAP=libtso-commithashmap.jnilib
-endif
-
-INCLUDES=
-ifeq ($(OSTYPE),Darwin)
-INCLUDES=-I/System/Library/Frameworks/JavaVM.framework/Headers
-else
-INCLUDES=-I$(JAVA_HOME)/include -I$(JAVA_HOME)/include/linux
-endif
-# The following includes the target dir where maven antrun places the .h file
-INCLUDES+=-I$(OBJECTSDIR)
-
-CFLAGS=-fPIC -O3 $(INCLUDES) -c
-LDFLAGS=-fPIC -lstdc++ -shared
-ifneq ($(OSTYPE),Darwin)
-LDFLAGS+=-Wl,-soname=$(TSOCOMMITHASHMAP)
-endif
-
-SOURCES=$(shell find '$(SOURCESDIR)' -type f -name '*.cc')
-OBJECTS=$(SOURCES:$(SOURCESDIR)/%.cc=$(OBJECTSDIR)/%.o)
-LIB=$(LIBDIR)/$(TSOCOMMITHASHMAP)
-
-all: $(LIB)
-
-$(LIB): $(OBJECTS)
- mkdir -p $(LIBDIR)
- $(CC) $(LDFLAGS) $(OBJECTS) -o $@
-
-$(OBJECTS): $(SOURCES)
- mkdir -p $(OBJECTSDIR)
- $(CC) $(CFLAGS) $< -o $@
-
-clean:
- rm -f $(LIB)
339 src/main/native/nativelib.cc
View
@@ -1,339 +0,0 @@
-/**
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-#include <jni.h>
-#include <string.h>
-#include <unistd.h>
-#include <stdlib.h>
-
-#include "com_yahoo_omid_tso_CommitHashMap.h"
-
-#define MAX_KEY_SIZE 256
-/**
- * The load factor for the hashtable.
- */
-long largestOrder = 1;
-/*
- * keep statistics of the # of average memory access
- */
-long totalwalkforget = 0;
-long totalwalkforput = 0;
-long totalget = 0;
-long totalput = 0;
-long gmaxCommits = 0;
-/**
- * The total number of entries in the hash table.
- */
-int count = 0;
-struct Entry;
-struct LargeEntry;
-struct StartCommit;
-//the hash map
-LargeEntry (*table);
-StartCommit (*commitTable);
-
-
-
-int tableLength;
-/**
- * An entry could be garbage collected if its older than this threshold
- * (The value of this field is (int)(capacity * loadFactor).)
- */
-
-int threshold;
-
-/*
- * Class: com_yahoo_omid_CommitHashMap
- * Method: gettotalput
- * Signature: ()J
- */
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_gettotalput
-(JNIEnv * env, jclass jcls) {
- return totalput;
-}
-
-/*
- * Class: com_yahoo_omid_CommitHashMap
- * Method: gettotalget
- * Signature: ()J
- */
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_gettotalget
-(JNIEnv * env, jclass jcls) {
- return totalget;
-}
-
-/*
- * Class: com_yahoo_omid_CommitHashMap
- * Method: gettotalwalkforput
- * Signature: ()J
- */
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_gettotalwalkforput
-(JNIEnv * env, jclass jcls) {
- return totalwalkforput;
-}
-
-/*
- * Class: com_yahoo_omid_CommitHashMap
- * Method: gettotalwalkforget
- * Signature: ()J
- */
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_gettotalwalkforget
-(JNIEnv * env, jclass jcls) {
- return totalwalkforget;
-}
-
-/*
- * Each item that stores the start timestamp and commit timestamp
- */
-struct StartCommit {
- jlong start;//which is the start timestamp;
- jlong commit;//which is commit timestamp
-};
-
-/**
- * Innerclass that acts as a datastructure to create a new entry in the
- * table.
- * Avoid using object to use less memory walks
- */
-struct Entry {
- long order;//the assigned order after insert
- int hash;//keep the computed hash for efficient comparison of keys
- //jbyte key[8];//which is row id;
- jbyte* key;//which is row id concatinated into table id
- jbyte rowidsize;//the key size = rowidsize + tableidsize
- jbyte tableidsize;//
- //jlong tag;//which is the start timestamp;
- jlong value;//which is commit timestamp
- //important must be the last field because of memcpy
- Entry* next;
-
- Entry() {
- order = 0;
- //hash = 0;
- key = NULL;
- //tag = 0;
- value = 0;
- next = NULL;
- }
-
- Entry(Entry* e) {
- *this = *e;
- }
-};
-
-struct LargeEntry {
- Entry e1;
- Entry e2;
- Entry e3;
-
- LargeEntry() {
- e1.next = &e2;
- e2.next = &e3;
- }
-};
-
-/*
- * Class: CommitHashMap
- * Method: init
- * Signature: ()V
- */
-JNIEXPORT void JNICALL Java_com_yahoo_omid_tso_CommitHashMap_init
-(JNIEnv * env, jobject jobj, jint initialCapacity, jint maxCommits,jfloat loadFactor) {
- tableLength = initialCapacity;
- threshold = (int) (initialCapacity * loadFactor);
- printf("initialCapacity %d\n", initialCapacity);
- printf("Entry %lu Entry* %lu long %lu int %lu char %lu jbyte %lu jbyte* %lu jlong %lu\n",
- sizeof(Entry), sizeof(Entry*), sizeof(long), sizeof(int), sizeof(char), sizeof(jbyte), sizeof(jbyte*), sizeof(jlong));
- table = new LargeEntry[initialCapacity];
- commitTable = new StartCommit[maxCommits];
- memset(commitTable, 0, sizeof(StartCommit)*maxCommits);
- gmaxCommits = maxCommits;
-
- //Initialze the table not to interfere with garbage collection
- printf("Entry* %p %%8 %lu ...\n", table, ((size_t)table)%8);
- printf("MEMORY initialization start ...\n");
- for (int i = 0; i < initialCapacity; i++) {
- if (i%1000000==0) {
- printf("MEMORY i=%d\n", i);
- fflush(stdout);
- }
- //Entry* e2 = new Entry();
- //Entry* e3 = new Entry();
- //table[i].next = e2;
- //e2->next = e3;
- }
- printf("MEMORY initialization end\n");
- fflush(stdout);
-}
-
-
-// Returns the value to which the specified key is mapped in this map.
-// If there are multiple values with the same key, return the first
-// The first is the one with the largest key, because (i) put always
-// put the recent ones ahead, (ii) a new put on the same key has always
-// larger value (because value is commit timestamp and the map is atmoic)
-//
-// @param key a key in the hashtable.
-// @return the value to which the key is mapped in this hashtable;
-// <code>NULL</code> if the key is not mapped to any value in
-// this hashtable.
-
-/*
- * Class: CommitHashMap
- * Method: get
- * Signature: (JI)J
- */
-
-jbyte keyarray[MAX_KEY_SIZE];
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_get
-(JNIEnv * env , jobject jobj, jbyteArray rowId, jbyteArray tableId, jint hash) {
- totalget++;
- jsize rowidsize = env->GetArrayLength(rowId);
- jsize tableidsize = env->GetArrayLength(tableId);
- env->GetByteArrayRegion(rowId,0,rowidsize,keyarray);
- env->GetByteArrayRegion(tableId,0,tableidsize,keyarray + rowidsize * sizeof(jbyte));
- char keyarraysize = (rowidsize + tableidsize) * sizeof(jbyte);
- int index = (hash & 0x7FFFFFFF) % tableLength;
- for (Entry* e = &(table[index].e1); e != NULL; e = e->next) {
- totalwalkforget++;
- if (e->order == 0)//empty
- break;
- if (e->hash == hash && e->rowidsize == rowidsize && e->tableidsize == tableidsize)
- if (memcmp(e->key, keyarray, keyarraysize)==0)
- return e->value;
- }
- return 0;
-}
-
-/*
- * Class: CommitHashMap
- * Method: put
- * Signature: (JJJI)Z
- */
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_put
-(JNIEnv * env , jobject jobj, jbyteArray rowId, jbyteArray tableId, jlong value, jint hash, jlong largestDeletedTimestamp) {
- totalput++;
- int index = (hash & 0x7FFFFFFF) % tableLength;
- Entry* firstBucket = &(table[index].e1);
- bool keyarrayloaded = false;
- jsize rowidsize, tableidsize;
- unsigned int keyarraysize;
-
- Entry* lastEntry = NULL;//after the loop, it points to the last entry
- for (Entry* e = firstBucket; e != NULL; lastEntry = e, e = e->next) {
- totalwalkforput++;
-
- bool isOld = e->order == 0 ?
- true :
- largestOrder - e->order > threshold;
- if (isOld) {
- if (e->value > largestDeletedTimestamp) {
- largestDeletedTimestamp = e->value;
- }
-
- if (keyarrayloaded == false) {
- rowidsize = env->GetArrayLength(rowId);
- tableidsize = env->GetArrayLength(tableId);
- }
- if (e->key == NULL || (e->rowidsize + e->tableidsize) < (rowidsize + tableidsize)) {//not reusable
- free(e->key);
- //jbyte* key = (jbyte *)malloc(len * sizeof(jbyte));
- e->key = (jbyte *)malloc((rowidsize + tableidsize) * sizeof(jbyte));
- }
- if (keyarrayloaded == false) {
- env->GetByteArrayRegion(rowId,0,rowidsize,e->key);
- env->GetByteArrayRegion(tableId,0,tableidsize,e->key + rowidsize * sizeof(jbyte));
- }
- else memcpy(e->key, keyarray, keyarraysize);
-
- e->rowidsize = rowidsize;
- e->tableidsize = tableidsize;
- e->hash = hash;
- //e->tag = tag;
- e->value = value;
- e->order = ++largestOrder;
- return largestDeletedTimestamp;
- }
-
- if (keyarrayloaded == false) {
- rowidsize = env->GetArrayLength(rowId);
- tableidsize = env->GetArrayLength(tableId);
- keyarraysize = (rowidsize + tableidsize) * sizeof(jbyte);
- env->GetByteArrayRegion(rowId,0,rowidsize,keyarray);
- env->GetByteArrayRegion(tableId,0,tableidsize,keyarray + rowidsize * sizeof(jbyte));
- keyarrayloaded = true;
- }
- if (e->hash == hash && e->rowidsize == rowidsize && e->tableidsize == tableidsize) {
- if (memcmp(e->key, keyarray, keyarraysize)==0) {
- //e->tag = tag;
- e->value = value;
- e->order = ++largestOrder;
- return largestDeletedTimestamp;
- }
- }
- }
-
- //printf("new entry");
- // Creates the new entry.
- LargeEntry* le = new LargeEntry();
- Entry* newentry = &(le->e1);
- lastEntry->next = newentry;
- if (keyarrayloaded == false) {
- rowidsize = env->GetArrayLength(rowId);
- tableidsize = env->GetArrayLength(tableId);
- }
- newentry->key = (jbyte *)malloc((rowidsize + tableidsize) * sizeof(jbyte));
- if (keyarrayloaded == false) {
- env->GetByteArrayRegion(rowId,0,rowidsize,newentry->key);
- env->GetByteArrayRegion(tableId,0,tableidsize,newentry->key + rowidsize * sizeof(jbyte));
- }
- else memcpy(newentry->key, keyarray, keyarraysize);
- newentry->rowidsize = rowidsize;
- newentry->tableidsize = tableidsize;
- newentry->hash = hash;
- //newentry->tag = tag;
- newentry->value = value;
- newentry->order = ++largestOrder;
- //newentry->next = e;
- if (count % 100000 == 0) {
- printf("NNNNNNNNNNNNNNNNNNNew Entry %d\n" , count);
- fflush(stdout);
- }
- count++;
- return largestDeletedTimestamp;
-}
-
-
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_getCommittedTimestamp(JNIEnv *, jobject, jlong startTimestamp) {
- int key = startTimestamp % gmaxCommits;
- StartCommit& entry = commitTable[key];
- if (entry.start == startTimestamp)
- return entry.commit;
- return 0;//which means that there is not such entry in the array, either deleted or never entered
-}
-
-JNIEXPORT jlong JNICALL Java_com_yahoo_omid_tso_CommitHashMap_setCommitted(JNIEnv * env , jobject jobj, jlong startTimestamp, jlong commitTimestamp, jlong largestDeletedTimestamp) {
- int key = startTimestamp % gmaxCommits;
- StartCommit& entry = commitTable[key];
- //assume(entry.start != startTimestamp);
- if (entry.start != startTimestamp && entry.commit > largestDeletedTimestamp)
- largestDeletedTimestamp = entry.commit;
- entry.start = startTimestamp;
- entry.commit = commitTimestamp;
- return largestDeletedTimestamp;
-}
-
-
6 src/test/java/com/yahoo/omid/tso/TestClientHandler.java
View
@@ -45,6 +45,7 @@
import com.yahoo.omid.tso.messages.CommitRequest;
import com.yahoo.omid.tso.messages.CommitResponse;
import com.yahoo.omid.tso.messages.FullAbortRequest;
+import com.yahoo.omid.tso.messages.LargestDeletedTimestampReport;
import com.yahoo.omid.tso.messages.TimestampRequest;
/**
@@ -156,7 +157,10 @@ public Object receiveMessage() {
@SuppressWarnings("unchecked")
public <T extends TSOMessage> T receiveMessage(Class<T> type) {
try {
- TSOMessage msg = messageQueue.poll(5000, TimeUnit.SECONDS);
+ TSOMessage msg;
+ do {
+ msg = messageQueue.poll(5000, TimeUnit.SECONDS);
+ } while (msg instanceof LargestDeletedTimestampReport);
assertNotNull("Reception of message timed out", msg);
assertThat(msg, is(type));
return (T) msg;
1  src/test/java/com/yahoo/omid/tso/TestCommitQuery.java
View
@@ -1,4 +1,5 @@
/**
+
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Please sign in to comment.
Something went wrong with that request. Please try again.