Skip to content

Commit

Permalink
HA: trying fixing problem with indexes and writeQuorum=1
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Nov 1, 2016
1 parent 930ecb1 commit 8ea0358
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 32 deletions.
Expand Up @@ -42,7 +42,7 @@ public Boolean call(ODatabaseDocumentTx db) {
waitFor(2, new OCallable<Boolean, ODatabaseDocumentTx>() {
@Override
public Boolean call(ODatabaseDocumentTx db) {
final int node2Expected = lastNodeIsUp.get() ? expected : expected - (count * writerCount * (serverInstance.size() - 1));
final long node2Expected = lastNodeIsUp.get() ? expected : expected - (count * writerCount * (serverInstance.size() - 1));

final boolean ok = db.countClass("Person") >= node2Expected;
if (!ok)
Expand Down
Expand Up @@ -34,22 +34,19 @@
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import org.junit.Assert;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.*;

/**
* Insert records concurrently against the cluster
*/
public abstract class AbstractServerClusterInsertTest extends AbstractDistributedWriteTest {
protected volatile int delayWriter = 0;
protected volatile int delayReader = 1000;
protected static int writerCount = 5;
protected int baseCount = 0;
protected int expected;
protected OIndex<?> idx;
protected volatile int delayWriter = 0;
protected volatile int delayReader = 1000;
protected static int writerCount = 5;
protected int baseCount = 0;
protected long expected;
protected OIndex<?> idx;
protected int maxRetries = 5;
protected boolean useTransactions = false;
protected List<ServerRun> executeTestsOnServers = serverInstance;
Expand Down Expand Up @@ -146,8 +143,9 @@ protected ODocument createRecord(ODatabaseDocumentTx database, int i, final Stri
checkClusterStrategy(database);

final String uniqueId = serverId + "-" + threadId + "-" + i;
ODocument person = new ODocument("Person").fields("id", uid, "name", "Billy" + uniqueId, "surname", "Mayes" + uniqueId,
"birthday", new Date(), "children", uniqueId);
ODocument person = new ODocument("Person")
.fields("id", uid, "name", "Billy" + uniqueId, "surname", "Mayes" + uniqueId, "birthday", new Date(), "children",
uniqueId);
database.save(person);

if (!useTransactions)
Expand Down Expand Up @@ -186,8 +184,8 @@ protected ODocument loadRecord(ODatabaseDocumentTx database, int i) {

final String uniqueId = serverId + "-" + threadId + "-" + i;

List<ODocument> result = database.query(new OSQLSynchQuery<ODocument>("select from Person where name = ?"),
"Billy" + uniqueId);
List<ODocument> result = database
.query(new OSQLSynchQuery<ODocument>("select from Person where name = ?"), "Billy" + uniqueId);
if (result.size() == 0)
Assert.assertTrue("No record found with name = 'Billy" + uniqueId + "'!", false);
else if (result.size() > 1)
Expand Down Expand Up @@ -368,8 +366,7 @@ protected Callable<Void> createReader(String databaseURL) {
/**
* Event called right after the database has been created and right before to be replicated to the X servers
*
* @param db
* Current database
* @param db Current database
*/
@Override
protected void onAfterDatabaseCreation(final OrientBaseGraph db) {
Expand Down Expand Up @@ -443,32 +440,68 @@ protected void checkIndexedEntries() {
if (indexName == null)
return;

ODatabaseDocumentTx database;
final Map<String, Long> result = new HashMap<String, Long>();

for (ServerRun server : serverInstance) {
if (server.isActive()) {
database = poolFactory.get(getDatabaseURL(server), "admin", "admin").acquire();
final ODatabaseDocumentTx database = poolFactory.get(getDatabaseURL(server), "admin", "admin").acquire();
try {
final long indexSize = database.getMetadata().getIndexManager().getIndex(indexName).getSize();

result.put(server.serverId, indexSize);

List<ODocument> qResult = database.query(new OSQLSynchQuery<OIdentifiable>("select count(*) from index:" + indexName));
Assert.assertEquals("Index count is different by index content", indexSize,
((Long) qResult.get(0).field("count")).longValue());

if (indexSize != expected) {
// ERROR: DUMP ALL THE RECORDS
List<ODocument> result = database.query(new OSQLSynchQuery<OIdentifiable>("select from index:" + indexName));
int i = 0;
for (ODocument d : result) {
System.out.println((i++) + ": " + ((OIdentifiable) d.field("rid")).getRecord());
}
}
// ERROR: CHECK WHAT'S MISSING
for (int s = 0; s < executeTestsOnServers.size(); ++s) {
ServerRun srv = executeTestsOnServers.get(s);
final int srvId = Integer.parseInt(srv.serverId);

for (int threadId = srvId * writerCount; threadId < (srvId + 1) * writerCount; ++threadId) {

for (int i = 0; i < count; ++i) {
final String key = "Billy" + srvId + "-" + threadId + "-" + i;

Assert.assertEquals("Indexed items on server " + server + " are " + indexSize + ", but " + expected + " was expected",
expected, indexSize);
qResult = database
.query(new OSQLSynchQuery<OIdentifiable>("select from index:" + indexName + " where key='" + key + "'"));

List<ODocument> result = database.query(new OSQLSynchQuery<OIdentifiable>("select count(*) from index:" + indexName));
Assert.assertEquals(expected, ((Long) result.get(0).field("count")).longValue());
if (qResult.isEmpty())
System.out.println("Missing key: " + key + " on server: " + server);
}
}
}
}
} finally {
database.close();
}
}
}

// CHECK IF RESULT IS COHERENT BETWEEN SERVER
String server = null;
long value = -1;
for (Map.Entry<String, Long> entry : result.entrySet()) {
if (value == -1) {
server = entry.getKey();
value = result.values().iterator().next();
} else if (entry.getValue() != value) {
Assert.assertEquals(
"Not coherent result between servers. Server " + entry.getKey() + " has " + entry.getValue() + " indexed entries, but server " + server
+ " has " + value, (Long) value, entry.getValue());
}
}

// CHECK IF RESULT IS EXPECTED
for (Map.Entry<String, Long> entry : result.entrySet()) {
if (entry.getValue() != expected) {
Assert.assertEquals(
"Indexed items on server " + entry.getKey() + " are " + entry.getValue() + ", but " + expected + " was expected",
(Long) expected, entry.getValue());
}
}
}

protected void checkInsertedEntries() {
Expand Down
Expand Up @@ -106,7 +106,7 @@ public Boolean call(ODatabaseDocumentTx db) {
waitFor(2, new OCallable<Boolean, ODatabaseDocumentTx>() {
@Override
public Boolean call(ODatabaseDocumentTx db) {
final int node2Expected = lastNodeIsUp.get() ? expected : expected - (count * writerCount * (SERVERS - 1));
final long node2Expected = lastNodeIsUp.get() ? expected : expected - (count * writerCount * (SERVERS - 1));

final boolean ok = db.countClass("Person") >= node2Expected;
if (!ok)
Expand Down
Expand Up @@ -77,7 +77,8 @@ public void executeTest() throws Exception {
OHazelcastPlugin manager = (OHazelcastPlugin) server.getServerInstance().getDistributedManager();
ODistributedConfiguration databaseConfiguration = manager.getDatabaseConfiguration(getDatabaseName());
cfg = databaseConfiguration.getDocument();
cfg.field("writeQuorum", 1);
cfg.field("writeQuorum", 2);
// cfg.field("executionMode", "asynchronous");
cfg.field("autoDeploy", true);
cfg.field("version", (Integer) cfg.field("version") + 1);
manager.updateCachedDatabaseConfiguration(getDatabaseName(), cfg, true, true);
Expand Down

0 comments on commit 8ea0358

Please sign in to comment.