Skip to content

Commit

Permalink
PARQUET-525: Add test coverage for failure modes in ParseMetaData
Browse files Browse the repository at this point in the history
Author: Wes McKinney <wesm@apache.org>

Closes apache#60 from wesm/PARQUET-525 and squashes the following commits:

04eea0f [Wes McKinney] Test various invalid files checked in reader-internal.cc.
  • Loading branch information
wesm committed Sep 2, 2018
1 parent a73674a commit 1a6e207
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 17 deletions.
55 changes: 55 additions & 0 deletions cpp/src/parquet/file/file-deserialize-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,59 @@ TEST_F(TestPageSerde, LZONotSupported) {
ASSERT_THROW(InitSerializedPageReader(Compression::LZO), ParquetException);
}

// ----------------------------------------------------------------------
// File structure tests

class TestParquetFileReader : public ::testing::Test {
public:
void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) {
std::unique_ptr<BufferReader> reader(new BufferReader(buffer));
reader_.reset(new ParquetFileReader());

ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(reader))),
ParquetException);
}

protected:
std::unique_ptr<ParquetFileReader> reader_;
};

TEST_F(TestParquetFileReader, InvalidHeader) {
const char* bad_header = "PAR2";

auto buffer = std::make_shared<Buffer>(
reinterpret_cast<const uint8_t*>(bad_header), strlen(bad_header));
AssertInvalidFileThrows(buffer);
}

TEST_F(TestParquetFileReader, InvalidFooter) {
// File is smaller than FOOTER_SIZE
const char* bad_file = "PAR1PAR";
auto buffer = std::make_shared<Buffer>(
reinterpret_cast<const uint8_t*>(bad_file), strlen(bad_file));
AssertInvalidFileThrows(buffer);

// Magic number incorrect
const char* bad_file2 = "PAR1PAR2";
buffer = std::make_shared<Buffer>(
reinterpret_cast<const uint8_t*>(bad_file2), strlen(bad_file2));
AssertInvalidFileThrows(buffer);
}

TEST_F(TestParquetFileReader, IncompleteMetadata) {
InMemoryOutputStream stream;

const char* magic = "PAR1";

stream.Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic));
std::vector<uint8_t> bytes(10);
stream.Write(bytes.data(), bytes.size());
uint32_t metadata_len = 24;
stream.Write(reinterpret_cast<const uint8_t*>(&metadata_len), sizeof(uint32_t));
stream.Write(reinterpret_cast<const uint8_t*>(magic), strlen(magic));

auto buffer = stream.GetBuffer();
AssertInvalidFileThrows(buffer);
}

} // namespace parquet_cpp
9 changes: 2 additions & 7 deletions cpp/src/parquet/file/reader-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,8 @@ void SerializedFile::ParseMetaData() {
uint8_t footer_buffer[FOOTER_SIZE];
source_->Seek(filesize - FOOTER_SIZE);
size_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer);

if (bytes_read != FOOTER_SIZE) {
throw ParquetException("Invalid parquet file. Corrupt footer.");
}
if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
if (bytes_read != FOOTER_SIZE ||
memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
throw ParquetException("Invalid parquet file. Corrupt footer.");
}

Expand All @@ -262,15 +259,13 @@ void SerializedFile::ParseMetaData() {
throw ParquetException("Invalid parquet file. File is less than "
"file metadata size.");
}

source_->Seek(metadata_start);

std::vector<uint8_t> metadata_buffer(metadata_len);
bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
if (bytes_read != metadata_len) {
throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
}

DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);

schema::FlatSchemaConverter converter(&metadata_.schema[0],
Expand Down
54 changes: 48 additions & 6 deletions cpp/src/parquet/util/input.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ void LocalFileSource::Open(const std::string& path) {
path_ = path;
file_ = fopen(path_.c_str(), "r");
is_open_ = true;
fseek(file_, 0L, SEEK_END);
size_ = Tell();
Seek(0);
}

void LocalFileSource::Close() {
Expand All @@ -58,16 +61,15 @@ void LocalFileSource::CloseFile() {
}
}

int64_t LocalFileSource::Size() {
fseek(file_, 0L, SEEK_END);
return Tell();
}

void LocalFileSource::Seek(int64_t pos) {
fseek(file_, pos, SEEK_SET);
}

int64_t LocalFileSource::Tell() {
int64_t LocalFileSource::Size() const {
return size_;
}

int64_t LocalFileSource::Tell() const {
return ftell(file_);
}

Expand All @@ -86,6 +88,46 @@ std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
return result;
}

// ----------------------------------------------------------------------
// BufferReader

BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer) :
buffer_(buffer),
data_(buffer->data()),
pos_(0) {
size_ = buffer->size();
}

int64_t BufferReader::Tell() const {
return pos_;
}

void BufferReader::Seek(int64_t pos) {
if (pos < 0 || pos >= size_) {
std::stringstream ss;
ss << "Cannot seek to " << pos
<< "File is length " << size_;
throw ParquetException(ss.str());
}
pos_ = pos;
}

int64_t BufferReader::Size() const {
return size_;
}

int64_t BufferReader::Read(int64_t nbytes, uint8_t* out) {
ParquetException::NYI("not implemented");
return 0;
}

std::shared_ptr<Buffer> BufferReader::Read(int64_t nbytes) {
int64_t bytes_available = std::min(nbytes, size_ - pos_);
auto result = std::make_shared<Buffer>(Head(), bytes_available);
pos_ += bytes_available;
return result;
}

// ----------------------------------------------------------------------
// InMemoryInputStream

Expand Down
37 changes: 33 additions & 4 deletions cpp/src/parquet/util/input.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@ class RandomAccessSource {
public:
virtual ~RandomAccessSource() {}

virtual int64_t Size() const = 0;

virtual void Close() = 0;
virtual int64_t Size() = 0;
virtual int64_t Tell() = 0;
virtual int64_t Tell() const = 0;
virtual void Seek(int64_t pos) = 0;

// Returns actual number of bytes read
virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;

virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
std::shared_ptr<Buffer> ReadAt(int64_t pos, int64_t nbytes);

protected:
int64_t size_;
};


Expand All @@ -57,8 +61,8 @@ class LocalFileSource : public RandomAccessSource {
void Open(const std::string& path);

virtual void Close();
virtual int64_t Size();
virtual int64_t Tell();
virtual int64_t Size() const;
virtual int64_t Tell() const;
virtual void Seek(int64_t pos);

// Returns actual number of bytes read
Expand All @@ -77,6 +81,31 @@ class LocalFileSource : public RandomAccessSource {
bool is_open_;
};

// ----------------------------------------------------------------------
// A file-like object that reads from virtual address space

class BufferReader : public RandomAccessSource {
public:
explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
virtual void Close() {}
virtual int64_t Tell() const;
virtual void Seek(int64_t pos);
virtual int64_t Size() const;

virtual int64_t Read(int64_t nbytes, uint8_t* out);

virtual std::shared_ptr<Buffer> Read(int64_t nbytes);

protected:
const uint8_t* Head() {
return data_ + pos_;
}

std::shared_ptr<Buffer> buffer_;
const uint8_t* data_;
int64_t pos_;
};

// ----------------------------------------------------------------------
// Streaming input interfaces

Expand Down

0 comments on commit 1a6e207

Please sign in to comment.