Skip to content

Commit

Permalink
Merge branch 'develop' into laa_develop
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OAbstractPaginatedStorage.java
#	core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/atomicoperations/OAtomicOperation.java
  • Loading branch information
andrii0lomakin committed Mar 26, 2020
2 parents 3b8ba18 + 160fe80 commit a4ff9c0
Show file tree
Hide file tree
Showing 22 changed files with 267 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ node {
try{
build job: "orientdb-gremlin-multibranch/${env.BRANCH_NAME}", wait: false
build job: "orientdb-security-multibranch/${env.BRANCH_NAME}", wait: false
build job: "orientdb-enterprise-multibranch/${env.BRANCH_NAME}", wait: false
build job: "orientdb-sap-enterprise-multibranch/${env.BRANCH_NAME}", wait: false
} catch (ex){
slackSend(color: '#FFAAAA', channel: '#jenkins-failures', message: "Error scheduling downstream builds: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' (${env.BUILD_URL})\n${ex}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,8 @@
import com.orientechnologies.orient.core.sql.executor.OResultSet;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;

/**
* Handles Multi-value types such as Arrays, Collections and Maps. It recognizes special Orient collections.
Expand Down Expand Up @@ -198,7 +189,7 @@ public static Object getValue(final Object iObject, final int iIndex) {
if (!isMultiValue(iObject))
return null;

if (iIndex > getSize(iObject))
if (iIndex >= getSize(iObject))
return null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public static final String encryptPassword(final String iPassword) {
}

public static boolean encodePassword(final ODocument iDocument) {
if (iDocument.field("name") == null)
final String name = iDocument.field("name");
if (name == null)
throw new OSecurityException("User name not found");

final String password = (String) iDocument.field("password");
Expand All @@ -91,7 +92,7 @@ public static boolean encodePassword(final ODocument iDocument) {
throw new OSecurityException("User '" + iDocument.field("name") + "' has no password");

if (Orient.instance().getSecurity() != null) {
Orient.instance().getSecurity().validatePassword(password);
Orient.instance().getSecurity().validatePassword(name, password);
}

if (!password.startsWith("{")) {
Expand All @@ -105,7 +106,7 @@ public static boolean encodePassword(final ODocument iDocument) {
@Override
public void fromStream(final ODocument iSource) {
if (document != null)
return;
return;

document = iSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package com.orientechnologies.orient.core.security;

import com.orientechnologies.orient.core.metadata.security.OSecurityUser;
import com.orientechnologies.orient.core.metadata.security.OSystemUser;
import com.orientechnologies.orient.core.metadata.security.OUser;
import com.orientechnologies.orient.core.record.impl.ODocument;

Expand Down Expand Up @@ -105,5 +104,5 @@ default void reload(OSecurityUser user, final String jsonConfig) {
// If a password validator is registered with the security system, it will be called to validate
// the specified password. An OInvalidPasswordException is thrown if the password does not meet
// the password validator's requirements.
void validatePassword(final String password) throws OInvalidPasswordException;
void validatePassword(final String username, final String password) throws OInvalidPasswordException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,75 @@ public OBackgroundDelta recordsChangedAfterLSN(final OLogSequenceNumber lsn, fin
}
}

private List<OTransactionData> extractTransactionsFromWal(Set<byte[]> transactionsMetadata) {
List<OTransactionData> finished = new ArrayList<>();
stateLock.acquireReadLock();
try {
Set<byte[]> transactionsToRead = new HashSet<>(transactionsMetadata);
// we iterate till the last record is contained in wal at the moment when we call this method
OLogSequenceNumber beginLsn = writeAheadLog.end();
Map<Long, OTransactionData> units = new HashMap<>();

writeAheadLog.addCutTillLimit(beginLsn);
try {
List<WriteableWALRecord> records = writeAheadLog.next(beginLsn, 1_000);
// all information about changed records is contained in atomic operation metadata
while (!records.isEmpty()) {
for (final OWALRecord record : records) {

if (record instanceof OFileCreatedWALRecord) {
throw new ODatabaseException(
"Cannot execute delta-sync because a new file has been added. Filename: '" + ((OFileCreatedWALRecord) record)
.getFileName() + "' (id=" + ((OFileCreatedWALRecord) record).getFileId() + ")");
}

if (record instanceof OFileDeletedWALRecord) {
throw new ODatabaseException(
"Cannot execute delta-sync because a file has been deleted. File id: " + ((OFileDeletedWALRecord) record)
.getFileId());
}

if (record instanceof OAtomicUnitStartMetadataRecord) {
byte[] meta = ((OAtomicUnitStartMetadataRecord) record).getMetadata();
//TODO: This will not be a byte to byte compare, but should compare only the tx id not all status
if (transactionsToRead.contains(meta)) {
long unitId = ((OAtomicUnitStartMetadataRecord) record).getOperationUnitId();
units.put(unitId, new OTransactionData(meta));
}
transactionsToRead.remove(meta);
}
if (record instanceof OAtomicUnitEndRecord) {
long opId = ((OAtomicUnitEndRecord) record).getOperationUnitId();
OTransactionData opes = units.remove(opId);
finished.add(opes);
}
if (record instanceof OHighLevelTransactionChangeRecord) {
byte[] data = ((OHighLevelTransactionChangeRecord) record).getData();
long unitId = ((OHighLevelTransactionChangeRecord) record).getOperationUnitId();
OTransactionData tx = units.get(unitId);
tx.addRecord(data);
}
if (transactionsToRead.isEmpty() && units.isEmpty()) {
//all read stop scanning and return the transactions
return finished;
}
}

records = writeAheadLog.next(records.get(records.size() - 1).getLsn(), 1_000);
}
} finally {
writeAheadLog.removeCutTillLimit(beginLsn);
}

} catch (final IOException e) {
throw OException.wrapException(new OStorageException("Error of reading of records from WAL"), e);
} finally {
stateLock.releaseReadLock();
}

return finished;
}

protected void serializeDeltaContent(OutputStream stream, OCommandOutputListener outputListener, SortedSet<ORID> sortedRids,
OLogSequenceNumber lsn) {
try {
Expand Down Expand Up @@ -4345,7 +4414,8 @@ protected long checkIfStorageDirty() throws IOException {
return -1;
}

protected void initConfiguration(OAtomicOperation atomicOperation, final OContextConfiguration contextConfiguration) throws IOException {
protected void initConfiguration(OAtomicOperation atomicOperation, final OContextConfiguration contextConfiguration)
throws IOException {
}

@SuppressWarnings({ "WeakerAccess", "EmptyMethod" })
Expand Down Expand Up @@ -4476,11 +4546,16 @@ private void startStorageTx(final OTransactionInternal clientTx) throws IOExcept
assert OAtomicOperationsManager.getCurrentOperation() == null;
transaction.set(new OStorageTransaction(clientTx));
try {
atomicOperationsManager.startAtomicOperation(clientTx.getMetadata().orElse(null));
final OAtomicOperation atomicOperation = atomicOperationsManager.startAtomicOperation(clientTx.getMetadata().orElse(null));
if (clientTx.getMetadata().isPresent()) {
this.lastMetadata = clientTx.getMetadata().get();
}
clientTx.storageBegun();
Iterator<byte[]> ops = clientTx.getSerializedOperations();
while (ops.hasNext()) {
byte[] next = ops.next();
writeAheadLog.log(new OHighLevelTransactionChangeRecord(atomicOperation.getOperationUnitId(), next));
}
} catch (final RuntimeException e) {
transaction.set(null);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,9 @@ public void setDatabase(ODatabaseDocumentInternal database) {
public void setMetadata(Optional<byte[]> metadata) {
throw new UnsupportedOperationException();
}

public Iterator<byte[]> getSerializedOperations() {
return Collections.emptyIterator();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.orientechnologies.orient.core.storage.impl.local;

import java.util.ArrayList;
import java.util.List;

public class OTransactionData {
private byte[] transactionId;
private List<byte[]> records = new ArrayList<>();

public OTransactionData(byte[] transactionId) {
this.transactionId = transactionId;
}

public void addRecord(byte[] record) {
records.add(record);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.orientechnologies.orient.core.storage.impl.local.paginated.wal;

import com.orientechnologies.common.serialization.types.OIntegerSerializer;

import java.nio.ByteBuffer;

import static com.orientechnologies.orient.core.storage.impl.local.paginated.wal.WALRecordTypes.HIGH_LEVEL_TRANSACTION_CHANGE_RECORD;

public class OHighLevelTransactionChangeRecord extends OOperationUnitRecord {
private byte[] data;

public OHighLevelTransactionChangeRecord() {

}

public OHighLevelTransactionChangeRecord(long operationUnitId, byte[] data) {
super(operationUnitId);
this.data = data;
}

@Override
protected void serializeToByteBuffer(ByteBuffer buffer) {
buffer.putInt(data.length);
buffer.put(data, 0, data.length);
}

@Override
protected void deserializeFromByteBuffer(ByteBuffer buffer) {
int size = buffer.getInt();
data = new byte[size];
buffer.get(data, 0, size);
}

@Override
public int serializedSize() {
return super.serializedSize() + OIntegerSerializer.INT_SIZE + data.length;
}

@Override
public boolean isUpdateMasterRecord() {
return false;
}

@Override
public int getId() {
return HIGH_LEVEL_TRANSACTION_CHANGE_RECORD;
}

public byte[] getData() {
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ private WriteableWALRecord walRecordById(final int recordId) {
case FUZZY_CHECKPOINT_START_METADATA_RECORD:
walRecord = new OFuzzyCheckpointStartMetadataRecord();
break;
case HIGH_LEVEL_TRANSACTION_CHANGE_RECORD:
walRecord = new OHighLevelTransactionChangeRecord();
break;
case CHECKPOINT_END_RECORD:
walRecord = new OCheckpointEndRecord();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public final class WALRecordTypes {
public static final int ATOMIC_UNIT_START_METADATA_RECORD = 15;
public static final int FULL_CHECKPOINT_START_METADATA_RECORD = 16;
public static final int FUZZY_CHECKPOINT_START_METADATA_RECORD = 17;
public static final int HIGH_LEVEL_TRANSACTION_CHANGE_RECORD = 18;

public static final int CLUSTER_POSITION_MAP_INIT_PO = 35;
public static final int CLUSTER_POSITION_MAP_ADD_PO = 36;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.storage.OBasicTransaction;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;

/**
* Expose the api for extract the internal details needed by the storage for perform the transaction commit
Expand Down Expand Up @@ -114,4 +111,6 @@ default Optional<byte[]> getMetadata() {
default void storageBegun() {

}

Iterator<byte[]> getSerializedOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,8 @@ public Optional<byte[]> getMetadata() {
public void setMetadata(Optional<byte[]> metadata) {
this.metadata = metadata;
}

public Iterator<byte[]> getSerializedOperations() {
return Collections.emptyIterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ public void waitForOnline() {
}

public void reEnqueue(final int senderNodeId, final long msgSequence, final String databaseName, final ORemoteTask payload,
int retryCount) {
int retryCount, int autoRetryDelay) {

Orient.instance().scheduleTask(
() -> processRequest(new ODistributedRequest(getManager(), senderNodeId, msgSequence, databaseName, payload), false),
10 * retryCount, 0);
autoRetryDelay * retryCount, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.orientechnologies.orient.client.remote.message.tx.ORecordOperationRequest;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.exception.OConcurrentCreateException;
Expand Down Expand Up @@ -126,9 +127,10 @@ public Object execute(ODistributedRequestId requestId, OServer iServer, ODistrib
throw e;
}
if (res1 == null) {
final int autoRetryDelay = OGlobalConfiguration.DISTRIBUTED_CONCURRENT_TX_AUTORETRY_DELAY.getValueAsInteger();
retryCount++;
((ODatabaseDocumentDistributed) database).getStorageDistributed().getLocalDistributedDatabase()
.reEnqueue(requestId.getNodeId(), requestId.getMessageId(), database.getName(), this, retryCount);
.reEnqueue(requestId.getNodeId(), requestId.getMessageId(), database.getName(), this, retryCount, autoRetryDelay);
hasResponse = false;
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
import com.orientechnologies.orient.server.OServer;
Expand Down Expand Up @@ -91,12 +92,13 @@ public Object execute(ODistributedRequestId requestId, OServer iServer, ODistrib
ODatabaseDocumentInternal database) throws Exception {
if (success) {
if (!((ODatabaseDocumentDistributed) database).commit2pc(transactionId, false, requestId)) {
final int autoRetryDelay = OGlobalConfiguration.DISTRIBUTED_CONCURRENT_TX_AUTORETRY_DELAY.getValueAsInteger();
retryCount++;
if (retryCount < database.getConfiguration().getValueAsInteger(DISTRIBUTED_CONCURRENT_TX_MAX_AUTORETRY)) {
OLogManager.instance()
.info(OTransactionPhase2Task.this, "Received second phase but not yet first phase, re-enqueue second phase");
((ODatabaseDocumentDistributed) database).getStorageDistributed().getLocalDistributedDatabase()
.reEnqueue(requestId.getNodeId(), requestId.getMessageId(), database.getName(), this, retryCount);
.reEnqueue(requestId.getNodeId(), requestId.getMessageId(), database.getName(), this, retryCount, autoRetryDelay);
hasResponse = false;
} else {
Orient.instance().submit(() -> {
Expand All @@ -113,12 +115,13 @@ public Object execute(ODistributedRequestId requestId, OServer iServer, ODistrib
}
} else {
if (!((ODatabaseDocumentDistributed) database).rollback2pc(transactionId)) {
final int autoRetryDelay = OGlobalConfiguration.DISTRIBUTED_CONCURRENT_TX_AUTORETRY_DELAY.getValueAsInteger();
retryCount++;
if (retryCount < database.getConfiguration().getValueAsInteger(DISTRIBUTED_CONCURRENT_TX_MAX_AUTORETRY)) {
OLogManager.instance()
.info(OTransactionPhase2Task.this, "Received second phase but not yet first phase, re-enqueue second phase");
((ODatabaseDocumentDistributed) database).getStorageDistributed().getLocalDistributedDatabase()
.reEnqueue(requestId.getNodeId(), requestId.getMessageId(), database.getName(), this, retryCount);
.reEnqueue(requestId.getNodeId(), requestId.getMessageId(), database.getName(), this, retryCount, autoRetryDelay);
hasResponse = false;
} else {
//ABORT THE OPERATION IF THERE IS A NOT VALID TRANSACTION ACTIVE WILL BE ROLLBACK ON RE-INSTALL
Expand Down
Loading

0 comments on commit a4ff9c0

Please sign in to comment.