This repository has been archived by the owner on Nov 9, 2017. It is now read-only.
/
RestCallLimiter.java
157 lines (141 loc) · 5.11 KB
/
RestCallLimiter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package org.zanata.limits;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
/**
* @author Patrick Huang <a
* href="mailto:pahuang@redhat.com">pahuang@redhat.com</a>
*/
@Slf4j
class RestCallLimiter {
private volatile Semaphore maxConcurrentSemaphore;
private volatile Semaphore maxActiveSemaphore;
private int maxConcurrent;
private int maxActive;
RestCallLimiter(int maxConcurrent, int maxActive) {
this.maxConcurrent = maxConcurrent;
this.maxActive = maxActive;
this.maxConcurrentSemaphore = makeSemaphore(maxConcurrent);
this.maxActiveSemaphore = makeSemaphore(maxActive);
}
/**
* May throw an exception if it takes too long to obtain one of the
* semaphores
*
* @param taskAfterAcquire
* task to perform after acquire
*/
public boolean tryAcquireAndRun(Runnable taskAfterAcquire) {
// hang on to the semaphore, so that we can be certain of
// releasing the same one we acquired
final Semaphore concSem = maxConcurrentSemaphore;
boolean gotConcurrentPermit = concSem.tryAcquire();
if (gotConcurrentPermit) {
// if acquired, immediately enter try finally (release)
try {
log.debug("acquired [concurrent] permit");
if (!acquireActiveAndRatePermit(taskAfterAcquire)) {
throw new RuntimeException(
"Couldn't get an [active] permit before timeout");
}
} finally {
concSem.release();
log.debug("released [concurrent] semaphore");
}
} else {
log.debug("failed to acquire [concurrent] permit");
}
return gotConcurrentPermit;
}
private boolean acquireActiveAndRatePermit(Runnable taskAfterAcquire) {
log.debug("before acquire [active] semaphore:{}", maxActiveSemaphore);
try {
// hang on to the semaphore, so that we can be certain of
// releasing the same one we acquired
final Semaphore activeSem = maxActiveSemaphore;
boolean gotActivePermit =
activeSem.tryAcquire(5, TimeUnit.MINUTES);
if (gotActivePermit) {
// if acquired, immediately enter try finally (release)
try {
log.debug("got [active] semaphore");
taskAfterAcquire.run();
} finally {
activeSem.release();
log.debug("released [active] semaphore");
}
}
return gotActivePermit;
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
public synchronized void changeConfig(int maxConcurrent, int maxActive) {
if (maxConcurrent != this.maxConcurrent) {
log.debug(
"change max [concurrent] semaphore with new permit {}",
maxConcurrent);
maxConcurrentSemaphore =
makeSemaphore(maxConcurrent);
this.maxConcurrent = maxConcurrent;
}
if (maxActive != this.maxActive) {
log.debug(
"change max [active] semaphore with new permit {}",
maxActive);
maxActiveSemaphore = makeSemaphore(maxActive);
this.maxActive = maxActive;
}
}
public int availableConcurrentPermit() {
return maxConcurrentSemaphore.availablePermits();
}
public int availableActivePermit() {
return maxActiveSemaphore.availablePermits();
}
private static Semaphore makeSemaphore(int permit) {
if (permit == 0) {
return NoLimitSemaphore.INSTANCE;
} else {
return new Semaphore(permit, true);
}
}
@Override
public String toString() {
return Objects
.toStringHelper(this)
.add("id", super.toString())
.add("maxConcurrent(available)",
maxConcurrentSemaphore.availablePermits())
.add("maxActive(available)",
maxActiveSemaphore.availablePermits())
.add("maxActive(queue)", maxActiveSemaphore.getQueueLength())
.toString();
}
/**
* Overrides tryAcquire method to return true all the time.
*/
private static class NoLimitSemaphore extends Semaphore {
private static final long serialVersionUID = 1L;
private static final NoLimitSemaphore INSTANCE =
new NoLimitSemaphore();
private NoLimitSemaphore() {
super(1);
}
@Override
public void release() {
// do nothing
}
@Override
public boolean tryAcquire() {
return true;
}
@Override
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return true;
}
}
}