From a9b11c6858e6b6ab8984075bb8da131dec484a93 Mon Sep 17 00:00:00 2001 From: Tglman Date: Fri, 17 May 2024 14:47:46 +0200 Subject: [PATCH] feat: first implementation of preserve rids in database import --- .../db/document/ODatabaseDocumentRemote.java | 10 +++ .../core/db/ODatabaseDocumentInternal.java | 5 ++ .../document/ODatabaseDocumentEmbedded.java | 68 +++++++++++++++++++ .../core/db/document/ODatabaseDocumentTx.java | 11 +++ .../orient/core/db/tool/ODatabaseCompare.java | 2 +- .../core/db/tool/ODatabaseImpExpAbstract.java | 2 + .../orient/core/db/tool/ODatabaseImport.java | 49 ++++++++----- .../core/db/tool/importer/OLinkConverter.java | 3 +- .../core/tx/OTransactionOptimistic.java | 28 ++++++++ .../core/db/tool/ODatabaseImportTest.java | 57 ++++++++++++++++ .../impl/ODatabaseDocumentDistributed.java | 20 ++++++ 11 files changed, 236 insertions(+), 19 deletions(-) diff --git a/client/src/main/java/com/orientechnologies/orient/client/remote/db/document/ODatabaseDocumentRemote.java b/client/src/main/java/com/orientechnologies/orient/client/remote/db/document/ODatabaseDocumentRemote.java index 039abef577f..fbd3b5d336d 100755 --- a/client/src/main/java/com/orientechnologies/orient/client/remote/db/document/ODatabaseDocumentRemote.java +++ b/client/src/main/java/com/orientechnologies/orient/client/remote/db/document/ODatabaseDocumentRemote.java @@ -1283,4 +1283,14 @@ public long truncateClass(String name, boolean polimorfic) { } return count; } + + @Override + public void commitPreallocate() { + throw new UnsupportedOperationException(); + } + + @Override + public void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic) { + throw new UnsupportedOperationException(); + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseDocumentInternal.java b/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseDocumentInternal.java index b768726ef1e..8389a535899 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseDocumentInternal.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/ODatabaseDocumentInternal.java @@ -46,6 +46,7 @@ import com.orientechnologies.orient.core.tx.OTransactionAbstract; import com.orientechnologies.orient.core.tx.OTransactionData; import com.orientechnologies.orient.core.tx.OTransactionInternal; +import com.orientechnologies.orient.core.tx.OTransactionOptimistic; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -304,4 +305,8 @@ default void queryStartUsingViewIndex(String index) {} public long truncateClass(String name, boolean polimorfic); public long truncateClusterInternal(String name); + + void commitPreallocate(); + + void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic); } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentEmbedded.java b/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentEmbedded.java index 2b5b2a52e30..b264c90722d 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentEmbedded.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentEmbedded.java @@ -20,8 +20,10 @@ package com.orientechnologies.orient.core.db.document; +import com.orientechnologies.common.concur.ONeedRetryException; import com.orientechnologies.common.concur.lock.OLockException; import com.orientechnologies.common.exception.OException; +import com.orientechnologies.common.exception.OHighLevelException; import com.orientechnologies.common.io.OIOUtils; import com.orientechnologies.common.log.OLogManager; import com.orientechnologies.orient.core.Orient; @@ -2086,4 +2088,70 @@ public long truncateClusterInternal(String clusterName) { public void truncateCluster(String clusterName) { truncateClusterInternal(clusterName); } + + @Override + public void commitPreallocate() { + checkOpenness(); + checkIfActive(); + + if (!currentTx.isActive()) return; + + if (currentTx.amountOfNestedTxs() > 1) { + // This just do count down no real commit here + ((OTransactionOptimistic) currentTx).commitPreallocate(); + return; + } + + // WAKE UP LISTENERS + + try { + beforeCommitOperations(); + } catch (OException e) { + try { + rollback(); + } catch (Exception re) { + OLogManager.instance() + .error(this, "Exception during rollback `%08X`", re, System.identityHashCode(re)); + } + throw e; + } + try { + ((OTransactionOptimistic) currentTx).commitPreallocate(); + } catch (RuntimeException e) { + + if ((e instanceof OHighLevelException) || (e instanceof ONeedRetryException)) + OLogManager.instance() + .debug(this, "Error on transaction commit `%08X`", e, System.identityHashCode(e)); + else + OLogManager.instance() + .error(this, "Error on transaction commit `%08X`", e, System.identityHashCode(e)); + + // WAKE UP ROLLBACK LISTENERS + beforeRollbackOperations(); + + try { + // ROLLBACK TX AT DB LEVEL + ((OTransactionAbstract) currentTx).internalRollback(); + } catch (Exception re) { + OLogManager.instance() + .error( + this, "Error during transaction rollback `%08X`", re, System.identityHashCode(re)); + } + + // WAKE UP ROLLBACK LISTENERS + afterRollbackOperations(); + throw e; + } + + // WAKE UP LISTENERS + afterCommitOperations(); + + return; + } + + @Override + public void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic) { + ((OAbstractPaginatedStorage) getStorage()).preallocateRids(oTransactionOptimistic); + ((OAbstractPaginatedStorage) getStorage()).commitPreAllocated(oTransactionOptimistic); + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java b/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java index e0e6e32df92..99bf13bf51f 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/document/ODatabaseDocumentTx.java @@ -66,6 +66,7 @@ import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransactionAbstract; import com.orientechnologies.orient.core.tx.OTransactionInternal; +import com.orientechnologies.orient.core.tx.OTransactionOptimistic; import com.orientechnologies.orient.core.util.OURLConnection; import com.orientechnologies.orient.core.util.OURLHelper; import java.io.IOException; @@ -1755,4 +1756,14 @@ public long truncateClusterInternal(String name) { public long truncateClass(String name, boolean polimorfic) { return internal.truncateClass(name, polimorfic); } + + @Override + public void commitPreallocate() { + internal.commitPreallocate(); + } + + @Override + public void internalCommitPreallocate(OTransactionOptimistic oTransactionOptimistic) { + internal.internalCommitPreallocate(oTransactionOptimistic); + } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java index b21650bc0d3..15c4b9f63c6 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseCompare.java @@ -147,7 +147,7 @@ public boolean compare() { + " where key = ?", rid.toString())) { if (resultSet.hasNext()) { - return new ORecordId(resultSet.next().getProperty("value")); + return (ORID) resultSet.next().getProperty("value"); } return null; } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImpExpAbstract.java b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImpExpAbstract.java index f7764beb5ad..21c245b20e6 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImpExpAbstract.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImpExpAbstract.java @@ -252,6 +252,8 @@ protected void parseSetting(final String option, final List items) { } else if (option.equalsIgnoreCase("-useLineFeedForRecords")) { useLineFeedForRecords = Boolean.parseBoolean(items.get(0)); + } else if (option.equalsIgnoreCase("-preserveRids")) { + setPreserveRids(Boolean.parseBoolean(items.get(0))); } } } diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImport.java b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImport.java index f96ccf413ab..33f23324ee6 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImport.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/tool/ODatabaseImport.java @@ -33,6 +33,7 @@ import com.orientechnologies.orient.core.db.document.ODocumentFieldWalker; import com.orientechnologies.orient.core.db.record.OClassTrigger; import com.orientechnologies.orient.core.db.record.OIdentifiable; +import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ridbag.ORidBag; import com.orientechnologies.orient.core.db.tool.importer.OConverterData; import com.orientechnologies.orient.core.db.tool.importer.OLinksRewriter; @@ -72,6 +73,7 @@ import com.orientechnologies.orient.core.sql.executor.ORidSet; import com.orientechnologies.orient.core.storage.OPhysicalPosition; import com.orientechnologies.orient.core.storage.OStorage; +import com.orientechnologies.orient.core.tx.OTransactionInternal; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -105,7 +107,6 @@ public class ODatabaseImport extends ODatabaseImpExpAbstract { private final Map linkedClasses = new HashMap<>(); private final Map> superClasses = new HashMap<>(); private OJSONReader jsonReader; - private ORecord record; private boolean schemaImported = false; private int exporterVersion = -1; private ORID schemaRecordId; @@ -372,6 +373,13 @@ public void setPreserveClusterIDs(boolean preserveClusterIDs) { this.preserveClusterIDs = preserveClusterIDs; } + public void setPreserveRids(boolean preserveRids) { + super.setPreserveRids(preserveRids); + if (preserveRids) { + setPreserveClusterIDs(true); + } + } + public boolean isMerge() { return merge; } @@ -463,8 +471,9 @@ private void removeDefaultNonSecurityClasses() { for (final OClass dbClass : classes) { final String className = dbClass.getName(); if (!dbClass.isSuperClassOf(role) - && !dbClass.isSuperClassOf(user) - && !dbClass.isSuperClassOf(identity) /*&& !dbClass.isSuperClassOf(oSecurityPolicy)*/) { + && !dbClass.isSuperClassOf(user) + && !dbClass.isSuperClassOf(identity) + || isPreserveRids()) { classesToDrop.put(className, dbClass); for (final OIndex index : dbClass.getIndexes()) { indexNames.add(index.getName()); @@ -557,7 +566,7 @@ private void importManualIndexes() throws IOException, ParseException { if (!result.hasNext()) { newRid = oldRid; } else { - newRid = new ORecordId(result.next().getProperty("value")); + newRid = result.next().getProperty("value"); } } @@ -574,7 +583,7 @@ private void importManualIndexes() throws IOException, ParseException { if (!result.hasNext()) { newRid = document.field("rid"); } else { - newRid = new ORecordId(result.next().getProperty("value")); + newRid = result.next().getProperty("value"); } } @@ -1160,7 +1169,7 @@ private ORID importRecord(final HashSet recordsBeforeImport) throws Except value = value.substring(1); } - record = null; + ORecord record = null; // big ridbags (ie. supernodes) sometimes send the system OOM, so they have to be discarded at // this stage @@ -1308,21 +1317,31 @@ record = ORecordInternal.setVersion(record, loadedRecord.getVersion()); } else { ORecordInternal.setVersion(record, 0); - ORecordInternal.setIdentity(record, new ORecordId()); + if (!preserveRids) { + ORecordInternal.setIdentity(record, new ORecordId()); + } } record.setDirty(); - - if (!preserveRids - && record instanceof ODocument - && ODocumentInternal.getImmutableSchemaClass(database, ((ODocument) record)) != null) + if (preserveRids) { + database.begin(); record.save(); - else record.save(database.getClusterNameById(clusterId)); + ((OTransactionInternal) database.getTransaction()) + .getRecordEntry(record.getIdentity()) + .type = + ORecordOperation.CREATED; + database.commitPreallocate(); + } else if (record instanceof ODocument + && ODocumentInternal.getImmutableSchemaClass(database, ((ODocument) record)) != null) { + record.save(); + } else { + record.save(database.getClusterNameById(clusterId)); + } if (!rid.equals(record.getIdentity())) { // SAVE IT ONLY IF DIFFERENT new ODocument(EXPORT_IMPORT_CLASS_NAME) .field("key", rid.toString()) - .field("value", record.getIdentity().toString()) + .field("value", record) .save(); } } @@ -1376,7 +1395,7 @@ private long importRecords() throws Exception { } final OClass cls = schema.createClass(EXPORT_IMPORT_CLASS_NAME); cls.createProperty("key", OType.STRING); - cls.createProperty("value", OType.STRING); + cls.createProperty("value", OType.LINK); cls.createIndex(EXPORT_IMPORT_INDEX_NAME, OClass.INDEX_TYPE.DICTIONARY, "key"); jsonReader.readNext(OJSONReader.BEGIN_COLLECTION); @@ -1439,8 +1458,6 @@ private long importRecords() throws Exception { lastLapRecords = 0; involvedClusters.clear(); } - - record = null; } if (!merge) { diff --git a/core/src/main/java/com/orientechnologies/orient/core/db/tool/importer/OLinkConverter.java b/core/src/main/java/com/orientechnologies/orient/core/db/tool/importer/OLinkConverter.java index a36068a2ac0..b54b2071033 100644 --- a/core/src/main/java/com/orientechnologies/orient/core/db/tool/importer/OLinkConverter.java +++ b/core/src/main/java/com/orientechnologies/orient/core/db/tool/importer/OLinkConverter.java @@ -3,7 +3,6 @@ import com.orientechnologies.orient.core.db.record.OIdentifiable; import com.orientechnologies.orient.core.db.tool.ODatabaseImport; import com.orientechnologies.orient.core.id.ORID; -import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.sql.executor.OResultSet; /** Created by tglman on 28/07/17. */ @@ -26,7 +25,7 @@ public OIdentifiable convert(OIdentifiable value) { "select value from " + ODatabaseImport.EXPORT_IMPORT_CLASS_NAME + " where key = ?", rid.toString())) { if (resultSet.hasNext()) { - return new ORecordId(resultSet.next().getProperty("value")); + return resultSet.next().getProperty("value"); } return value; } diff --git a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java index 9e295abd1c0..cf8582a3511 100755 --- a/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java +++ b/core/src/main/java/com/orientechnologies/orient/core/tx/OTransactionOptimistic.java @@ -122,6 +122,34 @@ public void commit(final boolean force) { } } + public void commitPreallocate() { + checkTransactionValid(); + if (txStartCounter < 0) { + throw new OStorageException("Invalid value of tx counter: " + txStartCounter); + } + txStartCounter--; + + if (txStartCounter == 0) { + if (status == TXSTATUS.ROLLED_BACK || status == TXSTATUS.ROLLBACKING) { + throw new ORollbackException( + "Given transaction was rolled back, and thus cannot be committed."); + } + status = TXSTATUS.COMMITTING; + + if (sentToServer || !allEntries.isEmpty() || !indexEntries.isEmpty()) { + database.internalCommitPreallocate(this); + } + invokeCallbacks(); + close(); + status = TXSTATUS.COMPLETED; + } else if (txStartCounter > 0) { + OLogManager.instance() + .debug(this, "Nested transaction was closed but transaction itself was not committed."); + } else { + throw new OTransactionException("Transaction was committed more times than it was started."); + } + } + @Override public int amountOfNestedTxs() { return txStartCounter; diff --git a/core/src/test/java/com/orientechnologies/orient/core/db/tool/ODatabaseImportTest.java b/core/src/test/java/com/orientechnologies/orient/core/db/tool/ODatabaseImportTest.java index c5f63387440..436605e9ea7 100755 --- a/core/src/test/java/com/orientechnologies/orient/core/db/tool/ODatabaseImportTest.java +++ b/core/src/test/java/com/orientechnologies/orient/core/db/tool/ODatabaseImportTest.java @@ -1,10 +1,14 @@ package com.orientechnologies.orient.core.db.tool; +import static org.junit.Assert.assertNotNull; + import com.orientechnologies.orient.core.OCreateDatabaseUtil; import com.orientechnologies.orient.core.command.OCommandOutputListener; import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal; import com.orientechnologies.orient.core.db.ODatabaseSession; import com.orientechnologies.orient.core.db.OrientDB; +import com.orientechnologies.orient.core.id.ORID; +import com.orientechnologies.orient.core.record.ORecord; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -105,4 +109,57 @@ public void onMessage(String iText) {} orientDB.drop(databaseName); orientDB.close(); } + + @Test + public void importPreserveRids() throws IOException { + final String databaseName = "test"; + final String exportDbUrl = + "embedded:target/export_" + ODatabaseImportTest.class.getSimpleName() + "_preserveRids"; + final OrientDB orientDB = + OCreateDatabaseUtil.createDatabase( + databaseName, exportDbUrl, OCreateDatabaseUtil.TYPE_PLOCAL); + ORID toCheck; + final ByteArrayOutputStream output = new ByteArrayOutputStream(); + try (final ODatabaseSession db = + orientDB.open(databaseName, "admin", OCreateDatabaseUtil.NEW_ADMIN_PASSWORD)) { + db.createClass("SimpleClass"); + db.save(db.newElement("SimpleClass")); + ORID toDelete = db.save(db.newElement("SimpleClass")).getIdentity(); + toCheck = db.save(db.newElement("SimpleClass")).getIdentity(); + db.delete(toDelete); + + final ODatabaseExport export = + new ODatabaseExport( + (ODatabaseDocumentInternal) db, + output, + new OCommandOutputListener() { + @Override + public void onMessage(String iText) {} + }); + export.exportDatabase(); + } + + final String importDbUrl = + "embedded:target/import_" + ODatabaseImportTest.class.getSimpleName() + "_preserveRids"; + OCreateDatabaseUtil.createDatabase(databaseName, importDbUrl, OCreateDatabaseUtil.TYPE_PLOCAL); + + try (final ODatabaseSession db = + orientDB.open(databaseName, "admin", OCreateDatabaseUtil.NEW_ADMIN_PASSWORD)) { + final ODatabaseImport importer = + new ODatabaseImport( + (ODatabaseDocumentInternal) db, + new ByteArrayInputStream(output.toByteArray()), + new OCommandOutputListener() { + @Override + public void onMessage(String iText) {} + }); + importer.setPreserveRids(true); + importer.importDatabase(); + ORecord read = db.load(toCheck); + assertNotNull(read); + Assert.assertEquals(read.getIdentity(), toCheck); + } + orientDB.drop(databaseName); + orientDB.close(); + } } diff --git a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODatabaseDocumentDistributed.java b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODatabaseDocumentDistributed.java index 75e7381cb12..6be727e164f 100755 --- a/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODatabaseDocumentDistributed.java +++ b/distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODatabaseDocumentDistributed.java @@ -353,6 +353,26 @@ public void internalCommit(OTransactionInternal iTx) { } } + @Override + public void internalCommitPreallocate(OTransactionOptimistic iTx) { + int protocolVersion = DISTRIBUTED_REPLICATION_PROTOCOL_VERSION.getValueAsInteger(); + if (OScenarioThreadLocal.INSTANCE.isRunModeDistributed() + || (iTx.isSequenceTransaction() && protocolVersion == 2)) { + // Exclusive for handling schema manipulation, remove after refactor for distributed schema + super.internalCommitPreallocate(iTx); + } else { + switch (protocolVersion) { + case 1: + distributedCommitV1(iTx); + break; + default: + throw new IllegalStateException( + "Invalid distributed replicaiton protocol version: " + + DISTRIBUTED_REPLICATION_PROTOCOL_VERSION.getValueAsInteger()); + } + } + } + @Override public T sendSequenceAction(OSequenceAction action) throws ExecutionException, InterruptedException {