Skip to content

Commit

Permalink
StorageMap支撑append,用于实现rowKey递增
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 30, 2017
1 parent 4e8dc71 commit 2bf3ee8
Show file tree
Hide file tree
Showing 21 changed files with 382 additions and 47 deletions.
10 changes: 10 additions & 0 deletions lealone-aose/src/main/java/org/lealone/aose/storage/AOMap.java
Expand Up @@ -318,4 +318,14 @@ public void transferFrom(ReadableByteChannel src) throws IOException {
public Storage getStorage() { public Storage getStorage() {
return map.getStorage(); return map.getStorage();
} }

@Override
public K append(V value) {
beforeWrite();
try {
return map.append(value);
} finally {
afterWrite();
}
}
} }
Expand Up @@ -418,4 +418,9 @@ public void transferFrom(ReadableByteChannel src) throws IOException {
public Storage getStorage() { public Storage getStorage() {
return map.getStorage(); return map.getStorage();
} }

@Override
public K append(V value) {
return map.append(value);
}
} }
Expand Up @@ -13,12 +13,12 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.aose.config.ConfigDescriptor; import org.lealone.aose.config.ConfigDescriptor;
import org.lealone.aose.gms.Gossiper; import org.lealone.aose.gms.Gossiper;
import org.lealone.aose.server.P2pServer; import org.lealone.aose.server.P2pServer;
import org.lealone.aose.storage.AOStorage; import org.lealone.aose.storage.AOStorage;
import org.lealone.aose.storage.AOStorageEngine;
import org.lealone.aose.storage.StorageMapBuilder; import org.lealone.aose.storage.StorageMapBuilder;
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.DataUtils; import org.lealone.common.util.DataUtils;
Expand All @@ -27,6 +27,7 @@
import org.lealone.db.Database; import org.lealone.db.Database;
import org.lealone.db.ServerSession; import org.lealone.db.ServerSession;
import org.lealone.db.Session; import org.lealone.db.Session;
import org.lealone.db.value.ValueLong;
import org.lealone.net.NetEndpoint; import org.lealone.net.NetEndpoint;
import org.lealone.replication.Replication; import org.lealone.replication.Replication;
import org.lealone.replication.ReplicationSession; import org.lealone.replication.ReplicationSession;
Expand Down Expand Up @@ -81,6 +82,9 @@ public BTreeMap<K, V> openMap() {
*/ */
protected volatile BTreePage root; protected volatile BTreePage root;


// TODO 考虑是否要使用总是递增的数字
protected final AtomicLong lastKey = new AtomicLong(0);

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String, Object> config, protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String, Object> config,
AOStorage aoStorage) { AOStorage aoStorage) {
Expand Down Expand Up @@ -111,6 +115,11 @@ protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String
} }
} }
} }

K lastKey = lastKey();
if (lastKey != null && lastKey instanceof ValueLong) {
this.lastKey.set(((ValueLong) lastKey).getLong());
}
} }


@Override @Override
Expand Down Expand Up @@ -739,6 +748,16 @@ private List<NetEndpoint> getReplicationEndpoints(BTreePage p) {
return replicationEndpoints; return replicationEndpoints;
} }


private List<NetEndpoint> getLastPageReplicationEndpoints() {
BTreePage p = root;
while (true) {
if (p.isLeaf()) {
return getReplicationEndpoints(p);
}
p = p.getChildPage(getChildPageCount(p) - 1);
}
}

