Skip to content

Commit

Permalink
Distributed: fixed ThreeClientsRecordUpdateWithTransactionsOnMultiple…
Browse files Browse the repository at this point in the history
…ServersScenarioTest
  • Loading branch information
lvca committed Apr 15, 2016
1 parent f86cae2 commit 7442819
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 64 deletions.
Expand Up @@ -334,6 +334,8 @@ public ODistributedTxContext registerTxContext(final ODistributedRequestId reqId

@Override
public ODistributedTxContext popTxContext(final ODistributedRequestId requestId) {
ODistributedServerLog.debug(this, getLocalNodeName(), null, DIRECTION.NONE,
"Distributed transaction: pop request %s for database %s", requestId, databaseName);
return activeTxContexts.remove(requestId);
}

Expand Down
Expand Up @@ -652,7 +652,7 @@ protected boolean fixNodesInConflict(final List<ODistributedResponse> bestRespon
}
}
}
return false;
return true;
}

protected boolean checkNoWinnerCase(final List<ODistributedResponse> bestResponsesGroup) {
Expand Down
Expand Up @@ -132,7 +132,7 @@ public Object call() throws Exception {
final OTxTaskResult localResult = createLocalTxResult(uResult);

final OTxTask txTask = createTxTask(uResult);
txTask.setUndoTasks(undoTasks);
txTask.setLocalUndoTasks(undoTasks);

OTransactionInternal.setStatus((OTransactionAbstract) iTx, OTransaction.TXSTATUS.COMMITTING);

Expand Down
Expand Up @@ -52,7 +52,7 @@ public class OTxTask extends OAbstractReplicatedTask {

private List<OAbstractRecordReplicatedTask> tasks = new ArrayList<OAbstractRecordReplicatedTask>();

private transient List<OAbstractRemoteTask> undoTasks;
private transient List<OAbstractRemoteTask> localUndoTasks = new ArrayList<OAbstractRemoteTask>();
private transient OTxTaskResult result;

public OTxTask() {
Expand Down Expand Up @@ -191,7 +191,7 @@ public ORemoteTask getFixTask(final ODistributedRequest iRequest, final ORemoteT
public ORemoteTask getUndoTask(final ODistributedRequestId reqId) {
final OCompletedTxTask fixTask = new OCompletedTxTask(reqId, false);

for (ORemoteTask undoTask : undoTasks)
for (ORemoteTask undoTask : localUndoTasks)
fixTask.addFixTask(undoTask);

return fixTask;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void setNodeSource(final String nodeSource) {
}
}

public void setUndoTasks(final List<OAbstractRemoteTask> undoTasks) {
this.undoTasks = undoTasks;
public void setLocalUndoTasks(final List<OAbstractRemoteTask> undoTasks) {
this.localUndoTasks = undoTasks;
}
}
Expand Up @@ -375,16 +375,17 @@ protected void waitFor(final int serverId, final OCallable<Boolean, ODatabaseDoc
}
}

protected void waitFor(final long timeout, final OCallable<Boolean, Void> condition) {
protected void waitFor(final long timeout, final OCallable<Boolean, Void> condition, final String message) {
final long startTime = System.currentTimeMillis();

while (true) {
if (condition.call(null)) {
// SUCCEED
break;
}

if (timeout > 0 && System.currentTimeMillis() - startTime > timeout)
throw new OTimeoutException("Timeout waiting for test condition");
throw new OTimeoutException("Timeout waiting for test condition: " + message);

try {
Thread.sleep(1000);
Expand Down
Expand Up @@ -141,7 +141,7 @@ protected void onBeforeChecks() throws InterruptedException {
public Boolean call(Void nothing) {
return nodeLefts.get() > 0;
}
});
}, "Nodes left are " + nodeLefts.get());
}

@Override
Expand Down
Expand Up @@ -16,16 +16,19 @@

package com.orientechnologies.orient.server.distributed.scenariotest;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.common.util.OCallable;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.exception.OQueryParsingException;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.metadata.OMetadataInternal;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.query.OSQLSynchQuery;
import com.orientechnologies.orient.server.OServer;
import com.orientechnologies.orient.server.distributed.AbstractServerClusterInsertTest;
import com.orientechnologies.orient.server.distributed.ODistributedStorageEventListener;
import com.orientechnologies.orient.server.distributed.ServerRun;

import java.util.*;
Expand All @@ -40,13 +43,13 @@

public abstract class AbstractScenarioTest extends AbstractServerClusterInsertTest {

protected final static int SERVERS = 3;
protected List<ServerRun> executeWritesOnServers = new LinkedList<ServerRun>();
protected final static ODocument MISSING_DOCUMENT = new ODocument();
protected final static int SERVERS = 3;
protected List<ServerRun> executeWritesOnServers = new LinkedList<ServerRun>();
protected final static ODocument MISSING_DOCUMENT = new ODocument();

// FIXME: these should be parameters read from configuration file (or, if missing, defaulted to some values)
private final long PROPAGATION_DOCUMENT_RETRIEVE_TIMEOUT = 5000;
protected final long DOCUMENT_WRITE_TIMEOUT = 5000;
private final long PROPAGATION_DOCUMENT_RETRIEVE_TIMEOUT = 15000;
protected final long DOCUMENT_WRITE_TIMEOUT = 5000;

protected ODocument loadRecord(ODatabaseDocumentTx database, int serverId, int threadId, int i) {
final String uniqueId = serverId + "-" + threadId + "-" + i;
Expand Down Expand Up @@ -193,9 +196,8 @@ protected void checkWritesAboveCluster(List<ServerRun> checkConsistencyOnServers

List<ODocument> docsToCompare = new LinkedList<ODocument>();

super.banner(
"Checking consistency among servers...\nChecking on servers {" + checkOnServer + "} that all the records written on {"
+ writtenServer + "} are consistent.");
super.banner("Checking consistency among servers...\nChecking on servers {" + checkOnServer
+ "} that all the records written on {" + writtenServer + "} are consistent.");

try {

Expand Down Expand Up @@ -273,7 +275,7 @@ public Boolean call(Void iArgument) {
}
return true;
}
});
}, "Record " + recordId);

}

Expand All @@ -298,11 +300,12 @@ public Boolean call(Void iArgument) {
|| document.field(fieldName) == null && expectedFieldValue != null)
return false;

System.out.println("Waiting for updated document propagation..");
OLogManager.instance().info(this, "Waiting for updated document propagation on record %s (%s=%s)...", recordId, fieldName,
expectedFieldValue);
}
return true;
}
});
}, String.format("expected %s=%s", recordId, fieldName, expectedFieldValue));

}

