Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
428 lines (396 sloc) 17.7 KB
---
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
title: "Document Processing"
---
<p>
This document describes how to develop and deploy <em>Document Processors</em>.
Document processing is a framework to create <a href="chained-components.html">chains</a>
of configurable <a href="jdisc/container-components.html">components</a>,
that read and modify document operations.
</p><p>
The input source, typically a crawler, a stream of incoming mail,
data generated from user actions, products or basically anything else,
splits the input data into logical units called <a href="documents.html">documents</a>.
A <a href="writing-to-vespa.html">feeder application</a>
sends the documents into a document processing chain.
This chain is an ordered list of document processors.
Document processing examples range from language detection,
HTML removal and natural language processing to mail attachment processing,
character set transcoding and image thumbnailing.
At the end of the processing chain, extracted data will typically be
set in some fields in the document, which will continue into a Vespa Content node.
</p><p>
The motivation for Document processing is that code and configuration is atomically deployed,
as like all Vespa components.
It is also easy to build components that access data in Vespa as part of processing.
</p><p>
To get started with development,
see <a href="getting-started-vespa-applications.html">Vespa Applications</a>.
Read <a href="indexing.html">indexing</a> first to understand deployment and routing.
As document processors are chained components just like Searchers,
read <a href="searcher-development.html">Searcher Development</a>.
For reference, see the
<a href="http://javadoc.io/doc/com.yahoo.vespa/docproc">Javadoc</a>,
and <a href="reference/services-docproc.html">services.xml</a>.
</p>
<h2 id="deploying-a-document-processor">Deploying a Document Processor</h2>
<p>
Refer to<a href="https://github.com/vespa-engine/sample-apps/tree/master/basic-search-java">
Basic Search Java</a> to get started,
<a href="https://github.com/vespa-engine/sample-apps/blob/master/basic-search-java/src/main/java/com/mydomain/example/Rot13DocumentProcessor.java">
Rot13DocumentProcessor.java</a> / <a href="https://github.com/vespa-engine/sample-apps/blob/master/basic-search-java/src/test/java/com/mydomain/example/Rot13DocumentProcessorTest.java">
Rot13DocumentProcessorTest.java</a> is a document processor example.
Add the document processor in <a href="reference/services-docproc.html">services.xml</a>,
using <em>"default"</em> to include it in the default indexing chain -
find more details in <a href="indexing.html#document-processing">indexing</a>:
<pre>
&lt;?xml version="1.0" encoding="utf-8"?&gt;
&lt;services version="1.0"&gt;
&lt;admin version="2.0"&gt;
&lt;adminserver hostalias="node1" /&gt;
&lt;/admin&gt;
&lt;container version="1.0" id="default"&gt;
&lt;nodes&gt;
&lt;node hostalias="node1"/&gt;
&lt;/nodes&gt;
<span style="background-color: yellow;"> &lt;document-processing&gt;
&lt;chain id="default"&gt;
&lt;documentprocessor id="com.yahoo.vespatest.Rot13DocumentProcessor"/&gt;
&lt;/chain&gt;
&lt;/document-processing&gt;</span>
&lt;/container&gt;
&lt;/services&gt;
</pre>
</p>
<h2 id="document-processors">Document Processors</h2>
<p>
A document processor is a component extending <code>com.yahoo.docproc.DocumentProcessor</code>.
All document processors must implement <code>process()</code>:
<pre>
public Progress process(Processing processing);
</pre>
When the container receives a document operation, it will create a
new <code>Processing</code>, and add
the <code>DocumentPut</code>s, <code>DocumentUpdate</code>s
or <code>DocumentRemove</code>s to the <code>List</code> accessible
through <code>Processing.getDocumentOperations()</code>. Furthermore, the
call stack of the document processing chain in question will
be <em>copied</em> to <code>Processing.callStack()</code>, so that
document processors may freely modify the flow of control for this
processing without affecting all other processings going on. After
creation, the <code>Processing</code> is added to an internal queue.
</p><p>
A worker thread will retrieve a <code>Processing</code> from the input
queue, and run its document operations through its call stack. A
minimal, no-op document processor implementation is thus:
</p>
<pre>
import com.yahoo.docproc.*;
public class SimpleDocumentProcessor extends DocumentProcessor {
public Progress process(Processing processing) {
return Progress.DONE;
}
}
</pre>
<p>
The <code>process()</code> method should loop through all
document operations in <code>Processing.getDocumentOperations()</code>, do
whatever it sees fit to them, and return a Progress:
<pre>
public Progress process(Processing processing) {
for (DocumentOperation op : processing.getDocumentOperations()) {
if (op instanceof DocumentPut) {
DocumentPut put = (DocumentPut) op;
// TODO do something to 'put here
} else if (op instanceof DocumentUpdate) {
DocumentUpdate update = (DocumentUpdate) op;
// TODO do something to 'update' here
} else if (op instanceof DocumentRemove) {
DocumentRemove remove = (DocumentRemove) op;
// TODO do something to 'remove' here
}
}
return Progress.DONE;
}
</pre>
Return codes:
<table class="table">
<thead></thead><tbody>
<tr>
<td><code>Progress.DONE</code></td>
<td>Returned if a document processor has successfully processed a <code>Processing</code></td>
</tr><tr>
<td><code>Progress.FAILED</code></td>
<td>
Processing failed and the input message should return a <em>fatal</em>
failure back to the feeding application,
meaning that this application will not try to re-feed this document operation
</td>
</tr><tr>
<td><code>Progress.LATER</code></td>
<td>
<p>
See <a href="#execution-model">execution model</a>.
The document processor wants to release the calling thread and be called again later.
This is useful if e.g. calling an external service with high latency.
The document processor may then save its state in the <code>Processing</code>
and resume when called again later.
There are no guarantees as to <em>when</em> the processor is called again
with this <code>Processing</code>; it is simply appended to the back of the input queue
</p><p>
By the use of <code>Progress.LATER</code>, this is an asynchronous model,
where the processing of a document operation
does not need to consume one thread for its entire lifespan.
Note, however, that the document processors themselves
are shared between all processing operations in a chain,
and must thus be implemented <a href="#state">thread-safe</a>
</p>
</td>
</tr>
</tbody>
</table>
Return an error message/reason by calling <code>withReason()</code>:
<pre>
if (op instanceof DocumentPut) {
return Progress.FAILED.withReason("PUT is not supported");
}
</pre>
A document processor can throw an exception:
<table class="table">
<thead></thead><tbody>
<tr>
<td><code>com.yahoo.docproc.TransientFailureException</code></td>
<td>
Processing failed and the input message should return a <em>transient</em> failure
back to the feeding application,
meaning that this application <em>may</em> try to re-feed this document operation
</td>
</tr><tr>
<td><code>RuntimeException</code></td>
<td>
Throwing any other <code>RuntimeException</code>
means same behavior as for <code>Progress.FAILED</code>
</td>
</tr>
</tbody>
</table>
</p>
<h2 id="Chains">Chains</h2>
<p>
The call stack mentioned above is another name for a <em>document
processor chain</em>. Document processor chains are a special case of
the general <a href="chained-components.html">component chains</a> -
to avoid confusion some concepts are explained here as well.
A document processor chain is nothing more than a list of document
processor instances, having an id, and represented as a stack. The
document processor chains are typically not created for every
processing, but are part of the configuration. Multiple ones may
exist at the same time, the chain to execute will be specified by the
message bus destination of the incoming message. The same document
processor instance may exist in multiple document processor chains,
which is why the <code>CallStack</code> of the <code>Processing</code>
is responsible for knowing the next document processor to invoke in a
particular message.
</p><p>
The execution order of the document processors in a chain are not ordered explicitly,
but by <a href="chained-components.html#ordering-components">ordering
constraints</a> declared in the document processors or their configuration.
</p>
<h2 id="execution-model">Execution model</h2>
<p>
The Document Processing Framework works like this:
<ol>
<li>
A thread from the message bus layer appends an incoming message to an internal priority queue,
shared between all document processing chains configured on a node.
The priority is set based on the message bus priority of the message.
Messages of the same priority are ordered FIFO
</li><li>
One worker thread from the docproc thread pool picks one message
from the head of the queue, deserializes it, copies the call stack
(chain) in question, and runs it through the document processors
</li><li>
Processing finishes if <strong>(a)</strong> the document(s) has
passed successfully through the whole chain,
or <strong>(b)</strong> a document processor in the chain has
returned <code>Progress.FAILED</code> or thrown an exception
</li><li>
The same thread passes the message on to the message bus layer for
further transport on to its destination
</li>
</ol>
There is a single instance of each document processor chain.
In every chain, there is a single instance of each document processor -
unless a chain is configured with multiple, identical document processors
- this is a rare case.
</p><p>
As is evident from the model above, multiple worker threads execute
the document processors in a chain concurrently.
Thus; many threads of execution can be going through <code>process()</code> in a
document processor, at the same time.
</p><p>
This model places a very important constraint on document processor classes:
<em>instance variables are not safe.</em>
They must be eliminated, or made threadsafe somehow.
</p><p>
Also see <a href="jdisc/container-components.html#resource-management">Resource management</a>,
use <code>deconstruct()</code> in order to not leak resources.
</p>
<h3 id="asynchronous-execution">Asynchronous execution</h3>
<p>
The execution model outlined above also shows one important restriction:
If a document processor performs any high-latency operation in its process() method,
a docproc worker thread will be occupied.
With all <em>n</em> worker threads blocking on an external resource,
throughput will be limited.
This can be fixed by saving the state in the Processing object,
and returning <code>Progress.LATER</code>.
A document processor doing a high-latency operation should use a pattern like this:
<ol>
<li>Check a self-defined context variable in Processing for status.
Basically, <em>have we seen this Processing before?</em></li>
<li>If no:
<ol>
<li>We have been given a Processing object fresh off the network,
we have not seen this before. Process it up until the
high-latency operation.</li>
<li>Start the high-latency operation (possibly in a separate
thread).</li>
<li>Save the state of the operation in a self-defined context
variable in the Processing.</li>
<li>Return <code>Progress.LATER</code>. This Processing is the appended to the
back of the input queue, and we will be called again
later.</li>
</ol></li>
<li>If yes:
<ol>
<li>Retrieve the reference that we set in our self-defined context variable in Processing</li>
<li>Is the high-latency operation done? If so, return <code>Progress.DONE</code></li>
<li>Is it not yet done? Return <code>Progress.LATER</code> again</li>
</ol></li>
</ol>
As is evident, this will let the finite set of document processing
threads to perform more work at the same time.
</p>
<h2 id="state">State</h2>
<p>
Any state in the document processor for the particular Processing
should be kept as local variables in the process method,
while state which should be shared by all Processings should
be kept as member variables. As the latter kind will be accessed by
multiple threads at any one time, the state of such member variables
must be <em>threadsafe</em>.
This critical restriction is similar to those of e.g. the Servlet API.
Options for implementing a multithread-safe document processor with instance variables:
<ol>
<li>Use immutable (and preferably final) objects: they never change
after they are constructed; no modifications to their state occurs
after the DocumentProcessor constructor returns</li>
<li>Use a single instance of a thread-safe class</li>
<li>Create a single instance and synchronize access to it across all
threads (but this will severely limit your scalability)</li>
<li>Arrange for each thread to have its own instance, e.g. with
a <code>ThreadLocal</code></li>
</ol>
</p>
<h3 id="processing-context-variables">Processing Context Variables</h3>
<p>
The <code>Processing</code> has a map <code>String -&gt; Object</code>
that can be used to pass information between document processors. It
is also very useful when using <code>Progress.LATER</code>, as
discussed above, to save the state of a processing.
<pre>
/** Returns a context variable, or null if it is not set */
public Object getVariable(String name);
/** Returns an iterator of all context variables that are set */
public Iterator&lt;Map.Entry&lt;String, Object&gt;&gt; getVariableAndNameIterator();
/** Clears all context variables that have been set */
public void clearVariables();
/** Sets a context variable. */
public void setVariable(String name, Object value);
/** Removes a context variable. */
public Object removeVariable(String name);
/** Returns true if this variable is present, even if it is null */
public boolean hasVariable(String name);
</pre>
</p>
<h2 id="reconfiguring-document-processing">(Re)configuring Document Processing</h2>
<p>
Consider the following configuration:
<pre>
&lt;?xml version="1.0" encoding="utf-8" ?&gt;
&lt;services version="1.0"&gt;
&lt;container version="1.0" id="default"&gt;
&lt;document-processing&gt;
&lt;chain id="default"&gt;
&lt;documentprocessor id="SomeDocumentProcessor"&gt;
<span style="background-color: yellow;"> &lt;config name="foo.something"&gt;
&lt;variable&gt;value&lt;/variable&gt;
&lt;/config&gt;</span>
&lt;/documentprocessor&gt
&lt;/chain&gt;
&lt;/document-processing&gt;
&lt;/container&gt;
&lt;/services&gt;
</pre>
Changing chain ids, components in a chain, component configuration and schema mapping
all takes effect after
<a href="cloudconfig/application-packages.html#deploy">vespa-deploy activate</a> -
no restart required.
Changing a <em>cluster name</em> (i.e. the container id)
requires a restart of docproc services after <em>vespa-deploy activate</em>.
</p><p>
Note when adding or modifying a processing chain in a running cluster;
if at the same time deploying a <em>new</em> document processor
(i.e. a document processor that was unknown to Vespa at the time the cluster was started),
the container must be restarted:
<pre>
$ <a href="reference/vespa-cmdline-tools.html#vespa-sentinel-cmd">vespa-sentinel-cmd</a> restart container
</pre>
</p>
<h2 id="class-diagram">Class diagram</h2>
<figure>
<img src="img/docproc/docproccore.gif" alt="Document processing core class diagram" />
</figure>
<p>
The framework core supports asynchronous processing, processing one or
multiple documents or document updates at the same time,
document processors that makes dynamic decisions about the processing flow and
passing of information between processors outside the document or document update:
</p>
<ul>
<li>
One or more named <strong>Docproc Services</strong> may be created.
One of the services is the <em>default</em>
</li><li>
A service accepts subclasses of <strong>DocumentOperation</strong> for processing,
meaning <strong>DocumentPuts</strong>, <strong>DocumentUpdates</strong>
and <strong>DocumentRemoves</strong>.
It has a <strong>Call Stack</strong> which lists the calls to make to
various <strong>Document Processors</strong> to process each
DocumentOperation handed to the service
</li><li>
Call Stacks consist of <strong>Calls</strong>,
which refer to the Document Processor instance to call
</li><li>
DocumentPuts and document updates are processed asynchronously,
the state is kept in a Processing for its duration
(instead of in a thread or process).
A Document Processor may make some asynchronous calls (typically to remote services)
and return to the framework that it should be called again later
for the same Processing to handle the outcome of the calls
</li><li>
A processing contains its own copy of the Call Stack of the Docproc Service
to keep track of what to call next.
Document Processors may modify this Call Stack
to dynamically decide the processing steps required to process a DocumentOperation
</li><li>
A Processing may contain one or more DocumentOperations to be processed as a unit
</li><li>
A Processing has a <em>context</em>, which is a Map of named values
which can be used to pass arguments between processors
</li><li>
Processings are prepared to be stored to disk,
to allow a high number of ongoing long-term processings per node
</li>
</ul>
You can’t perform that action at this time.