Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Caching parquet metadata in an off-process cache #135

Closed
annanay25 opened this issue Apr 13, 2022 · 15 comments
Closed

Caching parquet metadata in an off-process cache #135

annanay25 opened this issue Apr 13, 2022 · 15 comments

Comments

@annanay25
Copy link
Contributor

As discussed earlier, in Grafana Tempo we are planning to build an object store backed parquet database. At higher volumes, querying could mean retrieving metadata from a large number of parquet files and we would like to optimise by caching these in an off-process cache (redis/memcached/...)

There are two strategies we have considered to implement this:

  1. Build in support for metadata caching directly into this library so the details are abstracted away from the user.
  2. Provide methods to marshal/unmarshal metadata and handle all the caching on the application side.

Thoughts?

Using this ticket as a placeholder to start discussions around this.

@mdisibio
Copy link
Contributor

Adding a few additional thoughts:

  1. The library currently takes an io.ReaderAt which we already wrap for various reasons (metrics, etc). We considered if there is a way to redirect some reads to cache but did not see a general solution there.
  2. We expect our largest blocks to be 20+GB and it looks like metadata for them is 50-100MB. This is quite large for a single cache entry, so it might be best to support granularity of individual row groups, or even column chunks which allows for reading the minimum necessary data for the columns involved.

@achille-roussel
Copy link
Contributor

Do you have more details to share about what metadata you would like to optimize access to?

@annanay25
Copy link
Contributor Author

I believe we would need to cache the entire metadata in order to avoid hitting the object storage to fetch anything other than data? Going through code I think we use Schema and RowGroup information from the reader, but even so we might want to cache the entire metadata

@achille-roussel
Copy link
Contributor

OK, I was wondering if you intended to also cache other parts of the file like column indexes or bloom filters.

The library currently takes an io.ReaderAt which we already wrap for various reasons (metrics, etc). We considered if there is a way to redirect some reads to cache but did not see a general solution there.

This would be hard to do without duplicating the logic to decode footers of parquet files. A lower-level caching solution for the io.ReaderAt layer would be more appropriate in my opinion.

We expect our largest blocks to be 20+GB and it looks like metadata for them is 50-100MB. This is quite large for a single cache entry, so it might be best to support granularity of individual row groups, or even column chunks which allows for reading the minimum necessary data for the columns involved.

This is really large indeed, almost questionable whether caching of metadata would improve access here: assuming the data is on some long term storage medium, loading large blocks will be bound on I/O bandwidth, not latency, so it may be just as efficient to read the original files.

Build in support for metadata caching directly into this library so the details are abstracted away from the user.

I'm curious if you have ideas of what that could look like. For reference, parquet already attempts to offer a solution to these problems with file references in the file metadata, where one file may contain a schema only and a reference to a remote file for the data. This is currently not supported in parquet-go but could be something to explore https://github.com/segmentio/parquet-go/blob/main/format/parquet.go#L763-L765

Provide methods to marshal/unmarshal metadata and handle all the caching on the application side.

We could easily expose the format.FileMetaData loaded when reading parquet file, it's already serializable to thrift in parquet, but other formats can be used... it's just Go structs and slices.

@annanay25
Copy link
Contributor Author

OK, I was wondering if you intended to also cache other parts of the file like column indexes or bloom filters.

My bad, I forgot to mention that. We do intend to cache the dictionaries/bloom-filters as well. Although metadata definition seems to only store offsets for indexes, but in a conversation with @mdisibio I think we discussed that indexes are read as part of metadata read as well. So I will dig deeper into that as well.

I'm curious if you have ideas of what that could look like. For reference, parquet already attempts to offer a solution to these problems with file references in the file metadata, where one file may contain a schema only and a reference to a remote file for the data. This is currently not supported in parquet-go but could be something to explore https://github.com/segmentio/parquet-go/blob/main/format/parquet.go#L763-L765

TIL! This is really interesting. But its also a string and not an io.ReaderAt so not sure how we can integrate a remote file like S3/GCS. And then again, this still does not give us a solution for caching indexes which will be have to be fetched from the backend.. So IIUC the access pattern of the file is to first read metadata to figure out the layout of the file, next read column indexes, and third access the data. We need to cache access to the first two, and figure out a way to have fine grained caching policies for each column index.

Will give this some thought.

@achille-roussel
Copy link
Contributor

But its also a string and not an io.ReaderAt so not sure how we can integrate a remote file like S3/GCS

