Skip to content

Commit

Permalink
KAA-1281: kaa node blocks startup if zookeeper is unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
nocs00 committed Sep 6, 2016
1 parent f076993 commit da29bc9
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 352 deletions.
Expand Up @@ -21,9 +21,7 @@
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;


import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCache;
Expand Down Expand Up @@ -56,7 +54,7 @@ public abstract class ControlNodeTracker implements ControlNodeAware, Closeable
.getLogger(ControlNodeTracker.class); .getLogger(ControlNodeTracker.class);


/** The client. */ /** The client. */
protected CuratorFramework client; protected CuratorFramework zkClient;


/** The control cache. */ /** The control cache. */
private NodeCache controlCache; private NodeCache controlCache;
Expand Down Expand Up @@ -107,34 +105,9 @@ public void unhandledError(String message, Throwable e) {
/** /**
* Instantiates a new control node tracker. * Instantiates a new control node tracker.
* *
* @param zkHostPortList
* the zk host port list
* @param retryPolicy
* the retry policy
*/ */
public ControlNodeTracker(String zkHostPortList, RetryPolicy retryPolicy) { public ControlNodeTracker() {
this(zkHostPortList, -1, -1, retryPolicy);
}

/**
* Instantiates a new control node tracker.
*
* @param zkHostPortList
* the zk host port list
* @param sessionTimeoutMs
* session timeout
* @param connectionTimeoutMs
* connection timeout
* @param retryPolicy
* the retry policy
*/
public ControlNodeTracker(String zkHostPortList, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
super(); super();
if (sessionTimeoutMs > -1 && connectionTimeoutMs > -1) {
this.client = CuratorFrameworkFactory.newClient(zkHostPortList, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
} else {
this.client = CuratorFrameworkFactory.newClient(zkHostPortList, retryPolicy);
}
this.listeners = new CopyOnWriteArrayList<ControlNodeListener>(); this.listeners = new CopyOnWriteArrayList<ControlNodeListener>();
} }


Expand All @@ -146,10 +119,9 @@ public ControlNodeTracker(String zkHostPortList, int sessionTimeoutMs, int conne
*/ */
public void start() throws Exception { //NOSONAR public void start() throws Exception { //NOSONAR
LOG.info("Starting node tracker"); LOG.info("Starting node tracker");
client.start(); zkClient.getUnhandledErrorListenable().addListener(errorsListener);
client.getUnhandledErrorListenable().addListener(errorsListener);
if(createZkNode()){ if(createZkNode()){
controlCache = new NodeCache(client, CONTROL_SERVER_NODE_PATH); controlCache = new NodeCache(zkClient, CONTROL_SERVER_NODE_PATH);
controlCache.getListenable().addListener(new NodeCacheListener() { controlCache.getListenable().addListener(new NodeCacheListener() {


@Override @Override
Expand Down Expand Up @@ -204,7 +176,7 @@ protected void onMasterChange(ChildData currentData) {
* @return true, if is connected * @return true, if is connected
*/ */
public boolean isConnected() { public boolean isConnected() {
return client.getZookeeperClient().isConnected(); return zkClient.getZookeeperClient().isConnected();
} }


/** /**
Expand Down Expand Up @@ -250,14 +222,12 @@ public void close() throws IOException {


if(nodePath != null){ if(nodePath != null){
try { try {
client.delete().forPath(nodePath); zkClient.delete().forPath(nodePath);
LOG.debug("Node with path {} successfully deleted", nodePath); LOG.debug("Node with path {} successfully deleted", nodePath);
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Failed to delete node", e); LOG.debug("Failed to delete node", e);
} }
} }

client.close();
} }


/* /*
Expand Down Expand Up @@ -299,7 +269,7 @@ public boolean doZKClientAction(ZKClientAction action) throws IOException{


public boolean doZKClientAction(ZKClientAction action, boolean throwIOException) throws IOException{ public boolean doZKClientAction(ZKClientAction action, boolean throwIOException) throws IOException{
try{ try{
action.doWithZkClient(client); action.doWithZkClient(zkClient);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
LOG.error("Unknown Error", e); LOG.error("Unknown Error", e);
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;


import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
Expand Down Expand Up @@ -65,36 +64,25 @@ public abstract class WorkerNodeTracker extends ControlNodeTracker {
/** /**
* Instantiates a new worker node tracker. * Instantiates a new worker node tracker.
* *
* @param zkHostPortList
* the zk host port list
* @param retryPolicy
* the retry policy
*/ */
public WorkerNodeTracker(String zkHostPortList, RetryPolicy retryPolicy) { public WorkerNodeTracker() {
super(zkHostPortList, retryPolicy); super();
init(); init();
} }


/** /**
* Instantiates a new worker node tracker. * Instantiates a new worker node tracker.
* * @param zkClient Zookeeper client
* @param zkHostPortList
* the zk host port list
* @param sessionTimeoutMs
* the session timeout
* @param connectionTimeoutMs
* the connection timeout
* @param retryPolicy
* the retry policy
*/ */
public WorkerNodeTracker(String zkHostPortList, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { public WorkerNodeTracker(CuratorFramework zkClient) {
super(zkHostPortList, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); super();
this.zkClient = zkClient;
init(); init();
} }


private void init() { private void init() {
endpointCache = new PathChildrenCache(client, OPERATIONS_SERVER_NODE_PATH, true); endpointCache = new PathChildrenCache(zkClient, OPERATIONS_SERVER_NODE_PATH, true);
bootstrapCache = new PathChildrenCache(client, BOOTSTRAP_SERVER_NODE_PATH, true); bootstrapCache = new PathChildrenCache(zkClient, BOOTSTRAP_SERVER_NODE_PATH, true);
endpointListeners = new CopyOnWriteArrayList<OperationsNodeListener>(); endpointListeners = new CopyOnWriteArrayList<OperationsNodeListener>();
bootstrapListeners = new CopyOnWriteArrayList<BootstrapNodeListener>(); bootstrapListeners = new CopyOnWriteArrayList<BootstrapNodeListener>();
operationNodesStartTimes = new HashMap<String, Long>(); operationNodesStartTimes = new HashMap<String, Long>();
Expand Down
Expand Up @@ -43,12 +43,20 @@ public class BootstrapNode extends WorkerNodeTracker {
* Instantiates a new bootstrap node. * Instantiates a new bootstrap node.
* *
* @param nodeInfo the node info * @param nodeInfo the node info
* @param zkHostPortList the zk host port list
* @param retryPolicy the retry policy
*/ */
public BootstrapNode(BootstrapNodeInfo nodeInfo, String zkHostPortList, public BootstrapNode(BootstrapNodeInfo nodeInfo) {
RetryPolicy retryPolicy) { super();
super(zkHostPortList, retryPolicy); this.nodeInfo = nodeInfo;
}

/**
* Instantiates a new bootstrap node.
*
* @param nodeInfo the node info
* @param zkClient Zookeeper client
*/
public BootstrapNode(BootstrapNodeInfo nodeInfo, CuratorFramework zkClient) {
super(zkClient);
this.nodeInfo = nodeInfo; this.nodeInfo = nodeInfo;
} }


Expand Down
Expand Up @@ -46,13 +46,9 @@ public class ControlNode extends WorkerNodeTracker {
* *
* @param currentNodeInfo * @param currentNodeInfo
* the current node info * the current node info
* @param zkHostPortList
* the zk host port list
* @param retryPolicy
* the retry policy
*/ */
public ControlNode(ControlNodeInfo currentNodeInfo, String zkHostPortList, RetryPolicy retryPolicy) { public ControlNode(ControlNodeInfo currentNodeInfo) {
super(zkHostPortList, retryPolicy); super();
this.currentNodeInfo = currentNodeInfo; this.currentNodeInfo = currentNodeInfo;
} }


Expand All @@ -61,17 +57,10 @@ public ControlNode(ControlNodeInfo currentNodeInfo, String zkHostPortList, Retry
* *
* @param currentNodeInfo * @param currentNodeInfo
* the current node info * the current node info
* @param zkHostPortList * @param zkClient Zookeeper client
* the zk host port list
* @param sessionTimeoutMs
* session timeout
* @param connectionTimeoutMs
* connection timeout
* @param retryPolicy
* the retry policy
*/ */
public ControlNode(ControlNodeInfo currentNodeInfo, String zkHostPortList, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { public ControlNode(ControlNodeInfo currentNodeInfo, CuratorFramework zkClient) {
super(zkHostPortList, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); super(zkClient);
this.currentNodeInfo = currentNodeInfo; this.currentNodeInfo = currentNodeInfo;
} }


Expand Down Expand Up @@ -127,7 +116,7 @@ public ControlNodeInfo getCurrentNodeInfo() {
@Override @Override
public boolean createZkNode() throws IOException { public boolean createZkNode() throws IOException {
try { try {
nodePath = client.create().withMode(CreateMode.EPHEMERAL) nodePath = zkClient.create().withMode(CreateMode.EPHEMERAL)
.forPath(ControlNodeTracker.CONTROL_SERVER_NODE_PATH, controlNodeAvroConverter.get().toByteArray(currentNodeInfo)); .forPath(ControlNodeTracker.CONTROL_SERVER_NODE_PATH, controlNodeAvroConverter.get().toByteArray(currentNodeInfo));
LOG.info("Created node with path: " + nodePath); LOG.info("Created node with path: " + nodePath);
} catch (NodeExistsException e) { } catch (NodeExistsException e) {
Expand Down
Expand Up @@ -43,13 +43,22 @@ public class OperationsNode extends WorkerNodeTracker {
* *
* @param nodeInfo * @param nodeInfo
* the node info * the node info
* @param zkHostPortList
* the zk host port list
* @param retryPolicy
* the retry policy
*/ */
public OperationsNode(OperationsNodeInfo nodeInfo, String zkHostPortList, RetryPolicy retryPolicy) { public OperationsNode(OperationsNodeInfo nodeInfo) {
super(zkHostPortList, retryPolicy); super();
this.nodeInfo = nodeInfo;
this.nodeInfo.setTimeStarted(System.currentTimeMillis());
}

/**
* Instantiates a new endpoint node.
*
* @param nodeInfo
* the node info
* @param zkClient Zookeeper client
*/
public OperationsNode(OperationsNodeInfo nodeInfo, CuratorFramework zkClient) {
super(zkClient);
this.nodeInfo = nodeInfo; this.nodeInfo = nodeInfo;
this.nodeInfo.setTimeStarted(System.currentTimeMillis()); this.nodeInfo.setTimeStarted(System.currentTimeMillis());
} }
Expand Down

0 comments on commit da29bc9

Please sign in to comment.