Skip to content

Commit

Permalink
为Storage和StorageMap增加一些新的API,并优化append操作,补充相关测试
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 4, 2018
1 parent 7b44ff4 commit 2793fb0
Show file tree
Hide file tree
Showing 30 changed files with 532 additions and 525 deletions.
69 changes: 13 additions & 56 deletions lealone-aose/src/main/java/org/lealone/storage/aose/AOStorage.java
Expand Up @@ -17,22 +17,15 @@
*/ */
package org.lealone.storage.aose; package org.lealone.storage.aose;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;


import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.DataUtils; import org.lealone.common.util.DataUtils;
import org.lealone.common.util.IOUtils;
import org.lealone.db.DataBuffer; import org.lealone.db.DataBuffer;
import org.lealone.db.IDatabase; import org.lealone.db.IDatabase;
import org.lealone.db.RunMode; import org.lealone.db.RunMode;
Expand All @@ -59,17 +52,16 @@ public class AOStorage extends StorageBase {
public static final String SUFFIX_AO_FILE = ".db"; public static final String SUFFIX_AO_FILE = ".db";
public static final int SUFFIX_AO_FILE_LENGTH = SUFFIX_AO_FILE.length(); public static final int SUFFIX_AO_FILE_LENGTH = SUFFIX_AO_FILE.length();


private final Map<String, Object> config; private final IDatabase db;
protected IDatabase db;


AOStorage(Map<String, Object> config) { AOStorage(Map<String, Object> config) {
this.config = config; super(config);
this.db = (IDatabase) config.get("db"); this.db = (IDatabase) config.get("db");
String storageName = (String) config.get("storageName"); String storagePath = getStoragePath();
DataUtils.checkArgument(storageName != null, "The storage name may not be null"); DataUtils.checkArgument(storagePath != null, "The storage path may not be null");
if (!FileUtils.exists(storageName)) if (!FileUtils.exists(storagePath))
FileUtils.createDirectories(storageName); FileUtils.createDirectories(storagePath);
FilePath dir = FilePath.get(storageName); FilePath dir = FilePath.get(storagePath);
for (FilePath fp : dir.newDirectoryStream()) { for (FilePath fp : dir.newDirectoryStream()) {
String mapFullName = fp.getName(); String mapFullName = fp.getName();
if (mapFullName.startsWith(TEMP_NAME_PREFIX)) { if (mapFullName.startsWith(TEMP_NAME_PREFIX)) {
Expand Down Expand Up @@ -115,6 +107,7 @@ public <K, V> AOMap<K, V> openAOMap(String name, StorageDataType keyType, Storag
Map<String, String> parameters) { Map<String, String> parameters) {
BTreeMap<K, V> btreeMap = openBTreeMap(name, keyType, valueType, parameters); BTreeMap<K, V> btreeMap = openBTreeMap(name, keyType, valueType, parameters);
AOMap<K, V> map = new AOMap<>(btreeMap); AOMap<K, V> map = new AOMap<>(btreeMap);
maps.put(name, map); // 覆盖btreeMap
AOStorageService.addAOMap(map); AOStorageService.addAOMap(map);
return map; return map;
} }
Expand All @@ -123,6 +116,7 @@ public <K, V> BufferedMap<K, V> openBufferedMap(String name, StorageDataType key
Map<String, String> parameters) { Map<String, String> parameters) {
BTreeMap<K, V> btreeMap = openBTreeMap(name, keyType, valueType, parameters); BTreeMap<K, V> btreeMap = openBTreeMap(name, keyType, valueType, parameters);
BufferedMap<K, V> map = new BufferedMap<>(btreeMap); BufferedMap<K, V> map = new BufferedMap<>(btreeMap);
maps.put(name, map); // 覆盖btreeMap
AOStorageService.addBufferedMap(map); AOStorageService.addBufferedMap(map);
return map; return map;
} }
Expand All @@ -147,38 +141,14 @@ private <M extends StorageMap<K, V>, K, V> M openMap(String name, StorageMapBuil
return map; return map;
} }


public boolean isReadOnly() { boolean isReadOnly() {
return config.containsKey("readOnly"); return config.containsKey("readOnly");
} }


@Override @Override
public void backupTo(String fileName) { public void save() {
try { AOStorageService.forceMerge();
save(); super.save();
close(); // TODO 如何在不关闭存储的情况下备份,现在每个文件与FileStorage相关的在打开时就用排它锁锁住了,所以读不了
OutputStream zip = FileUtils.newOutputStream(fileName, false);
ZipOutputStream out = new ZipOutputStream(zip);
String storageName = (String) config.get("storageName");
String storageShortName = storageName.replace('\\', '/');
storageShortName = storageShortName.substring(storageShortName.lastIndexOf('/') + 1);
FilePath dir = FilePath.get(storageName);
for (FilePath map : dir.newDirectoryStream()) {
String entryNameBase = storageShortName + "/" + map.getName();
for (FilePath file : map.newDirectoryStream()) {
backupFile(out, file.newInputStream(), entryNameBase + "/" + file.getName());
}
}
out.close();
zip.close();
} catch (IOException e) {
throw DbException.convertIOException(e, fileName);
}
}

private static void backupFile(ZipOutputStream out, InputStream in, String entryName) throws IOException {
out.putNextEntry(new ZipEntry(entryName));
IOUtils.copyAndCloseInput(in, out);
out.closeEntry();
} }


private List<NetEndpoint> getReplicationEndpoints(String[] replicationHostIds) { private List<NetEndpoint> getReplicationEndpoints(String[] replicationHostIds) {
Expand Down Expand Up @@ -245,19 +215,6 @@ private void replicateRootPage(IDatabase db, StorageMap<?, ?> map, DataBuffer p,
} }
} }


@Override
public void drop() {
close();
String storageName = (String) config.get("storageName");
FileUtils.deleteRecursive(storageName, false);
}

@Override
public void save() {
AOStorageService.forceMerge();
super.save();
}

@Override @Override
public void scaleIn(Object dbObject, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints, public void scaleIn(Object dbObject, RunMode oldRunMode, RunMode newRunMode, String[] oldEndpoints,
String[] newEndpoints) { String[] newEndpoints) {
Expand Down
Expand Up @@ -36,14 +36,14 @@ public AOStorageBuilder(Map<String, String> defaultConfig) {
*/ */
@Override @Override
public AOStorage openStorage() { public AOStorage openStorage() {
String storageName = (String) config.get("storageName"); String storagePath = (String) config.get("storagePath");
AOStorage storage = cache.get(storageName); AOStorage storage = cache.get(storagePath);
if (storage == null) { if (storage == null) {
synchronized (cache) { synchronized (cache) {
storage = cache.get(storageName); storage = cache.get(storagePath);
if (storage == null) { if (storage == null) {
storage = new AOStorage(config); storage = new AOStorage(config);
cache.put(storageName, storage); cache.put(storagePath, storage);
} }
} }
} }
Expand Down
Expand Up @@ -23,14 +23,12 @@
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.common.util.DataUtils; import org.lealone.common.util.DataUtils;
import org.lealone.db.value.ValueLong; import org.lealone.db.value.ValueLong;
import org.lealone.storage.DelegatedStorageMap; import org.lealone.storage.DelegatedStorageMap;
import org.lealone.storage.PageKey; import org.lealone.storage.PageKey;
import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
import org.lealone.storage.StorageMapBase;
import org.lealone.storage.StorageMapCursor; import org.lealone.storage.StorageMapCursor;
import org.lealone.storage.type.StorageDataType; import org.lealone.storage.type.StorageDataType;


Expand All @@ -46,13 +44,10 @@
public class BufferedMap<K, V> extends DelegatedStorageMap<K, V> implements Callable<Void> { public class BufferedMap<K, V> extends DelegatedStorageMap<K, V> implements Callable<Void> {


private final ConcurrentSkipListMap<Object, Object> buffer = new ConcurrentSkipListMap<>(); private final ConcurrentSkipListMap<Object, Object> buffer = new ConcurrentSkipListMap<>();
private final AtomicLong lastKey = new AtomicLong(0);


public BufferedMap(StorageMap<K, V> map) { public BufferedMap(StorageMap<K, V> map) {
super(map); super(map);
if (map instanceof StorageMapBase) { map.setMaxKey(map.lastKey()); // 先读取上一次的最大key,用于支持append操作
lastKey.set(((StorageMapBase<K, V>) map).getLastKey());
}
} }


public StorageMap<K, V> getMap() { public StorageMap<K, V> getMap() {
Expand All @@ -69,28 +64,10 @@ public V get(K key) {


@Override @Override
public V put(K key, V value) { public V put(K key, V value) {
if (map instanceof StorageMapBase) { map.setMaxKey(key); // 更新最大key
((StorageMapBase<K, V>) map).setLastKey(key);
setLastKey(key);
}
return (V) buffer.put(key, value); return (V) buffer.put(key, value);
} }


private void setLastKey(Object key) {
if (key instanceof ValueLong) {
long k = ((ValueLong) key).getLong();
while (true) {
long old = lastKey.get();
if (k > old) {
if (lastKey.compareAndSet(old, k))
break;
} else {
break;
}
}
}
}

@Override @Override
public V putIfAbsent(K key, V value) { public V putIfAbsent(K key, V value) {
V old = get(key); V old = get(key);
Expand Down Expand Up @@ -317,8 +294,8 @@ public boolean needMerge() {


@Override @Override
public K append(V value) { public K append(V value) {
K key = (K) ValueLong.get(lastKey.incrementAndGet()); K key = (K) ValueLong.get(map.incrementAndGetMaxKeyAsLong());
put(key, value); buffer.put(key, value);
return key; return key;
} }


Expand Down
Expand Up @@ -153,7 +153,7 @@ public int binarySearch(Object key) {


@Override @Override
boolean needSplit() { boolean needSplit() {
return memory > map.storage.getPageSplitSize() && keys.length > 1; return memory > map.btreeStorage.getPageSplitSize() && keys.length > 1;
} }


/** /**
Expand Down Expand Up @@ -224,7 +224,7 @@ public void removePage() {
if (p == 0) { if (p == 0) {
removedInMemory = true; removedInMemory = true;
} }
map.storage.removePage(p, memory); map.btreeStorage.removePage(p, memory);
} }


void compressPage(DataBuffer buff, int compressStart, int type, int typePos) { void compressPage(DataBuffer buff, int compressStart, int type, int typePos) {
Expand Down

0 comments on commit 2793fb0

Please sign in to comment.