Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Update Thrift IO structs, add file sink #129

Merged
merged 4 commits into from
Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 10 additions & 30 deletions src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use std::convert::TryFrom;
use std::fs::File;
use std::io::{self, BufReader, Read, Seek, SeekFrom};
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::Path;
use std::rc::Rc;

Expand All @@ -37,7 +37,7 @@ use parquet_format::{PageType, PageHeader};
use record::reader::RowIter;
use schema::types::{self, SchemaDescriptor, Type as SchemaType};
use thrift::protocol::TCompactInputProtocol;
use util::io::FileChunk;
use util::io::FileSource;
use util::memory::ByteBufferPtr;

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -86,24 +86,6 @@ pub trait RowGroupReader {
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
}

/// A thin wrapper on `T: Read` to be used by Thrift transport. Write is not supported.
struct TMemoryBuffer<'a, T> where T: 'a + Read {
data: &'a mut T
}

impl<'a, T: 'a + Read> TMemoryBuffer<'a, T> {
fn new(data: &'a mut T) -> Self {
Self { data: data }
}
}

impl<'a, T: 'a + Read> Read for TMemoryBuffer<'a, T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let bytes_read = self.data.read(buf)?;
Ok(bytes_read)
}
}

// ----------------------------------------------------------------------
// Serialized impl for file & row group readers

Expand Down Expand Up @@ -159,10 +141,9 @@ impl SerializedFileReader {
}
buf.seek(SeekFrom::Start(metadata_start as u64))?;
let metadata_buf = buf.take(metadata_len as u64).into_inner();
let transport = TMemoryBuffer::new(metadata_buf);

