Skip to content

Commit

Permalink
Added new test case to simulate crash and recover
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jul 23, 2015
1 parent 41a6320 commit a5a6190
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 16 deletions.
Expand Up @@ -50,6 +50,7 @@ public abstract class AbstractServerClusterInsertTest extends AbstractDistribute
protected static final int delayWriter = 0;
protected static final int delayReader = 1000;
protected static final int writerCount = 5;
protected int baseCount = 0;
protected long expected;
protected long beginInstances;
protected OIndex<?> idx;
Expand All @@ -74,9 +75,11 @@ public Void call() throws Exception {
if ((i + 1) % 100 == 0)
System.out.println("\nWriter " + database.getURL() + " managed " + (i + 1) + "/" + count + " records so far");

final ODocument person = createRecord(database, i);
updateRecord(database, i);
checkRecord(database, i);
final int id = baseCount + i;

final ODocument person = createRecord(database, id);
updateRecord(database, id);
checkRecord(database, id);
checkIndex(database, (String) person.field("name"), person.getIdentity());

Thread.sleep(delayWriter);
Expand Down Expand Up @@ -187,30 +190,42 @@ public void executeTest() throws Exception {
database.close();
}

executeMultipleTest();
dropIndexNode1();
recreateIndexNode2();
}

protected void executeMultipleTest() throws InterruptedException, java.util.concurrent.ExecutionException {
System.out.println("Creating Writers and Readers threads...");

final ExecutorService writerExecutors = Executors.newCachedThreadPool();
final ExecutorService readerExecutors = Executors.newCachedThreadPool();

runningWriters = new CountDownLatch(serverInstance.size() * writerCount);
expected = writerCount * count * serverInstance.size() + beginInstances;

int serverId = 0;
int threadId = 0;
List<Callable<Void>> writerWorkers = new ArrayList<Callable<Void>>();
for (ServerRun server : serverInstance) {
for (int j = 0; j < writerCount; j++) {
Callable writer = createWriter(serverId, threadId++, getDatabaseURL(server));
writerWorkers.add(writer);
if (server.isActive()) {
for (int j = 0; j < writerCount; j++) {
Callable writer = createWriter(serverId, threadId++, getDatabaseURL(server));
writerWorkers.add(writer);
}
serverId++;
}
serverId++;
}

expected = writerCount * count * serverId + beginInstances + baseCount;

List<Future<Void>> futures = writerExecutors.invokeAll(writerWorkers);

List<Callable<Void>> readerWorkers = new ArrayList<Callable<Void>>();
for (ServerRun server : serverInstance) {
Callable<Void> reader = createReader(getDatabaseURL(server));
readerWorkers.add(reader);
if (server.isActive()) {
Callable<Void> reader = createReader(getDatabaseURL(server));
readerWorkers.add(reader);
}
}

List<Future<Void>> rFutures = readerExecutors.invokeAll(readerWorkers);
Expand All @@ -236,13 +251,13 @@ public void executeTest() throws Exception {
System.out.println("All threads have finished, shutting down server instances");

for (ServerRun server : serverInstance) {
printStats(getDatabaseURL(server));
if (server.isActive()) {
printStats(getDatabaseURL(server));
}
}

checkInsertedEntries();
checkIndexedEntries();
dropIndexNode1();
recreateIndexNode2();
}

protected Callable<Void> createReader(String databaseURL) {
Expand Down Expand Up @@ -394,7 +409,7 @@ private void printStats(final String databaseUrl) {
System.out.println("\nReader " + name + " sql count: " + result.get(0) + " counting class: " + database.countClass("Person")
+ " counting cluster: " + database.countClusterElements("Person"));

if (((OMetadataInternal)database.getMetadata()).getImmutableSchemaSnapshot().existsClass("ODistributedConflict"))
if (((OMetadataInternal) database.getMetadata()).getImmutableSchemaSnapshot().existsClass("ODistributedConflict"))
try {
List<ODocument> conflicts = database
.query(new OSQLSynchQuery<OIdentifiable>("select count(*) from ODistributedConflict"));
Expand Down
Expand Up @@ -147,7 +147,7 @@ protected void onServerStarted(ServerRun server) {
protected void onTestEnded() {
}

protected void onAfterExecution() {
protected void onAfterExecution() throws Exception {
}

protected abstract String getDatabaseName();
Expand Down
Expand Up @@ -44,7 +44,7 @@ public TxWriter(final int iServerId, final String db) {
@Override
public Void call() throws Exception {
String name = Integer.toString(serverId);
for (int i = 0; i < count; i++) {
for (int i = baseCount; i < count; i++) {
final ODatabaseDocumentTx database = poolFactory.get(databaseUrl, "admin", "admin").acquire();
try {
if ((i + 1) % 100 == 0)
Expand Down
@@ -0,0 +1,67 @@
/*
* Copyright 2010-2013 Luca Garulli (l.garulli--at--orientechnologies.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.orientechnologies.orient.server.distributed;

import org.junit.Ignore;

/**
* Distributed TX test against "plocal" protocol + shutdown and restart of a node.
*/
public class ServerClusterLocalTxHATest extends AbstractServerClusterTxTest {
final static int SERVERS = 3;

@Ignore
// @Test
public void test() throws Exception {
init(SERVERS);
prepare(false);
execute();
}

@Override
protected void onAfterExecution() throws Exception {
System.out.println("SIMULATE FAILURE ON SERVER " + (SERVERS - 1));
serverInstance.get(SERVERS - 1).shutdownServer();

Thread.sleep(1000);

System.out.println("RESTARTING TESTS...");

baseCount += count;
count = 1000000;

executeMultipleTest();

System.out.println("RESTART SERVER " + (SERVERS - 1) + "...");
serverInstance.get(SERVERS - 1).startServer(getDistributedServerConfiguration(serverInstance.get(SERVERS - 1)));

System.out.println("RESTARTING TESTS...");

baseCount += count;
count = 1000000;

executeMultipleTest();
}

protected String getDatabaseURL(final ServerRun server) {
return "plocal:" + server.getDatabasePath(getDatabaseName());
}

@Override
public String getDatabaseName() {
return "distributed-inserttxha";
}
}

0 comments on commit a5a6190

Please sign in to comment.