The string is an identifier in some file-system-like interface. We could have something similar to the standard library's fs.FS interface which we would use to acquire the secondary io.ReaderAt instances allowing access to the column chunk data for example.

This model would not allow for caching of the dictionary pages tho, those are part of the column chunks so they wouldn't be available in the file containing metadata only. This might call for having two caching mechanisms, one for the parquet metadata (via the column chunk's FilePath), and another more specific to parquet-go for data pages.

I'm also curious how effective it would be to add a caching layer. Latency of reads from an object store would definitely be higher, but considering the large volumes of metadata to read from the storage layer, bandwidth would likely be the bottleneck. Would the performance then be roughly equivalent regardless of the remote backend serving the data?

@annanay25
Copy link
Contributor Author

  1. The string is an identifier in some file-system-like interface. We could have something similar to the standard library's fs.FS interface which we would use to acquire the secondary io.ReaderAt instances allowing access to the column chunk data for example.

Nice. It makes the read pattern slightly complex but nevertheless should do the trick with least effort. I believe it will require an addition of the following form:

GetIOReaderForFilepath func(filepath string) io.ReaderAt

to get the secondary file handlers for a given filepath. These can then be used to read data from the object store.

  1. We need to get an estimate of how large the bloom-filters/indexes are for the larger blocks so we can decide if we should potentially make smaller blocks in Tempo. I understand that if bloom-filters/indexes get too large we might be better off reading them from the object store.

@annanay25
Copy link
Contributor Author

@achille-roussel I've started making some changes in this branch over here: main...annanay25:add-support-for-secondary-io-readerAts

It seemed easy enough to edit the reader/writers to support this new Filepath parameter but please let me know if I'm heading down the wrong path.

@annanay25
Copy link
Contributor Author

So, with the above changes I was able to use two different io.ReaderAt instances for reading metadata and columnChunk data.

But now I'm beginning to get concerned about the small reads happening as part of building metadata:
https://github.com/segmentio/parquet-go/blob/main/file.go#L52-L120

This results in the following access pattern:

cached reader called. offset 0 bytes_read 4
cached reader called. offset 4697519 bytes_read 8
cached reader called. offset 4693014 bytes_read 4096
cached reader called. offset 4697110 bytes_read 409
cached reader called. offset 4691193 bytes_read 1821
cached reader called. offset 4 bytes_read 1
cached reader called. offset 5 bytes_read 1
cached reader called. offset 6 bytes_read 1
cached reader called. offset 7 bytes_read 1
cached reader called. offset 8 bytes_read 1
cached reader called. offset 9 bytes_read 1
cached reader called. offset 10 bytes_read 1
cached reader called. offset 11 bytes_read 1
cached reader called. offset 12 bytes_read 1
cached reader called. offset 13 bytes_read 1
cached reader called. offset 14 bytes_read 1
cached reader called. offset 15 bytes_read 1
cached reader called. offset 16 bytes_read 1
cached reader called. offset 17 bytes_read 1
cached reader called. offset 18 bytes_read 1
cached reader called. offset 19 bytes_read 1

now we absolutely want to cache all of these, but its making me think if its worth having two io.ReaderAts -- one cached and the other reading from object store.

One solution if we want to continue down this path is that our caching strategy will somehow need to encode offset & bytes_read in the object key... But is there a path where the we read a different number of bytes from the same offset? Is there a recommended pattern here?

@achille-roussel
Copy link
Contributor

Opening a file is quite a busy operation, lots of reads happen indeed.


cached reader called. offset 0 bytes_read 4

This is reading the 4 bytes "PAR1" file header; it's just a sanity check, we could make it optional since it provides low value for the common case where the program knows that it's dealing with parquet files.


cached reader called. offset 4697519 bytes_read 8
cached reader called. offset 4693014 bytes_read 4096
cached reader called. offset 4697110 bytes_read 409
cached reader called. offset 4691193 bytes_read 1821

This one looks like reading of the footer and page index, which is somewhat buffered? The first 8 bytes read is for the "PAR1" footer + length of the file metadata. We can't really do without it but we could optimistically buffer a few KiB of footer data to avoid making small reads here.


cached reader called. offset 4 bytes_read 1
cached reader called. offset 5 bytes_read 1
cached reader called. offset 6 bytes_read 1
cached reader called. offset 7 bytes_read 1
cached reader called. offset 8 bytes_read 1
cached reader called. offset 9 bytes_read 1
cached reader called. offset 10 bytes_read 1
cached reader called. offset 11 bytes_read 1
cached reader called. offset 12 bytes_read 1
cached reader called. offset 13 bytes_read 1
cached reader called. offset 14 bytes_read 1
cached reader called. offset 15 bytes_read 1
cached reader called. offset 16 bytes_read 1
cached reader called. offset 17 bytes_read 1
cached reader called. offset 18 bytes_read 1
cached reader called. offset 19 bytes_read 1