@Override @Override
public NetEndpoint getLocalEndpoint() { public NetEndpoint getLocalEndpoint() {
return ConfigDescriptor.getLocalEndpoint(); return ConfigDescriptor.getLocalEndpoint();
Expand Down Expand Up @@ -779,7 +798,7 @@ public Object put(Object key, Object value, DataType valueType, Session session)
valueBuffer.put(buffer); valueBuffer.put(buffer);
valueBuffer.flip(); valueBuffer.flip();
WriteBufferPool.offer(writeBuffer); WriteBufferPool.offer(writeBuffer);
byte[] oldValue = (byte[]) c.executePut(AOStorageEngine.NAME, getName(), keyBuffer, valueBuffer); byte[] oldValue = (byte[]) c.executePut(null, getName(), keyBuffer, valueBuffer);
if (oldValue == null) if (oldValue == null)
return null; return null;
return valueType.read(ByteBuffer.wrap(oldValue)); return valueType.read(ByteBuffer.wrap(oldValue));
Expand Down Expand Up @@ -914,4 +933,49 @@ private void removeLeafPage(BTreePage p, Object key) {
public Storage getStorage() { public Storage getStorage() {
return aoStorage; return aoStorage;
} }

@SuppressWarnings("unchecked")
@Override
public K append(V value) {
K key = (K) ValueLong.get(lastKey.incrementAndGet());
put(key, value);
return key;
}

@Override
public Object append(Object value, DataType valueType, Session session) {
List<NetEndpoint> replicationEndpoints = getLastPageReplicationEndpoints();
NetEndpoint localEndpoint = getLocalEndpoint();

Session[] sessions = new Session[replicationEndpoints.size()];
ServerSession s = (ServerSession) session;
int i = 0;
for (NetEndpoint e : replicationEndpoints) {
String hostId = P2pServer.instance.getTopologyMetaData().getHostId(e);
sessions[i++] = s.getNestedSession(hostId, !localEndpoint.equals(e));
}

ReplicationSession rs = new ReplicationSession(sessions);
rs.setRpcTimeout(ConfigDescriptor.getRpcTimeout());
rs.setAutoCommit(session.isAutoCommit());
StorageCommand c = null;
try {
c = rs.createStorageCommand();
WriteBuffer writeBuffer = WriteBufferPool.poll();
valueType.write(writeBuffer, value);
ByteBuffer buffer = writeBuffer.getBuffer();
buffer = writeBuffer.getBuffer();
buffer.flip();
ByteBuffer valueBuffer = ByteBuffer.allocateDirect(buffer.limit());
valueBuffer.put(buffer);
valueBuffer.flip();
WriteBufferPool.offer(writeBuffer);
return c.executeAppend(null, getName(), valueBuffer, null);
} catch (Exception e) {
throw DbException.convert(e);
} finally {
if (c != null)
c.close();
}
}
} }
58 changes: 48 additions & 10 deletions lealone-client/src/main/java/org/lealone/client/ClientCommand.java
Expand Up @@ -9,6 +9,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
Expand All @@ -28,6 +29,7 @@
import org.lealone.db.SysProperties; import org.lealone.db.SysProperties;
import org.lealone.db.result.Result; import org.lealone.db.result.Result;
import org.lealone.db.value.Value; import org.lealone.db.value.Value;
import org.lealone.db.value.ValueLong;
import org.lealone.net.AsyncCallback; import org.lealone.net.AsyncCallback;
import org.lealone.net.Transfer; import org.lealone.net.Transfer;
import org.lealone.storage.StorageCommand; import org.lealone.storage.StorageCommand;
Expand Down Expand Up @@ -444,8 +446,8 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
try { try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) { if (isDistributedUpdate) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_PUT", id); session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_PUT", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_PUT); transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_PUT);
} else if (replicationName != null) { } else if (replicationName != null) {
session.traceOperation("COMMAND_STORAGE_REPLICATION_PUT", id); session.traceOperation("COMMAND_STORAGE_REPLICATION_PUT", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_REPLICATION_PUT); transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_REPLICATION_PUT);
Expand All @@ -454,8 +456,7 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_PUT); transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_PUT);
} }
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key).writeByteBuffer(value); transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key).writeByteBuffer(value);
if (replicationName != null) transfer.writeString(replicationName);
transfer.writeString(replicationName);


