Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More progress on file round-trips #23

Closed
wants to merge 11 commits into from
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ set(FEATHER_TEST_LINK_LIBS ${FEATHER_MIN_TEST_LIBS})
# Library config

set(LIBFEATHER_SRCS
src/feather/buffer.cc
src/feather/io.cc
src/feather/metadata.cc
src/feather/reader.cc
src/feather/types.cc
src/feather/writer.cc
)

Expand Down
8 changes: 8 additions & 0 deletions src/feather/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
# limitations under the License.

install(FILES
buffer.h
common.h
exception.h
io.h
metadata.h
reader.h
types.h
writer.h
DESTINATION include/feather)

add_library(feather_test_main
Expand Down
41 changes: 41 additions & 0 deletions src/feather/buffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2016 Feather Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "feather/buffer.h"

namespace feather {

Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset,
int64_t size) {
data_ = parent->data() + offset;
size_ = size;
parent_ = parent;
}

std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() {
return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size());
}

OwnedMutableBuffer::OwnedMutableBuffer(int64_t size) {
Resize(size);
}

void OwnedMutableBuffer::Resize(int64_t new_size) {
size_ = new_size;
buffer_owner_.resize(new_size);
data_ = buffer_owner_.data();
mutable_data_ = buffer_owner_.data();
}

} // namespace feather
108 changes: 108 additions & 0 deletions src/feather/buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2016 Feather Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef FEATHER_BUFFER_H
#define FEATHER_BUFFER_H

#include <cstdint>
#include <cstdlib>
#include <memory>
#include <vector>

namespace feather {

// ----------------------------------------------------------------------
// Buffer classes

// Immutable API for a chunk of bytes which may or may not be owned by the
// class instance
class Buffer : public std::enable_shared_from_this<Buffer> {
public:
Buffer(const uint8_t* data, int64_t size) :
data_(data),
size_(size) {}

// An offset into data that is owned by another buffer, but we want to be
// able to retain a valid pointer to it even after other shared_ptr's to the
// parent buffer have been destroyed
Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that just be a std::shared_ptr<Buffer>?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, otherwise you end up copying the passed argument twice (once on pass-by-value, then again when assigning to Buffer::parent_)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but why do you care about that?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, copying a shared pointer is not totally trivial, since there are C++11 atomics involved with managing the internal reference count.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://msdn.microsoft.com/en-us/library/hh279669.aspx

"Pass the shared_ptr by reference or const reference. In this case, the reference count is not incremented, and the callee can access the pointer as long as the caller does not go out of scope. Or, the callee can decide to create a shared_ptr based on the reference, and thereby become a shared owner. Use this option when the caller has no knowledge of the callee, or when you must pass a shared_ptr and want to avoid the copy operation for performance reasons."


std::shared_ptr<Buffer> get_shared_ptr() {
return shared_from_this();
}

const uint8_t* data() const {
return data_;
}

int64_t size() const {
return size_;
}

// Returns true if this Buffer is referencing memory (possibly) owned by some
// other buffer
bool is_shared() const {
return static_cast<bool>(parent_);
}

const std::shared_ptr<Buffer> parent() const {
return parent_;
}

protected:
const uint8_t* data_;
int64_t size_;

// nullptr by default, but may be set
std::shared_ptr<Buffer> parent_;
};

// A Buffer whose contents can be mutated
class MutableBuffer : public Buffer {
public:
MutableBuffer(uint8_t* data, int64_t size) :
Buffer(data, size) {
mutable_data_ = data;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me who ends up owning the data for this class

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No owner indicated here, only that you can mutate the buffer. I can add more detailed comments

}

uint8_t* mutable_data() {
return mutable_data_;
}

// Get a read-only view of this buffer
std::shared_ptr<Buffer> GetImmutableView();

protected:
MutableBuffer() :
Buffer(nullptr, 0),
mutable_data_(nullptr) {}

uint8_t* mutable_data_;
};

// A MutableBuffer whose memory is owned by the class instance. For example,
// for reading data out of files that you want to deallocate when this class is
// garbage-collected
class OwnedMutableBuffer : public MutableBuffer {
public:
explicit OwnedMutableBuffer(int64_t size);
void Resize(int64_t new_size);

private:
std::vector<uint8_t> buffer_owner_;
};

} // namespace feather

#endif // FEATHER_BUFFER_H
28 changes: 14 additions & 14 deletions src/feather/io-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>

#include <gtest/gtest.h>

#include <cstdint>
#include <memory>

#include "feather/buffer.h"
#include "feather/io.h"
#include "feather/test-common.h"

