Skip to content

Commit

Permalink
renamed variable in the micro-transaction commit to prepare merge of …
Browse files Browse the repository at this point in the history
…commits
  • Loading branch information
tglman committed Oct 24, 2017
1 parent 394c9f7 commit 374a293
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 55 deletions.
Expand Up @@ -1590,12 +1590,12 @@ public List<ORecordOperation> commitPreAllocated(final OTransactionInternal clie
* <bold>other node commit</bold> is the commit that happen when a node execute a transaction of another node where all the rids
* are already allocated in the other node.
*
* @param clientTx the transaction to commit
* @param transaction the transaction to commit
* @param allocated true if the operation is pre-allocated commit
*
* @return The list of operations applied by the transaction
*/
private List<ORecordOperation> commit(final OTransactionInternal clientTx, boolean allocated) {
private List<ORecordOperation> commit(final OTransactionInternal transaction, boolean allocated) {
// XXX: At this moment, there are two implementations of the commit method. One for regular client transactions and one for
// implicit micro-transactions. The implementations are quite identical, but operate on slightly different data. If you change
// this method don't forget to change its counterpart:
Expand All @@ -1608,33 +1608,32 @@ private List<ORecordOperation> commit(final OTransactionInternal clientTx, boole

txBegun.incrementAndGet();

final ODatabaseDocumentInternal databaseRecord = (ODatabaseDocumentInternal) clientTx.getDatabase();
final OIndexManager indexManager = databaseRecord.getMetadata().getIndexManager();
final TreeMap<String, OTransactionIndexChanges> indexesToCommit = getSortedIndexOperations(clientTx);
final ODatabaseDocumentInternal database = transaction.getDatabase();
final OIndexManager indexManager = database.getMetadata().getIndexManager();
final TreeMap<String, OTransactionIndexChanges> indexOperations = getSortedIndexOperations(transaction);

databaseRecord.getMetadata().makeThreadLocalSchemaSnapshot();
database.getMetadata().makeThreadLocalSchemaSnapshot();

@SuppressWarnings("unchecked")
final Iterable<ORecordOperation> entries = (Iterable<ORecordOperation>) clientTx.getRecordOperations();
final Collection<ORecordOperation> recordOperations = transaction.getRecordOperations();
final TreeMap<Integer, OCluster> clustersToLock = new TreeMap<>();
final Map<ORecordOperation, Integer> clusterOverrides = new IdentityHashMap<>();

final Set<ORecordOperation> newRecords = new TreeSet<>(COMMIT_RECORD_OPERATION_COMPARATOR);

for (ORecordOperation txEntry : entries) {
if (txEntry.type == ORecordOperation.CREATED || txEntry.type == ORecordOperation.UPDATED) {
final ORecord record = txEntry.getRecord();
for (ORecordOperation recordOperation : recordOperations) {
if (recordOperation.type == ORecordOperation.CREATED || recordOperation.type == ORecordOperation.UPDATED) {
final ORecord record = recordOperation.getRecord();
if (record instanceof ODocument)
((ODocument) record).validate();
}

if (txEntry.type == ORecordOperation.UPDATED || txEntry.type == ORecordOperation.DELETED) {
final int clusterId = txEntry.getRecord().getIdentity().getClusterId();
if (recordOperation.type == ORecordOperation.UPDATED || recordOperation.type == ORecordOperation.DELETED) {
final int clusterId = recordOperation.getRecord().getIdentity().getClusterId();
clustersToLock.put(clusterId, getClusterById(clusterId));
} else if (txEntry.type == ORecordOperation.CREATED) {
newRecords.add(txEntry);
} else if (recordOperation.type == ORecordOperation.CREATED) {
newRecords.add(recordOperation);

final ORecord record = txEntry.getRecord();
final ORecord record = recordOperation.getRecord();
final ORID rid = record.getIdentity();

int clusterId = rid.getClusterId();
Expand All @@ -1645,7 +1644,7 @@ private List<ORecordOperation> commit(final OTransactionInternal clientTx, boole
final OImmutableClass class_ = ODocumentInternal.getImmutableSchemaClass(((ODocument) record));
if (class_ != null) {
clusterId = class_.getClusterForNewInstance((ODocument) record);
clusterOverrides.put(txEntry, clusterId);
clusterOverrides.put(recordOperation, clusterId);
}
}

Expand All @@ -1662,65 +1661,65 @@ private List<ORecordOperation> commit(final OTransactionInternal clientTx, boole
checkOpenness();

makeStorageDirty();
startStorageTx(clientTx);
startStorageTx(transaction);

lockClusters(clustersToLock);

Map<ORecordOperation, OPhysicalPosition> positions = new IdentityHashMap<>();
for (ORecordOperation txEntry : newRecords) {
ORecord rec = txEntry.getRecord();
for (ORecordOperation recordOperation : newRecords) {
ORecord rec = recordOperation.getRecord();

if (allocated) {
if (rec.getIdentity().isPersistent()) {
positions.put(txEntry, new OPhysicalPosition(rec.getIdentity().getClusterPosition()));
positions.put(recordOperation, new OPhysicalPosition(rec.getIdentity().getClusterPosition()));
} else {
throw new OStorageException("Impossible to commit a transaction with not valid rid in pre-allocated commit");
}
} else if (rec.isDirty() && !rec.getIdentity().isPersistent()) {
ORecordId rid = (ORecordId) rec.getIdentity().copy();
ORecordId oldRID = rid.copy();

final Integer clusterOverride = clusterOverrides.get(txEntry);
final Integer clusterOverride = clusterOverrides.get(recordOperation);
final int clusterId = clusterOverride == null ? rid.getClusterId() : clusterOverride;

final OCluster cluster = getClusterById(clusterId);
OPhysicalPosition ppos = cluster.allocatePosition(ORecordInternal.getRecordType(rec));
OPhysicalPosition physicalPosition = cluster.allocatePosition(ORecordInternal.getRecordType(rec));
rid.setClusterId(cluster.getId());

if (rid.getClusterPosition() > -1) {
// CREATE EMPTY RECORDS UNTIL THE POSITION IS REACHED. THIS IS THE CASE WHEN A SERVER IS OUT OF SYNC
// BECAUSE A TRANSACTION HAS BEEN ROLLED BACK BEFORE TO SEND THE REMOTE CREATES. SO THE OWNER NODE DELETED
// RECORD HAVING A HIGHER CLUSTER POSITION
while (rid.getClusterPosition() > ppos.clusterPosition) {
ppos = cluster.allocatePosition(ORecordInternal.getRecordType(rec));
while (rid.getClusterPosition() > physicalPosition.clusterPosition) {
physicalPosition = cluster.allocatePosition(ORecordInternal.getRecordType(rec));
}

if (rid.getClusterPosition() != ppos.clusterPosition)
throw new OConcurrentCreateException(rid, new ORecordId(rid.getClusterId(), ppos.clusterPosition));
if (rid.getClusterPosition() != physicalPosition.clusterPosition)
throw new OConcurrentCreateException(rid, new ORecordId(rid.getClusterId(), physicalPosition.clusterPosition));
}
positions.put(txEntry, ppos);
positions.put(recordOperation, physicalPosition);

rid.setClusterPosition(ppos.clusterPosition);
rid.setClusterPosition(physicalPosition.clusterPosition);

clientTx.updateIdentityAfterCommit(oldRID, rid);
transaction.updateIdentityAfterCommit(oldRID, rid);
}
}

lockRidBags(clustersToLock, indexesToCommit, indexManager);
lockRidBags(clustersToLock, indexOperations, indexManager);

for (ORecordOperation txEntry : entries) {
commitEntry(txEntry, positions.get(txEntry), databaseRecord.getSerializer());
result.add(txEntry);
for (ORecordOperation recordOperation : recordOperations) {
commitEntry(recordOperation, positions.get(recordOperation), database.getSerializer());
result.add(recordOperation);
}

lockIndexes(indexesToCommit);
lockIndexes(indexOperations);

commitIndexes(indexesToCommit);
commitIndexes(indexOperations);

final OLogSequenceNumber lsn = endStorageTx();
final DataOutputStream journaledStream = OAbstractPaginatedStorage.journaledStream;
if (journaledStream != null) { // send event to journaled tx stream if the streaming is on
final int txId = clientTx.getClientTransactionId();
final int txId = transaction.getClientTransactionId();
if (lsn == null || writeAheadLog == null) // if tx is not journaled
try {
journaledStream.writeInt(txId);
Expand All @@ -1737,17 +1736,17 @@ private List<ORecordOperation> commit(final OTransactionInternal clientTx, boole
});
}

OTransactionAbstract.updateCacheFromEntries(clientTx.getDatabase(), entries, true);
OTransactionAbstract.updateCacheFromEntries(transaction.getDatabase(), recordOperations, true);

txCommit.incrementAndGet();

} catch (IOException | RuntimeException ioe) {
makeRollback(clientTx, ioe);
} catch (IOException | RuntimeException e) {
makeRollback(transaction, e);
} finally {
transaction.set(null);
this.transaction.set(null);
}
} finally {
databaseRecord.getMetadata().clearThreadLocalSchemaSnapshot();
database.getMetadata().clearThreadLocalSchemaSnapshot();
}
} finally {
stateLock.releaseReadLock();
Expand All @@ -1756,7 +1755,7 @@ private List<ORecordOperation> commit(final OTransactionInternal clientTx, boole
if (OLogManager.instance().isDebugEnabled())
OLogManager.instance()
.debug(this, "%d Committed transaction %d on database '%s' (result=%s)", Thread.currentThread().getId(),
clientTx.getId(), databaseRecord.getName(), result);
transaction.getId(), database.getName(), result);

return result;
} catch (RuntimeException ee) {
Expand All @@ -1771,9 +1770,9 @@ private List<ORecordOperation> commit(final OTransactionInternal clientTx, boole
/**
* Commits the given micro-transaction.
*
* @param microTransaction the micro-transaction to commit.
* @param transaction the micro-transaction to commit.
*/
public void commit(OMicroTransaction microTransaction) {
public void commit(OMicroTransaction transaction) {
// XXX: At this moment, there are two implementations of the commit method. One for regular client transactions and one for
// implicit micro-transactions. The implementations are quite identical, but operate on slightly different data. If you change
// this method don't forget to change its counterpart:
Expand All @@ -1786,13 +1785,13 @@ public void commit(OMicroTransaction microTransaction) {

txBegun.incrementAndGet();

final ODatabaseDocumentInternal database = microTransaction.getDatabase();
final ODatabaseDocumentInternal database = transaction.getDatabase();
final OIndexManager indexManager = database.getMetadata().getIndexManager();
final TreeMap<String, OTransactionIndexChanges> indexOperations = getSortedIndexOperations(microTransaction);
final TreeMap<String, OTransactionIndexChanges> indexOperations = getSortedIndexOperations(transaction);

database.getMetadata().makeThreadLocalSchemaSnapshot();

final Iterable<ORecordOperation> recordOperations = microTransaction.getRecordOperations();
final Collection<ORecordOperation> recordOperations = transaction.getRecordOperations();
final TreeMap<Integer, OCluster> clustersToLock = new TreeMap<>();
final Map<ORecordOperation, Integer> clusterOverrides = new IdentityHashMap<>();

Expand Down Expand Up @@ -1838,7 +1837,7 @@ public void commit(OMicroTransaction microTransaction) {
checkOpenness();

makeStorageDirty();
startStorageTx(microTransaction);
startStorageTx(transaction);

lockClusters(clustersToLock);

Expand Down Expand Up @@ -1872,29 +1871,30 @@ public void commit(OMicroTransaction microTransaction) {

rid.setClusterPosition(physicalPosition.clusterPosition);

microTransaction.updateIdentityAfterCommit(oldRID, rid);
transaction.updateIdentityAfterCommit(oldRID, rid);
}
}

lockRidBags(clustersToLock, indexOperations, indexManager);

for (ORecordOperation recordOperation : recordOperations)
for (ORecordOperation recordOperation : recordOperations) {
commitEntry(recordOperation, positions.get(recordOperation), database.getSerializer());
}

lockIndexes(indexOperations);

commitIndexes(indexOperations);

endStorageTx();

OTransactionAbstract.updateCacheFromEntries(microTransaction.getDatabase(), microTransaction.getRecordOperations(), true);
OTransactionAbstract.updateCacheFromEntries(transaction.getDatabase(), recordOperations, true);

txCommit.incrementAndGet();

} catch (IOException | RuntimeException e) {
makeRollback(microTransaction, e);
makeRollback(transaction, e);
} finally {
transaction.set(null);
this.transaction.set(null);
}
} finally {
database.getMetadata().clearThreadLocalSchemaSnapshot();
Expand Down
Expand Up @@ -121,7 +121,7 @@ public void testMultipleDbAllocationNotAlignedFailure() {
OVertex v = db.newVertex("V");
db.save(v);

((OAbstractPaginatedStorage) db.getStorage()).preallocateRids(db.getTransaction());
((OAbstractPaginatedStorage) db.getStorage()).preallocateRids((OTransactionOptimistic) db.getTransaction());
OTransaction transaction = db.getTransaction();
second.activateOnCurrentThread();
second.begin();
Expand Down

0 comments on commit 374a293

Please sign in to comment.