Skip to content

Commit

Permalink
分布式事务模拟测试
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 24, 2015
1 parent 55c54fd commit d02baed
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 17 deletions.
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.test.transaction;

import org.junit.Test;
import org.lealone.storage.Storage;
import org.lealone.test.TestBase;
import org.lealone.transaction.Transaction;
import org.lealone.transaction.TransactionEngine;
import org.lealone.transaction.TransactionMap;

public class DistributedTransactionTest extends TestBase {

private static class ParticipantTest implements Transaction.Participant {
@Override
public void addSavepoint(String name) {
}

@Override
public void rollbackToSavepoint(String name) {
}

@Override
public void commitTransaction(String localTransactionName) {
}

@Override
public void rollbackTransaction() {
}
}

private static class ValidatorTest implements Transaction.Validator {
private final boolean validateResult;

public ValidatorTest(boolean validateResult) {
this.validateResult = validateResult;
}

@Override
public boolean validateTransaction(String localTransactionName) {
return validateResult;
}
}

@Test
public void run() {
TransactionEngine te = TransactionEngineTest.getTransactionEngine(true);
Storage storage = TransactionEngineTest.getStorage();

Transaction t = te.beginTransaction(false);
t.setLocal(false);

ParticipantTest p1 = new ParticipantTest();
ParticipantTest p2 = new ParticipantTest();
ValidatorTest validatorTrue = new ValidatorTest(true);
ValidatorTest validatorFalse = new ValidatorTest(false);

t.addParticipant(p1);
t.addParticipant(p2);
t.setValidator(validatorTrue);

long tid1 = t.getTransactionId() + 2;
long tid2 = t.getTransactionId() + 4;

String alocalTransactionName1 = "127.0.0.1:5210:" + tid1;
String alocalTransactionName2 = "127.0.0.1:5210:" + tid2;

t.addLocalTransactionNames(alocalTransactionName1);
t.addLocalTransactionNames(alocalTransactionName2);

TransactionMap<String, String> map = t.openMap("DistributedTransactionTest", storage);
map.clear();
map.put("1", "a");
map.put("2", "b");
t.commit();

t = te.beginTransaction(false);
t.setLocal(false);
t.setValidator(validatorTrue);
map = map.getInstance(t);
assertEquals("a", map.get("1"));
assertEquals("b", map.get("2"));

t.addParticipant(p1);
t.addParticipant(p2);
t.setValidator(validatorFalse);
t.addLocalTransactionNames(alocalTransactionName1);
t.addLocalTransactionNames(alocalTransactionName2);
map.put("3", "c");
map.put("4", "d");
t.commit();

t = te.beginTransaction(false);
t.setLocal(false);
t.setValidator(validatorFalse);
map = map.getInstance(t);
assertEquals(null, map.get("3"));
assertEquals(null, map.get("4"));

te.close();
}
}
Expand Up @@ -33,19 +33,12 @@
import org.lealone.transaction.TransactionMap;

public class TransactionEngineTest extends TestBase {
@Test
public void run() {

public static TransactionEngine getTransactionEngine(boolean isDistributed) {
TransactionEngine te = TransactionEngineManager.getInstance().getEngine(
Constants.DEFAULT_TRANSACTION_ENGINE_NAME);
assertEquals(Constants.DEFAULT_TRANSACTION_ENGINE_NAME, te.getName());

StorageEngine se = StorageEngineManager.getInstance().getEngine(Constants.DEFAULT_STORAGE_ENGINE_NAME);
assertEquals(Constants.DEFAULT_STORAGE_ENGINE_NAME, se.getName());

StorageBuilder storageBuilder = se.getStorageBuilder();
storageBuilder.storageName(joinDirs("transaction-test", "data"));
Storage storage = storageBuilder.openStorage();

Map<String, String> config = new HashMap<>();
config.put("base_dir", joinDirs("transaction-test"));
config.put("transaction_log_dir", "tlog");
Expand All @@ -55,7 +48,31 @@ public void run() {
// config.put("log_sync_type", "periodic");
// config.put("log_sync_period", "500"); // 500ms

if (isDistributed) {
config.put("is_cluster_mode", "true");
config.put("host_and_port", Constants.DEFAULT_HOST + ":" + Constants.DEFAULT_TCP_PORT);
}

te.init(config);

return te;
}

public static Storage getStorage() {
StorageEngine se = StorageEngineManager.getInstance().getEngine(Constants.DEFAULT_STORAGE_ENGINE_NAME);
assertEquals(Constants.DEFAULT_STORAGE_ENGINE_NAME, se.getName());

StorageBuilder storageBuilder = se.getStorageBuilder();
storageBuilder.storageName(joinDirs("transaction-test", "data"));
Storage storage = storageBuilder.openStorage();
return storage;
}

@Test
public void run() {
TransactionEngine te = getTransactionEngine(false);
Storage storage = getStorage();

Transaction t = te.beginTransaction(false);
TransactionMap<String, String> map = t.openMap("test", storage);
map.clear();
Expand Down
Expand Up @@ -65,13 +65,11 @@ public class MVCCTransaction implements Transaction {

LinkedList<LogRecord> logRecords = new LinkedList<>();

MVCCTransaction(MVCCTransactionEngine engine, long tid, int status, int logId) {
MVCCTransaction(MVCCTransactionEngine engine, long tid) {
transactionEngine = engine;
transactionId = tid;
transactionName = getTransactionName(engine.hostAndPort, tid);

this.status = status;
this.logId = logId;
status = MVCCTransaction.STATUS_OPEN;
}

static class LogRecord {
Expand Down
Expand Up @@ -125,6 +125,9 @@ void closeService() {
public void run() {
Long checkpoint = null;
while (true) {
if (isClosed)
break;

try {
semaphore.tryAcquire(sleep, TimeUnit.MILLISECONDS);
semaphore.drainPermits();
Expand Down Expand Up @@ -253,7 +256,7 @@ public MVCCTransaction beginTransaction(boolean autoCommit) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, "Not initialized");
}
long tid = getTransactionId(autoCommit);
MVCCTransaction t = new MVCCTransaction(this, tid, MVCCTransaction.STATUS_OPEN, 0);
MVCCTransaction t = new MVCCTransaction(this, tid);
t.setAutoCommit(autoCommit);
currentTransactions.put(tid, t);
return t;
Expand Down Expand Up @@ -323,7 +326,11 @@ void commitAfterValidate(long tid) {
}

private void commitFinal(long tid) {
LinkedList<LogRecord> logRecords = currentTransactions.get(tid).logRecords;
// 避免并发提交(TransactionValidator线程和其他读写线程都有可能在检查到分布式事务有效后帮助提交最终事务)
MVCCTransaction t = currentTransactions.remove(tid);
if (t == null)
return;
LinkedList<LogRecord> logRecords = t.logRecords;
StorageMap<Object, VersionedValue> map;
for (LogRecord r : logRecords) {
map = getMap(r.mapName);
Expand All @@ -342,7 +349,7 @@ private void commitFinal(long tid) {
}
}

currentTransactions.get(tid).endTransaction();
t.endTransaction();
}

RedoLogValue getRedoLog(MVCCTransaction t) {
Expand Down
Expand Up @@ -32,7 +32,7 @@ public class MVCCTransactionMap<K, V> implements TransactionMap<K, V> {
* The map used for writing (the latest version).
* <p>
* Key: the key of the data.
* Value: { transactionId, oldVersion, value }
* Value: { transactionId, logId, value }
*/
private final StorageMap<K, VersionedValue> map;

Expand Down

0 comments on commit d02baed

Please sign in to comment.