Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Add column writer #138

Merged
merged 14 commits into from
Aug 14, 2018
Merged

Add column writer #138

merged 14 commits into from
Aug 14, 2018

Conversation

sadikovi
Copy link
Collaborator

@sadikovi sadikovi commented Jul 29, 2018

This PR adds column writer ColumnWriter and its typed implementation ColumnWriterImpl, which is similar to column reader in this regard. Column writer supports writing data page v1 and v2 and dictionary.

PR only adds column writer and updates page writer, tests will be added in the follow-up PR, because of the amount of code to review.

I updated PageWriter to only have 3 methods:

  • write_page, writes page and returns metrics for a page in PageWriteSpec.
  • write_metadata, writes column metadata into sink once.
  • close, closes page writer and releases resources, if any.

This allows us to avoid maintaining state in page writer, all of the previous logic is moved into column writer.

The way column writer works in as follows:

  1. Column writer is created using column descriptor (mainly for max definition and repetition levels), writer properties to get various limits and page writer.
  2. Column writer provides a set of public methods: write_batch, get_total_bytes_written, get_total_rows_written, get_column_metadata, close.
  3. Every time write_batch is called, we split batch into smaller batches and call write_mini_batch to write data, this ensures fine-grained page writes.
  4. Within write_mini_batch we check if we have enough data for a page, if so, we call add_data_page method to create a new compressed data page. This encodes values, levels, etc. If dictionary encoding is enabled, created page is added to an in-memory queue, otherwise it is sent to page writer directly.
  5. Within write_mini_batch after we added page, we check if dictionary is enabled and it exceeds the dictionary limit. If so, we create a dictionary page and write it to a page writer. Then we write all of the pages from in-memory buffer into page writer.
  6. After dictionary fallback, all pages are written directly into page writer.
  7. When we call close, all remaining data is converted into a data page and written to page writer. If dictionary is still not full, we create dictionary page and flush all of the data pages.

Note that we call flush_data_pages in dictionary fallback. This method will write another page depending on the values left in the buffer, but these values are not accounted when deciding if dictionary is full. So, theoretically, dictionary will be slightly larger than the set limit.

@sadikovi sadikovi requested a review from sunchao July 29, 2018 20:48
@sadikovi
Copy link
Collaborator Author

@sunchao I refactored column writer and page writer. I am open to any suggestions or API changes, and would be glad if you could review the code. Thanks!

While writing tests, I found that it could be challenging to create a page writer, pass it to column writer as Box and then check the content of page writer, it complains about Box having static lifetime, but the mutable reference that I pass into page writer does not live long enough. My test was going to write all of the data into page writer, then get that buffer and use it to read pages in page reader.

Would appreciate if you have any suggestions on how to solve this problem.

@coveralls
Copy link

coveralls commented Jul 29, 2018

Pull Request Test Coverage Report for Build 581

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 38 unchanged lines in 3 files lost coverage.
  • Overall coverage decreased (-0.04%) to 95.412%

Files with Coverage Reduction New Missed Lines %
column/page.rs 3 96.43%
file/writer.rs 5 97.67%
encodings/encoding.rs 30 94.82%
Totals Coverage Status
Change from base Build 580: -0.04%
Covered Lines: 11916
Relevant Lines: 12489

💛 - Coveralls

@sadikovi
Copy link
Collaborator Author

I had some issues with Boxs and lifetimes in my PR when testing the code. I am thinking if it is worth revisiting all APIs (external and internal) and how we pass things between file reader/writer, row group reader/writer, column reader/writer and page reader/writer; and opening an issue on it.

Copy link
Owner

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks great @sadikovi ! Left some comments.

FixedLenByteArrayColumnWriter(ColumnWriterImpl<FixedLenByteArrayType>)
}

/// Gets a specific column writer corresponding to column descriptor `col_descr`.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: col_descr -> descr?

@@ -162,6 +162,30 @@ impl CompressedPage {
}
}

/// Contains page write metrics.
pub struct PageWriteSpec {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this non-public?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it is part of the page writer API:

fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;

This is basically a container to return a bunch of metrics for written page. Do you have any suggestions on how to implement it better?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good. Didn't see it is used in the API.

}

/// Finalises writes and closes the column writer.
/// Returns total number of bytes written by this column writer.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comment? the return type is Result<()>


/// Finalises writes and closes the column writer.
/// Returns total number of bytes written by this column writer.
pub fn close(&mut self) -> Result<()> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder if we should consume self in the close() call and enforce this on the trait level. It seems cleaner to me since now this is enforced on the language level, so people will never be able to use the writer after it is closed.

/// Writes batch of values, definition levels and repetition levels.
///
/// If values are non-nullable and non-repeated, then definition and repetition levels
/// can be omitted.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps state what is the returned result.

self.encoder.encoding()
};

let data_page = if self.props.writer_version() == WriterVersion::PARQUET_1_0 {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps we can use pattern matching here?

None => compressed_values.extend_from_slice(values.data())
}
rep_levels.extend_from_slice(&def_levels[..]);
rep_levels.extend_from_slice(&compressed_values[..]);
Copy link
Owner

