/
CrossrefClient.java
167 lines (148 loc) · 5.81 KB
/
CrossrefClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package org.grobid.core.utilities.crossref;
import java.io.Closeable;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.*;
import org.apache.commons.lang3.concurrent.TimedSemaphore;
import org.apache.http.client.ClientProtocolException;
import org.grobid.core.utilities.crossref.CrossrefRequestListener.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Request pool to get data from api.crossref.org without exceeding limits
* supporting multi-thread.
*
* @author Vincent Kaestle, Patrice
*/
public class CrossrefClient implements Closeable {
public static final Logger logger = LoggerFactory.getLogger(CrossrefRequestTask.class);
protected static volatile CrossrefClient instance;
protected volatile ExecutorService executorService;
protected int max_pool_size = 1;
protected static boolean limitAuto = true;
// this list is used to maintain a list of Futures that were submitted,
// that we can use to check if the requests are completed
//private List<Future<?>> futures = new ArrayList<Future<?>>();
protected volatile Map<Long, List<Future<?>>> futures = new HashMap<>();
public static CrossrefClient getInstance() {
if (instance == null) {
getNewInstance();
}
return instance;
}
/**
* Creates a new instance.
*/
private static synchronized void getNewInstance() {
logger.debug("Get new instance of CrossrefClient");
instance = new CrossrefClient();
}
/**
* Hidden constructor
*/
protected CrossrefClient() {
this.executorService = Executors.newCachedThreadPool(r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
this.futures = new HashMap<>();
setLimits(1, 1000);
}
public static void printLog(CrossrefRequest<?> request, String message) {
logger.info((request != null ? request+": " : "")+message);
//System.out.println((request != null ? request+": " : "")+message);
}
public void setLimits(int iterations, int interval) {
this.setMax_pool_size(iterations);
// interval is not useful here ! we should wait termination of each thread
}
public void updateLimits(int iterations, int interval) {
if (this.limitAuto) {
//printLog(null, "Updating limits... " + iterations + " / " + interval);
this.setLimits(iterations / 2, interval);
}
}
/**
* Push a request in pool to be executed as soon as possible, then wait a response through the listener.
* API Documentation : https://github.com/CrossRef/rest-api-doc/blob/master/rest_api.md
*/
public <T extends Object> void pushRequest(CrossrefRequest<T> request, CrossrefRequestListener<T> listener,
long threadId) throws URISyntaxException, ClientProtocolException, IOException {
if (listener != null)
request.addListener(listener);
synchronized(this) {
// we should limite the number of active threads depending on crossref api limits
while(((ThreadPoolExecutor)executorService).getActiveCount()>=this.getMax_pool_size()) {
try {
TimeUnit.MICROSECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Future<?> f = executorService.submit(new CrossrefRequestTask<T>(this, request));
List<Future<?>> localFutures = this.futures.get(new Long(threadId));
if (localFutures == null)
localFutures = new ArrayList<Future<?>>();
localFutures.add(f);
this.futures.put(new Long(threadId), localFutures);
logger.debug("add request to thread " + threadId +
"active threads count is now " + ((ThreadPoolExecutor) executorService).getActiveCount()
);
//System.out.println("add request to thread " + threadId + " / current total for the thread: " + localFutures.size());
}
}
/**
* Push a request in pool to be executed soon as possible, then wait a response through the listener.
* @see <a href="https://github.com/CrossRef/rest-api-doc/blob/master/rest_api.md">Crossref API Documentation</a>
*
* @param params query parameters, can be null, ex: ?query.title=[title]&query.author=[author]
* @param deserializer json response deserializer, ex: WorkDeserializer to convert Work to BiblioItem
* @param threadId the java identifier of the thread providing the request (e.g. via Thread.currentThread().getId())
* @param listener catch response from request
*/
public <T extends Object> void pushRequest(String model, Map<String, String> params, CrossrefDeserializer<T> deserializer,
long threadId, CrossrefRequestListener<T> listener) throws URISyntaxException, ClientProtocolException, IOException {
CrossrefRequest<T> request = new CrossrefRequest<T>(model, params, deserializer);
synchronized(this) {
this.<T>pushRequest(request, listener, threadId);
}
}
/**
* Wait for all request from a specific thread to be completed
*/
public void finish(long threadId) {
synchronized(this.futures) {
try {
List<Future<?>> threadFutures = this.futures.get(new Long(threadId));
if (threadFutures != null) {
//System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< thread: " + threadId + " / waiting for " + threadFutures.size() + " requests to finish...");
for(Future<?> future : threadFutures) {
future.get();
// get will block until the future is done
}
this.futures.remove(threadId);
}
} catch (InterruptedException ie) {
// Preserve interrupt status
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
logger.error("CrossRef request execution fails");
}
}
}
public int getMax_pool_size() {
return max_pool_size;
}
public void setMax_pool_size(int max_pool_size) {
this.max_pool_size = max_pool_size;
}
@Override
public void close() throws IOException {
executorService.shutdown();
}
}