Skip to content

Commit

Permalink
fix handling of empty lines, fix secondary index stream reset, extend…
Browse files Browse the repository at this point in the history
… test coverage
  • Loading branch information
rweng committed Dec 2, 2011
1 parent 41912e4 commit 8bd9f35
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 78 deletions.
65 changes: 56 additions & 9 deletions src/main/java/de/rwhq/hdfs/index/AbstractMultiFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@
* - the hdfs file name and path - the identifier to be indexed (column, xml-path, ...) - the starting position in the
* hdfs file for the index - the end position in the hdfs file for the index
* <p/>
* The file name and path are stored in the structure in the index directory. The identifier and starting position could
* The file name and path are stored in the structure in the index directory. The identifier and starting position
* could
* be stored directly in the file name. However, as it is unsure where the indexing will end at the time the index file
* is created, it is not possible to store the end position in the file name (assuming we dont want to rename). Thus, a
* properties file is required.
*/
public abstract class AbstractMultiFileIndex<K, V> implements Index<K,V> {
public abstract class AbstractMultiFileIndex<K, V> implements Index<K, V> {
private static Log LOG = LogFactory.getLog(AbstractMultiFileIndex.class);

protected String hdfsFile;
Expand All @@ -69,8 +70,6 @@ public abstract class AbstractMultiFileIndex<K, V> implements Index<K,V> {
protected FixLengthSerializer<K, byte[]> keySerializer;
protected KeyExtractor<K> keyExtractor;

/* if an exception occured during key extraction */
private boolean extractorException = false;
protected TreeSet<Range<K>> defaultSearchRanges;
private FixLengthSerializer<V, byte[]> valueSerializer;
private MFIProperties properties;
Expand All @@ -86,6 +85,10 @@ public boolean addLine(String line, long startPos, long endPos) {

ensureOpen();

if (line.equals("")) {
handleEmptyLine(line, startPos, endPos);
return false;
}

if (!ourLock && isLocked()) {
return lineMatchesSearchRange(line);
Expand All @@ -108,7 +111,6 @@ public boolean addLine(String line, long startPos, long endPos) {
saveWriteTree();
}

// LOG.info("adding tree to cache");
if (writingTreePropertyEntry.startPos == null)
writingTreePropertyEntry.startPos = startPos;

Expand All @@ -124,6 +126,52 @@ public boolean addLine(String line, long startPos, long endPos) {
}
}

/**
* an empty line could have multiple resons: it could be just an empty line, or it could be that the hdfs
* file got appended and now has a new line (where it wasn't terminated with one before)
* <p/>
* Independent, we ommit the line. There are three cases we got to consider:
* <ol>
* <li>we are currently creating an index ourselves and got the lock. Then we just extend this indexes coverage</li>
* <li>we are not an index but we can add it to a previous or next indexes coverage</li>
* <li>neither of both, in which case we just ommit the line. It will extend an coverage later anyway.</li>
* </ol>
*/
private void handleEmptyLine(String line, long startPos, long endPos) {

// first check case 1.
if (ourLock) {
if (writingTreePropertyEntry.startPos == null)
writingTreePropertyEntry.startPos = startPos;

writingTreePropertyEntry.endPos = endPos;
}

// case 2, previous index
MFIProperties.MFIProperty p = properties.propertyForPos(startPos - 1);
if (p != null) {
p.endPos = endPos;
try {
properties.write();
return;
} catch (IOException e) {
LOG.error("could not extend index: ", e);
}
}

// case 2, next index
p = properties.propertyForPos(endPos + 1);
if (p != null) {
p.startPos = startPos;
try {
properties.write();
return;
} catch (IOException e) {
LOG.error("could not extend index: ", e);
}
}
}

/** {@inheritDoc} */
@Override
public boolean isOpen() {
Expand Down Expand Up @@ -152,7 +200,7 @@ public void open() throws IOException {
if (properties.exists())
properties.read();

if(LOG.isDebugEnabled())
if (LOG.isDebugEnabled())
LOG.debug("Index opened. Properties: " + properties);

isOpen = true;
Expand Down Expand Up @@ -194,11 +242,10 @@ public SortedSet<Range<Long>> toRanges() {
}



@Override
public Iterator<String> getIterator() {
ensureOpen();

Iterator<Iterator<String>> iterator =
Iterators.transform(toRanges().iterator(), new Function<Range<Long>, Iterator<String>>() {

Expand Down Expand Up @@ -398,7 +445,7 @@ private List<BTree<K, V>> getTreeList() {
} catch (IOException e) {
LOG.error("Could not load properties, operating on old instance.", e);
}

// filter trees not in split range
Collection<MFIProperties.MFIProperty> filted =
Collections2.filter(properties.asList(), new Predicate<MFIProperties.MFIProperty>() {
Expand Down
50 changes: 40 additions & 10 deletions src/main/java/de/rwhq/hdfs/index/IndexedRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import de.rwhq.btree.Range;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.hsqldb.index.RowIterator;
import org.apache.hadoop.util.LineReader;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Iterator;

Expand All @@ -18,6 +21,7 @@
/** Special kind of LineRecordReader. It tries to create an index over the hdfs-file. Therefore, */
public class IndexedRecordReader extends LineRecordReader {
private static final Log LOG = LogFactory.getLog(IndexedRecordReader.class);
private Configuration conf;

private static FileSplit inputToFileSplit(InputSplit inputSplit) {
FileSplit split;
Expand All @@ -32,20 +36,23 @@ private static FileSplit inputToFileSplit(InputSplit inputSplit) {

private Iterator<Range<Long>> rangesIterator;
private Range<Long> currentRange;
private Iterator<String> currentIterator;
private Iterator<String> currentRangeIterator;
private Index index;
private FileSplit split;


/** {@inheritDoc} */
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
super.initialize(genericSplit, context);
this.split = inputToFileSplit(genericSplit);
this.conf = context.getConfiguration();

int mb = 1024 * 1024;
LOG.info("Max memory: " + (Runtime.getRuntime().maxMemory() / mb));
LOG.info("Total Memory:" + (Runtime.getRuntime().totalMemory() / mb));

try {
LOG.info("genericSplit.getLocations(): " + Arrays.toString(genericSplit.getLocations()));
LOG.info("generic Split length: " + genericSplit.getLength());
Expand All @@ -63,7 +70,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context)
try {
IndexBuilder builder = (IndexBuilder) builderClass.getConstructor().newInstance();
index = builder
.hdfsFilePath(inputToFileSplit(genericSplit).getPath().toString())
.hdfsFilePath(split.getPath().toString())
.jobConfiguration(context.getConfiguration())
.inputStream(fileIn)
.fileSplit(inputToFileSplit(genericSplit))
Expand All @@ -78,11 +85,14 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context)
index.open();

// initialize ranges and iterator
if(LOG.isDebugEnabled())
LOG.debug("index ranges: " + index.toRanges());

rangesIterator = index.toRanges().iterator();
if(rangesIterator.hasNext())
currentRange = rangesIterator.next();
if(currentRange != null)
currentIterator = index.getIterator(currentRange);
currentRangeIterator = index.getIterator(currentRange);
}
// create a text object for efficiency
value = new Text();
Expand Down Expand Up @@ -126,21 +136,41 @@ private String nextFromIndex() throws IOException {
if(currentRange == null)
return null;

if(LOG.isDebugEnabled())
LOG.debug("nextFromIndex(): currentRange: " + currentRange + " - pos: " + pos);

// if the currentRange did not yet start
if(pos < currentRange.getFrom())
return null;

// if the current iterator does not have any more values, set to next range
if(!currentIterator.hasNext()){
// reset pos
if(!currentRangeIterator.hasNext()){
// set pos so that it can be compared when we call nextFromIndex() later
// only if this returns null, we are going to adjust the LineReader
pos = currentRange.getTo() + 1;

currentRange = rangesIterator.hasNext() ? rangesIterator.next() : null;
currentIterator = currentRange == null ? null : index.getIterator(currentRange);
return nextFromIndex();
currentRangeIterator = currentRange == null ? null : index.getIterator(currentRange);

String next = nextFromIndex();

// if the next index does not directly continue, reset pos etc
if(next == null){
// reset pos
fileIn.seek(pos);

CompressionCodec codec = compressionCodecs.getCodec(split.getPath());
if(codec == null)
in = new LineReader(fileIn, conf);
else
in = new LineReader(codec.createInputStream(fileIn), conf);
}


return next;
}

// if the currentIterator has more values
return currentIterator.next();
return currentRangeIterator.next();
}
}
4 changes: 2 additions & 2 deletions src/main/java/de/rwhq/hdfs/index/LineRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class);

@SuppressWarnings({"FieldCanBeLocal"})
private CompressionCodecFactory compressionCodecs = null;
protected CompressionCodecFactory compressionCodecs = null;
private long start;
long pos; // made package visible for getting and setting in IndexedRecordReader and ..!!!
private long end;
private LineReader in;
protected LineReader in; // made protected
private int maxLineLength;
protected LongWritable key = null;
protected Text value = null; // made protected for setting in IndexedRecordReader!!!
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/de/rwhq/hdfs/index/MFIProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public boolean exists() {
return new File(path).exists();
}

/**
* @param pos
* @return MFIProperty or null, if pos is not contained
*/
public MFIProperty propertyForPos(long pos) {
for(MFIProperty p : properties) {
if (p.startPos <= pos && p.endPos >= pos) {
Expand Down Expand Up @@ -99,13 +103,7 @@ public MFIProperty getPropertyForRange(Range<Long> range) {
}

public boolean contains(long startPos) {
for(MFIProperty p : properties){
if(p.startPos<=startPos && p.endPos >=startPos){
return true;
}
}

return false;
return propertyForPos(startPos) != null;
}


Expand Down
Loading

0 comments on commit 8bd9f35

Please sign in to comment.