Skip to content

Commit

Permalink
Attempting to make parquet support better for windows.
Browse files Browse the repository at this point in the history
  • Loading branch information
cnuernber committed May 17, 2021
1 parent 0f0d000 commit 490500c
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 2 deletions.
193 changes: 193 additions & 0 deletions java/tech/v3/dataset/LocalInputFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (c) 2010-2020 Haifeng Li. All rights reserved.
*
* Smile is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Smile is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Smile. If not, see <https://www.gnu.org/licenses/>.
*/

package tech.v3.dataset;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;

/**
* Parquet InputFile with a local java.nio.Path.
* Adapted from https://github.com/tideworks/arvo2parquet
*
* @author Haifeng Li
* @hacks Chris Nuernberger
*/
public class LocalInputFile implements InputFile {
/** Local file object. */
private final RandomAccessFile input;

/**
* Constructor.
* @param path the input file path.
* @throws FileNotFoundException when file cannot be found.
*/
public LocalInputFile(Path path) throws FileNotFoundException {
input = new RandomAccessFile(path.toFile(), "r");
}

@Override
public long getLength() throws IOException {
return input.length();
}

@Override
public SeekableInputStream newStream() throws IOException {
return new SeekableInputStream() {
private final byte[] page = new byte[8192];
private long markPos = 0;

@Override
public int read() throws IOException {
return input.read();
}

@Override
public int read(byte[] b) throws IOException {
return input.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return input.read(b, off, len);
}

@Override
public long skip(long n) throws IOException {
final long savPos = input.getFilePointer();
final long amtLeft = input.length() - savPos;
n = Math.min(n, amtLeft);
final long newPos = savPos + n;
input.seek(newPos);
final long curPos = input.getFilePointer();
return curPos - savPos;
}

@Override
public int available() {
return 0;
}

@Override
public void close() throws IOException {
input.close();
}

@SuppressWarnings({"unchecked", "unused", "UnusedReturnValue"})
private <T extends Throwable, R> R uncheckedExceptionThrow(Throwable t) throws T {
throw (T) t;
}

@Override
public synchronized void mark(int readlimit) {
try {
markPos = input.getFilePointer();
} catch (IOException e) {
uncheckedExceptionThrow(e);
}
}

@Override
public synchronized void reset() throws IOException {
input.seek(markPos);
}

@Override
public boolean markSupported() {
return true;
}

@Override
public long getPos() throws IOException {
return input.getFilePointer();
}

@Override
public void seek(long l) throws IOException {
input.seek(l);
}

@Override
public void readFully(byte[] bytes) throws IOException {
input.readFully(bytes);
}

@Override
public void readFully(byte[] bytes, int i, int i1) throws IOException {
input.readFully(bytes, i, i1);
}

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
return readDirectBuffer(byteBuffer, page, input::read);
}

@Override
public void readFully(ByteBuffer byteBuffer) throws IOException {
readFullyDirectBuffer(byteBuffer, page, input::read);
}
};
}

private interface ByteBufferReader {
int read(byte[] b, int off, int len) throws IOException;
}

private int readDirectBuffer(ByteBuffer byteBuffer, byte[] page, ByteBufferReader reader) throws IOException {
// copy all the bytes that return immediately, stopping at the first
// read that doesn't return a full buffer.
int nextReadLength = Math.min(byteBuffer.remaining(), page.length);
int totalBytesRead = 0;
int bytesRead;

while ((bytesRead = reader.read(page, 0, nextReadLength)) == page.length) {
byteBuffer.put(page);
totalBytesRead += bytesRead;
nextReadLength = Math.min(byteBuffer.remaining(), page.length);
}

if (bytesRead < 0) {
// return -1 if nothing was read
return totalBytesRead == 0 ? -1 : totalBytesRead;
} else {
// copy the last partial buffer
byteBuffer.put(page, 0, bytesRead);
totalBytesRead += bytesRead;
return totalBytesRead;
}
}

private static void readFullyDirectBuffer(ByteBuffer byteBuffer, byte[] page, ByteBufferReader reader) throws IOException {
int nextReadLength = Math.min(byteBuffer.remaining(), page.length);
int bytesRead = 0;

while (nextReadLength > 0 && (bytesRead = reader.read(page, 0, nextReadLength)) >= 0) {
byteBuffer.put(page, 0, bytesRead);
nextReadLength = Math.min(byteBuffer.remaining(), page.length);
}

if (bytesRead < 0 && byteBuffer.remaining() > 0) {
throw new EOFException("Reached the end of stream with " + byteBuffer.remaining() + " bytes left to read");
}
}
}
14 changes: 12 additions & 2 deletions src/tech/v3/libs/parquet.clj
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ org.apache.hadoop/hadoop-mapreduce-client-core {:mvn/version \"3.3.0\"
;; Behold my Kindom of Nouns...And Tremble!!!!
(:import [smile.io HadoopInput]
[tech.v3.datatype PrimitiveList Buffer]
[tech.v3.dataset Text ParquetRowWriter ParquetRowWriter$WriterBuilder]
[tech.v3.dataset Text ParquetRowWriter ParquetRowWriter$WriterBuilder
LocalInputFile]
[tech.v3.dataset.io.column_parsers PParser]
[org.apache.hadoop.conf Configuration]
[java.time.temporal TemporalAccessor TemporalField ChronoField]
Expand Down Expand Up @@ -706,7 +707,16 @@ org.apache.hadoop/hadoop-mapreduce-client-core {:mvn/version \"3.3.0\"
(instance? ParquetFileReader data)
data
(string? data)
(ParquetFileReader/open (HadoopInput/file data))
;;unwrap url if exists
(-> (io/file data)
;;get canonical path
(.getPath)
;;make a path out of it
(Paths/get (into-array String []))
;;from a path we get a local input file
(LocalInputFile.)
;;from that we get a parquet reader
(ParquetFileReader/open))
(instance? org.apache.hadoop.fs.Path data)
(ParquetFileReader/open ^org.apache.hadoop.fs.Path data)
(instance? InputFile data)
Expand Down

0 comments on commit 490500c

Please sign in to comment.