Skip to content

Commit

Permalink
允许查找之前的事务日志
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 14, 2015
1 parent eab256a commit f28e6f0
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 50 deletions.
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.test.transaction;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.lealone.test.UnitTestBase;
import org.lealone.transaction.log.LogMap;
import org.lealone.transaction.log.LogStorage;

public class LogMapTest extends UnitTestBase {
@Test
public void run() {
Map<String, String> config = new HashMap<>();
config.put("base_dir", joinDirs("transaction-test"));
config.put("transaction_log_dir", "tlog");
config.put("log_sync_type", "none");
config.put("log_chunk_size", "128");

LogStorage ls = new LogStorage(config);
LogMap<Integer, Integer> map = ls.openLogMap("test", null, null);// 自动侦测key/value的类型

for (int i = 10; i < 100; i++) {
if (i > 10 && i % 10 == 0)
map.save();
map.put(i, i * 10);
}

assertEquals(10 * 10, map.get(10).intValue());
assertEquals(50 * 10, map.get(50).intValue());
assertEquals(99 * 10, map.get(99).intValue());

assertNull(map.get(100));

map.remove();
ls.close();
}
}
Expand Up @@ -5,7 +5,6 @@
*/ */
package org.lealone.transaction; package org.lealone.transaction;


import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
Expand Down Expand Up @@ -86,10 +85,6 @@ public synchronized void init(Map<String, String> config) {
isClusterMode = Boolean.parseBoolean(config.get("is_cluster_mode")); isClusterMode = Boolean.parseBoolean(config.get("is_cluster_mode"));
hostAndPort = config.get("host_and_port"); hostAndPort = config.get("host_and_port");


String baseDir = config.get("base_dir");
String logDir = config.get("transaction_log_dir");
String storageName = baseDir + File.separator + logDir;
config.put("storageName", storageName);
logStorage = new LogStorage(config); logStorage = new LogStorage(config);


// undoLog中存放的是所有事务的事务日志, // undoLog中存放的是所有事务的事务日志,
Expand Down
Expand Up @@ -36,7 +36,7 @@
* *
* @author zhh * @author zhh
*/ */
public class LogChunkMap<K, V> extends MemoryMap<K, V> { public class LogChunkMap<K, V> extends MemoryMap<K, V> implements Comparable<LogChunkMap<K, V>> {
public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);


protected final FileStorage fileStorage; protected final FileStorage fileStorage;
Expand All @@ -47,15 +47,20 @@ public class LogChunkMap<K, V> extends MemoryMap<K, V> {
public LogChunkMap(int id, String name, DataType keyType, DataType valueType, Map<String, String> config) { public LogChunkMap(int id, String name, DataType keyType, DataType valueType, Map<String, String> config) {
super(id, name, keyType, valueType); super(id, name, keyType, valueType);


String storageName = config.get("storageName"); name = getChunkFileName(config, id, name);
name = storageName + File.separator + name + LogStorage.MAP_NAME_ID_SEPARATOR + id;
fileStorage = new FileStorage(); fileStorage = new FileStorage();
fileStorage.open(name, config); fileStorage.open(name, config);
pos = fileStorage.size(); pos = fileStorage.size();
if (pos > 0) if (pos > 0)
read(); read();
} }


static String getChunkFileName(Map<String, String> config, int id, String name) {
String storageName = config.get("storageName");
name = storageName + File.separator + name + LogStorage.MAP_NAME_ID_SEPARATOR + id;
return name;
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void read() { private void read() {
ByteBuffer buffer = fileStorage.readFully(0, (int) pos); ByteBuffer buffer = fileStorage.readFully(0, (int) pos);
Expand All @@ -70,7 +75,8 @@ private void read() {
public synchronized void save() { public synchronized void save() {
WriteBuffer buff = WriteBufferPool.poll(); WriteBuffer buff = WriteBufferPool.poll();
K lastKey = this.lastSyncKey; K lastKey = this.lastSyncKey;
Set<Entry<K, V>> entrySet = lastKey == null ? skipListMap.entrySet() : skipListMap.tailMap(lastKey).entrySet(); Set<Entry<K, V>> entrySet = lastKey == null ? skipListMap.entrySet() : skipListMap.tailMap(lastKey, false)
.entrySet();
for (Entry<K, V> e : entrySet) { for (Entry<K, V> e : entrySet) {
lastKey = e.getKey(); lastKey = e.getKey();
keyType.write(buff, lastKey); keyType.write(buff, lastKey);
Expand All @@ -91,7 +97,6 @@ public synchronized void save() {
@Override @Override
public void close() { public void close() {
save(); save();
clear();
super.close(); super.close();
fileStorage.close(); fileStorage.close();
} }
Expand All @@ -114,4 +119,14 @@ Set<Entry<K, V>> entrySet() {
K getLastSyncKey() { K getLastSyncKey() {
return lastSyncKey; return lastSyncKey;
} }

@Override
public int compareTo(LogChunkMap<K, V> o) {
return this.getId() - o.getId();
}

@Override
public String toString() {
return "LogChunkMap[" + getId() + ", " + getName() + "]";
}
} }
Expand Up @@ -20,10 +20,11 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ConcurrentSkipListSet;


import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
import org.lealone.storage.StorageMapCursor; import org.lealone.storage.StorageMapCursor;
import org.lealone.storage.fs.FileUtils;
import org.lealone.storage.type.DataType; import org.lealone.storage.type.DataType;
import org.lealone.storage.type.ObjectDataType; import org.lealone.storage.type.ObjectDataType;


Expand All @@ -39,7 +40,7 @@ public class LogMap<K, V> implements StorageMap<K, V> {


private static final long DEFAULT_LOG_CHUNK_SIZE = 32 * 1024 * 1024; private static final long DEFAULT_LOG_CHUNK_SIZE = 32 * 1024 * 1024;


private final CopyOnWriteArrayList<LogChunkMap<K, V>> chunks = new CopyOnWriteArrayList<>(); private final ConcurrentSkipListSet<LogChunkMap<K, V>> chunks = new ConcurrentSkipListSet<>();
private LogChunkMap<K, V> current; private LogChunkMap<K, V> current;


private int id; private int id;
Expand Down Expand Up @@ -91,15 +92,37 @@ public DataType getValueType() {
@Override @Override
public V get(K key) { public V get(K key) {
V v = current.get(key); V v = current.get(key);
if (v == null && chunks.isEmpty()) { if (v == null) {
// TODO read old if (chunks.isEmpty()) {
LogChunkMap<K, V> chunk = new LogChunkMap<>(id, name, keyType, valueType, config); return getFromPreviousChunk(key, id);
chunks.add(chunk); } else {
v = chunks.get(0).get(key); for (LogChunkMap<K, V> c : chunks) {
v = c.get(key);
if (v != null)
return v;
}
return getFromPreviousChunk(key, chunks.first().getId());
}
} }
return v; return v;
} }


private V getFromPreviousChunk(K key, Integer currentId) {
V v;
while (true) {
Integer previousId = LogStorage.getPreviousId(getName(), currentId);
if (previousId == null)
return null;

LogChunkMap<K, V> chunk = new LogChunkMap<>(previousId, name, keyType, valueType, config);
chunks.add(chunk);
v = chunk.get(key);
if (v != null)
return v;
currentId = previousId;
}
}

@Override @Override
public V put(K key, V value) { public V put(K key, V value) {
return current.put(key, value); return current.put(key, value);
Expand Down Expand Up @@ -183,11 +206,25 @@ public StorageMapCursor<K, V> cursor(K from) {
@Override @Override
public void clear() { public void clear() {
current.clear(); current.clear();
for (LogChunkMap<K, V> c : chunks) {
c.clear();
}
} }


@Override @Override
public void remove() { public void remove() {
current.remove(); current.close();
for (LogChunkMap<K, V> c : chunks) {
c.close();
}

Integer id = current.getId();

do {
FileUtils.delete(LogChunkMap.getChunkFileName(config, id, name));
id = LogStorage.getPreviousId(name, id);
} while (id != null);

LogStorage.logMaps.remove(this); LogStorage.logMaps.remove(this);
} }


Expand All @@ -206,6 +243,7 @@ public void save() {
current.save(); current.save();
if (current.logChunkSize() > logChunkSize) { if (current.logChunkSize() > logChunkSize) {
current.close(); current.close();
LogStorage.addMapId(name, id);
current = new LogChunkMap<>(++id, name, keyType, valueType, config); current = new LogChunkMap<>(++id, name, keyType, valueType, config);
} }
} }
Expand All @@ -217,4 +255,9 @@ public Set<Entry<K, V>> entrySet() {
public K getLastSyncKey() { public K getLastSyncKey() {
return current.getLastSyncKey(); return current.getLastSyncKey();
} }

@Override
public String toString() {
return "LogMap[" + getId() + ", " + getName() + "]";
}
} }
Expand Up @@ -17,8 +17,10 @@
*/ */
package org.lealone.transaction.log; package org.lealone.transaction.log;


import java.io.File;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;


import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
Expand All @@ -37,9 +39,12 @@ public class LogStorage {


private static final String TEMP_MAP_NAME_PREFIX = "temp" + MAP_NAME_ID_SEPARATOR; private static final String TEMP_MAP_NAME_PREFIX = "temp" + MAP_NAME_ID_SEPARATOR;


private static final ConcurrentHashMap<String, ConcurrentSkipListSet<Integer>> ids = new ConcurrentHashMap<>();

static final CopyOnWriteArrayList<LogMap<?, ?>> logMaps = new CopyOnWriteArrayList<>(); static final CopyOnWriteArrayList<LogMap<?, ?>> logMaps = new CopyOnWriteArrayList<>();


private final ConcurrentHashMap<String, Integer> ids = new ConcurrentHashMap<>(); static LogMap<?, ?> redoLog;

private final Map<String, String> config; private final Map<String, String> config;


public final LogSyncService logSyncService; public final LogSyncService logSyncService;
Expand All @@ -49,44 +54,43 @@ public class LogStorage {
*/ */
private int nextTempMapId; private int nextTempMapId;


static LogMap<?, ?> redoLog;

public LogStorage(Map<String, String> config) { public LogStorage(Map<String, String> config) {
this.config = config; this.config = config;
String storageName = config.get("storageName"); String baseDir = config.get("base_dir");
if (storageName != null) { String logDir = config.get("transaction_log_dir");
if (!FileUtils.exists(storageName)) String storageName = baseDir + File.separator + logDir;
FileUtils.createDirectories(storageName); config.put("storageName", storageName);


FilePath dir = FilePath.get(storageName); if (!FileUtils.exists(storageName))
for (FilePath fp : dir.newDirectoryStream()) { FileUtils.createDirectories(storageName);
String mapFullName = fp.getName();
if (mapFullName.startsWith(TEMP_MAP_NAME_PREFIX)) { FilePath dir = FilePath.get(storageName);
fp.delete(); for (FilePath fp : dir.newDirectoryStream()) {
continue; String mapFullName = fp.getName();
} if (mapFullName.startsWith(TEMP_MAP_NAME_PREFIX)) {

fp.delete();
int mapIdStartPos = mapFullName.lastIndexOf(MAP_NAME_ID_SEPARATOR); continue;
if (mapIdStartPos > 0) { }
String mapName = mapFullName.substring(0, mapIdStartPos);
int mapId = Integer.parseInt(mapFullName.substring(mapIdStartPos + 1));
Integer oldMapId = ids.put(mapName, mapId);
if (oldMapId != null && mapId < oldMapId)
ids.put(mapName, oldMapId);
}


int mapIdStartPos = mapFullName.lastIndexOf(MAP_NAME_ID_SEPARATOR);
if (mapIdStartPos > 0) {
String mapName = mapFullName.substring(0, mapIdStartPos);
int mapId = Integer.parseInt(mapFullName.substring(mapIdStartPos + 1));
addMapId(mapName, mapId);
} }
} }

String logSyncType = config.get("log_sync_type"); String logSyncType = config.get("log_sync_type");
if (logSyncType == null || "periodic".equalsIgnoreCase(logSyncType)) if (logSyncType == null || "periodic".equalsIgnoreCase(logSyncType))
logSyncService = new PeriodicLogSyncService(config); logSyncService = new PeriodicLogSyncService(config);
else if ("batch".equalsIgnoreCase(logSyncType)) else if ("batch".equalsIgnoreCase(logSyncType))
logSyncService = new BatchLogSyncService(config); logSyncService = new BatchLogSyncService(config);
else if ("none".equalsIgnoreCase(logSyncType))
logSyncService = null;
else else
throw new IllegalArgumentException("Unknow log_sync_type:" + logSyncType); throw new IllegalArgumentException("Unknow log_sync_type: " + logSyncType);


logSyncService.start(); if (logSyncService != null)
logSyncService.start();
} }


public synchronized StorageMap<Object, Integer> createTempMap() { public synchronized StorageMap<Object, Integer> createTempMap() {
Expand All @@ -97,7 +101,7 @@ public synchronized StorageMap<Object, Integer> createTempMap() {
public <K, V> LogMap<K, V> openLogMap(String name, DataType keyType, DataType valueType) { public <K, V> LogMap<K, V> openLogMap(String name, DataType keyType, DataType valueType) {
int mapId = 1; int mapId = 1;
if (ids.containsKey(name)) if (ids.containsKey(name))
mapId = ids.get(name); mapId = ids.get(name).last();
LogMap<K, V> m = new LogMap<>(mapId, name, keyType, valueType, config); LogMap<K, V> m = new LogMap<>(mapId, name, keyType, valueType, config);
logMaps.add(m); logMaps.add(m);
if ("redoLog".equals(name)) if ("redoLog".equals(name))
Expand All @@ -108,10 +112,13 @@ public <K, V> LogMap<K, V> openLogMap(String name, DataType keyType, DataType va
public synchronized void close() { public synchronized void close() {
for (StorageMap<?, ?> map : logMaps) for (StorageMap<?, ?> map : logMaps)
map.save(); map.save();
logSyncService.close();
try { if (logSyncService != null) {
logSyncService.join(); logSyncService.close();
} catch (InterruptedException e) { try {
logSyncService.join();
} catch (InterruptedException e) {
}
} }


for (StorageMap<?, ?> map : logMaps) for (StorageMap<?, ?> map : logMaps)
Expand All @@ -120,4 +127,21 @@ public synchronized void close() {
logMaps.clear(); logMaps.clear();
ids.clear(); ids.clear();
} }

public static Integer getPreviousId(String name, Integer currentId) {
Integer id = null;
if (ids.containsKey(name))
id = ids.get(name).lower(currentId);
return id;
}

public static void addMapId(String mapName, Integer mapId) {
ConcurrentSkipListSet<Integer> set = ids.get(mapName);
if (set == null) {
set = new ConcurrentSkipListSet<Integer>();
ids.putIfAbsent(mapName, set);
set = ids.get(mapName);
}
set.add(mapId);
}
} }

0 comments on commit f28e6f0

Please sign in to comment.