An example application that computes phrase counts for unique documents using Fluo. Each new unique document that is added causes phrase counts to be incremented. Unique documents have reference counts based on the number of locations that point to them. When a unique document is no longer referenced by any location, then the phrase counts will be decremented appropriately.
After phrase counts are incremented, export transactions send phrase counts to an Accumulo table. The purpose of exporting data is to make it available for query. Percolator is not designed to support queries, because its transactions are designed for throughput and not responsiveness.
This example uses the following schema.
Row | Column | Value | Purpose |
---|---|---|---|
uri:<uri> | doc:hash | <hash> | Contains the hash of the document found at the URI |
doc:<hash> | doc:content | <document> | The contents of the document |
doc:<hash> | doc:refCount | <int> | The number of URIs that reference this document |
doc:<hash> | index:check | empty | Setting this columns triggers the observer that indexes the document |
doc:<hash> | index:status | INDEXED or empty | Used to track the status of whether this document was indexed |
phrase:<phrase> | stat:check | empty | Triggers observer that handles high cardinality phrases |
phrase:<phrase> | stat:docCount | <int> | Total number of documents the phrase occurred in |
phrase:<phrase> | stat:sum | <int> | Total number of times the phrase was seen in all documents |
phrase:<phrase> | stat:docCount<int> | <int> | Random document count column used for high cardinality phrases |
phrase:<phrase> | stat:sum<int> | <int> | Random count column used for high cardinality phrases |
phrase:<phrase> | export:check | empty | Triggers export observer |
phrase:<phrase> | export:docCount | <int> | Phrase docCount queued for export |
phrase:<phrase> | export:seq | <int> | A sequence number used to order exports, as they may arrive out of order. |
phrase:<phrase> | export:sum | <int> | Phrase sum queued for export |
Documents are loaded into the Fluo table by DocumentLoader which is executed by Load. DocumentLoader handles reference counting of unique documents and may set a notification causing PhraseCounter to execute. PhraseCounter increments or decrements global phrase counts for all phrases found in a unique document. PhraseCounter is run by the Fluo worker process and is configured by Mini when using java to run this example. PhraseCounter may set a notification which causes PhraseExporter to run. PhraseExporter exports phrases to a file with a sequence number. The sequence number allows you to know which version of the phrase in the file is the most recent. PhraseExporter can be configured to export to an Accumulo table.
For high cardinality phrases, PhraseCounter will update a random column and set a notification that causes HCCounter to run. HCCounter will read all of random columns and update the main count. This breaks updating high cardinality phrases into two transactions, as mentioned in the Percolator paper.
After cloning this repository, build with following command.
mvn package
If you do not have Accumulo, Hadoop, Zookeeper, and Fluo setup, then you
can start an MiniFluo instance with the following command. This command
will create an fluo.properties
that can be used by the following commands
in this example.
mvn exec:java -Dexec.mainClass=phrasecount.cmd.Mini -Dexec.args="/tmp/mac fluo.properties" -Dexec.classpathScope=test
After the mini command prints out Wrote : fluo.properties
then its ready to use.
This command will automatically configure PhraseExporter to export phrases
to an Accumulo table named dataExport
.
The reason -Dexec.classpathScope=test
is set is because it adds the test
log4j.properties file to the classpath.
The following command will scan the directory $TXT_DIR
looking for .txt files to add. The scan is recursive.
mvn exec:java -Dexec.mainClass=phrasecount.cmd.Load -Dexec.args="fluo.properties $TXT_DIR" -Dexec.classpathScope=test
After documents are added, the following command will print out phrase counts. Try modifying a document you added and running the load command again, you should eventually see the phrase counts change.
mvn exec:java -Dexec.mainClass=phrasecount.cmd.Print -Dexec.args="fluo.properties" -Dexec.classpathScope=test
The command will print out the number of unique documents and the number of processed documents. If the number of processed documents is less than the number of unique documents, then there is still work to do. After the load command runs, the documents will have been added or updated. However the phrase counts will not update until the Observer runs in the background.
After all export transactions have run, the phrase counts in the Accumulo export table should be the same as those stored in the Fluo table. The following utility will iterate over the two and look for differences.
mvn exec:java -Dexec.mainClass=phrasecount.cmd.Compare -Dexec.args="fluo.properties data dataExport" -Dexec.classpathScope=test
If this command prints nothing, then all is good. If things are not good, then try enabling transaction trace logging and rerunning the scenario. Adding the following to log4j.properties will enable this tracing. This configuration is commented out in the test log4j.properties file.
log4j.logger.io.fluo.tx=TRACE
The following instructions cover running this example on an installed Fluo instance. Copy this jar to the Fluo observer directory.
cp target/phrasecount-0.0.1-SNAPSHOT.jar $FLUO_HOME/apps/<appname>/observers
Modify $FLUO_HOME/apps/<appname>/fluo.properties
and replace the observer
lines with the following:
io.fluo.observer.0=phrasecount.PhraseCounter
io.fluo.observer.1=phrasecount.PhraseExporter,sink=accumulo,instance=${io.fluo.client.accumulo.instance},zookeepers=${io.fluo.client.accumulo.zookeepers},user=${io.fluo.client.accumulo.user},password=${io.fluo.client.accumulo.password},table=pcExport
io.fluo.observer.weak.0=phrasecount.HCCounter
The line with PhraseExporter has configuration options that need to be configured to the Accumulo table where you want phrases to be exported.
Now initialize and start Fluo as outlined in its docs. Once started the
load and print commands above can be run passing in
$FLUO_HOME/apps/<appname>/conf/fluo.properties
There are two example bash scripts that run this test using the Fluo tar distribution and serve as executable documentation for deployment. The previous maven commands using the exec plugin are convenient for a development environment, using the following scripts shows how things would work in a production environment.
- run-mini.sh : Runs this example using mini fluo started using the tar distribution. Running this way does not require setting up Hadoop, Zookeeper, and Accumulo separately. Just download or build the Fluo tar distribution, untar it, and point the script to that directory. This is the easiest way to simulate a production environment.
- [run-cluster.sh] (bin/run-cluster.sh) : Runs this example with YARN using the Fluo tar distribution. Running in this way requires setting up Hadoop, Zookeeper, and Accumulo instances separately. The fluo-dev and fluo-deploy projects were created to ease setting up these external dependencies.
Need some data? Use elinks
to generate text files from web pages.
mkdir data
elinks -dump 1 -no-numbering -no-references http://accumulo.apache.org > data/accumulo.txt
elinks -dump 1 -no-numbering -no-references http://hadoop.apache.org > data/hadoop.txt
elinks -dump 1 -no-numbering -no-references http://zookeeper.apache.org > data/zookeeper.txt