This looks like reading of a page header? the thrift decoder will consume bytes one by one, offset 4 should have a page header.

We could also reduce the number of small sequential reads happening with a small buffering mechanism (e.g. reading 4KiB pages through a bufio.Reader).

@achille-roussel
Copy link
Contributor

One solution if we want to continue down this path is that our caching strategy will somehow need to encode offset & bytes_read in the object key... But is there a path where the we read a different number of bytes from the same offset? Is there a recommended pattern here?

I would look at having better buffering first, so we read data in pages of a few KiB instead of having the small read pattern we see here.

@annanay25
Copy link
Contributor Author

Thanks, I like the idea of using a small buffer. I'm on PTO for the next few days now, will continue to experiment once I'm back.

@mdisibio
Copy link
Contributor

Hi all, I've been following along, great discussion. Definitely want to find a way to benefit from memcache/redis eventually and if there is something straightforward, but for now agree that basic buffering is a good solution to investigate. Spent some time on that and have some thoughts.

  1. I didn't see that bufio supports io.ReaderAt, so did a custom implementation.
  • Definitely helped to reduce i/os but ended up a more complex than expected. We read multiple columns concurrently and don't think there's a way to inject a buffered io.ReaderAt per goroutine, so the custom implementation maintains several buffers and matches up reads automatically.
  • Extending reads forward is easy enough, but some reads seem to go backwards a bit. Do you know what activity that might be? Wondering if it makes sense to extend reads backwards and forwards.
  • Some numbers from local testing on a variety of buffer settings:
    • No buffering (baseline): 873 reads, 89.71 MB
    • 16K, 1x buffer: 757 reads, 94.25 MB
    • 64K, 32x buffer: 595 reads, 93.27 MB
    • 1M, 16x buffer (extreme): 164 reads, 161.26 MB
  • Good range here, I'm sure we can find a balance between i/o and over-fetch ourselves, but do you have any thoughts on what is likely to be ideal?
  1. Is the 4K defaultBufferReadSize worth configuring? Agree that external buffering is main solution, but increasing this might provide a more percent improvement. Thoughts?

@achille-roussel
Copy link
Contributor

  • Definitely helped to reduce i/os but ended up a more complex than expected. We read multiple columns concurrently and don't think there's a way to inject a buffered io.ReaderAt per goroutine, so the custom implementation maintains several buffers and matches up reads automatically.

I made this code public not too long ago, which serves a similar purpose to what you described, providing a caching layer to some underlying io.ReaderAt, maybe we can consider bringing something similar to parquet-go if it's a very common tool we need when using the package.

  • Extending reads forward is easy enough, but some reads seem to go backwards a bit. Do you know what activity that might be? Wondering if it makes sense to extend reads backwards and forwards.

If more than one column is being read, it seems like read patterns would be somewhat random-ish: data for a column chunk A would be before column chunk B, but reading both A and B columns would cause reading at interleaving positions.

I don't have much context on the test you ran, so I'm speculating, but generally there are plenty of reasons why read access may be somewhat random.

  • Good range here, I'm sure we can find a balance between i/o and over-fetch ourselves, but do you have any thoughts on what is likely to be ideal?

This is going to depend on physical properties of the underlying storage layer, if latency is high we may prefer larger buffers. The size of contiguous reads is also an important metric to look at; if scans are very short there is not many gains from using large buffers. For S3, I wouldn't be shy to use 1 MiB buffer size, from what you showed it causes a 2x size amplification, but 5x drop in the number of reads, that's an interesting ratio in my opinion. If you are reading from a local file system tho, a buffer size in the 64-256KiB seem like a better trade off: small size amplification and can bring 2-3x reduction in I/O operations.

  • Is the 4K defaultBufferReadSize worth configuring? Agree that external buffering is main solution, but increasing this might provide a more percent improvement. Thoughts?

We could definitely make it configurable, tho maybe the existing buffering strategy isn't very effective? For example, I'm not sure why the logs shared by @annanay25 showed a one-byte-at-a-time read pattern. The sequential reads should have been buffered and only require a single read from the underlying io.ReaderAt.

@annanay25
Copy link
Contributor Author

Addressed in #249

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants