Skip to content

Commit

Permalink
[PinpiontWeb] fix a ActiveThread concurrency problem #1482
Browse files Browse the repository at this point in the history
  1. applied SimpleThreadPoolExecutor for separated thread from WebSocketSessionId.
  • Loading branch information
koo-taejin committed Jan 30, 2016
1 parent d8ddc0d commit e0f7a0e
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 35 deletions.
Expand Up @@ -21,6 +21,7 @@
import com.navercorp.pinpoint.rpc.util.MapUtils;
import com.navercorp.pinpoint.rpc.util.StringUtils;
import com.navercorp.pinpoint.web.service.AgentService;
import com.navercorp.pinpoint.web.util.SimpleOrderedThreadPool;
import com.navercorp.pinpoint.web.websocket.message.PinpointWebSocketMessage;
import com.navercorp.pinpoint.web.websocket.message.PinpointWebSocketMessageConverter;
import com.navercorp.pinpoint.web.websocket.message.PinpointWebSocketMessageType;
Expand All @@ -39,10 +40,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -68,7 +65,7 @@ public class ActiveThreadCountHandler extends TextWebSocketHandler implements Pi

private final AtomicBoolean onTimerTask = new AtomicBoolean(false);

private ExecutorService webSocketFlushThreadPool;
private SimpleOrderedThreadPool webSocketflushExecutor;

private java.util.Timer flushTimer;
private static final long DEFAULT_FLUSH_DELAY = 1000;
Expand Down Expand Up @@ -102,7 +99,7 @@ public ActiveThreadCountHandler(String requestMapping, AgentService agentService
@Override
public void start() {
PinpointThreadFactory flushThreadFactory = new PinpointThreadFactory(ClassUtils.simpleClassName(this) + "-Flush-Thread", true);
webSocketFlushThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, DEFAULT_HEALTH_CHECk_DELAY, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), flushThreadFactory);
webSocketflushExecutor = new SimpleOrderedThreadPool(Runtime.getRuntime().availableProcessors(), 65535, flushThreadFactory);

flushTimer = new java.util.Timer(ClassUtils.simpleClassName(this) + "-Flush-Timer", true);
healthCheckTimer = new java.util.Timer(ClassUtils.simpleClassName(this) + "-HealthCheck-Timer", true);
Expand Down Expand Up @@ -130,8 +127,8 @@ public void stop() {
reactiveTimer.cancel();
}

if (webSocketFlushThreadPool != null) {
webSocketFlushThreadPool.shutdown();
if (webSocketflushExecutor != null) {
webSocketflushExecutor.shutdown();
}
}

