Skip to content

Commit

Permalink
Fixed issue #4999
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Nov 5, 2015
1 parent e393f42 commit e168a9a
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 22 deletions.
Expand Up @@ -2739,7 +2739,8 @@ public void restore(final InputStream in, final Map<String, Object> options, fin


getStorage().restore(in, options, callable, iListener); getStorage().restore(in, options, callable, iListener);


getMetadata().reload(); if (!isClosed())
getMetadata().reload();
} }


/** /**
Expand Down
Expand Up @@ -347,6 +347,11 @@ public DB_STATUS getDatabaseStatus(final String iNode, final String iDatabaseNam
@Override @Override
public void setDatabaseStatus(final String iNode, final String iDatabaseName, final DB_STATUS iStatus) { public void setDatabaseStatus(final String iNode, final String iDatabaseName, final DB_STATUS iStatus) {
getConfigurationMap().put(OHazelcastPlugin.CONFIG_DBSTATUS_PREFIX + iNode + "." + iDatabaseName, iStatus); getConfigurationMap().put(OHazelcastPlugin.CONFIG_DBSTATUS_PREFIX + iNode + "." + iDatabaseName, iStatus);

// NOTIFY DB/NODE IS CHANGING STATUS
for (ODistributedLifecycleListener l : listeners) {
l.onDatabaseChangeStatus(iNode, iDatabaseName, iStatus);
}
} }


@Override @Override
Expand Down
Expand Up @@ -32,11 +32,11 @@
* Test class that creates and executes distributed operations against a cluster of servers created in the same JVM. * Test class that creates and executes distributed operations against a cluster of servers created in the same JVM.
*/ */
public abstract class AbstractServerClusterTest { public abstract class AbstractServerClusterTest {
protected int delayServerStartup = 0; protected int delayServerStartup = 0;
protected int delayServerAlign = 0; protected int delayServerAlign = 0;
protected String rootDirectory = "target/servers/"; protected String rootDirectory = "target/servers/";


protected List<ServerRun> serverInstance = new ArrayList<ServerRun>(); protected List<ServerRun> serverInstance = new ArrayList<ServerRun>();


protected AbstractServerClusterTest() { protected AbstractServerClusterTest() {
} }
Expand Down Expand Up @@ -93,10 +93,13 @@ public void execute() throws Exception {


try { try {


for (ServerRun server : serverInstance) { for (final ServerRun server : serverInstance) {
banner("STARTING SERVER -> " + server.getServerId() + "..."); banner("STARTING SERVER -> " + server.getServerId() + "...");

server.startServer(getDistributedServerConfiguration(server)); server.startServer(getDistributedServerConfiguration(server));


onServerStarting(server);

if (delayServerStartup > 0) if (delayServerStartup > 0)
try { try {
Thread.sleep(delayServerStartup * serverInstance.size()); Thread.sleep(delayServerStartup * serverInstance.size());
Expand All @@ -108,8 +111,8 @@ public void execute() throws Exception {


if (delayServerAlign > 0) if (delayServerAlign > 0)
try { try {
System.out.println("Server started, waiting for synchronization (" + (delayServerAlign * serverInstance.size() / 1000) System.out.println(
+ "secs)..."); "Server started, waiting for synchronization (" + (delayServerAlign * serverInstance.size() / 1000) + "secs)...");
Thread.sleep(delayServerAlign * serverInstance.size()); Thread.sleep(delayServerAlign * serverInstance.size());
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
Expand Down Expand Up @@ -160,10 +163,13 @@ protected void banner(final String iMessage) {


protected void log(final String iMessage) { protected void log(final String iMessage) {
OLogManager.instance().flush(); OLogManager.instance().flush();
System.out.println("\n"+iMessage); System.out.println("\n" + iMessage);
System.out.flush(); System.out.flush();
} }


protected void onServerStarting(ServerRun server) {
}

protected void onServerStarted(ServerRun server) { protected void onServerStarted(ServerRun server) {
} }


Expand Down
Expand Up @@ -20,20 +20,26 @@


package com.orientechnologies.orient.server.distributed; package com.orientechnologies.orient.server.distributed;


import junit.framework.Assert; import com.orientechnologies.common.util.OPair;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;


import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


/** /**
* Tests the behavior of hooks in distributed configuration. * Tests the behavior of hooks in distributed configuration.
*/ */
public class DistributedLifecycleListenerTest extends AbstractServerClusterTest implements ODistributedLifecycleListener { public class DistributedLifecycleListenerTest extends AbstractServerClusterTest implements ODistributedLifecycleListener {
private final static int SERVERS = 2; private final static int SERVERS = 2;


private final AtomicLong beforeNodeJoin = new AtomicLong(); private final AtomicLong beforeNodeJoin = new AtomicLong();
private final AtomicLong afterNodeJoin = new AtomicLong(); private final AtomicLong afterNodeJoin = new AtomicLong();
private final AtomicLong nodeLeft = new AtomicLong(); private final AtomicLong nodeLeft = new AtomicLong();
private final List<OPair<String, ODistributedServerManager.DB_STATUS>> changeStatus = Collections
.synchronizedList(new ArrayList<OPair<String, ODistributedServerManager.DB_STATUS>>());


@Override @Override
public boolean onNodeJoining(String iNode) { public boolean onNodeJoining(String iNode) {
Expand All @@ -51,6 +57,11 @@ public void onNodeLeft(String iNode) {
nodeLeft.incrementAndGet(); nodeLeft.incrementAndGet();
} }


@Override
public void onDatabaseChangeStatus(String iNode, String iDatabaseName, ODistributedServerManager.DB_STATUS iNewStatus) {
changeStatus.add(new OPair<String, ODistributedServerManager.DB_STATUS>(iNode + "." + iDatabaseName, iNewStatus));
}

public String getDatabaseName() { public String getDatabaseName() {
return "distributed-lifecycle"; return "distributed-lifecycle";
} }
Expand All @@ -63,7 +74,7 @@ public void test() throws Exception {
} }


@Override @Override
protected void onServerStarted(ServerRun server) { protected void onServerStarting(ServerRun server) {
server.getServerInstance().getDistributedManager().registerLifecycleListener(this); server.getServerInstance().getDistributedManager().registerLifecycleListener(this);
} }


Expand All @@ -77,15 +88,23 @@ protected void onTestEnded() {
Assert.assertEquals(SERVERS - 1, beforeNodeJoin.get()); Assert.assertEquals(SERVERS - 1, beforeNodeJoin.get());
Assert.assertEquals(SERVERS - 1, afterNodeJoin.get()); Assert.assertEquals(SERVERS - 1, afterNodeJoin.get());


for (int attempt = 0; attempt < 30; ++attempt) { for (int attempt = 0; attempt < 50; ++attempt) {
if (nodeLeft.get() == SERVERS) if (nodeLeft.get() >= SERVERS - 1)
break; break;
try { try {
Thread.sleep(300); Thread.sleep(500);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
Assert.assertEquals(SERVERS - 1, nodeLeft.get()); Assert.assertEquals(SERVERS - 1, nodeLeft.get());

Assert.assertEquals(3, changeStatus.size());
Assert.assertEquals("europe1." + getDatabaseName(), changeStatus.get(0).getKey());
Assert.assertEquals(ODistributedServerManager.DB_STATUS.SYNCHRONIZING, changeStatus.get(0).getValue());
Assert.assertEquals("europe0." + getDatabaseName(), changeStatus.get(1).getKey());
Assert.assertEquals(ODistributedServerManager.DB_STATUS.SYNCHRONIZING, changeStatus.get(1).getValue());
Assert.assertEquals("europe0." + getDatabaseName(), changeStatus.get(2).getKey());
Assert.assertEquals(ODistributedServerManager.DB_STATUS.ONLINE, changeStatus.get(2).getValue());
} }
} }
@@ -1,14 +1,12 @@
package com.orientechnologies.orient.server.distributed; package com.orientechnologies.orient.server.distributed;


import junit.framework.Assert;

import org.junit.Test;

import com.orientechnologies.orient.core.sql.OCommandSQL; import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
import com.tinkerpop.blueprints.impls.orient.OrientVertex; import com.tinkerpop.blueprints.impls.orient.OrientVertex;
import com.tinkerpop.blueprints.impls.orient.OrientVertexType; import com.tinkerpop.blueprints.impls.orient.OrientVertexType;
import junit.framework.Assert;
import org.junit.Test;


public class TestShardingManualSync extends AbstractServerClusterTest { public class TestShardingManualSync extends AbstractServerClusterTest {


Expand Down
Expand Up @@ -51,4 +51,17 @@ public interface ODistributedLifecycleListener {
* Node name that left * Node name that left
*/ */
void onNodeLeft(String iNode); void onNodeLeft(String iNode);

/**
* Called upon change of database status on a node. Available statuses are defined in ODistributedServerManager.DB_STATUS.
*
* @since 2.2.0
* @param iNode
* The node name
* @param iDatabaseName
* Database name
* @param iNewStatus
* The new status
*/
void onDatabaseChangeStatus(String iNode, String iDatabaseName, ODistributedServerManager.DB_STATUS iNewStatus);
} }

0 comments on commit e168a9a

Please sign in to comment.