Skip to content

Commit

Permalink
fix: ngrok服务器端100%的bug,啊啊啊
Browse files Browse the repository at this point in the history
  • Loading branch information
wendal committed Mar 14, 2017
1 parent 488d258 commit fdb2465
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 12 deletions.
Expand Up @@ -193,7 +193,7 @@ protected void handle() {
try {
// 看看服务器想干啥
NgrokMsg msg = NgrokAgent.readMsg(ctlIn);
String type = msg.getString("Type");
String type = msg.getType();
// 服务器要求我们发送新的代理链接
if ("ReqProxy".equals(type)) {
ProxyConn pc = new ProxyConn();
Expand Down Expand Up @@ -296,7 +296,7 @@ public Object call() throws Exception {
// 等待服务器响应StartProxy
NgrokMsg msg = NgrokAgent.readMsg(srvIn);
// 如果真的响应了StartProxy,开始桥接Socket
if ("StartProxy".equals(msg.getString("Type"))) {
if ("StartProxy".equals(msg.getType())) {
try {
if (log.isDebugEnabled())
log.debug("start socket pipe ...");
Expand Down
Expand Up @@ -28,9 +28,10 @@ public class NgrokAgent {

public static void writeMsg(OutputStream out, NgrokMsg msg) throws IOException {
synchronized (out) {
NutMap map = new NutMap("Type", msg.remove("Type")).setv("Payload", msg);
String type = (String) msg.remove("Type");
NutMap map = new NutMap("Type", type).setv("Payload", msg);
String cnt = Json.toJson(map, JsonFormat.tidy().setQuoteName(true));
if (log.isDebugEnabled() && !"Ping".equals(map.get("Type")))
if (log.isDebugEnabled() && !"Ping".equals(type) && !"Pong".equals(type))
log.debug("write msg = " + cnt);
byte[] buf = cnt.getBytes(Encoding.CHARSET_UTF8);
int len = buf.length;
Expand All @@ -55,7 +56,7 @@ public static NgrokMsg readMsg(InputStream ins) throws IOException {
Map<String, Object> payload = map.getAs("Payload", Map.class);
if (payload == null)
payload = new HashMap<String, Object>();
if (log.isDebugEnabled() && !"Pong".equals(msg.get("Type")))
if (log.isDebugEnabled() && !"Pong".equals(msg.get("Type")) && !"Ping".equals(msg.get("Type")))
log.debug("read msg = " + cnt);
msg.putAll(payload);
return msg;
Expand Down Expand Up @@ -158,13 +159,15 @@ public static InputStream gzip_in(boolean enable, InputStream ins) throws IOExce

public static void httpResp(OutputStream out , int code, String cnt) {
try {
byte[] buf = cnt.getBytes();
String respLine = String.format("HTTP/1.0 %d %s\r\n", code, Http.getStatusText(code, ""));
String content_len = "Content-Length: " + cnt.getBytes().length + "\r\n";
String content_len = "Content-Length: " + buf.length + "\r\n";
out.write(respLine.getBytes());
out.write(content_len.getBytes());
out.write("\r\n".getBytes());
out.write(cnt.getBytes());
out.write(buf);
out.flush();
out.close();
}
catch (IOException e) {
}
Expand Down
Expand Up @@ -117,4 +117,8 @@ public static NgrokMsg pong() {
public void write(OutputStream out) throws IOException {
NgrokAgent.writeMsg(out, this);
}

public String getType() {
return this.getString("Type");
}
}
Expand Up @@ -27,6 +27,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;

import org.nutz.lang.Stopwatch;
import org.nutz.lang.Streams;
import org.nutz.lang.Strings;
import org.nutz.lang.random.R;
Expand Down Expand Up @@ -176,7 +177,7 @@ public Object call() throws Exception {
this.out = socket.getOutputStream();
while (true) {
NgrokMsg msg = NgrokAgent.readMsg(ins);
String type = msg.getString("Type");
String type = msg.getType();
if ("Auth".equals(type)) {
if (authed) {
NgrokMsg.authResp("", "Auth Again?!!").write(out);
Expand Down Expand Up @@ -274,7 +275,9 @@ public boolean reqProxy(String host) throws IOException {
String reqId = reqIdMap.get(host);
if (reqId == null)
return false;
NgrokAgent.writeMsg(out, NgrokMsg.reqProxy(reqId, "http://" + host, "http", ""));
for (int i = 0; i < 5; i++) {
NgrokAgent.writeMsg(out, NgrokMsg.reqProxy(reqId, "http://" + host, "http", ""));
}
return true;
}

Expand All @@ -296,6 +299,8 @@ public ProxySocket getProxy(String host) throws Exception {
return ps;
}
if (ps == null) {
if (log.isDebugEnabled())
log.debugf("req proxy conn for host[%s]", host);
if (reqProxy(host))
ps = idleProxys.poll(client_proxy_wait_timeout, TimeUnit.MILLISECONDS);
}
Expand All @@ -308,6 +313,8 @@ public void clean() {
ProxySocket proxySocket = idleProxys.poll();
if (proxySocket != null)
Streams.safeClose(proxySocket.socket);
else
break;
}
}

Expand All @@ -326,14 +333,17 @@ public HttpThread(Socket socket) {
}

public Object call() throws Exception {
log.debug("NEW Http Request ...");
//if (log.isDebugEnabled())
// log.debug("NEW Http Request ...");
Stopwatch sw = Stopwatch.begin();
InputStream _ins = socket.getInputStream();
OutputStream _out = socket.getOutputStream();
ByteArrayOutputStream bao = new ByteArrayOutputStream(8192);
ByteArrayOutputStream line_buffer_bao = new ByteArrayOutputStream();
int line_len = 0;
int count = 0;
byte[] buf = new byte[1];
String firstLine = null;
while (true) {
int len = _ins.read(buf);
if (len == -1)
Expand All @@ -355,12 +365,17 @@ else if (len == 0)
if (line_len > 8) { // Host: wendal.cn 域名起码3位吧?
byte[] line_buf = line_buffer_bao.toByteArray();
String line = new String(line_buf).trim().toLowerCase();
if (firstLine == null) {
firstLine = line;
}
//log.debug("Header Line --> " + line);
// 看看是不是Host
// 有可能是Host或者host哦
if (line.startsWith("host") && line.contains(":")) {
else if (line.startsWith("host") && line.contains(":")) {
String host = line.split("[\\:]")[1].trim();
log.debug("Host --> " + host);
sw.tag("Read Host");
if (log.isDebugEnabled())
log.debugf("Host[%s] >> %s", host, firstLine);
String clientId = hostmap.get(host);
if (clientId == null) {
NgrokAgent.httpResp(_out, 404, "Tunnel " + host + " not found");
Expand All @@ -385,9 +400,11 @@ else if (len == 0)
socket.close();
return null;
}
sw.tag("After Get ProxySocket");
try {
NgrokAgent.writeMsg(proxySocket.socket.getOutputStream(),
NgrokMsg.startProxy("http://" + host, ""));
sw.tag("After Send Start Proxy");
proxySocket.socket.getOutputStream().write(bao.toByteArray());
// 服务器-->本地
PipedStreamThread srv2loc = new PipedStreamThread("http2proxy",
Expand All @@ -399,6 +416,9 @@ else if (len == 0)
NgrokAgent.gzip_in(client.gzip_proxy, proxySocket.socket.getInputStream()),
_out,
bufSize);
sw.tag("After PipedStream Make");
sw.stop();
log.debug("ProxyConn Timeline = " + sw.toString());
// 等待其中任意一个管道的关闭
String exitFirst = executorService.invokeAny(Arrays.asList(srv2loc,
loc2srv));
Expand Down

0 comments on commit fdb2465

Please sign in to comment.