Skip to content

Commit

Permalink
Fixes for Isabel notes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbos committed Oct 6, 2011
1 parent 74f6f33 commit 6abbd11
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* Online Viterbi algorithm implementation which could decode hidden variable sequence from the given
* sequence of observed variables as soon as some part of input sequence could be decoded. In some cases
* this algorithm may perform at the constant space and asymptotically same time as the normal Viterbi
* Based on Rastislav Sramek's master thesis
* Based on "Rastislav Sramek. The Online Viterbi algorithm (Master's Thesis). 2007".
* @see HmmEvaluator
*/
public class HmmOnlineViterbi implements Writable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import org.apache.commons.cli2.commandline.Parser;
import org.apache.mahout.common.CommandLineUtil;

import java.io.*;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.mahout.classifier.sequencelearning.hmm.mapreduce;

import org.apache.commons.lang.NullArgumentException;
import com.google.common.base.Preconditions;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
Expand Down Expand Up @@ -79,8 +79,8 @@ public int[] getData() {
}

public void setData(int[] data) {
if (data == null)
throw new NullArgumentException("data");
Preconditions.checkNotNull(data);

this.data = data;
this.length = data.length;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@

/**
* Performs all work for decoding hidden states from given sequence of observed variables for HMM using
* {@link HmmOnlineViterbi} functionality
* {@link HmmOnlineViterbi} functionality.
*
* The logic is similar to {@link ForwardViterbiReducer} (since it's online algorithm there's only
* forward pass). The reducer takes current chunk of observed variables and algorithm state from the previous step
* and produces new algorithm state after decoding at the output.
* Decoded sequence of hidden variables (possibly empty sequence if nothing could be decoded at the moment) is written
* in the sequence file in the background.
*/
public class OnlineViterbiReducer extends Reducer<Text, ViterbiDataWritable, Text, ViterbiDataWritable> {
private String outputPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,51 @@ public int run(String[] args) throws IOException {
FileSystem inputFs = FileSystem.get(URI.create(input), configuration);
FileSystem outputFs = FileSystem.get(URI.create(output), configuration);

OutputStream outputStream = outputFs.create(new Path(output));
PrintWriter writer = new PrintWriter(outputStream);

int chunkNumber = 0;
HiddenSequenceWritable decoded = new HiddenSequenceWritable();

while (true) {
Path chunkPath = new Path(input, String.valueOf(chunkNumber));
if (!inputFs.exists(chunkPath))
break;

log.info("Reading " + input + ", chunk number " + chunkNumber);
FileSystem fs = FileSystem.get(chunkPath.toUri(), configuration);
SequenceFile.Reader reader = new SequenceFile.Reader(fs, chunkPath, configuration);

IntWritable chunk = new IntWritable();
while (reader.next(chunk)) {
reader.getCurrentValue(decoded);

for (int state: decoded.get()) {
writer.print(state);
writer.print(' ');
OutputStream outputStream = null;
try {
outputStream = outputFs.create(new Path(output));
PrintWriter writer = new PrintWriter(outputStream);

int chunkNumber = 0;
HiddenSequenceWritable decoded = new HiddenSequenceWritable();

while (true) {
Path chunkPath = new Path(input, String.valueOf(chunkNumber));
if (!inputFs.exists(chunkPath))
break;

log.info("Reading " + input + ", chunk number " + chunkNumber);
FileSystem fs = FileSystem.get(chunkPath.toUri(), configuration);
SequenceFile.Reader reader = null;

try {
reader = new SequenceFile.Reader(fs, chunkPath, configuration);

IntWritable chunk = new IntWritable();
while (reader.next(chunk)) {
reader.getCurrentValue(decoded);

for (int state: decoded.get()) {
writer.print(state);
writer.print(' ');
}
}
}
finally {
if (reader != null)
reader.close();
}

++chunkNumber;
reader.close();
}

++chunkNumber;
reader.close();
writer.close();
}
finally {
if (outputStream != null)
outputStream.close();
}

writer.close();
outputStream.close();

} else {
int chunkSize = Integer.parseInt((String) commandLine.getValue(chunkSizeOption));
Expand Down Expand Up @@ -157,7 +171,6 @@ static class ChunkSplitter implements PathFilter {
Configuration configuration;
FileSystem outputFileSystem;
Map<String, List<SequenceFile.Writer>> outputs = new HashMap<String, List<SequenceFile.Writer>>();
//List<SequenceFile.Writer> outputs = new ArrayList<SequenceFile.Writer>();

public ChunkSplitter(int chunkSize, Path outputPath, Configuration configuration) throws IOException {
this.chunkSize = chunkSize;
Expand All @@ -169,55 +182,63 @@ public ChunkSplitter(int chunkSize, Path outputPath, Configuration configuration

public void process(Path inputPath, FileSystem fs) throws IOException {
log.info("Splitting " + inputPath.getName() + " to chunks with size " + chunkSize);
FSDataInputStream in = fs.open(inputPath);
FSDataInputStream in = null;
String inputName = inputPath.getName();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
Scanner scanner = new Scanner(reader);

for (int currentChunk = 0; ; ++currentChunk) {
int[] chunkObservations = new int[chunkSize];
int observationsRead;
for (observationsRead = 0;
observationsRead < chunkSize && scanner.hasNext(); ++observationsRead) {
chunkObservations[observationsRead] = scanner.nextInt();
}

if (observationsRead > 0) {
List<SequenceFile.Writer> chunkWriters = outputs.get(inputPath.toString());
if (chunkWriters == null) {
chunkWriters = new LinkedList<SequenceFile.Writer>();
outputs.put(inputPath.toString(), chunkWriters);
try {
in = fs.open(inputPath);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
Scanner scanner = new Scanner(reader);

for (int currentChunk = 0; ; ++currentChunk) {
int[] chunkObservations = new int[chunkSize];
int observationsRead;
for (observationsRead = 0;
observationsRead < chunkSize && scanner.hasNext(); ++observationsRead) {
chunkObservations[observationsRead] = scanner.nextInt();
}
if (chunkWriters.size() <= currentChunk) {
log.debug("Opening new sequence file for chunk #" + currentChunk);
Path chunkPath = new Path(outputPath, String.valueOf(currentChunk));
if (!fs.exists(chunkPath)) fs.mkdirs(chunkPath);
SequenceFile.Writer writer = SequenceFile.createWriter(outputFileSystem, configuration,
new Path(chunkPath, inputPath.getName()),
Text.class, ViterbiDataWritable.class, SequenceFile.CompressionType.RECORD);
chunkWriters.add(writer);

log.info("Splitting " + inputName + ", chunk #" + currentChunk);
if (observationsRead > 0) {
List<SequenceFile.Writer> chunkWriters = outputs.get(inputPath.toString());
if (chunkWriters == null) {
chunkWriters = new LinkedList<SequenceFile.Writer>();
outputs.put(inputPath.toString(), chunkWriters);
}
if (chunkWriters.size() <= currentChunk) {
log.debug("Opening new sequence file for chunk #" + currentChunk);
Path chunkPath = new Path(outputPath, String.valueOf(currentChunk));
if (!fs.exists(chunkPath)) fs.mkdirs(chunkPath);
SequenceFile.Writer writer = SequenceFile.createWriter(outputFileSystem, configuration,
new Path(chunkPath, inputPath.getName()),
Text.class, ViterbiDataWritable.class, SequenceFile.CompressionType.RECORD);
chunkWriters.add(writer);

log.info("Splitting " + inputName + ", chunk #" + currentChunk);

ObservedSequenceWritable chunk = new ObservedSequenceWritable(chunkObservations,
observationsRead, currentChunk, !scanner.hasNextInt());
ObservedSequenceWritable chunk = new ObservedSequenceWritable(chunkObservations,
observationsRead, currentChunk, !scanner.hasNextInt());

log.info(observationsRead + " observations to write to this chunk");
writer.append(new Text(inputPath.getName()),
ViterbiDataWritable.fromObservedSequence(chunk));
log.info(observationsRead + " observations to write to this chunk");
writer.append(new Text(inputPath.getName()),
ViterbiDataWritable.fromObservedSequence(chunk));

writer.close();
writer.close();
}
}

if (observationsRead < chunkSize)
break;
}

if (observationsRead < chunkSize)
break;
}
scanner.close();
reader.close();

scanner.close();
reader.close();
in.close();
log.info(inputName + " was splitted successfully");
log.info(inputName + " was splitted successfully");
}
finally {
if (in != null)
in.close();
}
}

@Override
Expand All @@ -231,12 +252,6 @@ public boolean accept(Path path) {
}
return true;
}

/*public void close() throws IOException {
for (List<SequenceFile.Writer> chunkWrites: outputs.values()) {
for (SequenceFile.Writer)
}
}*/
}

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
* Generic Writable wrapper for multiple inputs that could be used by
* @see org.apache.mahout.classifier.sequencelearning.hmm.mapreduce.BackwardViterbiReducer and
* @see org.apache.mahout.classifier.sequencelearning.hmm.mapreduce.ForwardViterbiReducer
*
* That's internal class and it should not be used by Mahout's users.
*/
class ViterbiDataWritable extends GenericWritable {
static Class[] classes = new Class[] {
static Class[] wrappedClasses = new Class[] {
ObservedSequenceWritable.class,
HiddenStateProbabilitiesWritable.class,
VarIntWritable.class, BackpointersWritable.class,
Expand Down Expand Up @@ -65,6 +67,6 @@ public static ViterbiDataWritable fromInitialProbabilities(HiddenStateProbabilit

@Override
protected Class<? extends Writable>[] getTypes() {
return classes;
return wrappedClasses;
}
}

0 comments on commit 6abbd11

Please sign in to comment.