Expand All @@ -320,8 +323,7 @@ private ODocument retrieveRecord(String dbUrl, String uniqueId, boolean returnsM
assertTrue(result.size() + " records found with id = '" + uniqueId + "'!", false);

return (ODocument) result.get(0).reload();
}
finally {
} finally {
ODatabaseRecordThreadLocal.INSTANCE.set(null);
}
}
Expand Down Expand Up @@ -458,23 +460,23 @@ public void executeFutures(Collection<Future<Void>> futures) {
}

/*
* A simple client that updates a record
*/
* A simple client that updates a record
*/
protected class RecordUpdater implements Callable<Void> {

private String dbServerUrl;
private ODocument recordToUpdate;
private Map<String, String> fields;
private Map<String, Object> fields;
private boolean useTransaction;

protected RecordUpdater(String dbServerUrl, ODocument recordToUpdate, Map<String, String> fields, boolean useTransaction) {
protected RecordUpdater(String dbServerUrl, ODocument recordToUpdate, Map<String, Object> fields, boolean useTransaction) {
this.dbServerUrl = dbServerUrl;
this.recordToUpdate = recordToUpdate;
this.fields = fields;
this.useTransaction = useTransaction;
}

protected RecordUpdater(String dbServerUrl, String rid, Map<String, String> fields, boolean useTransaction) {
protected RecordUpdater(String dbServerUrl, String rid, Map<String, Object> fields, boolean useTransaction) {
this.dbServerUrl = dbServerUrl;
this.useTransaction = useTransaction;
this.recordToUpdate = retrieveRecord(dbServerUrl, rid);
Expand Down Expand Up @@ -504,4 +506,35 @@ public Void call() throws Exception {
}
}

class AfterRecordLockDelayer implements ODistributedStorageEventListener {

private long delay;

public AfterRecordLockDelayer(long delay) {
this.delay = delay;
System.err.println("Delayer created with " + delay + "ms of delay");
}

public AfterRecordLockDelayer() {
this(DOCUMENT_WRITE_TIMEOUT);
}

@Override
public void onAfterRecordLock(ORecordId rid) {
try {
OLogManager.instance().info(this, "Waiting %s for %dms with locked record [%s]", Thread.currentThread().getId(), delay,
rid.toString());
Thread.sleep(delay);
OLogManager.instance().info(this, "Finished %s Waiting for %dms with locked record [%s]", Thread.currentThread().getId(),
delay, rid.toString());
} catch (InterruptedException e) {

}
}

@Override
public void onAfterRecordUnlock(ORecordId rid) {
}
}

}
Expand Up @@ -16,13 +16,10 @@

package com.orientechnologies.orient.server.distributed.scenariotest;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.server.distributed.ODistributedStorage;
import com.orientechnologies.orient.server.distributed.ODistributedStorageEventListener;
import org.junit.Test;

import java.util.HashMap;
Expand All @@ -44,19 +41,19 @@

public class SimultaneousRecordUpdateWithTransactionsOnMultipleServersScenarioTest extends AbstractScenarioTest {

private final String RECORD_ID = "R001";
private HashMap<String, String> lukeFields = new HashMap<String, String>() {
{
put("firstName", "Luke");
put("lastName", "Skywalker");
}
};
private HashMap<String, String> darthFields = new HashMap<String, String>() {
{
put("firstName", "Darth");
put("lastName", "Vader");
}
};
private final String RECORD_ID = "R001";
private HashMap<String, Object> lukeFields = new HashMap<String, Object>() {
{
put("firstName", "Luke");
put("lastName", "Skywalker");
}
};
private HashMap<String, Object> darthFields = new HashMap<String, Object>() {
{
put("firstName", "Darth");
put("lastName", "Vader");
}
};

@Test
public void test() throws Exception {
Expand Down Expand Up @@ -120,21 +117,21 @@ public void executeTest() throws Exception {
public String getDatabaseName() {
return "distributed-simultaneous-update";
}

class AfterRecordLockDelayer implements ODistributedStorageEventListener {

@Override
public void onAfterRecordLock(ORecordId rid) {
try {
OLogManager.instance().error(this, "Waiting for %dms with locked record [%s]", DOCUMENT_WRITE_TIMEOUT, rid.toString());
Thread.sleep(DOCUMENT_WRITE_TIMEOUT);
} catch (InterruptedException e) {

}
}

@Override
public void onAfterRecordUnlock(ORecordId rid) {
}
}
//
// class AfterRecordLockDelayer implements ODistributedStorageEventListener {
//
// @Override
// public void onAfterRecordLock(ORecordId rid) {
// try {
// OLogManager.instance().error(this, "Waiting for %dms with locked record [%s]", DOCUMENT_WRITE_TIMEOUT, rid.toString());
// Thread.sleep(DOCUMENT_WRITE_TIMEOUT);
// } catch (InterruptedException e) {
//
// }
// }
//
// @Override
// public void onAfterRecordUnlock(ORecordId rid) {
// }
// }
}

0 comments on commit 7442819

Please sign in to comment.