Skip to content

Commit

Permalink
Add AbstractOrcDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Feb 15, 2015
1 parent e39bbd9 commit 80c04cd
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 171 deletions.
Expand Up @@ -13,112 +13,27 @@
*/ */
package com.facebook.presto.hive.orc; package com.facebook.presto.hive.orc;


import com.facebook.presto.orc.DiskRange; import com.facebook.presto.orc.AbstractOrcDataSource;
import com.facebook.presto.orc.OrcDataSource;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;


import java.io.IOException; import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;


public class HdfsOrcDataSource public class HdfsOrcDataSource
implements OrcDataSource extends AbstractOrcDataSource
{ {
private final FSDataInputStream inputStream; private final FSDataInputStream inputStream;
private final String path;
private final long size;
private final DataSize maxMergeDistance;
private final DataSize maxReadSize;
private long readTimeNanos;

public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, DataSize maxMergeDistance, DataSize maxReadSize)
{
this.path = checkNotNull(path, "path is null");
this.inputStream = checkNotNull(inputStream, "inputStream is null");
this.size = size;
checkArgument(size >= 0, "size is negative");

this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
this.maxReadSize = checkNotNull(maxReadSize, "maxReadSize is null");
}

@Override
public void close()
throws IOException
{
inputStream.close();
}


@Override public HdfsOrcDataSource(String name, long size, DataSize maxMergeDistance, DataSize maxReadSize, FSDataInputStream inputStream)
public long getReadTimeNanos()
{ {
return readTimeNanos; super(name, size, maxMergeDistance, maxReadSize);
this.inputStream = inputStream;
} }


@Override @Override
public long getSize() protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
{
return size;
}

@Override
public void readFully(long position, byte[] buffer)
throws IOException throws IOException
{ {
readFully(position, buffer, 0, buffer.length);
}

@Override
public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException
{
long start = System.nanoTime();

inputStream.readFully(position, buffer, bufferOffset, bufferLength); inputStream.readFully(position, buffer, bufferOffset, bufferLength);

readTimeNanos += System.nanoTime() - start;
}

@Override
public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
throws IOException
{
checkNotNull(diskRanges, "diskRanges is null");

if (diskRanges.isEmpty()) {
return ImmutableMap.of();
}

Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize);

// read ranges
Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
for (DiskRange mergedRange : mergedRanges) {
// read full range in one request
byte[] buffer = new byte[mergedRange.getLength()];
readFully(mergedRange.getOffset(), buffer);
buffers.put(mergedRange, buffer);
}

ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers));
}
return slices.build();
}

@Override
public String toString()
{
return path;
} }
} }
@@ -0,0 +1,116 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.orc;

import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public abstract class AbstractOrcDataSource
implements OrcDataSource
{
private final String name;
private final long size;
private final DataSize maxMergeDistance;
private final DataSize maxReadSize;
private long readTimeNanos;

public AbstractOrcDataSource(String name, long size, DataSize maxMergeDistance, DataSize maxReadSize)
{
this.name = checkNotNull(name, "name is null");

this.size = size;
checkArgument(size >= 0, "size is negative");

this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
this.maxReadSize = checkNotNull(maxReadSize, "maxReadSize is null");
}

protected abstract void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException;

@Override
public final long getReadTimeNanos()
{
return readTimeNanos;
}

@Override
public final long getSize()
{
return size;
}

@Override
public final void readFully(long position, byte[] buffer)
throws IOException
{
readFully(position, buffer, 0, buffer.length);
}

@Override
public final void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException
{
long start = System.nanoTime();

readInternal(position, buffer, bufferOffset, bufferLength);

readTimeNanos += System.nanoTime() - start;
}

@Override
public final <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
throws IOException
{
checkNotNull(diskRanges, "diskRanges is null");

if (diskRanges.isEmpty()) {
return ImmutableMap.of();
}

Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize);

// read ranges
Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
for (DiskRange mergedRange : mergedRanges) {
// read full range in one request
byte[] buffer = new byte[mergedRange.getLength()];
readFully(mergedRange.getOffset(), buffer);
buffers.put(mergedRange, buffer);
}

ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers));
}
return slices.build();
}

@Override
public final String toString()
{
return name;
}
}
Expand Up @@ -13,114 +13,44 @@
*/ */
package com.facebook.presto.orc; package com.facebook.presto.orc;


import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit; import io.airlift.units.DataSize.Unit;


import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
import static com.google.common.base.Preconditions.checkNotNull;


public class FileOrcDataSource public class FileOrcDataSource
implements OrcDataSource extends AbstractOrcDataSource
{ {
private final File path;
private final long size;
private final RandomAccessFile input; private final RandomAccessFile input;
private final DataSize maxMergeDistance;
private long readTimeNanos;


public FileOrcDataSource(File path, DataSize maxMergeDistance) public FileOrcDataSource(File path, DataSize maxMergeDistance)
throws IOException throws IOException
{ {
this.path = checkNotNull(path, "path is null"); this(path, maxMergeDistance, new DataSize(Integer.MAX_VALUE, Unit.BYTE));
this.size = path.length();
this.input = new RandomAccessFile(path, "r");

this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null");
} }


@Override public FileOrcDataSource(File path, DataSize maxMergeDistance, DataSize maxReadSize)
public void close() throws FileNotFoundException
throws IOException
{ {
input.close(); super(path.getPath(), path.length(), maxMergeDistance, maxReadSize);
} this.input = new RandomAccessFile(path, "r");

@Override
public long getReadTimeNanos()
{
return readTimeNanos;
}

@Override
public long getSize()
{
return size;
} }


@Override @Override
public void readFully(long position, byte[] buffer) public void close()
throws IOException throws IOException
{ {
readFully(position, buffer, 0, buffer.length); input.close();
} }


@Override @Override
public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
throws IOException throws IOException
{ {
long start = System.nanoTime();

input.seek(position); input.seek(position);
input.readFully(buffer, bufferOffset, bufferLength); input.readFully(buffer, bufferOffset, bufferLength);

readTimeNanos += System.nanoTime() - start;
}

@Override
public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
throws IOException
{
checkNotNull(diskRanges, "diskRanges is null");

if (diskRanges.isEmpty()) {
return ImmutableMap.of();
}

// TODO: benchmark alternatively strategies:
// 1) sort ranges and perform one read per range
// 2) single read with transferTo() using custom WritableByteChannel

Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, new DataSize(8, Unit.MEGABYTE));

// read ranges
Map<DiskRange, byte[]> buffers = new LinkedHashMap<>();
for (DiskRange mergedRange : mergedRanges) {
// read full range in one request
byte[] buffer = new byte[mergedRange.getLength()];
readFully(mergedRange.getOffset(), buffer);
buffers.put(mergedRange, buffer);
}

ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers));
}
return slices.build();
}

@Override
public String toString()
{
return path.getPath();
} }
} }
Expand Up @@ -34,4 +34,10 @@ void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)


<K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
throws IOException; throws IOException;

@Override
default void close()
throws IOException
{
}
} }

0 comments on commit 80c04cd

Please sign in to comment.