Skip to content

Commit

Permalink
feat: first implementation of preserve rids in database import
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed May 20, 2024
1 parent 73e917f commit a9b11c6
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1283,4 +1283,14 @@ public long truncateClass(String name, boolean polimorfic) {
}
return count;
}

@Override
public void commitPreallocate() {
throw new UnsupportedOperationException();
}

@Override
public void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.orientechnologies.orient.core.tx.OTransactionAbstract;
import com.orientechnologies.orient.core.tx.OTransactionData;
import com.orientechnologies.orient.core.tx.OTransactionInternal;
import com.orientechnologies.orient.core.tx.OTransactionOptimistic;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -304,4 +305,8 @@ default void queryStartUsingViewIndex(String index) {}
public long truncateClass(String name, boolean polimorfic);

public long truncateClusterInternal(String name);

void commitPreallocate();

void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

package com.orientechnologies.orient.core.db.document;

import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.common.concur.lock.OLockException;
import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.exception.OHighLevelException;
import com.orientechnologies.common.io.OIOUtils;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.Orient;
Expand Down Expand Up @@ -2086,4 +2088,70 @@ public long truncateClusterInternal(String clusterName) {
public void truncateCluster(String clusterName) {
truncateClusterInternal(clusterName);
}

@Override
public void commitPreallocate() {
checkOpenness();
checkIfActive();

if (!currentTx.isActive()) return;

if (currentTx.amountOfNestedTxs() > 1) {
// This just do count down no real commit here
((OTransactionOptimistic) currentTx).commitPreallocate();
return;
}

// WAKE UP LISTENERS

try {
beforeCommitOperations();
} catch (OException e) {
try {
rollback();
} catch (Exception re) {
OLogManager.instance()
.error(this, "Exception during rollback `%08X`", re, System.identityHashCode(re));
}
throw e;
}
try {
((OTransactionOptimistic) currentTx).commitPreallocate();
} catch (RuntimeException e) {

if ((e instanceof OHighLevelException) || (e instanceof ONeedRetryException))
OLogManager.instance()
.debug(this, "Error on transaction commit `%08X`", e, System.identityHashCode(e));
else
OLogManager.instance()
.error(this, "Error on transaction commit `%08X`", e, System.identityHashCode(e));

// WAKE UP ROLLBACK LISTENERS
beforeRollbackOperations();

try {
// ROLLBACK TX AT DB LEVEL
((OTransactionAbstract) currentTx).internalRollback();
} catch (Exception re) {
OLogManager.instance()
.error(
this, "Error during transaction rollback `%08X`", re, System.identityHashCode(re));
}

// WAKE UP ROLLBACK LISTENERS
afterRollbackOperations();
throw e;
}

// WAKE UP LISTENERS
afterCommitOperations();

return;
}

@Override
public void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic) {
((OAbstractPaginatedStorage) getStorage()).preallocateRids(oTransactionOptimistic);
((OAbstractPaginatedStorage) getStorage()).commitPreAllocated(oTransactionOptimistic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.core.tx.OTransactionAbstract;
import com.orientechnologies.orient.core.tx.OTransactionInternal;
import com.orientechnologies.orient.core.tx.OTransactionOptimistic;
import com.orientechnologies.orient.core.util.OURLConnection;
import com.orientechnologies.orient.core.util.OURLHelper;
import java.io.IOException;
Expand Down Expand Up @@ -1755,4 +1756,14 @@ public long truncateClusterInternal(String name) {
public long truncateClass(String name, boolean polimorfic) {
return internal.truncateClass(name, polimorfic);
}

@Override
public void commitPreallocate() {
internal.commitPreallocate();
}

@Override
public void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic) {
internal.internalCommitPreallocate(oTransactionOptimistic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public boolean compare() {
+ " where key = ?",
rid.toString())) {
if (resultSet.hasNext()) {
return new ORecordId(resultSet.next().<String>getProperty("value"));
return (ORID) resultSet.next().getProperty("value");
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ protected void parseSetting(final String option, final List<String> items) {

} else if (option.equalsIgnoreCase("-useLineFeedForRecords")) {
useLineFeedForRecords = Boolean.parseBoolean(items.get(0));
} else if (option.equalsIgnoreCase("-preserveRids")) {
setPreserveRids(Boolean.parseBoolean(items.get(0)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.orientechnologies.orient.core.db.document.ODocumentFieldWalker;
import com.orientechnologies.orient.core.db.record.OClassTrigger;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.db.record.ridbag.ORidBag;
import com.orientechnologies.orient.core.db.tool.importer.OConverterData;
import com.orientechnologies.orient.core.db.tool.importer.OLinksRewriter;
Expand Down Expand Up @@ -72,6 +73,7 @@
import com.orientechnologies.orient.core.sql.executor.ORidSet;
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
import com.orientechnologies.orient.core.storage.OStorage;
import com.orientechnologies.orient.core.tx.OTransactionInternal;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -105,7 +107,6 @@ public class ODatabaseImport extends ODatabaseImpExpAbstract {
private final Map<OPropertyImpl, String> linkedClasses = new HashMap<>();
private final Map<OClass, List<String>> superClasses = new HashMap<>();
private OJSONReader jsonReader;
private ORecord record;
private boolean schemaImported = false;
private int exporterVersion = -1;
private ORID schemaRecordId;
Expand Down Expand Up @@ -372,6 +373,13 @@ public void setPreserveClusterIDs(boolean preserveClusterIDs) {
this.preserveClusterIDs = preserveClusterIDs;
}

public void setPreserveRids(boolean preserveRids) {
super.setPreserveRids(preserveRids);
if (preserveRids) {
setPreserveClusterIDs(true);
}
}

public boolean isMerge() {
return merge;
}
Expand Down Expand Up @@ -463,8 +471,9 @@ private void removeDefaultNonSecurityClasses() {
for (final OClass dbClass : classes) {
final String className = dbClass.getName();
if (!dbClass.isSuperClassOf(role)
&& !dbClass.isSuperClassOf(user)
&& !dbClass.isSuperClassOf(identity) /*&& !dbClass.isSuperClassOf(oSecurityPolicy)*/) {
&& !dbClass.isSuperClassOf(user)
&& !dbClass.isSuperClassOf(identity)
|| isPreserveRids()) {
classesToDrop.put(className, dbClass);
for (final OIndex index : dbClass.getIndexes()) {
indexNames.add(index.getName());
Expand Down Expand Up @@ -557,7 +566,7 @@ private void importManualIndexes() throws IOException, ParseException {
if (!result.hasNext()) {
newRid = oldRid;
} else {
newRid = new ORecordId(result.next().<String>getProperty("value"));
newRid = result.next().getProperty("value");
}
}

Expand All @@ -574,7 +583,7 @@ private void importManualIndexes() throws IOException, ParseException {
if (!result.hasNext()) {
newRid = document.field("rid");
} else {
newRid = new ORecordId(result.next().<String>getProperty("value"));
newRid = result.next().getProperty("value");
}
}

Expand Down Expand Up @@ -1160,7 +1169,7 @@ private ORID importRecord(final HashSet<ORID> recordsBeforeImport) throws Except
value = value.substring(1);
}

record = null;
ORecord record = null;

// big ridbags (ie. supernodes) sometimes send the system OOM, so they have to be discarded at
// this stage
Expand Down Expand Up @@ -1308,21 +1317,31 @@ record =
ORecordInternal.setVersion(record, loadedRecord.getVersion());
} else {
ORecordInternal.setVersion(record, 0);
ORecordInternal.setIdentity(record, new ORecordId());
if (!preserveRids) {
ORecordInternal.setIdentity(record, new ORecordId());
}
}
record.setDirty();

if (!preserveRids
&& record instanceof ODocument
&& ODocumentInternal.getImmutableSchemaClass(database, ((ODocument) record)) != null)
if (preserveRids) {
database.begin();
record.save();
else record.save(database.getClusterNameById(clusterId));
((OTransactionInternal) database.getTransaction())
.getRecordEntry(record.getIdentity())
.type =
ORecordOperation.CREATED;
database.commitPreallocate();
} else if (record instanceof ODocument
&& ODocumentInternal.getImmutableSchemaClass(database, ((ODocument) record)) != null) {
record.save();
} else {
record.save(database.getClusterNameById(clusterId));
}

if (!rid.equals(record.getIdentity())) {
// SAVE IT ONLY IF DIFFERENT
new ODocument(EXPORT_IMPORT_CLASS_NAME)
.field("key", rid.toString())
.field("value", record.getIdentity().toString())
.field("value", record)
.save();
}
}
Expand Down Expand Up @@ -1376,7 +1395,7 @@ private long importRecords() throws Exception {
}
final OClass cls = schema.createClass(EXPORT_IMPORT_CLASS_NAME);
cls.createProperty("key", OType.STRING);
cls.createProperty("value", OType.STRING);
cls.createProperty("value", OType.LINK);
cls.createIndex(EXPORT_IMPORT_INDEX_NAME, OClass.INDEX_TYPE.DICTIONARY, "key");

jsonReader.readNext(OJSONReader.BEGIN_COLLECTION);
Expand Down Expand Up @@ -1439,8 +1458,6 @@ private long importRecords() throws Exception {
lastLapRecords = 0;
involvedClusters.clear();
}

record = null;
}

if (!merge) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.db.tool.ODatabaseImport;
import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.sql.executor.OResultSet;

/** Created by tglman on 28/07/17. */
Expand All @@ -26,7 +25,7 @@ public OIdentifiable convert(OIdentifiable value) {
"select value from " + ODatabaseImport.EXPORT_IMPORT_CLASS_NAME + " where key = ?",
rid.toString())) {
if (resultSet.hasNext()) {
return new ORecordId(resultSet.next().<String>getProperty("value"));
return resultSet.next().getProperty("value");
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,34 @@ public void commit(final boolean force) {
}
}

public void commitPreallocate() {
checkTransactionValid();
if (txStartCounter < 0) {
throw new OStorageException("Invalid value of tx counter: " + txStartCounter);
}
txStartCounter--;

if (txStartCounter == 0) {
if (status == TXSTATUS.ROLLED_BACK || status == TXSTATUS.ROLLBACKING) {
throw new ORollbackException(
"Given transaction was rolled back, and thus cannot be committed.");
}
status = TXSTATUS.COMMITTING;

if (sentToServer || !allEntries.isEmpty() || !indexEntries.isEmpty()) {
database.internalCommitPreallocate(this);
}
invokeCallbacks();
close();
status = TXSTATUS.COMPLETED;
} else if (txStartCounter > 0) {
OLogManager.instance()
.debug(this, "Nested transaction was closed but transaction itself was not committed.");
} else {
throw new OTransactionException("Transaction was committed more times than it was started.");
}
}

@Override
public int amountOfNestedTxs() {
return txStartCounter;
Expand Down
Loading

0 comments on commit a9b11c6

Please sign in to comment.