Expand All @@ -24,20 +26,20 @@ namespace feather {
TEST(TestBufferReader, Basics) {
std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

std::unique_ptr<BufferReader> reader(new BufferReader(&data[0], data.size()));
auto data_buffer = std::make_shared<Buffer>(&data[0], data.size());
std::unique_ptr<BufferReader> reader(new BufferReader(data_buffer));

ASSERT_EQ(0, reader->Tell());
ASSERT_EQ(10, reader->size());

size_t bytes_read;
const uint8_t* buffer = reader->ReadNoCopy(4, &bytes_read);
ASSERT_EQ(4, bytes_read);
ASSERT_EQ(0, memcmp(buffer, &data[0], bytes_read));
std::shared_ptr<Buffer> buffer = reader->Read(4);
ASSERT_EQ(4, buffer->size());
ASSERT_EQ(0, memcmp(buffer->data(), &data[0], buffer->size()));
ASSERT_EQ(4, reader->Tell());

buffer = reader->ReadNoCopy(10, &bytes_read);
ASSERT_EQ(6, bytes_read);
ASSERT_EQ(0, memcmp(buffer, &data[4], bytes_read));
buffer = reader->Read(10);
ASSERT_EQ(6, buffer->size());
ASSERT_EQ(0, memcmp(buffer->data(), &data[4], buffer->size()));
ASSERT_EQ(10, reader->Tell());
}

Expand All @@ -50,10 +52,8 @@ TEST(TestInMemoryOutputStream, Basics) {
ASSERT_EQ(4, stream->Tell());
stream->Write(&data[4], data.size() - 4);

std::vector<uint8_t> out;
stream->Transfer(&out);

assert_vector_equal(data, out);
std::shared_ptr<Buffer> buffer = stream->Finish();
ASSERT_EQ(0, memcmp(buffer->data(), &data[0], data.size()));
}

} // namespace feather
70 changes: 41 additions & 29 deletions src/feather/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@

namespace feather {

// ----------------------------------------------------------------------
// Buffer and its subclasses

// ----------------------------------------------------------------------
// BufferReader

size_t BufferReader::Tell() {
std::shared_ptr<Buffer> RandomAccessReader::ReadAt(int64_t position,
int64_t nbytes) {
// TODO(wesm): boundchecking
Seek(position);
return Read(nbytes);
}

int64_t BufferReader::Tell() const {
return pos_;
}

void BufferReader::Seek(size_t pos) {
if (pos >= size_) {
void BufferReader::Seek(int64_t pos) {
if (pos < 0 || pos >= size_) {
std::stringstream ss;
ss << "Cannot seek to " << pos
<< "File is length " << size_;
Expand All @@ -39,15 +49,10 @@ void BufferReader::Seek(size_t pos) {
pos_ = pos;
}

size_t BufferReader::ReadInto(size_t nbytes, uint8_t* out) {
FeatherException::NYI("not implemented");
return 0;
}

const uint8_t* BufferReader::ReadNoCopy(size_t nbytes, size_t* bytes_available) {
*bytes_available = std::min(nbytes, size_ - pos_);
const uint8_t* result = Head();
pos_ += *bytes_available;
std::shared_ptr<Buffer> BufferReader::Read(int64_t nbytes) {
int64_t bytes_available = std::min(nbytes, size_ - pos_);
auto result = std::make_shared<Buffer>(Head(), bytes_available);
pos_ += bytes_available;
return result;
}

Expand All @@ -70,7 +75,7 @@ std::unique_ptr<LocalFileReader> LocalFileReader::Open(const std::string& path)
throw FeatherException("Unable to seek to end of file");
}

size_t size = ftell(file);
int64_t size = ftell(file);

auto result = std::unique_ptr<LocalFileReader>(
new LocalFileReader(path, size, file));
Expand All @@ -86,63 +91,70 @@ void LocalFileReader::CloseFile() {
}
}

void LocalFileReader::Seek(size_t pos) {
void LocalFileReader::Seek(int64_t pos) {
fseek(file_, pos, SEEK_SET);
}

size_t LocalFileReader::Tell() {
int64_t LocalFileReader::Tell() const {
return ftell(file_);
}

size_t LocalFileReader::ReadInto(size_t nbytes, uint8_t* buffer) {
size_t bytes_read = fread(buffer, 1, nbytes, file_);
std::shared_ptr<Buffer> LocalFileReader::Read(int64_t nbytes) {
auto buffer = std::make_shared<OwnedMutableBuffer>(nbytes);
int64_t bytes_read = fread(buffer->mutable_data(), 1, nbytes, file_);
if (bytes_read < nbytes) {
// Exception if not EOF
if (!feof(file_)) {
throw FeatherException("Error reading bytes from file");
}

buffer->Resize(bytes_read);
}
return bytes_read;
return buffer;
}

// ----------------------------------------------------------------------
// In-memory output stream

InMemoryOutputStream::InMemoryOutputStream(size_t initial_capacity) :
InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) :
size_(0),
capacity_(initial_capacity) {
if (initial_capacity == 0) {
initial_capacity = 1024;
}
buffer_.resize(initial_capacity);
buffer_.reset(new OwnedMutableBuffer(initial_capacity));
}

uint8_t* InMemoryOutputStream::Head() {
return &buffer_[size_];
return buffer_->mutable_data() + size_;
}

void InMemoryOutputStream::Write(const uint8_t* data, size_t length) {
void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
if (size_ + length > capacity_) {
size_t new_capacity = capacity_ * 2;
int64_t new_capacity = capacity_ * 2;
while (new_capacity < size_ + length) {
new_capacity *= 2;
}
buffer_.resize(new_capacity);
buffer_->Resize(new_capacity);
capacity_ = new_capacity;
}
memcpy(Head(), data, length);
size_ += length;
}

size_t InMemoryOutputStream::Tell() {
int64_t InMemoryOutputStream::Tell() const {
return size_;
}

void InMemoryOutputStream::Transfer(std::vector<uint8_t>* out) {
buffer_.resize(size_);
buffer_.swap(*out);
std::shared_ptr<Buffer> InMemoryOutputStream::Finish() {
buffer_->Resize(size_);
std::shared_ptr<Buffer> result = buffer_;
buffer_ = nullptr;

// TODO(wesm): raise exceptions if user calls Write after Finish
size_ = 0;
capacity_ = buffer_.size();
capacity_ = 0;
return result;
}

} // namespace feather
Loading