Skip to content

Commit

Permalink
增加Dns ZK node,ZKClient 代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Sep 23, 2016
1 parent 303ed34 commit 9d7b5ee
Show file tree
Hide file tree
Showing 17 changed files with 483 additions and 46 deletions.
17 changes: 15 additions & 2 deletions mpush-api/src/main/java/com/mpush/api/spi/net/DnsMapping.java
Expand Up @@ -24,8 +24,11 @@
import java.util.Objects;

public class DnsMapping {
private String ip;
private int port;
protected String ip;
protected int port;

public DnsMapping() {
}

public DnsMapping(String ip, int port) {
this.ip = ip;
Expand All @@ -40,6 +43,16 @@ public int getPort() {
return port;
}

public DnsMapping setIp(String ip) {
this.ip = ip;
return this;
}

public DnsMapping setPort(int port) {
this.port = port;
return this;
}

public static DnsMapping parse(String addr) {
String[] host_port = Objects.requireNonNull(addr, "dns mapping can not be null")
.split(":");
Expand Down
12 changes: 4 additions & 8 deletions mpush-boot/src/main/java/com/mpush/bootstrap/job/ServerBoot.java
Expand Up @@ -25,6 +25,7 @@
import com.mpush.tools.log.Logs;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import com.mpush.zk.ZKClient;
import com.mpush.zk.ZKRegister;
import com.mpush.zk.node.ZKServerNode;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -52,8 +53,9 @@ public void start() {
@Override
public void onSuccess(Object... args) {
Logs.Console.info("start {} success listen:{}", serverName, args[0]);
if (node != null) {
registerServerToZk(node.getZkPath(), Jsons.toJson(node));
if (node != null) {//注册应用到zk
ZKRegister.build().setEphemeral(true).setNode(node).register();
Logs.Console.info("register server node={} to zk path={}", node, node.getNodePath());
}
startNext();
}
Expand All @@ -76,10 +78,4 @@ protected void stop() {
}
stopNext();
}

//注册应用到zk
private void registerServerToZk(String path, String value) {
ZKClient.I.registerEphemeralSequential(path, value);
Logs.Console.info("register server node={} to zk name={}", value, path);
}
}
1 change: 0 additions & 1 deletion mpush-client/.gitignore

This file was deleted.

Expand Up @@ -43,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -101,11 +102,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ErrorMessage errorMessage = new ErrorMessage(packet, connection);
LOGGER.error(">>> receive an error packet=" + errorMessage);
} else if (command == Command.BIND) {
OkMessage okMessage = new OkMessage(packet, connection);
LOGGER.warn(">>> receive an success packet=" + okMessage);
HttpRequestMessage message = new HttpRequestMessage(connection);
message.uri = "http://baidu.com";
message.send();

} else if (command == Command.PUSH) {
PushMessage message = new PushMessage(packet, connection);
LOGGER.warn(">>> receive an push message, content=" + new String(message.content, Constants.UTF_8));
Expand All @@ -117,8 +114,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

} else if (command == Command.HEARTBEAT) {
LOGGER.warn(">>> receive a heartbeat pong...");
} else {
LOGGER.warn(">>> receive a message, type=" + command + "," + packet);
} else if (command == Command.OK) {
OkMessage okMessage = new OkMessage(packet, connection);
LOGGER.warn(">>> receive an success packet=" + okMessage);
Map<String, String> headers = new HashMap<>();
headers.put(Constants.HTTP_HEAD_READ_TIMEOUT, "10000");
HttpRequestMessage message = new HttpRequestMessage(connection);
message.headers = headers;
message.uri = "http://baidu.com";
message.send();
} else if (command == Command.HTTP_PROXY) {
HttpResponseMessage message = new HttpResponseMessage(packet, connection);
LOGGER.warn(">>> receive a http response, message={}, body={}",
message, message.body == null ? null : new String(message.body, Constants.UTF_8));
}
}

Expand Down
Expand Up @@ -27,6 +27,12 @@
import com.mpush.api.spi.net.DnsMappingManager;
import com.mpush.tools.Jsons;
import com.mpush.tools.config.CC;
import com.mpush.zk.ZKPath;
import com.mpush.zk.ZKRegister;
import com.mpush.zk.cache.ZKDnsNodeCache;
import com.mpush.zk.listener.ZKDnsNodeWatcher;
import com.mpush.zk.node.ZKDnsNode;
import com.mpush.zk.node.ZKRedisNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,17 +46,20 @@

public class HttpProxyDnsMappingManager extends BaseService implements DnsMappingManager, Runnable {
private final Logger logger = LoggerFactory.getLogger(HttpProxyDnsMappingManager.class);

public HttpProxyDnsMappingManager() {
}
private final ZKDnsNodeWatcher watcher = new ZKDnsNodeWatcher();
private final ZKDnsNodeCache cache = watcher.getCache();

private final Map<String, List<DnsMapping>> all = Maps.newConcurrentMap();
private Map<String, List<DnsMapping>> available = Maps.newConcurrentMap();

private ScheduledExecutorService executorService;

public HttpProxyDnsMappingManager() {
}

@Override
protected void doStart(Listener listener) throws Throwable {
watcher.startWatch();
if (all.size() > 0) {
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(this, 1, 20, TimeUnit.SECONDS); //20秒 定时扫描dns
Expand Down Expand Up @@ -86,8 +95,13 @@ public Map<String, List<DnsMapping>> getAll() {
}

public DnsMapping lookup(String origin) {
if (available.isEmpty()) return null;
List<DnsMapping> list = available.get(origin);
List<? extends DnsMapping> list = cache.get(origin);

if (list == null || list.isEmpty()) {
if (available.isEmpty()) return null;
list = available.get(origin);
}

if (list == null || list.isEmpty()) return null;
int L = list.size();
if (L == 1) return list.get(0);
Expand Down
23 changes: 16 additions & 7 deletions mpush-zk/src/main/java/com/mpush/zk/ZKClient.java
Expand Up @@ -89,8 +89,10 @@ protected void doStop(Listener listener) throws Throwable {
*/
@Override
public void init() {
if (zkConfig != null) return;
zkConfig = ZKConfig.build();
if (client != null) return;
if (zkConfig == null) {
zkConfig = ZKConfig.build();
}
Builder builder = CuratorFrameworkFactory
.builder()
.connectString(zkConfig.getHosts())
Expand Down Expand Up @@ -163,12 +165,14 @@ public String get(final String key) {
* @return
*/
public String getFromRemote(final String key) {
try {
return new String(client.getData().forPath(key), Constants.UTF_8);
} catch (Exception ex) {
Logs.ZK.error("getFromRemote:{}", key, ex);
return null;
if (isExisted(key)) {
try {
return new String(client.getData().forPath(key), Constants.UTF_8);
} catch (Exception ex) {
Logs.ZK.error("getFromRemote:{}", key, ex);
}
}
return null;
}

/**
Expand Down Expand Up @@ -310,4 +314,9 @@ public void registerListener(ZKNodeCacheWatcher listener) {
public ZKConfig getZKConfig() {
return zkConfig;
}

public ZKClient setZKConfig(ZKConfig zkConfig) {
this.zkConfig = zkConfig;
return this;
}
}
23 changes: 19 additions & 4 deletions mpush-zk/src/main/java/com/mpush/zk/ZKPath.java
Expand Up @@ -22,10 +22,13 @@

import org.apache.curator.utils.ZKPaths;

import static org.apache.curator.utils.ZKPaths.PATH_SEPARATOR;

public enum ZKPath {
REDIS_SERVER("/redis", "machine", "redis注册的地方"),
CONNECT_SERVER("/cs/hosts", "machine", "connection server服务器应用注册的路径"),
GATEWAY_SERVER("/gs/hosts", "machine", "gateway server服务器应用注册的路径");
GATEWAY_SERVER("/gs/hosts", "machine", "gateway server服务器应用注册的路径"),
DNS_MAPPING("/dns/mapping", "machine", "dns mapping服务器应用注册的路径");

ZKPath(String root, String name, String desc) {
this.root = root;
Expand All @@ -40,12 +43,24 @@ public String getRootPath() {
}

public String getNodePath() {
return root + ZKPaths.PATH_SEPARATOR + name;
return root + PATH_SEPARATOR + name;
}

public String getNodePath(String... tails) {
String path = getNodePath();
for (String tail : tails) {
path += (PATH_SEPARATOR + tail);
}
return path;
}

//根据从zk中获取的app的值,拼装全路径
public String getFullPath(String tail) {
return root + ZKPaths.PATH_SEPARATOR + tail;
public String getFullPath(String childPaths) {
return root + PATH_SEPARATOR + childPaths;
}

public String getTail(String childPaths) {
return ZKPaths.getNodeFromPath(childPaths);
}

}
92 changes: 92 additions & 0 deletions mpush-zk/src/main/java/com/mpush/zk/ZKRegister.java
@@ -0,0 +1,92 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.zk;

import com.mpush.zk.node.ZKNode;

import java.util.Objects;

/**
* Created by ohun on 16/9/22.
*
* @author ohun@live.cn (夜色)
*/
public final class ZKRegister {
private ZKNode node;
private ZKPath path;
private boolean ephemeral = true;
private ZKClient client;

public void register() {
Objects.requireNonNull(this.node);
String path = node.getNodePath();
if (path == null) {
Objects.requireNonNull(this.path);
path = this.path.getNodePath();
}
if (ephemeral) {
client.registerEphemeralSequential(path, node.encode());
} else {
client.registerPersist(path, node.encode());
}
}

public static ZKRegister build() {
ZKRegister register = new ZKRegister();
register.client = ZKClient.I;
return register;
}

public ZKClient getClient() {
return client;
}

public ZKRegister setClient(ZKClient client) {
this.client = client;
return this;
}

public ZKNode getNode() {
return node;
}

public ZKRegister setNode(ZKNode node) {
this.node = node;
return this;
}

public ZKPath getPath() {
return path;
}

public ZKRegister setPath(ZKPath path) {
this.path = path;
return this;
}

public boolean isEphemeral() {
return ephemeral;
}

public ZKRegister setEphemeral(boolean ephemeral) {
this.ephemeral = ephemeral;
return this;
}
}

0 comments on commit 9d7b5ee

Please sign in to comment.