@sunchao sunchao Aug 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should keep the value buffer in a separate field in DataPageV2. Currently we are doing one extra copying in the write path (via extend_from_slice) and one extra copying in the read path (append in get_next_page).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more? Do you mean keeping 2 buffers for a data page v2?

When we write data page v2, we collect repetition + definition levels part, then we take values part, compress it and append to repetition + definition levels levels part.

When we read we take full buffer, split values part, decompress, reattach to the levels.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try putting it on hold and optimise data page v2 in a separate PR, if you want.

@sadikovi
Copy link
Collaborator Author

sadikovi commented Aug 1, 2018

@sunchao thanks! I am also working on unit tests at the moment. Will address your comments ASAP. Cheers!

@sadikovi
Copy link
Collaborator Author

sadikovi commented Aug 1, 2018

@sunchao I addressed most of your review comments. Unresolved ones are about making PageWriteSpec internal, and Data Page v2 optimisations. Let me know how we need to proceed with those and would appreciate if you could have another look. Thanks!

Copy link
Owner

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sadikovi . I have a few more comments - mostly minor. :)

@@ -162,6 +162,30 @@ impl CompressedPage {
}
}

/// Contains page write metrics.
pub struct PageWriteSpec {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good. Didn't see it is used in the API.

pub compressed_size: usize,
pub num_values: u32,
pub offset: u64,
pub bytes_written: u64
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: what's the difference between this and uncompressed_size?

Copy link
Collaborator Author

@sadikovi sadikovi Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is whatever bytes were actually written to a sink, technically should be min(uncompressed_size, compressed_size), I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the actual number of bytes written to the sink. This includes min(compressed_size, uncompressed_size) + header size + potential statistics or whatever is written as page metadata. The way we collect it is (end_pos - start_pos) of the sink.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!


// Check if we need to buffer data page or flush it to the sink directly.
if self.dict_encoder.is_some() {
self.data_pages.push_back(compressed_page);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that you have too many data pages to buffer that you can't hold them all in memory? If that's so, should we add a threshold to flush them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, we cannot flush them, because we need to write dictionary page first. Data pages should be fairly small after dictionary encoding, and there are settings users can specify.


// Optionally set dictionary encoder.
let dict_encoder = if props.dictionary_enabled(descr.path()) {
Some(DictEncoder::new(descr.clone(), Rc::new(MemTracker::new())))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemTracker now is just broken. At some point we should fix it so that we can track the memory usage by the writer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I observed that we are not using it properly. Should we open an issue to track the progress? Do we need memory tracker and how does it relate to memory allocation?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can open an issue to track this. Originally I tried to remove it and replace with a memory allocator which, besides allocate memory in chunks, also track the memory usage. However, I abandoned it later after finding it doesn't have much perf improvement.

IMO we still need to investigate on the memory management in general, at least give user option to specify parameters such as maximum threshold for the writer and track memory usage.

cmpr.compress(uncompressed_buf.data(), &mut output_buf)?;
ByteBufferPtr::new(output_buf)
},
None => encoder.write_dict()?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cn we just use uncompressed_buf?

buf: buf,
num_values: num_values as u32,
encoding: self.props.dictionary_page_encoding(),
// TODO: is our dictionary data sorted?
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not. I think the sorted info needs to be passed down from some where? not sure how parquet-mr handles it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a method for dictionary encoder to return if dictionary is sorted or not. Will that work?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check how parquet-mr does it. How does dictionary encoder knows whether the values are sorted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it depends on the implementation; maybe parquet-mr has an option of having dictionary values sorted somehow.

@sadikovi
Copy link
Collaborator Author

sadikovi commented Aug 8, 2018

@sunchao I updated this PR with new level encoders and added tests. Can you review again? Thanks!

I also think there are quite a few unnecessary byte copies, but I am not sure how critical those are. Would appreciate if you could also have a look at write batch and add data page methods and let me know. Thanks!

@sunchao
Copy link
Owner

sunchao commented Aug 10, 2018

Thanks @sadikovi ! will take a look at this soon.

Copy link
Owner

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

pub compressed_size: usize,
pub num_values: u32,
pub offset: u64,
pub bytes_written: u64
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

@sadikovi
Copy link
Collaborator Author

Honestly, I still think there is so much work to do to make this code somehow efficient... I wrote unit tests with writing into a temp file and reading it back into column reader - works so far, but it is a bit hacky :). I also added is_sorted method to the DictEncoder, which returns false for now.

I am still not quite sure about the actual functional differences between writer version 1 and version 2. I am treating them as data pages v1 and data pages v2, but it could be something else.

Anyway, it should be okay for now.

@sunchao sunchao merged commit 2d52502 into sunchao:master Aug 14, 2018
@sunchao
Copy link
Owner

sunchao commented Aug 14, 2018

Thanks @sadikovi . Yes we can improve the unit tests and implement the is_sorted logic later. Looking at the parquet-mr, I think we are doing the right way - but we can always fix it later if it doesn't :)

@sadikovi
Copy link
Collaborator Author

Sure, thanks for review and merge!

@sadikovi sadikovi deleted the column-writer branch August 14, 2018 06:12
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants