From 7ea0b2821d1d13f5257327e62730fdcf358d53f1 Mon Sep 17 00:00:00 2001 From: Honghua Zhu Date: Fri, 25 Sep 2015 23:25:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=96=B0=E7=9A=84AOStorageEn?= =?UTF-8?q?gine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/lealone/db/Constants.java | 2 +- .../java/org/lealone/db/table/MVTable.java | 26 +- .../java/org/lealone/server/TcpServer.java | 2 +- .../java/org/lealone/aostore/AOStore.java | 148 +++- .../org/lealone/aostore/btree/BTreeMap.java | 7 +- .../org/lealone/aostore/btree/BTreeStore.java | 99 +-- .../org/lealone/storage/AOMapBuilder.java | 41 ++ .../org/lealone/storage/AOStorageEngine.java | 693 ++++++++++++++++++ .../org/lealone/storage/MVStorageEngine.java | 3 +- .../org.lealone.storage.StorageEngine | 1 + 10 files changed, 903 insertions(+), 119 deletions(-) create mode 100644 lealone-storage/engine/src/main/java/org/lealone/storage/AOMapBuilder.java create mode 100644 lealone-storage/engine/src/main/java/org/lealone/storage/AOStorageEngine.java diff --git a/lealone-common/src/main/java/org/lealone/db/Constants.java b/lealone-common/src/main/java/org/lealone/db/Constants.java index 321775f21..97fc3f395 100644 --- a/lealone-common/src/main/java/org/lealone/db/Constants.java +++ b/lealone-common/src/main/java/org/lealone/db/Constants.java @@ -23,7 +23,7 @@ public class Constants { */ public static final String PROJECT_NAME_PREFIX = PROJECT_NAME + "."; - public static final String DEFAULT_STORAGE_ENGINE_NAME = "MVStore"; + public static final String DEFAULT_STORAGE_ENGINE_NAME = "AOStore"; public static final String DEFAULT_SQL_ENGINE_NAME = PROJECT_NAME; diff --git a/lealone-db/src/main/java/org/lealone/db/table/MVTable.java b/lealone-db/src/main/java/org/lealone/db/table/MVTable.java index c9395fdc6..4c06e1ac7 100644 --- a/lealone-db/src/main/java/org/lealone/db/table/MVTable.java +++ b/lealone-db/src/main/java/org/lealone/db/table/MVTable.java @@ -40,9 +40,7 @@ import org.lealone.db.result.SortOrder; import org.lealone.db.schema.SchemaObject; import org.lealone.storage.TransactionStorageEngine; -import org.lealone.storage.type.ObjectDataType; import org.lealone.transaction.Transaction; -import org.lealone.transaction.TransactionMap; /** * A table stored in a MVStore. @@ -71,7 +69,7 @@ public class MVTable extends TableBase { private final TransactionStorageEngine storageEngine; private boolean containsGlobalUniqueIndex; - private final TransactionMap rowVersionMap; + // private final TransactionMap rowVersionMap; public MVTable(CreateTableData data, TransactionStorageEngine storageEngine) { super(data); @@ -85,8 +83,8 @@ public MVTable(CreateTableData data, TransactionStorageEngine storageEngine) { } traceLock = database.getTrace(Trace.LOCK); - rowVersionMap = storageEngine.openMap(data.session, getName() + "_row_version", new ObjectDataType(), - new ObjectDataType()); + // rowVersionMap = storageEngine.openMap(data.session, getName() + "_row_version", new ObjectDataType(), + // new ObjectDataType()); } /** @@ -801,13 +799,13 @@ public boolean containsGlobalUniqueIndex() { return containsGlobalUniqueIndex; } - @Override - public long getRowVersion(long rowKey) { - return rowVersionMap.get(rowKey); - } - - @Override - public TransactionMap getRowVersionMap() { - return rowVersionMap; - } + // @Override + // public long getRowVersion(long rowKey) { + // return rowVersionMap.get(rowKey); + // } + // + // @Override + // public TransactionMap getRowVersionMap() { + // return rowVersionMap; + // } } diff --git a/lealone-server/src/main/java/org/lealone/server/TcpServer.java b/lealone-server/src/main/java/org/lealone/server/TcpServer.java index 6600accc4..aede876df 100644 --- a/lealone-server/src/main/java/org/lealone/server/TcpServer.java +++ b/lealone-server/src/main/java/org/lealone/server/TcpServer.java @@ -128,7 +128,7 @@ private void initManagementDb() { managementDb = conn; SERVERS.put(port, this); } catch (SQLException e) { - DbException.convert(e); + throw DbException.convert(e); } finally { JdbcUtils.closeSilently(stat); } diff --git a/lealone-storage/aostore/src/main/java/org/lealone/aostore/AOStore.java b/lealone-storage/aostore/src/main/java/org/lealone/aostore/AOStore.java index dd813c980..c51e7297c 100644 --- a/lealone-storage/aostore/src/main/java/org/lealone/aostore/AOStore.java +++ b/lealone-storage/aostore/src/main/java/org/lealone/aostore/AOStore.java @@ -13,15 +13,16 @@ */ package org.lealone.aostore; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; import org.lealone.aostore.btree.BTreeMap; import org.lealone.aostore.btree.BTreeStore; import org.lealone.common.util.DataUtils; import org.lealone.common.util.New; -import org.lealone.storage.fs.FilePath; import org.lealone.storage.fs.FileUtils; // adaptive optimization store @@ -43,7 +44,7 @@ public class AOStore { public static final String SUFFIX_AO_STORE_TEMP_FILE = ".tempFile"; private final HashMap config; - private final HashMap> maps = new HashMap>(); + private final ConcurrentSkipListMap> maps = new ConcurrentSkipListMap>(); private int lastMapId; @@ -53,14 +54,20 @@ public class AOStore { if (storeName != null) { String sn = storeName.toString(); if (!FileUtils.exists(sn)) - FileUtils.createDirectory(sn); + FileUtils.createDirectories(sn); - FilePath dir = FilePath.get(sn); - for (FilePath file : dir.newDirectoryStream()) { - String name = file.getName(); - maps.put(name.substring(0, name.length() - SUFFIX_AO_FILE.length()), null); - } + // FilePath dir = FilePath.get(sn); + // for (FilePath file : dir.newDirectoryStream()) { + // String name = file.getName(); + // maps.put(name.substring(0, name.length() - SUFFIX_AO_FILE.length()), null); + // } } + + // setAutoCommitDelay starts the thread, but only if + // the parameter is different from the old value + Object delayObject = config.get("autoCommitDelay"); + int delay = delayObject == null ? 1000 : (Integer) delayObject; + setAutoCommitDelay(delay); } public HashMap getConfig() { @@ -76,12 +83,13 @@ public synchronized , K, V> M openMap(String name, BTre c.put("id", id); c.put("createVersion", 0L); map = builder.create(); + map.setName(name); BTreeStore store = new BTreeStore(name, config); map.init(store, c); - if (!maps.containsKey(name)) // map不存在时标数据改变 - store.markMetaChanged(); + // if (!maps.containsKey(name)) // map不存在时标数据改变 + store.markMetaChanged(); maps.put(name, map); } @@ -89,6 +97,8 @@ public synchronized , K, V> M openMap(String name, BTre } public synchronized void close() { + stopBackgroundThread(); + for (BTreeMap map : maps.values()) map.close(); } @@ -98,10 +108,18 @@ public synchronized void commit() { map.commit(); } - public synchronized Set getMapNames() { + public Set getMapNames() { return new HashSet(maps.keySet()); } + public Collection> getMaps() { + return maps.values(); + } + + public boolean hasMap(String name) { + return maps.containsKey(name); + } + /** * Open a store in exclusive mode. For a file-based store, the parent * directory must already exist. @@ -329,4 +347,112 @@ public static Builder fromString(String s) { } } + + /** + * The background thread, if any. + */ + volatile BackgroundWriterThread backgroundWriterThread; + + /** + * The delay in milliseconds to automatically commit and write changes. + */ + private int autoCommitDelay; + + /** + * Set the maximum delay in milliseconds to auto-commit changes. + *

+ * To disable auto-commit, set the value to 0. In this case, changes are + * only committed when explicitly calling commit. + *

+ * The default is 1000, meaning all changes are committed after at most one + * second. + * + * @param millis the maximum delay + */ + + public void setAutoCommitDelay(int millis) { + if (autoCommitDelay == millis) { + return; + } + autoCommitDelay = millis; + if (config.get("storeName") == null) // 内存存储不需要BackgroundWriterThread + return; + stopBackgroundThread(); + // start the background thread if needed + if (millis > 0) { + int sleep = Math.max(1, millis / 10); + BackgroundWriterThread t = new BackgroundWriterThread(this, sleep, config.get("storeName").toString()); + t.start(); + backgroundWriterThread = t; + } + } + + private void stopBackgroundThread() { + BackgroundWriterThread t = backgroundWriterThread; + if (t == null) { + return; + } + backgroundWriterThread = null; + if (Thread.currentThread() == t) { + // within the thread itself - can not join + return; + } + synchronized (t.sync) { + t.sync.notifyAll(); + } + if (Thread.holdsLock(this)) { + // called from storeNow: can not join, + // because that could result in a deadlock + return; + } + try { + t.join(); + } catch (Exception e) { + // ignore + } + } + + /** + * A background writer thread to automatically store changes from time to + * time. + */ + private static class BackgroundWriterThread extends Thread { + + public final Object sync = new Object(); + private final AOStore store; + private final int sleep; + + BackgroundWriterThread(AOStore store, int sleep, String fileStoreName) { + super("AOStore background writer " + fileStoreName); + this.store = store; + this.sleep = sleep; + setDaemon(true); + } + + @Override + public void run() { + while (true) { + Thread t = store.backgroundWriterThread; + if (t == null) { + break; + } + synchronized (sync) { + try { + sync.wait(sleep); + } catch (InterruptedException e) { + continue; + } + } + for (BTreeMap map : store.getMaps()) { + BTreeStore btreeStore = map.getStore(); + FileStore fileStore = btreeStore.getFileStore(); + if (fileStore == null || fileStore.isReadOnly()) { + return; + } + btreeStore.writeInBackground(); + } + } + } + + } } diff --git a/lealone-storage/aostore/src/main/java/org/lealone/aostore/btree/BTreeMap.java b/lealone-storage/aostore/src/main/java/org/lealone/aostore/btree/BTreeMap.java index 37b21f715..674115809 100644 --- a/lealone-storage/aostore/src/main/java/org/lealone/aostore/btree/BTreeMap.java +++ b/lealone-storage/aostore/src/main/java/org/lealone/aostore/btree/BTreeMap.java @@ -54,6 +54,7 @@ public class BTreeMap extends AbstractMap implements ConcurrentMap config) { } lastCommitTime = getTimeSinceCreation(); - // setAutoCommitDelay starts the thread, but only if - // the parameter is different from the old value - o = config.get("autoCommitDelay"); - int delay = o == null ? 1000 : (Integer) o; - setAutoCommitDelay(delay); + // // setAutoCommitDelay starts the thread, but only if + // // the parameter is different from the old value + // o = config.get("autoCommitDelay"); + // int delay = o == null ? 1000 : (Integer) o; + // setAutoCommitDelay(delay); } private void panic(IllegalStateException e) { @@ -770,7 +765,6 @@ public void close() { } FileStore f = fileStore; if (f != null && !f.isReadOnly()) { - stopBackgroundThread(); if (hasUnsavedChanges()) { commitAndSave(); } @@ -797,10 +791,10 @@ private void closeStore(boolean shrinkIfPossible) { if (closed) { return; } - // can not synchronize on this yet, because - // the thread also synchronized on this, which - // could result in a deadlock - stopBackgroundThread(); + // // can not synchronize on this yet, because + // // the thread also synchronized on this, which + // // could result in a deadlock + // stopBackgroundThread(); closed = true; if (fileStore == null) { return; @@ -2345,7 +2339,7 @@ public synchronized String getMapName(int id) { * needed. */ - void writeInBackground() { + public void writeInBackground() { if (closed) { return; } @@ -2409,31 +2403,6 @@ public boolean isClosed() { return closed; } - private void stopBackgroundThread() { - BackgroundWriterThread t = backgroundWriterThread; - if (t == null) { - return; - } - backgroundWriterThread = null; - if (Thread.currentThread() == t) { - // within the thread itself - can not join - return; - } - synchronized (t.sync) { - t.sync.notifyAll(); - } - if (Thread.holdsLock(this)) { - // called from storeNow: can not join, - // because that could result in a deadlock - return; - } - try { - t.join(); - } catch (Exception e) { - // ignore - } - } - /** * Set the maximum delay in milliseconds to auto-commit changes. *

@@ -2451,17 +2420,6 @@ public void setAutoCommitDelay(int millis) { return; } autoCommitDelay = millis; - if (fileStore == null || fileStore.isReadOnly()) { - return; - } - stopBackgroundThread(); - // start the background thread if needed - if (millis > 0) { - int sleep = Math.max(1, millis / 10); - BackgroundWriterThread t = new BackgroundWriterThread(this, sleep, fileStore.toString()); - t.start(); - backgroundWriterThread = t; - } } /** @@ -2547,41 +2505,4 @@ public int getCacheSize() { public CacheLongKeyLIRS getCache() { return cache; } - - /** - * A background writer thread to automatically store changes from time to - * time. - */ - private static class BackgroundWriterThread extends Thread { - - public final Object sync = new Object(); - private final BTreeStore store; - private final int sleep; - - BackgroundWriterThread(BTreeStore store, int sleep, String fileStoreName) { - super("MVStore background writer " + fileStoreName); - this.store = store; - this.sleep = sleep; - setDaemon(true); - } - - @Override - public void run() { - while (true) { - Thread t = store.backgroundWriterThread; - if (t == null) { - break; - } - synchronized (sync) { - try { - sync.wait(sleep); - } catch (InterruptedException e) { - continue; - } - } - store.writeInBackground(); - } - } - - } } diff --git a/lealone-storage/engine/src/main/java/org/lealone/storage/AOMapBuilder.java b/lealone-storage/engine/src/main/java/org/lealone/storage/AOMapBuilder.java new file mode 100644 index 000000000..a8d13344a --- /dev/null +++ b/lealone-storage/engine/src/main/java/org/lealone/storage/AOMapBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.storage; + +import org.lealone.aostore.AOStore; +import org.lealone.aostore.btree.BTreeMap; +import org.lealone.storage.type.DataType; + +public class AOMapBuilder extends StorageMap.BuilderBase { + private final AOStore store; + + public AOMapBuilder(AOStore store) { + this.store = store; + } + + @Override + public StorageMap openMap(String name, DataType keyType, DataType valueType) { + BTreeMap.Builder builder = new BTreeMap.Builder().keyType(keyType).valueType(valueType); + return store.openMap(name, builder); + } + + @Override + public String getMapName(int id) { + return null; // TODO + } +} diff --git a/lealone-storage/engine/src/main/java/org/lealone/storage/AOStorageEngine.java b/lealone-storage/engine/src/main/java/org/lealone/storage/AOStorageEngine.java new file mode 100644 index 000000000..113467b49 --- /dev/null +++ b/lealone-storage/engine/src/main/java/org/lealone/storage/AOStorageEngine.java @@ -0,0 +1,693 @@ +/* + * Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0, + * and the EPL 1.0 (http://h2database.com/html/license.html). + * Initial Developer: H2 Group + */ +package org.lealone.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.Thread.UncaughtExceptionHandler; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.lealone.aostore.AOStore; +import org.lealone.aostore.AOStoreTool; +import org.lealone.aostore.btree.BTreeMap; +import org.lealone.aostore.btree.BTreeStore; +import org.lealone.api.ErrorCode; +import org.lealone.common.message.DbException; +import org.lealone.common.util.BitField; +import org.lealone.common.util.DataUtils; +import org.lealone.common.util.IOUtils; +import org.lealone.common.util.New; +import org.lealone.db.Constants; +import org.lealone.db.DatabaseEngine; +import org.lealone.db.InDoubtTransaction; +import org.lealone.db.Session; +import org.lealone.db.SessionInterface; +import org.lealone.db.index.ValueDataType; +import org.lealone.db.table.MVTable; +import org.lealone.db.table.Table; +import org.lealone.storage.fs.FileChannelInputStream; +import org.lealone.storage.fs.FileUtils; +import org.lealone.storage.type.DataType; +import org.lealone.transaction.MVCCTransactionEngine; +import org.lealone.transaction.Transaction; +import org.lealone.transaction.TransactionEngine; +import org.lealone.transaction.TransactionMap; + +/** + * A storage engine that internally uses the AOStore. + */ +public class AOStorageEngine extends StorageEngineBase implements TransactionStorageEngine { + public static final String NAME = "AOStore"; + private static HashMap stores = new HashMap<>(1); + + // 见StorageEngineManager.StorageEngineService中的注释 + public AOStorageEngine() { + StorageEngineManager.registerStorageEngine(this); + } + + @Override + public String getName() { + return NAME; + } + + @Override + public synchronized Table createTable(CreateTableData data0) { + org.lealone.db.table.CreateTableData data = (org.lealone.db.table.CreateTableData) data0; + org.lealone.db.Database db = data.session.getDatabase(); + Store store = stores.get(db.getName()); + if (store == null) { + store = init(this, db); + stores.put(db.getName(), store); + } + + MVTable table = new MVTable(data, this); + table.init(data.session); + store.tableMap.put(table.getMapName(), table); + return table; + } + + @Override + public synchronized void close(Database db0) { + org.lealone.db.Database db = (org.lealone.db.Database) db0; + stores.remove(db.getName()); + } + + @Override + public synchronized StorageMap.Builder createStorageMapBuilder(String dbName) { + return new AOMapBuilder(stores.get(dbName).getStore()); + } + + @Override + public TransactionEngine createTransactionEngine(DataType dataType, StorageMap.Builder mapBuilder, + String hostAndPort) { + return new MVCCTransactionEngine(dataType, mapBuilder, hostAndPort, Session.isClusterMode()); + } + + public static Store getStore(Session session) { + return getStore(session.getDatabase()); + } + + public static Store getStore(Database db0) { + org.lealone.db.Database db = (org.lealone.db.Database) db0; + return stores.get(db.getName()); + } + + /** + * Initialize the AOStore. + * + * @param db the database + * @return the store + */ + static Store init(StorageEngine storageEngine, final org.lealone.db.Database db) { + Store store = null; + byte[] key = db.getFileEncryptionKey(); + String dbPath = db.getDatabasePath(); + AOStore.Builder builder = new AOStore.Builder(); + if (dbPath == null) { + store = new Store(storageEngine, db, builder); + } else { + builder.pageSplitSize(db.getPageSize()); + // AOStoreTool.compactCleanUp(fileName); //TODO + builder.storeName(dbPath); + if (db.isReadOnly()) { + builder.readOnly(); + } + + if (key != null) { + char[] password = new char[key.length / 2]; + for (int i = 0; i < password.length; i++) { + password[i] = (char) (((key[i + i] & 255) << 16) | ((key[i + i + 1]) & 255)); + } + builder.encryptionKey(password); + } + if (db.getSettings().compressData) { + builder.compress(); + // use a larger page split size to improve the compression ratio + builder.pageSplitSize(64 * 1024); + } + builder.backgroundExceptionHandler(new UncaughtExceptionHandler() { + + @Override + public void uncaughtException(Thread t, Throwable e) { + db.setBackgroundException(DbException.convert(e)); + } + + }); + try { + store = new Store(storageEngine, db, builder); + } catch (IllegalStateException e) { + int errorCode = DataUtils.getErrorCode(e.getMessage()); + if (errorCode == DataUtils.ERROR_FILE_CORRUPT) { + if (key != null) { + throw DbException.get(ErrorCode.FILE_ENCRYPTION_ERROR_1, e, dbPath); + } + } else if (errorCode == DataUtils.ERROR_FILE_LOCKED) { + throw DbException.get(ErrorCode.DATABASE_ALREADY_OPEN_1, e, dbPath); + } else if (errorCode == DataUtils.ERROR_READING_FAILED) { + throw DbException.get(ErrorCode.IO_EXCEPTION_1, e, dbPath); + } + throw DbException.get(ErrorCode.FILE_CORRUPTED_1, e, dbPath); + } + } + return store; + } + + /** + * A store with open tables. + */ + public static class Store { + + /** + * The map of open tables. + * Key: the map name, value: the table. + */ + final ConcurrentHashMap tableMap = new ConcurrentHashMap<>(); + + /** + * The store. + */ + private final AOStore store; + + /** + * The transaction engine. + */ + private final TransactionEngine transactionEngine; + + private long statisticsStart; + + private int temporaryMapId; + + public Store(StorageEngine storageEngine, org.lealone.db.Database db, AOStore.Builder builder) { + store = builder.open(); + + stores.put(db.getName(), this); + + StorageMap.Builder mapBuilder = storageEngine.createStorageMapBuilder(db.getName()); + transactionEngine = storageEngine.createTransactionEngine(new ValueDataType(null, db, null), mapBuilder, + DatabaseEngine.getHostAndPort()); + + transactionEngine.init(store.getMapNames()); + // 不能过早初始化,需要等执行完MetaRecord之后生成所有Map了才行, + // 否则在执行undo log时对应map的sortTypes是null,在执行ValueDataType.compare(Object, Object)时出现空指针异常 + // initTransactions(); + db.setTransactionEngine(transactionEngine); + db.addStorageEngine(storageEngine); + db.setLobStorage(new LobStorageMap(db)); + } + + public AOStore getStore() { + return store; + } + + public HashMap getTables() { + return new HashMap(tableMap); + } + + /** + * Remove a table. + * + * @param table the table + */ + public void removeTable(MVTable table) { + tableMap.remove(table.getMapName()); + } + + /** + * Store all pending changes. + */ + public void flush() { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + org.lealone.aostore.FileStore s = store.getFileStore(); + if (s == null || s.isReadOnly()) { + return; + } + if (!store.compact(50, 4 * 1024 * 1024)) { + store.commit(); + } + } + } + + /** + * Close the store, without persisting changes. + */ + public void closeImmediately() { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + if (store.isClosed()) { + return; + } + store.closeImmediately(); + } + } + + /** + * Commit all transactions that are in the committing state, and + * rollback all open transactions. + */ + public void initTransactions() { + List list = transactionEngine.getOpenTransactions(); + for (Transaction t : list) { + if (t.getStatus() == Transaction.STATUS_COMMITTING) { + t.commit(); + } else if (t.getStatus() != Transaction.STATUS_PREPARED) { + t.rollback(); + } + } + } + + /** + * Remove all temporary maps. + * + * @param objectIds the ids of the objects to keep + */ + public void removeTemporaryMaps(BitField objectIds) { + for (BTreeMap map : store.getMaps()) { + String mapName = map.getName(); + + if (mapName.startsWith("temp.")) { + map.remove(); + } else if (mapName.startsWith("table.") || mapName.startsWith("index.")) { + int id = Integer.parseInt(mapName.substring(1 + mapName.indexOf("."))); + if (!objectIds.get(id)) { + ValueDataType keyType = new ValueDataType(null, null, null); + ValueDataType valueType = new ValueDataType(null, null, null); + Transaction t = transactionEngine.beginTransaction(false); + TransactionMap m = t.openMap(mapName, keyType, valueType); + transactionEngine.removeMap(m); + t.commit(); + } + } + } + } + + /** + * Get the name of the next available temporary map. + * + * @return the map name + */ + public synchronized String nextTemporaryMapName() { + return "temp." + temporaryMapId++; + } + + /** + * Prepare a transaction. + * + * @param session the session + * @param transactionName the transaction name (may be null) + */ + public void prepareCommit(Session session, String transactionName) { + Transaction t = session.getTransaction(); + t.setName(transactionName); + t.prepare(); + store.commit(); + } + + public ArrayList getInDoubtTransactions() { + List list = transactionEngine.getOpenTransactions(); + ArrayList result = New.arrayList(); + for (Transaction t : list) { + if (t.getStatus() == Transaction.STATUS_PREPARED) { + result.add(new AOInDoubtTransaction(store, t)); + } + } + return result; + } + + /** + * Set the maximum memory to be used by the cache. + * + * @param kb the maximum size in KB + */ + public void setCacheSize(int kb) { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + store.setCacheSize(Math.max(1, kb / 1024)); + } + } + + public InputStream getInputStream(BTreeStore btreeStore) { + FileChannel fc = btreeStore.getFileStore().getEncryptedFile(); + if (fc == null) { + fc = btreeStore.getFileStore().getFile(); + } + return new FileChannelInputStream(fc, false); + } + + /** + * Force the changes to disk. + */ + public void sync() { + flush(); + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + store.sync(); + } + } + + /** + * Compact the database file, that is, compact blocks that have a low + * fill rate, and move chunks next to each other. This will typically + * shrink the database file. Changes are flushed to the file, and old + * chunks are overwritten. + * + * @param maxCompactTime the maximum time in milliseconds to compact + */ + public void compactFile(long maxCompactTime) { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + store.setRetentionTime(0); + long start = System.currentTimeMillis(); + while (store.compact(95, 16 * 1024 * 1024)) { + store.sync(); + store.compactMoveChunks(95, 16 * 1024 * 1024); + long time = System.currentTimeMillis() - start; + if (time > maxCompactTime) { + break; + } + } + } + } + + /** + * Close the store. Pending changes are persisted. Chunks with a low + * fill rate are compacted, but old chunks are kept for some time, so + * most likely the database file will not shrink. + * + * @param maxCompactTime the maximum time in milliseconds to compact + */ + public void close(long maxCompactTime) { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + try { + if (!store.isClosed() && store.getFileStore() != null) { + boolean compactFully = false; + if (!store.getFileStore().isReadOnly()) { + transactionEngine.close(); + if (maxCompactTime == Long.MAX_VALUE) { + compactFully = true; + } + } + String fileName = store.getFileStore().getFileName(); + store.close(); + if (compactFully && FileUtils.exists(fileName)) { + // the file could have been deleted concurrently, + // so only compact if the file still exists + AOStoreTool.compact(fileName, true); + } + } + } catch (IllegalStateException e) { + int errorCode = DataUtils.getErrorCode(e.getMessage()); + if (errorCode == DataUtils.ERROR_WRITING_FAILED) { + // disk full - ok + } else if (errorCode == DataUtils.ERROR_FILE_CORRUPT) { + // wrong encryption key - ok + } + store.closeImmediately(); + throw DbException.get(ErrorCode.IO_EXCEPTION_1, e, "Closing"); + } + } + } + + /** + * Start collecting statistics. + */ + public void statisticsStart() { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + org.lealone.aostore.FileStore fs = store.getFileStore(); + statisticsStart = fs == null ? 0 : fs.getReadCount(); + return; + } + } + + /** + * Stop collecting statistics. + * + * @return the statistics + */ + public Map statisticsEnd() { + for (BTreeMap map : store.getMaps()) { + BTreeStore store = map.getStore(); + HashMap map2 = New.hashMap(); + org.lealone.aostore.FileStore fs = store.getFileStore(); + int reads = fs == null ? 0 : (int) (fs.getReadCount() - statisticsStart); + map2.put("reads", reads); + return map2; + } + + return New.hashMap(); + } + + } + + /** + * An in-doubt transaction. + */ + private static class AOInDoubtTransaction implements InDoubtTransaction { + + private final AOStore store; + private final Transaction transaction; + private int state = InDoubtTransaction.IN_DOUBT; + + AOInDoubtTransaction(AOStore store, Transaction transaction) { + this.store = store; + this.transaction = transaction; + } + + @Override + public void setState(int state) { + if (state == InDoubtTransaction.COMMIT) { + transaction.commit(); + } else { + transaction.rollback(); + } + store.commit(); + this.state = state; + } + + @Override + public String getState() { + switch (state) { + case IN_DOUBT: + return "IN_DOUBT"; + case COMMIT: + return "COMMIT"; + case ROLLBACK: + return "ROLLBACK"; + default: + throw DbException.throwInternalError("state=" + state); + } + } + + @Override + public String getTransactionName() { + return transaction.getName(); + } + + } + + @Override + public boolean hasMap(org.lealone.storage.Database db, String name) { + return getStore(db).getStore().hasMap(name); + } + + @Override + public boolean isInMemory(org.lealone.storage.Database db) { + return getStore(db) == null; + } + + @Override + public void removeTable(org.lealone.storage.Table table) { + getStore(((MVTable) table).getDatabase()).removeTable(((MVTable) table)); + } + + @Override + public String nextTemporaryMapName(org.lealone.storage.Database db) { + return getStore(db).nextTemporaryMapName(); + } + + @Override + public TransactionMap openMap(SessionInterface session, String name, DataType keyType, + DataType valueType) { + return ((Session) session).getTransaction().openMap(name, keyType, valueType); + } + + @Override + public void backupTo(Database db0, String fileName) { + org.lealone.db.Database db = (org.lealone.db.Database) db0; + if (!db.isPersistent()) { + throw DbException.get(ErrorCode.DATABASE_IS_NOT_PERSISTENT); + } + try { + Store store = getStore(db); + if (store != null) { + store.flush(); + } + // 生成fileName表示的文件,如果已存在则覆盖原有的,也就是文件为空 + OutputStream zip = FileUtils.newOutputStream(fileName, false); + ZipOutputStream out = new ZipOutputStream(zip); + + // synchronize on the database, to avoid concurrent temp file + // creation / deletion / backup + String base = FileUtils.getParent(db.getName()); + synchronized (db.getLobSyncObject()) { + String prefix = db.getDatabasePath(); // 返回E:/H2/baseDir/mydb + String dir = FileUtils.getParent(prefix); // 返回E:/H2/baseDir + dir = getDir(dir); // 返回E:/H2/baseDir + String name = db.getName(); // 返回E:/H2/baseDir/mydb + name = FileUtils.getName(name); // 返回mydb(也就是只取简单文件名) + ArrayList fileList = getDatabaseFiles(dir, name, true); + + // 把".lob.db"和".mv.db"文件备份到fileName表示的文件中(是一个zip文件) + for (String n : fileList) { + if (n.endsWith(Constants.SUFFIX_LOB_FILE)) { // 备份".lob.db"文件 + backupFile(out, base, n); + } else if (n.endsWith(AOStore.SUFFIX_AO_FILE) && store != null) { // 备份".mv.db"文件 + AOStore aoStore = store.getStore(); + for (BTreeMap map : aoStore.getMaps()) { + BTreeStore btreeStore = map.getStore(); + boolean before = btreeStore.getReuseSpace(); + btreeStore.setReuseSpace(false); + try { + InputStream in = store.getInputStream(btreeStore); + backupFile(out, base, n, in); + } finally { + btreeStore.setReuseSpace(before); + } + } + } + } + } + out.close(); + zip.close(); + } catch (IOException e) { + throw DbException.convertIOException(e, fileName); + } + } + + private static void backupFile(ZipOutputStream out, String base, String fn) throws IOException { + InputStream in = FileUtils.newInputStream(fn); + backupFile(out, base, fn, in); + } + + private static void backupFile(ZipOutputStream out, String base, String fn, InputStream in) throws IOException { + String f = FileUtils.toRealPath(fn); // 返回E:/H2/baseDir/mydb.mv.db + base = FileUtils.toRealPath(base); // 返回E:/H2/baseDir + if (!f.startsWith(base)) { + DbException.throwInternalError(f + " does not start with " + base); + } + f = f.substring(base.length()); // 返回/mydb.mv.db + f = correctFileName(f); // 返回mydb.mv.db + out.putNextEntry(new ZipEntry(f)); + IOUtils.copyAndCloseInput(in, out); + out.closeEntry(); + } + + /** + * Fix the file name, replacing backslash with slash. + * + * @param f the file name + * @return the corrected file name + */ + private static String correctFileName(String f) { + f = f.replace('\\', '/'); + if (f.startsWith("/")) { + f = f.substring(1); + } + return f; + } + + /** + * Normalize the directory name. + * + * @param dir the directory (null for the current directory) + * @return the normalized directory name + */ + private static String getDir(String dir) { + if (dir == null || dir.equals("")) { + return "."; + } + return FileUtils.toRealPath(dir); + } + + /** + * Get the list of database files. + * + * @param dir the directory (must be normalized) + * @param db the database name (null for all databases) + * @param all if true, files such as the lock, trace, and lob + * files are included. If false, only data, index, log, + * and lob files are returned + * @return the list of files + */ + private static ArrayList getDatabaseFiles(String dir, String db, boolean all) { + ArrayList files = New.arrayList(); + // for Windows, File.getCanonicalPath("...b.") returns just "...b" + String start = db == null ? null : (FileUtils.toRealPath(dir + "/" + db) + "."); + for (String f : FileUtils.newDirectoryStream(dir)) { + boolean ok = false; + if (f.endsWith(Constants.SUFFIX_LOBS_DIRECTORY)) { + if (start == null || f.startsWith(start)) { + files.addAll(getDatabaseFiles(f, null, all)); + ok = true; + } + } else if (f.endsWith(Constants.SUFFIX_LOB_FILE)) { + ok = true; + } else if (f.endsWith(AOStore.SUFFIX_AO_FILE)) { + ok = true; + } else if (all) { + if (f.endsWith(Constants.SUFFIX_LOCK_FILE)) { + ok = true; + } else if (f.endsWith(Constants.SUFFIX_TEMP_FILE)) { + ok = true; + } else if (f.endsWith(Constants.SUFFIX_TRACE_FILE)) { + ok = true; + } + } + if (ok) { + if (db == null || f.startsWith(start)) { + String fileName = f; + files.add(fileName); + } + } + } + return files; + } + + @Override + public void flush(Database db) { + getStore(db).flush(); + } + + @Override + public void sync(Database db) { + getStore(db).sync(); + } + + @Override + public void initTransactions(Database db) { + getStore(db).initTransactions(); + } + + @Override + public void removeTemporaryMaps(Database db, BitField objectIds) { + getStore(db).removeTemporaryMaps(objectIds); + } + + @Override + public void closeImmediately(Database db) { + getStore(db).closeImmediately(); + } +} diff --git a/lealone-storage/engine/src/main/java/org/lealone/storage/MVStorageEngine.java b/lealone-storage/engine/src/main/java/org/lealone/storage/MVStorageEngine.java index f9c49ce76..216d89ae9 100644 --- a/lealone-storage/engine/src/main/java/org/lealone/storage/MVStorageEngine.java +++ b/lealone-storage/engine/src/main/java/org/lealone/storage/MVStorageEngine.java @@ -48,6 +48,7 @@ */ public class MVStorageEngine extends StorageEngineBase implements TransactionStorageEngine { public static final String NAME = Constants.DEFAULT_STORAGE_ENGINE_NAME; + private static HashMap stores = new HashMap<>(1); // 见StorageEngineManager.StorageEngineService中的注释 public MVStorageEngine() { @@ -92,8 +93,6 @@ public TransactionEngine createTransactionEngine(DataType dataType, StorageMap.B return new MVCCTransactionEngine(dataType, mapBuilder, hostAndPort, Session.isClusterMode()); } - static HashMap stores = new HashMap<>(1); - public static Store getStore(Session session) { return getStore(session.getDatabase()); } diff --git a/lealone-storage/engine/src/main/resources/META-INF/services/org.lealone.storage.StorageEngine b/lealone-storage/engine/src/main/resources/META-INF/services/org.lealone.storage.StorageEngine index 103b2fdfc..8c2577750 100644 --- a/lealone-storage/engine/src/main/resources/META-INF/services/org.lealone.storage.StorageEngine +++ b/lealone-storage/engine/src/main/resources/META-INF/services/org.lealone.storage.StorageEngine @@ -1 +1,2 @@ org.lealone.storage.MVStorageEngine +org.lealone.storage.AOStorageEngine