Skip to content

Commit

Permalink
Changes to how I read properties
Browse files Browse the repository at this point in the history
  • Loading branch information
arunchaganty authored and Stanford NLP committed Sep 3, 2015
1 parent 94e16d1 commit 7041177
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 183 deletions.
151 changes: 16 additions & 135 deletions src/edu/stanford/nlp/pipeline/CoreNLPWebClient.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -29,17 +29,8 @@
*/ */
@SuppressWarnings("FieldCanBeLocal") @SuppressWarnings("FieldCanBeLocal")
public class CoreNLPWebClient extends AnnotationPipeline { public class CoreNLPWebClient extends AnnotationPipeline {

/** /**
* Information on how to connect to a backend. * Information on how to connect to a backend
* The semantics of one of these objects is as follows:
* <ul>
* <li>It should define a hostname and port to connect to.</li>
* <li>This represents ONE thread on the remote server. The client should
* treat it as such.</li>
* <li>Two backends that are .equals() point to the same endpoint, but there can be
* multiple of them if we want to run multiple threads on that endpoint.</li>
* </ul>
*/ */
private static class Backend { private static class Backend {
/** The protocol to connect to the server with. */ /** The protocol to connect to the server with. */
Expand All @@ -61,71 +52,25 @@ public boolean equals(Object o) {
} }
@Override @Override
public int hashCode() { public int hashCode() {
throw new IllegalStateException("Hashing backends is dangerous!"); int result = protocol.hashCode();
result = 31 * result + host.hashCode();
result = 31 * result + port;
return result;
} }
} }


/**
* A special type of {@link Thread}, which is responsible for scheduling jobs
* on the backend.
*/
private static class BackendScheduler extends Thread { private static class BackendScheduler extends Thread {
/**
* The list of backends that we can schedule on.
* This should not generally be called directly from anywhere
*/
public final List<Backend> backends; public final List<Backend> backends;


/**
* The queue of annotators (backends) that are free to be run on.
* Remember to lock access to this object with {@link BackendScheduler#freeAnnotatorsLock}.
*/
private final Queue<Backend> freeAnnotators; private final Queue<Backend> freeAnnotators;
/**
* The lock on access to {@link BackendScheduler#freeAnnotators}.
*/
private final Lock freeAnnotatorsLock = new ReentrantLock(); private final Lock freeAnnotatorsLock = new ReentrantLock();
/**
* Represents the event that an annotator has freed up and is available for
* work on the {@link BackendScheduler#freeAnnotators} queue.
* Linked to {@link BackendScheduler#freeAnnotatorsLock}.
*/
private final Condition newlyFree = freeAnnotatorsLock.newCondition(); private final Condition newlyFree = freeAnnotatorsLock.newCondition();


/**
* The queue on requests for the scheduler to handle.
* Each element of this queue is a function: calling the function signals
* that this backend is available to perform a task on the passed backend.
* It is then obligated to call the passed Consumer to signal that it has
* released control of the backend, and it can be used for other things.
* Remember to lock access to this object with {@link BackendScheduler#queueLock}.
*/
private final Queue<BiConsumer<Backend, Consumer<Backend>>> queue; private final Queue<BiConsumer<Backend, Consumer<Backend>>> queue;
/**
* The lock on access to {@link BackendScheduler#queue}.
*/
private final Lock queueLock = new ReentrantLock(); private final Lock queueLock = new ReentrantLock();
/**
* Represents the event that an item has been added to the work queue.
* Linked to {@link BackendScheduler#queueLock}.
*/
private final Condition enqueued = queueLock.newCondition(); private final Condition enqueued = queueLock.newCondition();
/**
* Represents the event that the queue has become empty, and this schedule is no
* longer needed.
*/
public final Condition queueEmpty = queueLock.newCondition();

/**
* While this is true, continue running the scheduler.
*/
private boolean doRun = true;

/**
* Create a new scheduler from a list of backends.
* These can contain duplicates -- in that case, that many concurrent
* calls can be made to that backend.
*/
public BackendScheduler(List<Backend> backends) { public BackendScheduler(List<Backend> backends) {
super(); super();
setDaemon(true); setDaemon(true);
Expand All @@ -134,26 +79,21 @@ public BackendScheduler(List<Backend> backends) {
this.queue = new LinkedList<>(); this.queue = new LinkedList<>();
} }


/** {@inheritDoc} */ @SuppressWarnings("InfiniteLoopStatement")
@Override @Override
public void run() { public void run() {
try { try {
while (doRun) { while (true) {
// Wait for a request // Wait for a request
queueLock.lock(); queueLock.lock();
while (queue.isEmpty()) { while (queue.isEmpty()) {
enqueued.await(); enqueued.await();
if (!doRun) { return; }
} }
BiConsumer<Backend, Consumer<Backend>> request = queue.poll(); BiConsumer<Backend, Consumer<Backend>> request = queue.poll();
// We have a request
// Signal if the queue is empty
if (queue.isEmpty()) {
queueEmpty.signalAll();
}
queueLock.unlock(); queueLock.unlock();
// We have a request


// Find a free annotator // Find a fre annotator
freeAnnotatorsLock.lock(); freeAnnotatorsLock.lock();
while (freeAnnotators.isEmpty()) { while (freeAnnotators.isEmpty()) {
newlyFree.await(); newlyFree.await();
Expand Down Expand Up @@ -181,13 +121,6 @@ public void run() {
} }
} }


/**
* Schedule a new job on the backend
* @param annotate A callback, which will be called when a backend is free
* to do some processing. The implementation of this callback
* MUST CALL the second argument when it is done processing,
* to register the backend as free for further work.
*/
public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) { public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) {
queueLock.lock(); queueLock.lock();
try { try {
Expand All @@ -207,7 +140,6 @@ public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) {
/** The properties file, serialized as JSON. */ /** The properties file, serialized as JSON. */
private final String propsAsJSON; private final String propsAsJSON;


/** The scheduler to use when running on multiple backends at a time */
private final BackendScheduler scheduler; private final BackendScheduler scheduler;


/** /**
Expand All @@ -216,13 +148,6 @@ public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) {
*/ */
private final ProtobufAnnotationSerializer serializer = new ProtobufAnnotationSerializer(true); private final ProtobufAnnotationSerializer serializer = new ProtobufAnnotationSerializer(true);


/**
* The main constructor. Create a client from a properties file and a list of backends.
* Note that this creates at least one Daemon thread.
*
* @param properties The properties file, as would be passed to {@link StanfordCoreNLP}.
* @param backends The backends to run on.
*/
public CoreNLPWebClient(Properties properties, List<Backend> backends) { public CoreNLPWebClient(Properties properties, List<Backend> backends) {
// Save the constructor variables // Save the constructor variables
this.properties = new Properties(properties); this.properties = new Properties(properties);
Expand Down Expand Up @@ -250,27 +175,10 @@ public CoreNLPWebClient(Properties properties, List<Backend> backends) {
this.scheduler.start(); this.scheduler.start();
} }


/**
* Run on a single backend.
*
* @see CoreNLPWebClient(Properties, List)
*/
public CoreNLPWebClient(Properties properties, String host, int port) { public CoreNLPWebClient(Properties properties, String host, int port) {
this(properties, Collections.singletonList(new Backend(host, port))); this(properties, Collections.singletonList(new Backend(host, port)));
} }


/**
* Run on a single backend, but with k threads on each backend.
*
* @see CoreNLPWebClient(Properties, List)
*/
public CoreNLPWebClient(Properties properties, String host, int port, int threads) {
this(properties, new ArrayList<Backend>() {{
for (int i = 0; i < threads; ++i) {
add(new Backend(host, port));
}
}});
}


/** /**
* {@inheritDoc} * {@inheritDoc}
Expand Down Expand Up @@ -439,7 +347,6 @@ private static void shell(CoreNLPWebClient pipeline) throws IOException {
*/ */
public void run() throws IOException { public void run() throws IOException {
StanfordRedwoodConfiguration.minimalSetup(); StanfordRedwoodConfiguration.minimalSetup();
StanfordCoreNLP.OutputFormat outputFormat = StanfordCoreNLP.OutputFormat.valueOf(properties.getProperty("outputFormat", "text").toUpperCase());


// //
// Process one file or a directory of files // Process one file or a directory of files
Expand All @@ -451,7 +358,7 @@ public void run() throws IOException {
} }
Collection<File> files = new FileSequentialCollection(new File(fileName), properties.getProperty("extension"), true); Collection<File> files = new FileSequentialCollection(new File(fileName), properties.getProperty("extension"), true);
StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate, StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate,
StanfordCoreNLP.createOutputter(properties, new AnnotationOutputter.Options()), outputFormat); StanfordCoreNLP.createOutputter(properties, AnnotationOutputter.Options::new));
} }


// //
Expand All @@ -469,7 +376,7 @@ else if (properties.containsKey("filelist")){
} }
} }
StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate, StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate,
StanfordCoreNLP.createOutputter(properties, new AnnotationOutputter.Options()), outputFormat); StanfordCoreNLP.createOutputter(properties, AnnotationOutputter.Options::new));
} }


// //
Expand All @@ -480,30 +387,6 @@ else if (properties.containsKey("filelist")){
} }
} }


/**
* <p>
* Good practice to call after you are done with this object.
* Shuts down the queue of annotations to run and the associated threads.
* </p>
*
* <p>
* If this is not called, any job which has been scheduled but not run will be
* cancelled.
* </p>
*/
public void shutdown() throws InterruptedException {
scheduler.queueLock.lock();
try {
while (!scheduler.queue.isEmpty()) {
scheduler.queueEmpty.await();
}
scheduler.doRun = false;
scheduler.enqueued.signalAll(); // In case the thread's waiting on this condition
} finally {
scheduler.queueLock.unlock();
}
}



/** /**
* This can be used just for testing or for command-line text processing. * This can be used just for testing or for command-line text processing.
Expand Down Expand Up @@ -554,10 +437,8 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
} }


// Run the pipeline // Run the pipeline
CoreNLPWebClient client = new CoreNLPWebClient(props, backends); new CoreNLPWebClient(props, backends).run();
client.run();
try {
client.shutdown(); // In case anything is pending on the server
} catch (InterruptedException ignored) { }
} }
} }


Loading

0 comments on commit 7041177

Please sign in to comment.