Skip to content

Commit

Permalink
Fixed distributed alignment
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jan 29, 2016
1 parent 7e5e872 commit 02637e2
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 242 deletions.

Large diffs are not rendered by default.

Expand Up @@ -29,7 +29,10 @@
import com.orientechnologies.orient.core.storage.impl.local.paginated.atomicoperations.OAtomicOperationMetadata;

import java.io.IOException;
import java.util.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* @author Andrey Lomakin
Expand Down Expand Up @@ -124,8 +127,7 @@ public int fromStream(final byte[] content, int offset) {
}

atomicOperationMetadataMap.put(recordOperationMetadata.getKey(), recordOperationMetadata);
} else
throw new IllegalStateException("Invalid metadata entry id " + metadataId);
}

return offset;
}
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.orientechnologies.common.io.OFileUtils;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandDistributedReplicateRequest;
import com.orientechnologies.orient.core.command.OCommandExecutor;
import com.orientechnologies.orient.core.command.OCommandRequest;
import com.orientechnologies.orient.core.command.OCommandRequestText;
import com.orientechnologies.orient.core.compression.impl.OZIPCompressionUtil;
Expand Down Expand Up @@ -188,7 +187,7 @@ public static Object replaceCluster(final OHazelcastPlugin dManager, final OServ
fileSize = writeDatabaseChunk(nodeName, 1, chunk, out);
for (int chunkNum = 2; !chunk.last; chunkNum++) {
final Object result = dManager.sendRequest(databaseName, null, Collections.singleton(r.getKey()),
new OCopyDatabaseChunkTask(chunk.filePath, chunkNum, chunk.offset + chunk.buffer.length),
new OCopyDatabaseChunkTask(chunk.filePath, chunkNum, chunk.offset + chunk.buffer.length, false),
ODistributedRequest.EXECUTION_MODE.RESPONSE);

if (result instanceof Boolean)
Expand Down
Expand Up @@ -179,15 +179,15 @@ protected ODistributedRequest readRequest() throws InterruptedException {
if (req != null) {
if (req.getId() >= distributed.waitForMessageId.get()) {
// ARRIVED, RESET IT
ODistributedServerLog.debug(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN,
ODistributedServerLog.info(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN,
"reached waited request %d on request=%s sourceNode=%s", distributed.waitForMessageId.get(), req,
req.getSenderNodeName());

distributed.waitForMessageId.set(-1);
break;
} else {
// SKIP IT
ODistributedServerLog.debug(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN,
ODistributedServerLog.info(this, manager.getLocalNodeName(), req.getSenderNodeName(), DIRECTION.IN,
"discarded request %d because waiting for %d request=%s sourceNode=%s", req.getId(),
distributed.waitForMessageId.get(), req, req.getSenderNodeName());

Expand Down
Expand Up @@ -87,7 +87,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.zip.GZIPInputStream;

/**
* Hazelcast implementation for clustering.
Expand Down Expand Up @@ -184,7 +183,7 @@ else if (!m.equals(hazelcastInstance.getCluster().getLocalMember()))
if (!configurationMap.containsKey(CONFIG_NODE_PREFIX + nodeId)) {
// NODE NOT REGISTERED, FORCING SHUTTING DOWN
ODistributedServerLog.error(this, localNodeName, null, DIRECTION.NONE, "Error on registering local node on cluster");
throw new ODistributedStartupException("Error on registering local node on cluster");
throw new ODistributedStartupException("Error on registering local node on cluster");
}

messageService = new OHazelcastDistributedMessageService(this);
Expand All @@ -209,7 +208,7 @@ public void run() {

} catch (Exception e) {
ODistributedServerLog.error(this, localNodeName, null, DIRECTION.NONE, "Error on starting distributed plugin", e);
throw OException.wrapException(new ODistributedStartupException("Error on starting distributed plugin"),e);
throw OException.wrapException(new ODistributedStartupException("Error on starting distributed plugin"), e);
}
}

Expand Down Expand Up @@ -1016,8 +1015,7 @@ protected boolean requestDatabaseDelta(final OHazelcastDistributedDatabase distr
final Map<String, Object> results = (Map<String, Object>) sendRequest(databaseName, null, targetNodes, deployTask,
EXECUTION_MODE.RESPONSE);

ODistributedServerLog.info(this, nodeName, entry.getKey(), DIRECTION.IN, "Receiving delta sync for '%s'...",
databaseName);
ODistributedServerLog.info(this, nodeName, entry.getKey(), DIRECTION.IN, "Receiving delta sync for '%s'...", databaseName);

ODistributedServerLog.debug(this, nodeName, selectedNodes.toString(), DIRECTION.OUT, "Database delta sync returned: %s",
results);
Expand Down Expand Up @@ -1224,7 +1222,7 @@ public void run() {
long fileSize = writeDatabaseChunk(1, chunk, fOut);
for (int chunkNum = 2; !chunk.last; chunkNum++) {
final Object result = sendRequest(databaseName, null, Collections.singleton(iNode),
new OCopyDatabaseChunkTask(chunk.filePath, chunkNum, chunk.offset + chunk.buffer.length),
new OCopyDatabaseChunkTask(chunk.filePath, chunkNum, chunk.offset + chunk.buffer.length, false),
EXECUTION_MODE.RESPONSE);

if (result instanceof Boolean)
Expand Down Expand Up @@ -1759,10 +1757,10 @@ public Object call() throws Exception {

long total = 0;

final GZIPInputStream gzipInput = new GZIPInputStream(in);
// final GZIPInputStream gzipInput = new GZIPInputStream(in);
try {

final DataInputStream input = new DataInputStream(gzipInput);
final DataInputStream input = new DataInputStream(in);
try {

final long records = input.readLong();
Expand Down Expand Up @@ -1792,21 +1790,31 @@ public Object call() throws Exception {
OLogManager.instance().info(this,
"DELTA <- other rid=" + rid + " type=" + recordType + " size=" + recordSize + " v=" + recordVersion);

final int forcedVersion = ORecordVersionHelper.setRollbackMode(recordVersion);
ORecord newRecord = Orient.instance().getRecordFactoryManager().newInstance((byte) recordType);

final ORecord record;
if (recordVersion == 1) {
final ORecord loadedRecord = rid.getRecord();
if (loadedRecord == null) {
// CREATE
record = Orient.instance().getRecordFactoryManager().newInstance((byte) recordType);

ORecordInternal.fill(record, rid, forcedVersion, recordContent, true);
ORecordInternal.fill(newRecord, new ORecordId(rid.getClusterId(), -1), 0, recordContent, true);
} else {
// UPDATE
record = rid.getRecord();
ORecordInternal.fill(record, rid, forcedVersion, recordContent, true);
ORecordInternal.fill(newRecord, rid, ORecordVersionHelper.setRollbackMode(recordVersion), recordContent, true);

if (loadedRecord instanceof ODocument) {
// APPLY CHANGES FIELD BY FIELD TO MARK DIRTY FIELDS FOR INDEXES/HOOKS
ODocument loadedDocument = (ODocument) loadedRecord;
loadedDocument.merge((ODocument) newRecord, false, false).getVersion();
loadedDocument.setDirty();
newRecord = loadedDocument;
}
}

record.save();
newRecord.save();

if (!newRecord.getIdentity().equals(rid))
throw new ODistributedDatabaseDeltaSyncException(
"Error on synchronization of records, rids are different: saved " + newRecord.getIdentity()
+ ", but it should be " + rid);
}
}

Expand All @@ -1822,7 +1830,7 @@ record = rid.getRecord();
throw OException.wrapException(
new ODistributedException("Error on installing database delta '" + db.getName() + "' on local server"), e);
} finally {
gzipInput.close();
// gzipInput.close();
}

ODistributedServerLog.info(this, nodeName, null, DIRECTION.IN, "Installed database delta for '%s', %d total records",
Expand Down
Expand Up @@ -19,40 +19,42 @@
*/
package com.orientechnologies.orient.server.distributed;

import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;

import java.io.Externalizable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
import java.util.zip.GZIPInputStream;

public class ODistributedDatabaseChunk implements Externalizable {
public long lastOperationId;
public String filePath;
public long offset;
public byte[] buffer;
public OLogSequenceNumber lsn;
public boolean compressed;
public boolean gzipCompressed;
public boolean last;

public ODistributedDatabaseChunk() {
}

public ODistributedDatabaseChunk(final long iLastOperationId, final File iFile, final long iOffset, final int iMaxSize,
final OLogSequenceNumber iLSN, final boolean compressed) throws IOException {
final OLogSequenceNumber iLSN, final boolean gzipCompressed) throws IOException {
lastOperationId = iLastOperationId;
filePath = iFile.getAbsolutePath();
offset = iOffset;
lsn = iLSN;
this.compressed = compressed;
this.gzipCompressed = gzipCompressed;

long fileSize = iFile.length();

final File completedFile = new File(iFile.getAbsolutePath() + ".completed");

// WHILE UNTIL THE CHUCK IS AVAILABLE
// WHILE UNTIL THE CHUNK IS AVAILABLE
for (int retry = 0; fileSize <= iOffset; ++retry) {
if (fileSize == 0 || iOffset > fileSize)
try {
Expand All @@ -69,11 +71,10 @@ public ODistributedDatabaseChunk(final long iLastOperationId, final File iFile,
break;
}

final FileInputStream in = new FileInputStream(iFile);

final int toRead = (int) Math.min(iMaxSize, fileSize - offset);
buffer = new byte[toRead];

final InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(iFile)) : new FileInputStream(iFile);
try {
in.skip(offset);
in.read(buffer);
Expand Down Expand Up @@ -113,7 +114,7 @@ public void writeExternal(final ObjectOutput out) throws IOException {
out.writeBoolean(lsn != null);
if (lsn != null)
lsn.writeExternal(out);
out.writeBoolean(compressed);
out.writeBoolean(gzipCompressed);
out.writeBoolean(last);
}

Expand All @@ -127,7 +128,7 @@ public void readExternal(final ObjectInput in) throws IOException, ClassNotFound
in.readFully(buffer);
final boolean lsnNotNull = in.readBoolean();
lsn = lsnNotNull ? new OLogSequenceNumber(in) : null;
compressed = in.readBoolean();
gzipCompressed = in.readBoolean();
last = in.readBoolean();
}
}
Expand Up @@ -681,16 +681,18 @@ protected void fixNodesInConflict(final List<ODistributedResponse> bestResponses
if (responseGroup != bestResponsesGroup) {
// CONFLICT GROUP: FIX THEM ONE BY ONE
for (ODistributedResponse r : responseGroup) {
final OAbstractRemoteTask fixTask = ((OAbstractReplicatedTask) request.getTask()).getFixTask(request, request.getTask(),
r.getPayload(), goodResponse.getPayload());
final List<OAbstractRemoteTask> fixTasks = ((OAbstractReplicatedTask) request.getTask()).getFixTask(request,
request.getTask(), r.getPayload(), goodResponse.getPayload());

if (fixTask != null) {
ODistributedServerLog.warn(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"sending fix message (%s) for response (%s) on request (%s) in server %s to be: %s", fixTask, r, request,
r.getExecutorNodeName(), goodResponse);
if (fixTasks != null) {
for (OAbstractRemoteTask fixTask : fixTasks) {
ODistributedServerLog.warn(this, dManager.getLocalNodeName(), null, DIRECTION.NONE,
"sending fix message (%s) for response (%s) on request (%s) in server %s to be: %s", fixTask, r, request,
r.getExecutorNodeName(), goodResponse);

dManager.sendRequest(request.getDatabaseName(), null, Collections.singleton(r.getExecutorNodeName()), fixTask,
ODistributedRequest.EXECUTION_MODE.NO_RESPONSE);
dManager.sendRequest(request.getDatabaseName(), null, Collections.singleton(r.getExecutorNodeName()), fixTask,
ODistributedRequest.EXECUTION_MODE.NO_RESPONSE);
}
}
}
}
Expand Down
Expand Up @@ -79,6 +79,11 @@ public void setLSN(final String iNode, final OLogSequenceNumber iLSN) throws IOE
embedded.field("segment", iLSN.getSegment(), OType.LONG);
embedded.field("position", iLSN.getPosition(), OType.LONG);

if (!file.exists()) {
file.getParentFile().mkdirs();
file.createNewFile();
}

final OutputStream os = new FileOutputStream(file, false);
try {

Expand Down
Expand Up @@ -21,6 +21,9 @@

import com.orientechnologies.orient.server.distributed.ODistributedRequest;

import java.util.Collections;
import java.util.List;

/**
* Base class for Replicated tasks.
*
Expand All @@ -30,9 +33,9 @@
public abstract class OAbstractReplicatedTask extends OAbstractRemoteTask {
private static final long serialVersionUID = 1L;

public OAbstractRemoteTask getFixTask(ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask,
Object iBadResponse, Object iGoodResponse) {
return null;
public List<OAbstractRemoteTask> getFixTask(ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, Object iBadResponse,
Object iGoodResponse) {
return Collections.EMPTY_LIST;
}

public OAbstractRemoteTask getUndoTask(ODistributedRequest iRequest, Object iBadResponse) {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -79,7 +80,7 @@ public OCommandDistributedReplicateRequest.QUORUM_TYPE getQuorumType() {
}

@Override
public OFixTxTask getFixTask(final ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, final Object iBadResponse,
public List<OAbstractRemoteTask> getFixTask(final ODistributedRequest iRequest, OAbstractRemoteTask iOriginalTask, final Object iBadResponse,
final Object iGoodResponse) {
return null;
}
Expand Down
Expand Up @@ -43,17 +43,19 @@
public class OCopyDatabaseChunkTask extends OAbstractReplicatedTask {
private static final long serialVersionUID = 1L;

private String fileName;
private int chunkNum;
private long offset;
private String fileName;
private int chunkNum;
private long offset;
private boolean compressed;

public OCopyDatabaseChunkTask() {
}

public OCopyDatabaseChunkTask(final String iFileName, final int iChunkNum, final long iOffset) {
public OCopyDatabaseChunkTask(final String iFileName, final int iChunkNum, final long iOffset, final boolean iCompressed) {
fileName = iFileName;
chunkNum = iChunkNum;
offset = iOffset;
compressed = iCompressed;
}

@Override
Expand All @@ -64,7 +66,7 @@ public Object execute(final OServer iServer, ODistributedServerManager iManager,
throw new IllegalArgumentException("File name '" + fileName + "' not found");

final ODistributedDatabaseChunk result = new ODistributedDatabaseChunk(0, f, offset, OSyncDatabaseTask.CHUNK_MAX_SIZE,
new OLogSequenceNumber(-1, -1), true);
new OLogSequenceNumber(-1, -1), false);

ODistributedServerLog.info(this, iManager.getLocalNodeName(), getNodeSource(), ODistributedServerLog.DIRECTION.OUT,
"- transferring chunk #%d offset=%d size=%s...", chunkNum, result.offset, OFileUtils.getSizeAsNumber(result.buffer.length));
Expand Down

0 comments on commit 02637e2

Please sign in to comment.