AtomicReference<byte[]> resultRef = new AtomicReference<>(); AtomicReference<byte[]> resultRef = new AtomicReference<>();
AsyncCallback<Void> ac = new AsyncCallback<Void>() { AsyncCallback<Void> ac = new AsyncCallback<Void>() {
Expand All @@ -464,8 +465,7 @@ public void runInternal() {
try { try {
if (isDistributedUpdate) if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString()); session.getTransaction().addLocalTransactionNames(transfer.readString());
byte[] bytes = transfer.readBytes(); resultRef.set(transfer.readBytes());
resultRef.set(bytes);
} catch (IOException e) { } catch (IOException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
Expand All @@ -488,8 +488,8 @@ public Object executeGet(String mapName, ByteBuffer key) {
try { try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) { if (isDistributedUpdate) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_GET", id); session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_GET", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_GET); transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_GET);
} else { } else {
session.traceOperation("COMMAND_STORAGE_GET", id); session.traceOperation("COMMAND_STORAGE_GET", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_GET); transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_GET);
Expand All @@ -502,8 +502,7 @@ public void runInternal() {
try { try {
if (isDistributedUpdate) if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString()); session.getTransaction().addLocalTransactionNames(transfer.readString());
byte[] bytes = transfer.readBytes(); resultRef.set(transfer.readBytes());
resultRef.set(bytes);
} catch (IOException e) { } catch (IOException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
Expand Down Expand Up @@ -546,6 +545,45 @@ public void removeLeafPage(String mapName, ByteBuffer key) {
} }
} }


@Override
public Object executeAppend(String replicationName, String mapName, ByteBuffer value,
CommandUpdateResult commandUpdateResult) {
AtomicLong resultAL = new AtomicLong();
int id = session.getNextId();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_APPEND", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_APPEND);
} else {
session.traceOperation("COMMAND_STORAGE_APPEND", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_APPEND);
}
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(value);
transfer.writeString(replicationName);

AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());
resultAL.set(transfer.readLong());
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush();
ac.await();
} catch (Exception e) {
session.handleException(e);
}
commandUpdateResult.addResult(this, resultAL.get());
return ValueLong.get(resultAL.get());
}

/** /**
* A client side parameter. * A client side parameter.
*/ */
Expand Down
7 changes: 5 additions & 2 deletions lealone-common/src/main/java/org/lealone/db/Session.java
Expand Up @@ -68,10 +68,13 @@ public interface Session extends Closeable, Transaction.Participant {


public static final int COMMAND_STORAGE_PUT = 160; public static final int COMMAND_STORAGE_PUT = 160;
public static final int COMMAND_STORAGE_REPLICATION_PUT = 161; public static final int COMMAND_STORAGE_REPLICATION_PUT = 161;
public static final int COMMAND_STORAGE_DISTRIBUTED_PUT = 162; public static final int COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_PUT = 162;
public static final int COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_REPLICATION_PUT = 163;
public static final int COMMAND_STORAGE_APPEND = 164;
public static final int COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_APPEND = 165;


public static final int COMMAND_STORAGE_GET = 170; public static final int COMMAND_STORAGE_GET = 170;
public static final int COMMAND_STORAGE_DISTRIBUTED_GET = 171; public static final int COMMAND_STORAGE_DISTRIBUTED_TRANSACTION_GET = 171;


public static final int COMMAND_STORAGE_MOVE_LEAF_PAGE = 180; public static final int COMMAND_STORAGE_MOVE_LEAF_PAGE = 180;
public static final int COMMAND_STORAGE_REMOVE_LEAF_PAGE = 181; public static final int COMMAND_STORAGE_REMOVE_LEAF_PAGE = 181;
Expand Down
Expand Up @@ -37,4 +37,6 @@ public interface Replication {
void addLeafPage(ByteBuffer splitKey, ByteBuffer page); void addLeafPage(ByteBuffer splitKey, ByteBuffer page);


void removeLeafPage(ByteBuffer key); void removeLeafPage(ByteBuffer key);

Object append(Object value, DataType valueType, Session session);
} }
Expand Up @@ -345,4 +345,55 @@ public Command prepare() {
return this; return this;
} }