// TODO: row group filtering
let mut prot = TCompactInputProtocol::new(transport);
let mut prot = TCompactInputProtocol::new(metadata_buf);
let mut t_file_metadata: TFileMetaData =
TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not parse metadata: {}", e)))?;
Expand Down Expand Up @@ -301,8 +282,8 @@ impl RowGroupReader for SerializedRowGroupReader {
col_start = col.dictionary_page_offset().unwrap();
}
let col_length = col.compressed_size();
let file_chunk = FileChunk::new(
self.buf.get_ref(), col_start as usize, col_length as usize);
let file_chunk = FileSource::new(
self.buf.get_ref(), col_start as u64, col_length as usize);
let page_reader = SerializedPageReader::new(
file_chunk,
col.num_values(),
Expand Down Expand Up @@ -344,9 +325,9 @@ impl RowGroupReader for SerializedRowGroupReader {

/// A serialized implementation for Parquet [`PageReader`].
pub struct SerializedPageReader {
// The file chunk buffer which references exactly the bytes for the column trunk
// The file source buffer which references exactly the bytes for the column trunk
// to be read by this page reader.
buf: FileChunk,
buf: FileSource,

// The compression codec for this column chunk. Only set for non-PLAIN codec.
decompressor: Option<Box<Codec>>,
Expand All @@ -362,9 +343,9 @@ pub struct SerializedPageReader {
}

impl SerializedPageReader {
/// Creates a new serialized page reader from file chunk.
/// Creates a new serialized page reader from file source.
fn new(
buf: FileChunk,
buf: FileSource,
total_num_values: i64,
compression: Compression,
physical_type: Type
Expand All @@ -382,8 +363,7 @@ impl SerializedPageReader {

/// Reads Page header from Thrift.
fn read_page_header(&mut self) -> Result<PageHeader> {
let transport = TMemoryBuffer::new(&mut self.buf);
let mut prot = TCompactInputProtocol::new(transport);
let mut prot = TCompactInputProtocol::new(&mut self.buf);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Ok(page_header)
}
Expand Down
141 changes: 123 additions & 18 deletions src/util/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@

use std::cmp;
use std::fs::File;
use std::io::{BufReader, Error, ErrorKind, Read, Result, Seek, SeekFrom};
use std::io::*;
use std::sync::Mutex;

// ----------------------------------------------------------------------
// Read/Write wrappers for `File`.

/// Position trait returns the current position in the stream.
/// Should be viewed as a lighter version of `Seek` that does not allow seek operations,
/// and does not require mutable reference for the current position.
pub trait Position {
/// Returns position in the stream.
fn pos(&self) -> u64;
}

/// Struct that represents a slice of a file data with independent start position and
/// length. Internally clones provided file handle, wraps with BufReader and resets
/// position before any read.
Expand All @@ -28,79 +39,173 @@ use std::sync::Mutex;
/// while preserving independent position, which is not available with `try_clone()`.
///
/// Designed after `arrow::io::RandomAccessFile`.
pub struct FileChunk {
pub struct FileSource {
reader: Mutex<BufReader<File>>,
start: usize, // start position in a file
end: usize // end position in a file
start: u64, // start position in a file
end: u64 // end position in a file
}

impl FileChunk {
impl FileSource {
/// Creates new file reader with start and length from a file handle
pub fn new(fd: &File, start: usize, length: usize) -> Self {
pub fn new(fd: &File, start: u64, length: usize) -> Self {
Self {
reader: Mutex::new(BufReader::new(fd.try_clone().unwrap())),
start: start,
end: start + length
end: start + length as u64
}
}
}

impl Read for FileChunk {
impl Read for FileSource {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let mut reader = self.reader.lock()
.map_err(|err| Error::new(ErrorKind::Other, err.to_string()))?;

let bytes_to_read = cmp::min(buf.len(), self.end - self.start);
let bytes_to_read = cmp::min(buf.len(), (self.end - self.start) as usize);
let buf = &mut buf[0..bytes_to_read];

reader.seek(SeekFrom::Start(self.start as u64))?;
let res = reader.read(buf);
if let Ok(bytes_read) = res {
self.start += bytes_read;
self.start += bytes_read as u64;
}

res
}
}

impl Position for FileSource {
fn pos(&self) -> u64 {
self.start
}
}

/// Struct that represents `File` output stream with position tracking.
/// Used as a sink in file writer.
pub struct FileSink {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we named this FileSink, should we rename FileChunk to FileSource?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sounds good - will update.

buf: BufWriter<File>,
// This is not necessarily position in the underlying file,
// but rather current position in the sink.
pos: u64
}

impl FileSink {
/// Creates new file sink.
/// Position is set to whatever position file has.
pub fn new(file: &File) -> Self {
let mut owned_file = file.try_clone().unwrap();
let pos = owned_file.seek(SeekFrom::Current(0)).unwrap();
Self {
buf: BufWriter::new(owned_file),
pos: pos
}
}
}

impl Write for FileSink {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
let num_bytes = self.buf.write(buf)?;
self.pos += num_bytes as u64;
Ok(num_bytes)
}

fn flush(&mut self) -> Result<()> {
self.buf.flush()
}
}

impl Position for FileSink {
fn pos(& self) -> u64 {
self.pos
}
}


#[cfg(test)]
mod tests {
use super::*;
use util::test_common::get_test_file;
use util::test_common::{get_temp_file, get_test_file};

#[test]
fn test_io_read_fully() {
let mut buf = vec![0; 8];
let mut chunk = FileChunk::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);

let bytes_read = chunk.read(&mut buf[..]).unwrap();
let bytes_read = src.read(&mut buf[..]).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1', 0, 0, 0, 0]);
}

#[test]
fn test_io_read_in_chunks() {
let mut buf = vec![0; 4];
let mut chunk = FileChunk::new(&get_test_file("alltypes_plain.parquet"), 0, 4);
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);

let bytes_read = chunk.read(&mut buf[0..2]).unwrap();
let bytes_read = src.read(&mut buf[0..2]).unwrap();
assert_eq!(bytes_read, 2);
let bytes_read = chunk.read(&mut buf[2..]).unwrap();
let bytes_read = src.read(&mut buf[2..]).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1']);
}

#[test]
fn test_io_read_pos() {
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);

src.read(&mut vec![0; 1]).unwrap();
assert_eq!(src.pos(), 1);

src.read(&mut vec![0; 4]).unwrap();
assert_eq!(src.pos(), 4);
}

#[test]
fn test_io_read_over_limit() {
let mut src = FileSource::new(&get_test_file("alltypes_plain.parquet"), 0, 4);

// Read all bytes from source
src.read(&mut vec![0; 128]).unwrap();
assert_eq!(src.pos(), 4);

// Try reading again, should return 0 bytes.
let bytes_read = src.read(&mut vec![0; 128]).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(src.pos(), 4);
}

#[test]
fn test_io_seek_switch() {
let mut buf = vec![0; 4];
let mut file = get_test_file("alltypes_plain.parquet");
let mut chunk = FileChunk::new(&file, 0, 4);
let mut src = FileSource::new(&file, 0, 4);

file.seek(SeekFrom::Start(5 as u64)).expect("File seek to a position");

let bytes_read = chunk.read(&mut buf[..]).unwrap();
let bytes_read = src.read(&mut buf[..]).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, vec![b'P', b'A', b'R', b'1']);
}

#[test]
fn test_io_write_with_pos() {
let mut file = get_temp_file("file_sink_test", &[b'a', b'b', b'c']);
file.seek(SeekFrom::Current(3)).unwrap();

// Write into sink
let mut sink = FileSink::new(&file);
assert_eq!(sink.pos(), 3);

sink.write(&[b'd', b'e', b'f', b'g']).unwrap();
assert_eq!(sink.pos(), 7);

sink.flush().unwrap();
assert_eq!(sink.pos(), file.seek(SeekFrom::Current(0)).unwrap());

// Read data using file chunk
let mut res = vec![0u8; 7];
let mut chunk = FileSource::new(&file, 0, file.metadata().unwrap().len() as usize);
chunk.read(&mut res[..]).unwrap();

assert_eq!(res, vec![b'a', b'b', b'c', b'd', b'e', b'f', b'g']);
}
}
7 changes: 5 additions & 2 deletions src/util/test_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
tmp_file.write_all(content).unwrap();
tmp_file.sync_all().unwrap();

// read file and return file handle
let file = fs::File::open(path_buf.as_path());
// return file handle for both read and write
let file = fs::OpenOptions::new()
.read(true)
.write(true)
.open(path_buf.as_path());
assert!(file.is_ok());
file.unwrap()
}