Skip to content

Commit

Permalink
Default annotators if none provided
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabor Angeli authored and Stanford NLP committed Sep 3, 2015
1 parent 7041177 commit 6d8ee82
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 63 deletions.
157 changes: 140 additions & 17 deletions src/edu/stanford/nlp/pipeline/CoreNLPWebClient.java
Expand Up @@ -29,8 +29,17 @@
*/ */
@SuppressWarnings("FieldCanBeLocal") @SuppressWarnings("FieldCanBeLocal")
public class CoreNLPWebClient extends AnnotationPipeline { public class CoreNLPWebClient extends AnnotationPipeline {

/** /**
* Information on how to connect to a backend * Information on how to connect to a backend.
* The semantics of one of these objects is as follows:
* <ul>
* <li>It should define a hostname and port to connect to.</li>
* <li>This represents ONE thread on the remote server. The client should
* treat it as such.</li>
* <li>Two backends that are .equals() point to the same endpoint, but there can be
* multiple of them if we want to run multiple threads on that endpoint.</li>
* </ul>
*/ */
private static class Backend { private static class Backend {
/** The protocol to connect to the server with. */ /** The protocol to connect to the server with. */
Expand All @@ -52,25 +61,71 @@ public boolean equals(Object o) {
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = protocol.hashCode(); throw new IllegalStateException("Hashing backends is dangerous!");
result = 31 * result + host.hashCode();
result = 31 * result + port;
return result;
} }
} }


/**
* A special type of {@link Thread}, which is responsible for scheduling jobs
* on the backend.
*/
private static class BackendScheduler extends Thread { private static class BackendScheduler extends Thread {
/**
* The list of backends that we can schedule on.
* This should not generally be called directly from anywhere
*/
public final List<Backend> backends; public final List<Backend> backends;


/**
* The queue of annotators (backends) that are free to be run on.
* Remember to lock access to this object with {@link BackendScheduler#freeAnnotatorsLock}.
*/
private final Queue<Backend> freeAnnotators; private final Queue<Backend> freeAnnotators;
/**
* The lock on access to {@link BackendScheduler#freeAnnotators}.
*/
private final Lock freeAnnotatorsLock = new ReentrantLock(); private final Lock freeAnnotatorsLock = new ReentrantLock();
/**
* Represents the event that an annotator has freed up and is available for
* work on the {@link BackendScheduler#freeAnnotators} queue.
* Linked to {@link BackendScheduler#freeAnnotatorsLock}.
*/
private final Condition newlyFree = freeAnnotatorsLock.newCondition(); private final Condition newlyFree = freeAnnotatorsLock.newCondition();


/**
* The queue on requests for the scheduler to handle.
* Each element of this queue is a function: calling the function signals
* that this backend is available to perform a task on the passed backend.
* It is then obligated to call the passed Consumer to signal that it has
* released control of the backend, and it can be used for other things.
* Remember to lock access to this object with {@link BackendScheduler#queueLock}.
*/
private final Queue<BiConsumer<Backend, Consumer<Backend>>> queue; private final Queue<BiConsumer<Backend, Consumer<Backend>>> queue;

/**
* The lock on access to {@link BackendScheduler#queue}.
*/
private final Lock queueLock = new ReentrantLock(); private final Lock queueLock = new ReentrantLock();
/**
* Represents the event that an item has been added to the work queue.
* Linked to {@link BackendScheduler#queueLock}.
*/
private final Condition enqueued = queueLock.newCondition(); private final Condition enqueued = queueLock.newCondition();

/**
* Represents the event that the queue has become empty, and this schedule is no
* longer needed.
*/
public final Condition queueEmpty = queueLock.newCondition();

/**
* While this is true, continue running the scheduler.
*/
private boolean doRun = true;

/**
* Create a new scheduler from a list of backends.
* These can contain duplicates -- in that case, that many concurrent
* calls can be made to that backend.
*/
public BackendScheduler(List<Backend> backends) { public BackendScheduler(List<Backend> backends) {
super(); super();
setDaemon(true); setDaemon(true);
Expand All @@ -79,21 +134,26 @@ public BackendScheduler(List<Backend> backends) {
this.queue = new LinkedList<>(); this.queue = new LinkedList<>();
} }


@SuppressWarnings("InfiniteLoopStatement") /** {@inheritDoc} */
@Override @Override
public void run() { public void run() {
try { try {
while (true) { while (doRun) {
// Wait for a request // Wait for a request
queueLock.lock(); queueLock.lock();
while (queue.isEmpty()) { while (queue.isEmpty()) {
enqueued.await(); enqueued.await();
if (!doRun) { return; }
} }
BiConsumer<Backend, Consumer<Backend>> request = queue.poll(); BiConsumer<Backend, Consumer<Backend>> request = queue.poll();
queueLock.unlock();
// We have a request // We have a request
// Signal if the queue is empty
if (queue.isEmpty()) {
queueEmpty.signalAll();
}
queueLock.unlock();


// Find a fre annotator // Find a free annotator
freeAnnotatorsLock.lock(); freeAnnotatorsLock.lock();
while (freeAnnotators.isEmpty()) { while (freeAnnotators.isEmpty()) {
newlyFree.await(); newlyFree.await();
Expand Down Expand Up @@ -121,6 +181,13 @@ public void run() {
} }
} }


/**
* Schedule a new job on the backend
* @param annotate A callback, which will be called when a backend is free
* to do some processing. The implementation of this callback
* MUST CALL the second argument when it is done processing,
* to register the backend as free for further work.
*/
public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) { public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) {
queueLock.lock(); queueLock.lock();
try { try {
Expand All @@ -140,6 +207,7 @@ public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) {
/** The properties file, serialized as JSON. */ /** The properties file, serialized as JSON. */
private final String propsAsJSON; private final String propsAsJSON;


/** The scheduler to use when running on multiple backends at a time */
private final BackendScheduler scheduler; private final BackendScheduler scheduler;


/** /**
Expand All @@ -148,6 +216,13 @@ public void schedule(BiConsumer<Backend, Consumer<Backend>> annotate) {
*/ */
private final ProtobufAnnotationSerializer serializer = new ProtobufAnnotationSerializer(true); private final ProtobufAnnotationSerializer serializer = new ProtobufAnnotationSerializer(true);


/**
* The main constructor. Create a client from a properties file and a list of backends.
* Note that this creates at least one Daemon thread.
*
* @param properties The properties file, as would be passed to {@link StanfordCoreNLP}.
* @param backends The backends to run on.
*/
public CoreNLPWebClient(Properties properties, List<Backend> backends) { public CoreNLPWebClient(Properties properties, List<Backend> backends) {
// Save the constructor variables // Save the constructor variables
this.properties = new Properties(properties); this.properties = new Properties(properties);
Expand Down Expand Up @@ -175,10 +250,27 @@ public CoreNLPWebClient(Properties properties, List<Backend> backends) {
this.scheduler.start(); this.scheduler.start();
} }


/**
* Run on a single backend.
*
* @see CoreNLPWebClient(Properties, List)
*/
public CoreNLPWebClient(Properties properties, String host, int port) { public CoreNLPWebClient(Properties properties, String host, int port) {
this(properties, Collections.singletonList(new Backend(host, port))); this(properties, Collections.singletonList(new Backend(host, port)));
} }


/**
* Run on a single backend, but with k threads on each backend.
*
* @see CoreNLPWebClient(Properties, List)
*/
public CoreNLPWebClient(Properties properties, String host, int port, int threads) {
this(properties, new ArrayList<Backend>() {{
for (int i = 0; i < threads; ++i) {
add(new Backend(host, port));
}
}});
}


/** /**
* {@inheritDoc} * {@inheritDoc}
Expand Down Expand Up @@ -209,7 +301,8 @@ public void annotate(Annotation annotation) {
} }


/** /**
* This method fires off a request to the server. Upon returning, * This method fires off a request to the server. Upon returning, it calls the provided
* callback method.
* *
* @param annotations The input annotations to process * @param annotations The input annotations to process
* @param numThreads The number of threads to run on. IGNORED in this class. * @param numThreads The number of threads to run on. IGNORED in this class.
Expand Down Expand Up @@ -311,6 +404,9 @@ public Annotation process(String text) {
private static void shell(CoreNLPWebClient pipeline) throws IOException { private static void shell(CoreNLPWebClient pipeline) throws IOException {
System.err.println("Entering interactive shell. Type q RETURN or EOF to quit."); System.err.println("Entering interactive shell. Type q RETURN or EOF to quit.");
final StanfordCoreNLP.OutputFormat outputFormat = StanfordCoreNLP.OutputFormat.valueOf(pipeline.properties.getProperty("outputFormat", "text").toUpperCase()); final StanfordCoreNLP.OutputFormat outputFormat = StanfordCoreNLP.OutputFormat.valueOf(pipeline.properties.getProperty("outputFormat", "text").toUpperCase());
if (pipeline.properties.getProperty("annotators") == null) {
pipeline.properties.setProperty("annotators", "tokenize,ssplit,pos,depparse,ner");
}
IOUtils.console("NLP> ", line -> { IOUtils.console("NLP> ", line -> {
if (line.length() > 0) { if (line.length() > 0) {
Annotation anno = pipeline.process(line); Annotation anno = pipeline.process(line);
Expand Down Expand Up @@ -347,6 +443,7 @@ private static void shell(CoreNLPWebClient pipeline) throws IOException {
*/ */
public void run() throws IOException { public void run() throws IOException {
StanfordRedwoodConfiguration.minimalSetup(); StanfordRedwoodConfiguration.minimalSetup();
StanfordCoreNLP.OutputFormat outputFormat = StanfordCoreNLP.OutputFormat.valueOf(properties.getProperty("outputFormat", "text").toUpperCase());


// //
// Process one file or a directory of files // Process one file or a directory of files
Expand All @@ -358,7 +455,7 @@ public void run() throws IOException {
} }
Collection<File> files = new FileSequentialCollection(new File(fileName), properties.getProperty("extension"), true); Collection<File> files = new FileSequentialCollection(new File(fileName), properties.getProperty("extension"), true);
StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate, StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate,
StanfordCoreNLP.createOutputter(properties, AnnotationOutputter.Options::new)); StanfordCoreNLP.createOutputter(properties, new AnnotationOutputter.Options()), outputFormat);
} }


// //
Expand All @@ -376,7 +473,7 @@ else if (properties.containsKey("filelist")){
} }
} }
StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate, StanfordCoreNLP.processFiles(null, files, 1, properties, this::annotate,
StanfordCoreNLP.createOutputter(properties, AnnotationOutputter.Options::new)); StanfordCoreNLP.createOutputter(properties, new AnnotationOutputter.Options()), outputFormat);
} }


// //
Expand All @@ -387,6 +484,30 @@ else if (properties.containsKey("filelist")){
} }
} }


/**
* <p>
* Good practice to call after you are done with this object.
* Shuts down the queue of annotations to run and the associated threads.
* </p>
*
* <p>
* If this is not called, any job which has been scheduled but not run will be
* cancelled.
* </p>
*/
public void shutdown() throws InterruptedException {
scheduler.queueLock.lock();
try {
while (!scheduler.queue.isEmpty()) {
scheduler.queueEmpty.await();
}
scheduler.doRun = false;
scheduler.enqueued.signalAll(); // In case the thread's waiting on this condition
} finally {
scheduler.queueLock.unlock();
}
}



/** /**
* This can be used just for testing or for command-line text processing. * This can be used just for testing or for command-line text processing.
Expand Down Expand Up @@ -437,8 +558,10 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
} }


// Run the pipeline // Run the pipeline
new CoreNLPWebClient(props, backends).run(); CoreNLPWebClient client = new CoreNLPWebClient(props, backends);
client.run();
try {
client.shutdown(); // In case anything is pending on the server
} catch (InterruptedException ignored) { }
} }
} }


0 comments on commit 6d8ee82

Please sign in to comment.