-
Notifications
You must be signed in to change notification settings - Fork 5
/
MergeCall.java
146 lines (125 loc) · 3.6 KB
/
MergeCall.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package org.easyj.easyjcommon.concurrent;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.easyj.easyjlog.util.LoggerUtil;
/**
* 合并调用 <br>
*
* <br>
* 使用示例:
* <code>
* MergeCall.execute("key", () -> {
* return ref.invoke();
* });
* </code>
*
* @author dengjianjun
*
*/
public class MergeCall {
// 最大循环次数
private static int MAX_CYCLE = 1000;
// 超时时间(毫秒)
private static long MAX_TIMEOUT = 3 * 1000;
// 最大并发请求数,超过部分快速失败
private static int MAX_REQUEST = 10000;
// 睡眠等待时间(毫秒)
private static long SLEEP_TIME = 10;
// 定义为 volatile 是为了在 double-check 的时候禁止指令重排
private static volatile Map<String, AtomicInteger> counter = new ConcurrentHashMap<>();
private static volatile Map<String, Optional<Object>> cache = new ConcurrentHashMap<>();
@SuppressWarnings("unchecked")
public static <E extends Object> E execute(String key, Supplier<E> function) {
if (key == null || key.trim().length() == 0) {
return null;
}
// 记录开始时间,用于超时处理
long startTime = System.currentTimeMillis();
// 记录自旋次数
int cycleCount = 0;
AtomicInteger running = counter.get(key);
if (running == null) {
// 初始对应的计数器
synchronized (counter) {
running = counter.get(key);
if (running == null) {
running = new AtomicInteger(0);
counter.put(key, running);
}
}
}
Optional<Object> result = null;
int cur = running.incrementAndGet();
while (result == null) {
cycleCount++;
try {
if (cur == 1) {
// 并发时只有第一个线程执行业务逻辑
E resultObj = null;
try {
resultObj = function.get();
} catch (Exception e) {
LoggerUtil.error("execute failed [{}]({}ms {}num): {}", key,
(System.currentTimeMillis() - startTime), cycleCount, e.getMessage());
throw e;
} finally {
result = Optional.ofNullable(resultObj);
cache.put(key, result);
}
} else {
// 其它线程直接从缓存获取
result = cache.get(key);
if (result == null) {
if (isBreak(key, cur, cycleCount, startTime)) {
// 超时结束
result = Optional.empty();
} else {
// 睡眠一段时间再次重试
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
LoggerUtil.warn(e, "线程等待被中断");
}
}
}
}
} finally {
if (result != null) {
cur = running.decrementAndGet();
if (cur == 0) {
// 最后一个线程清除缓存,考虑到业务逻辑耗时相对较长,所以删除操作并未与插入操作做同步
cache.remove(key);
}
}
}
}
E returnObj = null;
if (result.isPresent()) {
returnObj = (E) result.get();
}
LoggerUtil.debug("execute success [{}]({}ms {}num): {}", key, (System.currentTimeMillis() - startTime),
cycleCount, returnObj);
return returnObj;
}
/*
* 判断是否超出等待条件
*/
private static boolean isBreak(String key, int cur, int cycleCount, long startTime) {
boolean flag = false;
if (cur > MAX_REQUEST) {
flag = true;
} else if ((System.currentTimeMillis() - startTime) > MAX_TIMEOUT) {
flag = true;
} else if (cycleCount > MAX_CYCLE) {
flag = true;
}
if (flag) {
LoggerUtil.warn("execute break [{}]({}ms {}num): {}", key, (System.currentTimeMillis() - startTime),
cycleCount, cur);
}
return flag;
}
}