Skip to content

Commit

Permalink
Added the LzoStreamingInputFormat and LzoStreamingLineRecordReader cl…
Browse files Browse the repository at this point in the history
…asses.

These classes are more appropriate than DeprecatedLzoTextInputFormat /
DeprecatedLzoLineRecordReader for use with the hadoop-streaming jar, since
they have the same behavior as the default streaming input format:

- input is broken into lines using any of '\n', '\r', or '\r\n'.
- line contents up to the first '\t' character are treated as the key
- the rest of the line is treated as the value

In contrast, the DeprecatedLzoTextInputFormat treats the file offset as the
key and the entire line as the value. This resulted in weird behavior when
using the DeprecatedLzoTextInputFormat with a streaming MR job. For example,
when using -mapper 'cat' and no reducer (which should produce an output
file that's identical to the input file), this input

key1	    value1
key2	    value2
key3	    value3

Produced this output:

0	 key1 value1
95	 key2 value2
95	 key3 value3

which is clearly wrong. Using LzoStreamingInputFormat produces the expected
output (same as input).
  • Loading branch information
Ilya Maykov committed Aug 18, 2011
1 parent c4d41c3 commit 806e494
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 0 deletions.
Expand Up @@ -152,6 +152,7 @@ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
}
}

LOG.info("DeprecatedLzoTextInputFormat: returning " + result.size() + " input splits!");
return result.toArray(new FileSplit[result.size()]);
}

Expand Down
105 changes: 105 additions & 0 deletions src/java/com/hadoop/mapred/LzoStreamingInputFormat.java
@@ -0,0 +1,105 @@
/*
* This file is part of Hadoop-Gpl-Compression.
*
* Hadoop-Gpl-Compression is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Hadoop-Gpl-Compression is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hadoop-Gpl-Compression. If not, see
* <http://www.gnu.org/licenses/>.
*/

package com.hadoop.mapred;

import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

import com.hadoop.compression.lzo.LzoInputFormatCommon;

/**
* This class conforms to the old (org.apache.hadoop.mapred.*) hadoop API style
* which is deprecated but still required in places. Streaming, for example,
* does a check that the given input format is a descendant of
* org.apache.hadoop.mapred.InputFormat, which any InputFormat-derived class
* from the new API fails. In order for streaming to work, you must use
* com.hadoop.mapred.LzoStreamingInputFormat.
*
* See {@link LzoInputFormatCommon} for a description of the boolean property
* <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
* behavior of this input format.
*/

@SuppressWarnings("deprecation")
public class LzoStreamingInputFormat extends FileInputFormat<Text, Text>
implements JobConfigurable {

// Wrapper around DeprecatedLzoTextInputFormat that exposes a couple
// protected methods so we can delegate to them.
private class WrappedDeprecatedLzoTextInputFormat extends DeprecatedLzoTextInputFormat {
public boolean isSplitableWrapper(FileSystem fs, Path filename) {
return isSplitable(fs, filename);
}

public FileStatus[] listStatusWrapper(JobConf conf) throws IOException {
return listStatus(conf);
}
}

// This class delegates most calls to either DeprecatedLzoTextInputFormat
// (listStatus, getSplits, isSplitable) or KeyValueTextInputFormat
// (getRecordReader for non-LZO files).
private final WrappedDeprecatedLzoTextInputFormat lzoTextInputFormat =
new WrappedDeprecatedLzoTextInputFormat();
private final KeyValueTextInputFormat kvTextInputFormat =
new KeyValueTextInputFormat();

@Override
public void configure(JobConf conf) {
lzoTextInputFormat.configure(conf);
kvTextInputFormat.configure(conf);
}

@Override
protected FileStatus[] listStatus(JobConf conf) throws IOException {
// Delegate to DeprecatedLzoTextInputFormat
return lzoTextInputFormat.listStatusWrapper(conf);
}

@Override
public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
// Delegate to DeprecatedLzoTextInputFormat
return lzoTextInputFormat.getSplits(conf, numSplits);
}

public RecordReader<Text, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {

FileSplit fileSplit = (FileSplit) split;
if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
reporter.setStatus(split.toString());
return new LzoStreamingLineRecordReader(conf, (FileSplit)split);
} else {
// delegate non-LZO files to KeyValueTextInputFormat
return kvTextInputFormat.getRecordReader(split, conf, reporter);
}
}
}
140 changes: 140 additions & 0 deletions src/java/com/hadoop/mapred/LzoStreamingLineRecordReader.java
@@ -0,0 +1,140 @@
/*
* This file is part of Hadoop-Gpl-Compression.
*
* Hadoop-Gpl-Compression is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Hadoop-Gpl-Compression is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hadoop-Gpl-Compression. If not, see
* <http://www.gnu.org/licenses/>.
*/

package com.hadoop.mapred;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

/**
* This class treats a line in the input as a key/value pair separated by a
* separator character. The separator can be specified in config file
* under the attribute name key.value.separator.in.input.line. The default
* separator is the tab character ('\t').
*
* Note: this class is basically a copy of
* {@link org.apache.hadoop.mapred.KeyValueLineRecordReader}, except that it
* uses {@link DeprecatedLzoLineRecordReader} as the internal line reader.
*/
@SuppressWarnings("deprecation")
public class LzoStreamingLineRecordReader implements RecordReader<Text, Text> {
private final DeprecatedLzoLineRecordReader lzoLineRecordReader;
private byte separator = (byte) '\t';
private LongWritable dummyKey;
private Text innerValue;

public Class getKeyClass() {
return Text.class;
}

@Override
public Text createKey() {
return new Text();
}

@Override
public Text createValue() {
return new Text();
}

public LzoStreamingLineRecordReader(Configuration job, FileSplit split)
throws IOException {

lzoLineRecordReader = new DeprecatedLzoLineRecordReader(job, split);
dummyKey = lzoLineRecordReader.createKey();
innerValue = lzoLineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", "\t");
this.separator = (byte) sepStr.charAt(0);
}

/**
* Note: copied from org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader,
* which is a library class that's missing from the hadoop jar that's in the
* lib/ directory. Maybe it should be added.
*/
public static int findSeparator(byte[] utf, int start, int length,
byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}

/** Read key/value pair in a line. */
@Override
public synchronized boolean next(Text key, Text value)
throws IOException {
byte[] line = null;
int lineLen = -1;
if (lzoLineRecordReader.next(dummyKey, innerValue)) {
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
LzoStreamingLineRecordReader.setKeyValue(key, value, line, lineLen, pos);
return true;
}

@Override
public float getProgress() throws IOException {
return lzoLineRecordReader.getProgress();
}

@Override
public synchronized long getPos() throws IOException {
return lzoLineRecordReader.getPos();
}

@Override
public synchronized void close() throws IOException {
lzoLineRecordReader.close();
}

/**
* Note: copied from org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader,
* which is a library class that's missing from the hadoop jar that's in the
* lib/ directory. Maybe it should be added.
*/
private static void setKeyValue(Text key, Text value, byte[] line,
int lineLen, int pos) {
if (pos == -1) {
key.set(line, 0, lineLen);
value.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
key.set(keyBytes);
value.set(valBytes);
}
}
}

0 comments on commit 806e494

Please sign in to comment.