Skip to content

Commit

Permalink
实现增量恢复: 只有打开某个Map时才恢复它的数据
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 21, 2015
1 parent 9a40f48 commit b71ddac
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 13 deletions.
Expand Up @@ -58,6 +58,7 @@ public void run() {
te.init(config);
Transaction t = te.beginTransaction(false);
TransactionMap<String, String> map = t.openMap("test", storage);
map.clear();
map.put("1", "a");
map.put("2", "b");
assertEquals("a", map.get("1"));
Expand Down
Expand Up @@ -164,6 +164,7 @@ public <K, V> MVCCTransactionMap<K, V> openMap(String name, String mapType, Data
checkNotClosed();
StorageMap<K, VersionedValue> map = storage.openMap(name, mapType, keyType, new VersionedValueType(valueType),
null);
transactionEngine.redo(map);
transactionEngine.addMap((StorageMap<Object, VersionedValue>) map);
return new MVCCTransactionMap<>(this, map);
}
Expand Down
Expand Up @@ -7,9 +7,11 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -62,6 +64,8 @@ public class MVCCTransactionEngine extends TransactionEngineBase {
*/
LogMap<Long, RedoLogValue> redoLog;

HashMap<String, ArrayList<RedoLogValue>> pendingRedoLog = new HashMap<>();

public MVCCTransactionEngine() {
super(Constants.DEFAULT_TRANSACTION_ENGINE_NAME);
}
Expand Down Expand Up @@ -97,6 +101,7 @@ public synchronized void init(Map<String, String> config) {
undoLog = logStorage.openLogMap("undoLog", new ObjectDataType(), undoLogValueType);

redoLog = logStorage.openLogMap("redoLog", new ObjectDataType(), new RedoLogValueType());
initPendingRedoLog();

initTransactions();

Expand All @@ -110,6 +115,17 @@ public synchronized void init(Map<String, String> config) {
TransactionValidator.getInstance().start();
}

private void initPendingRedoLog() {
for (Entry<Long, RedoLogValue> e : redoLog.entrySet()) {
ArrayList<RedoLogValue> logs = pendingRedoLog.get(e.getValue().mapName);
if (logs == null) {
logs = new ArrayList<>();
pendingRedoLog.put(e.getValue().mapName, logs);
}
logs.add(e.getValue());
}
}

private void initTransactions() {
List<Transaction> list = getOpenTransactions();
for (Transaction t : list) {
Expand Down Expand Up @@ -322,23 +338,47 @@ void commitAfterValidate(int tid) {
removeUndoLog(tid, Long.MAX_VALUE);
}

private void redoLog(Long operationId, String mapName, Object key, VersionedValue value) {
@SuppressWarnings("unchecked")
<K, V> void redo(StorageMap<K, V> map) {
ArrayList<RedoLogValue> logs = pendingRedoLog.remove(map.getName());
if (logs != null) {
K key;
DataType kt = map.getKeyType();
DataType dt = ((VersionedValueType) map.getValueType()).valueType;
for (RedoLogValue log : logs) {
key = (K) kt.read(log.key);
if (log.value == LogChunkMap.EMPTY_BUFFER)
map.remove(key);
else {
map.put(key, (V) new VersionedValue(dt.read(log.value)));
}
}
}
}

private void addRedoLog(Long operationId, String mapName, Object key, VersionedValue value) {
WriteBuffer writeBuffer = WriteBufferPool.poll();
StorageMap<?, ?> map = maps.get(mapName);

map.getKeyType().write(writeBuffer, key);
ByteBuffer keyBuffer = writeBuffer.getBuffer();

ByteBuffer buffer = writeBuffer.getBuffer();
buffer.flip();

ByteBuffer keyBuffer = ByteBuffer.allocateDirect(buffer.limit());
keyBuffer.put(buffer);
keyBuffer.flip();
keyBuffer = keyBuffer.duplicate();

writeBuffer.clear();

ByteBuffer valueBuffer;
if (value != null) {
((VersionedValueType) map.getValueType()).valueType.write(writeBuffer, value.value);
valueBuffer = writeBuffer.getBuffer();
buffer = writeBuffer.getBuffer();
buffer.flip();
valueBuffer = ByteBuffer.allocateDirect(buffer.limit());
valueBuffer.put(buffer);
valueBuffer.flip();
valueBuffer = valueBuffer.duplicate();
} else {
valueBuffer = LogChunkMap.EMPTY_BUFFER;
}
Expand Down Expand Up @@ -377,15 +417,15 @@ private void removeUndoLog(int tid, long maxLogId) {
if (value == null) {
// nothing to do
} else if (value.value == null) {
redoLog(undoKey, mapName, key, null);
addRedoLog(undoKey, mapName, key, null);
// remove the value
// map.remove(key);
logs.add(new Object[] { map, key, undoKey });
lastOperationId = undoKey;
} else {
VersionedValue v2 = new VersionedValue();
v2.value = value.value;
redoLog(undoKey, mapName, key, v2);
addRedoLog(undoKey, mapName, key, v2);
// map.put(key, v2);
logs.add(new Object[] { map, key, v2, undoKey });
lastOperationId = undoKey;
Expand Down
Expand Up @@ -20,6 +20,13 @@ class VersionedValue {
*/
public Object value;

public VersionedValue() {
}

public VersionedValue(Object value) {
this.value = value;
}

@Override
public String toString() {
StringBuilder buff = new StringBuilder();
Expand Down
Expand Up @@ -69,6 +69,7 @@ private void read() {
K k = (K) keyType.read(buffer);
V v = (V) valueType.read(buffer);
put(k, v);
lastSyncKey = k;
}
}

Expand Down
Expand Up @@ -20,9 +20,9 @@
import java.nio.ByteBuffer;

public class RedoLogValue {
final String mapName;
final ByteBuffer key;
final ByteBuffer value;
public final String mapName;
public final ByteBuffer key;
public final ByteBuffer value;

public RedoLogValue(String mapName, ByteBuffer key, ByteBuffer value) {
this.mapName = mapName;
Expand Down
Expand Up @@ -62,9 +62,15 @@ public Object read(ByteBuffer buff) {
buff.get(key);
ByteBuffer keyBuffer = ByteBuffer.wrap(key);

byte[] value = new byte[DataUtils.readVarInt(buff)];
buff.get(value);
ByteBuffer valueBuffer = ByteBuffer.wrap(value);
ByteBuffer valueBuffer;
int len = DataUtils.readVarInt(buff);
if (len > 0) {
byte[] value = new byte[len];
buff.get(value);
valueBuffer = ByteBuffer.wrap(value);
} else {
valueBuffer = LogChunkMap.EMPTY_BUFFER;
}
return new RedoLogValue(mapName, keyBuffer, valueBuffer);
}

Expand Down

0 comments on commit b71ddac

Please sign in to comment.