From 5338260536387a47b6123e39930908011c842411 Mon Sep 17 00:00:00 2001 From: virjar Date: Wed, 16 Sep 2020 13:04:14 +0800 Subject: [PATCH] =?UTF-8?q?sekiro=20client=20=E5=A2=9E=E5=8A=A0=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=EF=BC=8C=E5=B9=B6=E4=BD=BF=E7=94=A8=E5=A4=9A?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../virjar/sekiro/api/HandlerThreadPool.java | 168 ++++++++++++++++++ .../api/SekiroRequestHandlerManager.java | 14 +- 2 files changed, 175 insertions(+), 7 deletions(-) create mode 100644 sekiro-lib/src/main/java/com/virjar/sekiro/api/HandlerThreadPool.java diff --git a/sekiro-lib/src/main/java/com/virjar/sekiro/api/HandlerThreadPool.java b/sekiro-lib/src/main/java/com/virjar/sekiro/api/HandlerThreadPool.java new file mode 100644 index 0000000..dbe9166 --- /dev/null +++ b/sekiro-lib/src/main/java/com/virjar/sekiro/api/HandlerThreadPool.java @@ -0,0 +1,168 @@ +package com.virjar.sekiro.api; + +import com.virjar.sekiro.log.SekiroLogger; + +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.util.internal.ConcurrentSet; + +public class HandlerThreadPool { + private static HandlerThreadPool instance; + + static { + instance = new HandlerThreadPool(); + // 默认启动两个线程工作 + instance.increaseWorker(); + instance.increaseWorker(); + } + + /** + * 线程最多空转30s + */ + private static int idleSecond = 30; + + /** + * 默认最多15个线程,可以设置到100个 + */ + private static int maxWorkSize = 15; + + /** + * 超过三个任务排队就会增加线程池数量 + */ + private static int maxPendingTaskSize = 3; + + /** + * 任务等待时间超过5s就会增加线程池数量 + */ + private static int maxWaitingSecond = 5; + + public static void setIdleSecond(int idleSecond) { + if (idleSecond < 0) { + return; + } + HandlerThreadPool.idleSecond = idleSecond; + } + + public static void setMaxWorkSize(int maxWorkSize) { + if (maxWorkSize > 100) { + SekiroLogger.warn("the sekiro worker can not grater than 100"); + return; + } + HandlerThreadPool.maxWorkSize = maxWorkSize; + } + + public static void setMaxPendingTaskSize(int maxPendingTaskSize) { + if (maxPendingTaskSize < 0) { + return; + } + HandlerThreadPool.maxPendingTaskSize = maxPendingTaskSize; + } + + public static void setMaxWaitingSecond(int maxWaitingSecond) { + if (maxWaitingSecond < 0) { + return; + } + HandlerThreadPool.maxWaitingSecond = maxWaitingSecond; + } + + + private static class TaskHolder { + SekiroRequest sekiroRequest; + SekiroResponse sekiroResponse; + SekiroRequestHandler sekiroRequestHandler; + long enqueueTimestamp; + + public TaskHolder(SekiroRequest sekiroRequest, SekiroResponse sekiroResponse, SekiroRequestHandler sekiroRequestHandler) { + this.sekiroRequest = sekiroRequest; + this.sekiroResponse = sekiroResponse; + this.sekiroRequestHandler = sekiroRequestHandler; + enqueueTimestamp = System.currentTimeMillis(); + } + } + + private static long idSeed = 0; + + private class TaskExecutorThread extends Thread { + public TaskExecutorThread() { + super("sekiro-worker-" + idSeed); + setDaemon(true); + idSeed++; + workers.add(this); + start(); + } + + private void work() { + while (!isInterrupted()) { + TaskHolder taskHolder; + try { + taskHolder = taskQueue.poll(idleSecond, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return; + } + if (taskHolder == null) { + if (workers.size() <= 2) { + // 有两个保底线程 + continue; + } + //任务为空,线程空转了20s,停止线程 + return; + } + + if (System.currentTimeMillis() - taskHolder.enqueueTimestamp + > maxWaitingSecond * 1000 || taskQueue.size() > maxPendingTaskSize + ) { + increaseWorker(); + } + + try { + taskHolder.sekiroRequestHandler.handleRequest( + taskHolder.sekiroRequest, taskHolder.sekiroResponse + ); + } catch (Throwable throwable) { + SekiroLogger.error("handle task", throwable); + taskHolder.sekiroResponse.failed(CommonRes.statusError, throwable); + } + } + } + + @Override + public void run() { + super.run(); + try { + work(); + } finally { + workers.remove(this); + } + } + } + + private void increaseWorker() { + if (workers.size() > maxWorkSize) { + SekiroLogger.warn("not enough thread resource to execute sekiro request,please setup your custom thread pool!!"); + return; + } + new TaskExecutorThread(); + } + + private BlockingQueue taskQueue = new LinkedBlockingQueue<>(); + private Set workers = new ConcurrentSet<>(); + + + public static void post(SekiroRequest sekiroRequest, SekiroResponse sekiroResponse, + SekiroRequestHandler sekiroRequestHandler) { + instance.taskQueue.add(new TaskHolder( + sekiroRequest, sekiroResponse, sekiroRequestHandler + )); + if (instance.workers.size() < 2 || instance.taskQueue.size() > maxPendingTaskSize) { + instance.increaseWorker(); + } + + if (instance.taskQueue.size() > 10) { + SekiroLogger.warn("too many pending task submit,please setup your custom thread pool!!"); + } + + } +} diff --git a/sekiro-lib/src/main/java/com/virjar/sekiro/api/SekiroRequestHandlerManager.java b/sekiro-lib/src/main/java/com/virjar/sekiro/api/SekiroRequestHandlerManager.java index 0347308..528cc64 100644 --- a/sekiro-lib/src/main/java/com/virjar/sekiro/api/SekiroRequestHandlerManager.java +++ b/sekiro-lib/src/main/java/com/virjar/sekiro/api/SekiroRequestHandlerManager.java @@ -8,7 +8,6 @@ import com.virjar.sekiro.api.databind.EmptyARCreateHelper; import com.virjar.sekiro.api.databind.FieldBindGenerator; import com.virjar.sekiro.api.databind.ICRCreateHelper; -import com.virjar.sekiro.log.SekiroLogger; import com.virjar.sekiro.netty.protocol.SekiroNatMessage; import com.virjar.sekiro.utils.Defaults; @@ -69,12 +68,13 @@ public void handleSekiroNatMessage(SekiroNatMessage sekiroNatMessage, Channel ch } } - try { - actionRequestHandlerGenerator.gen(sekiroRequest).handleRequest(sekiroRequest, sekiroResponse); - } catch (Throwable throwable) { - SekiroLogger.error("failed to generate action request handler", throwable); - sekiroResponse.failed(CommonRes.statusError, throwable); - } + HandlerThreadPool.post(sekiroRequest, sekiroResponse, actionRequestHandlerGenerator.gen(sekiroRequest)); +// try { +// actionRequestHandlerGenerator.gen(sekiroRequest).handleRequest(sekiroRequest, sekiroResponse); +// } catch (Throwable throwable) { +// SekiroLogger.error("failed to generate action request handler", throwable); +// sekiroResponse.failed(CommonRes.statusError, throwable); +// } }