Expand Down Expand Up @@ -287,16 +284,11 @@ public void run() {

Collection<PinpointWebSocketResponseAggregator> values = aggregatorRepository.values();
for (final PinpointWebSocketResponseAggregator aggregator : values) {
webSocketFlushThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
aggregator.flush();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});
try {
aggregator.flush(webSocketflushExecutor);
} catch (Exception e) {
logger.warn("failed while flushing ActiveThreadCount to aggregator. applicationName:{}, error:{}", aggregator.getApplicationName(), e.getMessage(), e);
}
}
} finally {
long waitTimeMillis = getWaitTimeMillis();
Expand Down Expand Up @@ -380,13 +372,11 @@ private void closeSession(WebSocketSession session, CloseStatus status) {

private void sendPingMessage(WebSocketSession session, TextMessage pingMessage) {
try {
session.sendMessage(pingMessage);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
closeSession(session, CloseStatus.SERVER_ERROR);
webSocketflushExecutor.execute(new OrderedWebSocketFlushRunnable(session, pingMessage, true));
} catch (RuntimeException e) {
logger.warn("failed while to execute. error:{}.", e.getMessage(), e);
}
}

}

}
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -205,6 +206,11 @@ public void response(AgentActiveThreadCount activeThreadCount) {

@Override
public void flush() throws Exception {
flush(null);
}

@Override
public void flush(Executor executor) throws Exception {
if ((flushCount.getAndIncrement() % flushLogRecordRate) == 0) {
logger.info("flush started. applicationName:{}", applicationName);
}
Expand All @@ -228,24 +234,45 @@ public void flush() throws Exception {
activeThreadCountMap = new HashMap<>(activeThreadCountWorkerRepository.size());
}

flush0(response);
TextMessage webSocketTextMessage = createWebSocketTextMessage(response);
if (webSocketTextMessage != null) {
if (executor == null) {
flush0(webSocketTextMessage);
} else {
flushAsync0(webSocketTextMessage, executor);
}
}
}

private void flush0(AgentActiveThreadCountList activeThreadCountList) {
private TextMessage createWebSocketTextMessage(AgentActiveThreadCountList activeThreadCountList) {
Map resultMap = createResultMap(activeThreadCountList, System.currentTimeMillis());
try {
TextMessage responseTextMessage = new TextMessage(messageConverter.getResponseTextMessage(ActiveThreadCountHandler.API_ACTIVE_THREAD_COUNT, resultMap));
return responseTextMessage;
} catch (JsonProcessingException e) {
logger.warn("failed while to convert message. applicationName:{}, original:{}, message:{}.", applicationName, resultMap, e.getMessage(), e);
}
return null;
}

for (WebSocketSession webSocketSession : webSocketSessions) {
try {
logger.debug("flush webSocketSession:{}, response:{}", webSocketSession, responseTextMessage);
webSocketSession.sendMessage(responseTextMessage);
} catch (Exception e) {
logger.warn("failed while flush message(applicationName:{}, session:{}). Error:{}", webSocketSession, applicationName, e.getMessage(), e);
}
private void flush0(TextMessage webSocketMessage) {
for (WebSocketSession webSocketSession : webSocketSessions) {
try {
logger.debug("flush webSocketSession:{}, response:{}", webSocketSession, webSocketMessage);
webSocketSession.sendMessage(webSocketMessage);
} catch (Exception e) {
logger.warn("failed while flushing message to webSocket. session:{}, message:{}, error:{}", webSocketSession, webSocketMessage, e.getMessage(), e);
}
} catch (JsonProcessingException e) {
logger.warn("failed while convert message. applicationName:{}, original:{}, message:{}.", applicationName, resultMap, e.getMessage(), e);
}
}

private void flushAsync0(TextMessage webSocketMessage, Executor executor) {
for (WebSocketSession webSocketSession : webSocketSessions) {
if (webSocketSession == null) {
logger.warn("failed caused webSocketSession is null. applicationName:{}", applicationName);
continue;
}
executor.execute(new OrderedWebSocketFlushRunnable(webSocketSession, webSocketMessage));
}
}

Expand All @@ -265,7 +292,7 @@ private Map createResultMap(AgentActiveThreadCountList activeThreadCount, long t
}

private String createEmptyResponseMessage(String applicationName, long timeStamp) {
StringBuilder emptyJsonMessage = new StringBuilder();
StringBuilder emptyJsonMessage = new StringBuilder(32);
emptyJsonMessage.append("{");
emptyJsonMessage.append("\"").append(APPLICATION_NAME).append("\"").append(":").append("\"").append(applicationName).append("\"").append(",");
emptyJsonMessage.append("\"").append(ACTIVE_THREAD_COUNTS).append("\"").append(":").append("{}").append(",");
Expand Down
@@ -0,0 +1,88 @@
/*
* Copyright 2016 NAVER Corp.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.web.websocket;

import com.navercorp.pinpoint.bootstrap.util.StringUtils;
import com.navercorp.pinpoint.web.util.SimpleOrderedThreadPool;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

/**
* @Author Taejin Koo
*/
public class OrderedWebSocketFlushRunnable implements Runnable, SimpleOrderedThreadPool.HashSelector {


private static final Logger LOGGER = LoggerFactory.getLogger(OrderedWebSocketFlushRunnable.class);

private final WebSocketSession webSocketSession;
private final TextMessage webSocketMessage;

private final boolean sessionCloseOnError;

public OrderedWebSocketFlushRunnable(WebSocketSession webSocketSession, TextMessage webSocketMessage) {
this(webSocketSession, webSocketMessage, false);
}

public OrderedWebSocketFlushRunnable(WebSocketSession webSocketSession, TextMessage webSocketMessage, boolean sessionCloseOnError) {
if (webSocketSession == null) {
throw new NullPointerException("webSocketSession null.");
}
if (webSocketMessage == null) {
throw new NullPointerException("webSocketMessage null.");

}

this.webSocketSession = webSocketSession;
this.webSocketMessage = webSocketMessage;
this.sessionCloseOnError = sessionCloseOnError;
}

@Override
public int select() {
String webSocketSessionId = webSocketSession.getId();
if (StringUtils.isEmpty(webSocketSessionId)) {
webSocketSessionId = RandomStringUtils.random(1);
}

return webSocketSessionId.hashCode();
}

@Override
public void run() {
try {
webSocketSession.sendMessage(webSocketMessage);
} catch (Exception e) {
LOGGER.warn("failed while flushing message to webSocket. session:{}, message:{}, error:{}", webSocketSession, webSocketMessage, e.getMessage(), e);
if (sessionCloseOnError) {
closeSession(webSocketSession);
}
}
}

private void closeSession(WebSocketSession session) {
try {
session.close(CloseStatus.SERVER_ERROR);
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
}
}

}
Expand Up @@ -23,6 +23,8 @@
import com.navercorp.pinpoint.web.vo.AgentInfo;
import org.springframework.web.socket.WebSocketSession;

import java.util.concurrent.Executor;

/**
* @Author Taejin Koo
*/
Expand All @@ -34,6 +36,8 @@ public interface PinpointWebSocketResponseAggregator {

void flush() throws Exception;

void flush(Executor executor) throws Exception;

void response(AgentActiveThreadCount activeThreadCount);

void addWebSocketSession(WebSocketSession webSocketSession);
Expand Down

0 comments on commit e0f7a0e

Please sign in to comment.