Skip to content

Commit

Permalink
插件化事务引擎API,增加一个MemoryTransactionEngine骨架代码用于演示
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed May 10, 2015
1 parent b347ad2 commit 778f5b5
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 77 deletions.
Expand Up @@ -21,6 +21,39 @@


public interface Transaction { public interface Transaction {


/**
* The status of a closed transaction (committed or rolled back).
*/
public static final int STATUS_CLOSED = 0;

/**
* The status of an open transaction.
*/
public static final int STATUS_OPEN = 1;

/**
* The status of a prepared transaction.
*/
public static final int STATUS_PREPARED = 2;

/**
* The status of a transaction that is being committed, but possibly not
* yet finished. A transactions can go into this state when the store is
* closed while the transaction is committing. When opening a store,
* such transactions should be committed.
*/
public static final int STATUS_COMMITTING = 3;

String getName(); //用于2pc

void setName(String name); //用于2pc

void prepare(); //用于2pc

int getStatus();

void setStatus(int status);

//long getTransactionId(); //long getTransactionId();


//long getCommitTimestamp(); //long getCommitTimestamp();
Expand Down
Expand Up @@ -17,10 +17,19 @@
*/ */
package org.lealone.transaction; package org.lealone.transaction;


import java.util.List;
import java.util.Set;

public interface TransactionEngine { public interface TransactionEngine {
Transaction beginTransaction(boolean autoCommit); Transaction beginTransaction(boolean autoCommit);


void close(); void close();


boolean isValid(String localTransactionName); boolean isValid(String localTransactionName);

void init(Set<String> storageMapNames);

List<Transaction> getOpenTransactions();

<K, V> void removeMap(TransactionMap<K, V> map);
} }
Expand Up @@ -196,4 +196,6 @@ public interface TransactionMap<K, V> {
* @return the result * @return the result
*/ */
public K lowerKey(K key); public K lowerKey(K key);

public int getMapId();
} }
Expand Up @@ -55,7 +55,6 @@ public class MVPrimaryIndex extends IndexBase {


private final MVTable mvTable; private final MVTable mvTable;
private final String mapName; private final String mapName;
//private final TransactionMap<Value, Value> dataMap;
private final TransactionMap<Value, Value> dataMap; private final TransactionMap<Value, Value> dataMap;
private long lastKey; private long lastKey;
private int mainIndexColumn = -1; private int mainIndexColumn = -1;
Expand Down
Expand Up @@ -30,9 +30,9 @@
import org.lealone.mvstore.MVMap; import org.lealone.mvstore.MVMap;
import org.lealone.mvstore.MVStore; import org.lealone.mvstore.MVStore;
import org.lealone.mvstore.MVStoreTool; import org.lealone.mvstore.MVStoreTool;
import org.lealone.transaction.MVCCTransaction;
import org.lealone.transaction.MVCCTransactionEngine; import org.lealone.transaction.MVCCTransactionEngine;
import org.lealone.transaction.MVCCTransactionMap; import org.lealone.transaction.Transaction;
import org.lealone.transaction.TransactionEngine;
import org.lealone.transaction.TransactionMap; import org.lealone.transaction.TransactionMap;
import org.lealone.type.DataType; import org.lealone.type.DataType;
import org.lealone.util.BitField; import org.lealone.util.BitField;
Expand All @@ -46,37 +46,52 @@ public class MVStorageEngine extends StorageEngineBase implements TransactionSto
public static final String NAME = Constants.DEFAULT_STORAGE_ENGINE_NAME; public static final String NAME = Constants.DEFAULT_STORAGE_ENGINE_NAME;


private StorageMap.Builder mapBuilder; private StorageMap.Builder mapBuilder;

private TransactionEngine transactionEngine;
public MVStorageEngine() {
this(null);
}


//见StorageEngineManager.StorageEngineService中的注释 //见StorageEngineManager.StorageEngineService中的注释
public MVStorageEngine(StorageMap.Builder mapBuilder) { public MVStorageEngine() {
StorageEngineManager.registerStorageEngine(this); StorageEngineManager.registerStorageEngine(this);
setMapBuilder(mapBuilder);
} }


@Override @Override
public String getName() { public String getName() {
return NAME; return NAME;
} }


protected void setMapBuilder(StorageMap.Builder mapBuilder) { public void setMapBuilder(StorageMap.Builder mapBuilder) {
this.mapBuilder = mapBuilder; this.mapBuilder = mapBuilder;
} }


public StorageMap.Builder getMapBuilder() {
return mapBuilder;
}

public void setTransactionEngine(TransactionEngine transactionEngine) {
this.transactionEngine = transactionEngine;
}

public TransactionEngine getTransactionEngine() {
return transactionEngine;
}

@Override @Override
public Table createTable(CreateTableData data) { public Table createTable(CreateTableData data) {
Database db = data.session.getDatabase(); Database db = data.session.getDatabase();
Store store = stores.get(db.getName()); Store store = stores.get(db.getName());
if (store == null) { if (store == null) {
synchronized (stores) { synchronized (stores) {
if (stores.get(db.getName()) == null) { if (stores.get(db.getName()) == null) {
store = init(db, mapBuilder);
store = init(db, mapBuilder, transactionEngine);
stores.put(db.getName(), store); stores.put(db.getName(), store);

if (mapBuilder == null)
mapBuilder = store.getMapBuilder();
if (transactionEngine == null)
transactionEngine = store.getTransactionEngine();

db.setStorageEngine(this); db.setStorageEngine(this);
db.setTransactionEngine(store.getTransactionEngine()); db.setTransactionEngine(transactionEngine);
db.setLobStorage(new LobStorageMap(db)); db.setLobStorage(new LobStorageMap(db));
} }
} }
Expand Down Expand Up @@ -109,17 +124,13 @@ public static Store getStore(Database db) {
* @param db the database * @param db the database
* @return the store * @return the store
*/ */
public static Store init(final Database db) { static Store init(final Database db, StorageMap.Builder mapBuilder, TransactionEngine transactionEngine) {
return init(db, null);
}

public static Store init(final Database db, StorageMap.Builder mapBuilder) {
Store store = null; Store store = null;
byte[] key = db.getFileEncryptionKey(); byte[] key = db.getFileEncryptionKey();
String dbPath = db.getDatabasePath(); String dbPath = db.getDatabasePath();
MVStore.Builder builder = new MVStore.Builder(); MVStore.Builder builder = new MVStore.Builder();
if (dbPath == null) { if (dbPath == null) {
store = new Store(db, builder); store = new Store(db, builder, mapBuilder, transactionEngine);
} else { } else {
String fileName = dbPath + Constants.SUFFIX_MV_FILE; String fileName = dbPath + Constants.SUFFIX_MV_FILE;
builder.pageSplitSize(db.getPageSize()); builder.pageSplitSize(db.getPageSize());
Expand Down Expand Up @@ -158,7 +169,7 @@ public void uncaughtException(Thread t, Throwable e) {


}); });
try { try {
store = new Store(db, builder, mapBuilder); store = new Store(db, builder, mapBuilder, transactionEngine);
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
int errorCode = DataUtils.getErrorCode(e.getMessage()); int errorCode = DataUtils.getErrorCode(e.getMessage());
if (errorCode == DataUtils.ERROR_FILE_CORRUPT) { if (errorCode == DataUtils.ERROR_FILE_CORRUPT) {
Expand Down Expand Up @@ -195,22 +206,28 @@ public static class Store {
/** /**
* The transaction engine. * The transaction engine.
*/ */
private final MVCCTransactionEngine transactionEngine; private final TransactionEngine transactionEngine;
private final StorageMap.Builder mapBuilder;


private long statisticsStart; private long statisticsStart;


private int temporaryMapId; private int temporaryMapId;


public Store(Database db, MVStore.Builder builder) { public Store(Database db, MVStore.Builder builder, StorageMap.Builder mapBuilder,
this(db, builder, null); TransactionEngine transactionEngine) {
}

public Store(Database db, MVStore.Builder builder, StorageMap.Builder mapBuilder) {
this.store = builder.open(); this.store = builder.open();

if (mapBuilder == null) if (mapBuilder == null)
mapBuilder = new MVMapBuilder(store); this.mapBuilder = mapBuilder = new MVMapBuilder(store);
this.transactionEngine = new MVCCTransactionEngine(new ValueDataType(null, db, null), mapBuilder, else
DatabaseEngine.getHostAndPort()); this.mapBuilder = mapBuilder;

if (transactionEngine == null)
this.transactionEngine = new MVCCTransactionEngine(new ValueDataType(null, db, null), mapBuilder,
DatabaseEngine.getHostAndPort());
else
this.transactionEngine = transactionEngine;

transactionEngine.init(store.getMapNames()); transactionEngine.init(store.getMapNames());
initTransactions(); initTransactions();
} }
Expand All @@ -219,10 +236,14 @@ public MVStore getStore() {
return store; return store;
} }


public MVCCTransactionEngine getTransactionEngine() { public TransactionEngine getTransactionEngine() {
return transactionEngine; return transactionEngine;
} }


public StorageMap.Builder getMapBuilder() {
return mapBuilder;
}

public HashMap<String, MVTable> getTables() { public HashMap<String, MVTable> getTables() {
return new HashMap<String, MVTable>(tableMap); return new HashMap<String, MVTable>(tableMap);
} }
Expand Down Expand Up @@ -264,11 +285,11 @@ public void closeImmediately() {
* rollback all open transactions. * rollback all open transactions.
*/ */
public void initTransactions() { public void initTransactions() {
List<MVCCTransaction> list = transactionEngine.getOpenTransactions(); List<Transaction> list = transactionEngine.getOpenTransactions();
for (MVCCTransaction t : list) { for (Transaction t : list) {
if (t.getStatus() == MVCCTransaction.STATUS_COMMITTING) { if (t.getStatus() == Transaction.STATUS_COMMITTING) {
t.commit(); t.commit();
} else if (t.getStatus() != MVCCTransaction.STATUS_PREPARED) { } else if (t.getStatus() != Transaction.STATUS_PREPARED) {
t.rollback(); t.rollback();
} }
} }
Expand All @@ -289,8 +310,8 @@ public void removeTemporaryMaps(BitField objectIds) {
if (!objectIds.get(id)) { if (!objectIds.get(id)) {
ValueDataType keyType = new ValueDataType(null, null, null); ValueDataType keyType = new ValueDataType(null, null, null);
ValueDataType valueType = new ValueDataType(null, null, null); ValueDataType valueType = new ValueDataType(null, null, null);
MVCCTransaction t = transactionEngine.beginTransaction(false); Transaction t = transactionEngine.beginTransaction(false);
MVCCTransactionMap<?, ?> m = t.openMap(mapName, keyType, valueType); TransactionMap<?, ?> m = t.openMap(mapName, keyType, valueType);
transactionEngine.removeMap(m); transactionEngine.removeMap(m);
t.commit(); t.commit();
} }
Expand All @@ -314,17 +335,17 @@ public synchronized String nextTemporaryMapName() {
* @param transactionName the transaction name (may be null) * @param transactionName the transaction name (may be null)
*/ */
public void prepareCommit(Session session, String transactionName) { public void prepareCommit(Session session, String transactionName) {
MVCCTransaction t = (MVCCTransaction) session.getTransaction(); Transaction t = session.getTransaction();
t.setName(transactionName); t.setName(transactionName);
t.prepare(); t.prepare();
store.commit(); store.commit();
} }


public ArrayList<InDoubtTransaction> getInDoubtTransactions() { public ArrayList<InDoubtTransaction> getInDoubtTransactions() {
List<MVCCTransaction> list = transactionEngine.getOpenTransactions(); List<Transaction> list = transactionEngine.getOpenTransactions();
ArrayList<InDoubtTransaction> result = New.arrayList(); ArrayList<InDoubtTransaction> result = New.arrayList();
for (MVCCTransaction t : list) { for (Transaction t : list) {
if (t.getStatus() == MVCCTransaction.STATUS_PREPARED) { if (t.getStatus() == Transaction.STATUS_PREPARED) {
result.add(new MVInDoubtTransaction(store, t)); result.add(new MVInDoubtTransaction(store, t));
} }
} }
Expand Down Expand Up @@ -443,10 +464,10 @@ public Map<String, Integer> statisticsEnd() {
private static class MVInDoubtTransaction implements InDoubtTransaction { private static class MVInDoubtTransaction implements InDoubtTransaction {


private final MVStore store; private final MVStore store;
private final MVCCTransaction transaction; private final Transaction transaction;
private int state = InDoubtTransaction.IN_DOUBT; private int state = InDoubtTransaction.IN_DOUBT;


MVInDoubtTransaction(MVStore store, MVCCTransaction transaction) { MVInDoubtTransaction(MVStore store, Transaction transaction) {
this.store = store; this.store = store;
this.transaction = transaction; this.transaction = transaction;
} }
Expand Down Expand Up @@ -505,7 +526,7 @@ public String nextTemporaryMapName(Database db) {


@Override @Override
public <K, V> TransactionMap<K, V> openMap(Session session, String name, DataType keyType, DataType valueType) { public <K, V> TransactionMap<K, V> openMap(Session session, String name, DataType keyType, DataType valueType) {
return ((MVCCTransaction) session.getTransaction()).openMap(name, keyType, valueType); return session.getTransaction().openMap(name, keyType, valueType);
} }


} }
Expand Up @@ -53,7 +53,7 @@ public Table createTable(CreateTableData data) {
conn = createConnection(dbName); conn = createConnection(dbName);
connections.put(dbName, conn); connections.put(dbName, conn);


Store store = init(db, new WTMapBuilder(conn.open_session(null))); Store store = init(db, new WTMapBuilder(conn.open_session(null)), getTransactionEngine());
stores.put(db.getName(), store); stores.put(db.getName(), store);
db.setStorageEngine(this); db.setStorageEngine(this);
db.setTransactionEngine(store.getTransactionEngine()); db.setTransactionEngine(store.getTransactionEngine());
Expand Down
Expand Up @@ -52,7 +52,9 @@ public static void register() {
} }


public MemoryStorageEngine() { public MemoryStorageEngine() {
super(new MemoryMapBuilder()); super();
setMapBuilder(new MemoryMapBuilder());
//setTransactionEngine(new MemoryTransactionEngine());
} }


@Override @Override
Expand Down Expand Up @@ -124,7 +126,7 @@ public int compare(K k1, K k2) {


} }


static class MemoryMap<K, V> extends ConcurrentSkipListMap<K, V> implements StorageMap<K, V> { public static class MemoryMap<K, V> extends ConcurrentSkipListMap<K, V> implements StorageMap<K, V> {


private static final AtomicInteger counter = new AtomicInteger(0); private static final AtomicInteger counter = new AtomicInteger(0);


Expand Down
Expand Up @@ -18,33 +18,33 @@
package org.lealone.test.transaction; package org.lealone.test.transaction;


import org.junit.Assert; import org.junit.Assert;
import org.junit.Test;
import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
import org.lealone.test.TestBase; import org.lealone.test.TestBase;
import org.lealone.test.storage.MemoryStorageEngine; import org.lealone.test.storage.MemoryStorageEngine;
import org.lealone.transaction.MVCCTransaction;
import org.lealone.transaction.MVCCTransactionEngine; import org.lealone.transaction.MVCCTransactionEngine;
import org.lealone.transaction.TransactionEngine; import org.lealone.transaction.MVCCTransactionMap;
import org.lealone.transaction.Transaction;
import org.lealone.transaction.TransactionMap;
import org.lealone.type.ObjectDataType; import org.lealone.type.ObjectDataType;


public class TransactionTest { public class MVCCTransactionTest {
public static void main(String[] args) { @Test
public void run() {
StorageMap.Builder mapBuilder = new MemoryStorageEngine.MemoryMapBuilder(); StorageMap.Builder mapBuilder = new MemoryStorageEngine.MemoryMapBuilder();
String hostAndPort = TestBase.getHost() + ":" + TestBase.getPort(); String hostAndPort = TestBase.getHost() + ":" + TestBase.getPort();
MVCCTransactionEngine e = new MVCCTransactionEngine(new ObjectDataType(), mapBuilder, hostAndPort); MVCCTransactionEngine e = new MVCCTransactionEngine(new ObjectDataType(), mapBuilder, hostAndPort);
e.init(null); e.init(null);
TransactionEngine te = e;


Transaction t = te.beginTransaction(false); MVCCTransaction t = e.beginTransaction(false);
TransactionMap<String, String> map = t.openMap("test"); MVCCTransactionMap<String, String> map = t.openMap("test");
map.put("1", "a"); map.put("1", "a");
map.put("2", "b"); map.put("2", "b");
Assert.assertEquals("a", map.get("1")); Assert.assertEquals("a", map.get("1"));
Assert.assertEquals("b", map.get("2")); Assert.assertEquals("b", map.get("2"));


t.rollback(); t.rollback();


t = te.beginTransaction(false); t = e.beginTransaction(false);


Assert.assertNull(map.get("1")); Assert.assertNull(map.get("1"));
Assert.assertNull(map.get("2")); Assert.assertNull(map.get("2"));
Expand Down

0 comments on commit 778f5b5

Please sign in to comment.