@Override
public Object executeAppend(String replicationName, String mapName, ByteBuffer value,
CommandUpdateResult commandUpdateResult) {
return executeAppend(mapName, value, 1);
}

private Object executeAppend(final String mapName, final ByteBuffer value, int tries) {
int n = session.n;
final String rn = session.createReplicationName();
final WriteResponseHandler writeResponseHandler = new WriteResponseHandler(n);
final ArrayList<Runnable> commands = New.arrayList(n);
final ArrayList<Exception> exceptions = New.arrayList(1);
final CommandUpdateResult commandUpdateResult = new CommandUpdateResult(session.n, session.w,
session.isAutoCommit(), this.commands);

for (int i = 0; i < n; i++) {
final StorageCommand c = (StorageCommand) this.commands[i];
Runnable command = new Runnable() {
@Override
public void run() {
try {
writeResponseHandler.response(c.executeAppend(rn, mapName, value.slice(), commandUpdateResult));
} catch (Exception e) {
if (writeResponseHandler != null)
writeResponseHandler.onFailure();
exceptions.add(e);
}
}
};
commands.add(command);
}

for (int i = 0; i < n; i++) {
ThreadPool.executor.submit(commands.get(i));
}

try {
Object result = writeResponseHandler.getResult(session.rpcTimeoutMillis);
commandUpdateResult.validate();
return result;
} catch (WriteTimeoutException | WriteFailureException e) {
if (tries < session.maxRries) {
value.rewind();
return executeAppend(mapName, value, ++tries);
} else {
if (!exceptions.isEmpty())
e.initCause(exceptions.get(0));
throw e;
}
}
}
} }
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;


import org.lealone.db.Command; import org.lealone.db.Command;
import org.lealone.db.CommandUpdateResult;


public interface StorageCommand extends Command { public interface StorageCommand extends Command {


Expand All @@ -31,4 +32,7 @@ public interface StorageCommand extends Command {


void removeLeafPage(String mapName, ByteBuffer key); void removeLeafPage(String mapName, ByteBuffer key);


Object executeAppend(String replicationName, String mapName, ByteBuffer value,
CommandUpdateResult commandUpdateResult);

} }
Expand Up @@ -219,4 +219,6 @@ public interface StorageMap<K, V> {
void transferFrom(ReadableByteChannel src) throws IOException; void transferFrom(ReadableByteChannel src) throws IOException;


Storage getStorage(); Storage getStorage();

K append(V value);
} }
Expand Up @@ -22,7 +22,9 @@
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.db.value.ValueLong;
import org.lealone.storage.Storage; import org.lealone.storage.Storage;
import org.lealone.storage.StorageMapBase; import org.lealone.storage.StorageMapBase;
import org.lealone.storage.StorageMapCursor; import org.lealone.storage.StorageMapCursor;
Expand Down Expand Up @@ -225,4 +227,14 @@ public void transferFrom(ReadableByteChannel src) throws IOException {
public Storage getStorage() { public Storage getStorage() {
return memoryStorage; return memoryStorage;
} }

private final AtomicLong lastKey = new AtomicLong(0);

@SuppressWarnings("unchecked")
@Override
public K append(V value) {
K key = (K) ValueLong.get(lastKey.incrementAndGet());
skipListMap.put(key, value);
return key;
}
} }
Expand Up @@ -112,6 +112,8 @@ <K, V> TransactionMap<K, V> openMap(String name, String mapType, DataType keyTyp


void rollbackToSavepoint(int savepointId); void rollbackToSavepoint(int savepointId);


void replicationPrepareCommit(long validKey);

interface Participant { interface Participant {
void addSavepoint(String name); void addSavepoint(String name);


Expand Down

0 comments on commit 2bf3ee8

Please sign in to comment.