Skip to content

Commit

Permalink
check active threads before submitting new ones.
Browse files Browse the repository at this point in the history
  • Loading branch information
achrafazharccsd committed Mar 3, 2021
1 parent bfc10f7 commit b702da9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 38 deletions.
Expand Up @@ -7,14 +7,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;

import org.apache.commons.lang3.concurrent.TimedSemaphore;
import org.apache.http.client.ClientProtocolException;
Expand All @@ -34,9 +27,9 @@ public class CrossrefClient implements Closeable {
protected static volatile CrossrefClient instance;

protected volatile ExecutorService executorService;


protected int max_pool_size = 1;
protected static boolean limitAuto = true;
protected volatile TimedSemaphore timedSemaphore;

// this list is used to maintain a list of Futures that were submitted,
// that we can use to check if the requests are completed
Expand Down Expand Up @@ -67,7 +60,6 @@ protected CrossrefClient() {
t.setDaemon(true);
return t;
});
this.timedSemaphore = null;
this.futures = new HashMap<>();
setLimits(1, 1000);
}
Expand All @@ -78,19 +70,8 @@ public static void printLog(CrossrefRequest<?> request, String message) {
}

public void setLimits(int iterations, int interval) {
if ((this.timedSemaphore == null)
|| (this.timedSemaphore.getLimit() != iterations)
|| (this.timedSemaphore.getPeriod() != interval)) {
// custom executor to prevent stopping JVM from exiting
this.timedSemaphore = new TimedSemaphore(new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
}
}), interval, TimeUnit.MILLISECONDS, iterations);
}
this.setMax_pool_size(iterations);
// interval is not useful here ! we should wait termination of each thread
}

public void updateLimits(int iterations, int interval) {
Expand All @@ -100,16 +81,6 @@ public void updateLimits(int iterations, int interval) {
}
}

public synchronized void checkLimits() throws InterruptedException {
if (this.limitAuto) {
synchronized(this.timedSemaphore) {
printLog(null, "timedSemaphore acquire... current total: " + this.timedSemaphore.getAcquireCount() +
", still available: " + this.timedSemaphore.getAvailablePermits() );
this.timedSemaphore.acquire();
}
}
}

/**
* Push a request in pool to be executed as soon as possible, then wait a response through the listener.
* API Documentation : https://github.com/CrossRef/rest-api-doc/blob/master/rest_api.md
Expand All @@ -119,12 +90,23 @@ public <T extends Object> void pushRequest(CrossrefRequest<T> request, CrossrefR
if (listener != null)
request.addListener(listener);
synchronized(this) {
// we should limite the number of active threads depending on crossref api limits
while(((ThreadPoolExecutor)executorService).getActiveCount()>=this.max_pool_size) {
try {
TimeUnit.MICROSECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Future<?> f = executorService.submit(new CrossrefRequestTask<T>(this, request));
List<Future<?>> localFutures = this.futures.get(new Long(threadId));
if (localFutures == null)
localFutures = new ArrayList<Future<?>>();
localFutures.add(f);
this.futures.put(new Long(threadId), localFutures);
logger.debug("add request to thread " + threadId +
"active threads count is now " + ((ThreadPoolExecutor) executorService).getActiveCount()
);
//System.out.println("add request to thread " + threadId + " / current total for the thread: " + localFutures.size());
}
}
Expand Down Expand Up @@ -170,9 +152,16 @@ public void finish(long threadId) {
}
}

public int getMax_pool_size() {
return max_pool_size;
}

public void setMax_pool_size(int max_pool_size) {
this.max_pool_size = max_pool_size;
}

@Override
public void close() throws IOException {
timedSemaphore.shutdown();
executorService.shutdown();
}
}
Expand Up @@ -21,9 +21,7 @@ public CrossrefRequestTask(CrossrefClient client, CrossrefRequest<T> request) {

@Override
public void run() {
try {
client.checkLimits();

try {
CrossrefClient.printLog(request, ".. executing");

request.addListener(this);
Expand Down

0 comments on commit b702da9

Please sign in to comment.