Skip to content

Parquet file reading performance improvement

Parth Chandra edited this page Aug 5, 2016 · 11 revisions

Parquet file reading performance

1 Motivation

In performance benchmarking of Drill, queries on Parquet [1] files that are disk bound appear to perform less well than expected when compared with other engines that read from Parquet files. Drill appears to perform slower, especially in the Parquet reader. In particular the Parquet reader sometimes pulls in data from the file system slower than it can be read from the file system.

The aim of this improvement is to optimize the performance of the Parquet reader so that the reader is gated on the speed at which data can be read from the file system. This also addresses the issue highlighted in DRILL-4800

2 Requirements

The main requirement of this improvement is to get the performance of Drill’s vectorized parquet reader to be at least as fast as other applications that read from Parquet files. While Drill’s parquet reader is among the fastest implementations in Java, it may lag behind other implementations e.g the parquet-cpp reader.

For comparison, the performance of Drill should be benchmarked using the TPC-H data set and queries that are reading from disk.

2.1 Use Cases

There are no end user use cases for this as this is an internal platform improvement. #3 Background and Research We ran some baseline tests to establish Drill’s parquet reader performance and a reasonable target for this project.

The tests were run on a ten node cluster with 11 disks per node. There were two runs for a single query that read all the data from all the columns of a set of parquet files. The file system cache was flushed before the first run so that the first run read all data from disk. The second run was done with a warm cache to see if differences in performance are as a result of the engine’s in-memory processing. Care was taken to ensure that all file reads are local. The tests were run using Drill and one other Parquet reader.

In addition we wrote a similar stand alone program (Solo Reader) that reads all the columns from the files that are on a single node. The program reads the column chunks in parallel (matching the parallelization of the two parquet readers), reads from disk in 16 MB chunks and discards the data after reading. This program effectively simulates the best speeds we can get from the distributed file system when a process has multiple threads reading from different subsets of as set of Parquet files.

The following table summarizes the results

SQLEngine DataSz (MB) #Nodes #Disks/ Node Cached Runtime Est MB/sec
Drill 1.7.0 72,023 10 11 No 26,859 268
Drill 1.7.0 72,023 10 11 Yes 11,236 641
Other Parquet Reader 72,023 10 11 No 18,050 399
Other Parquet Reader 72,023 10 11 Yes 11,610 620
SoloReader-16MB 24,822 1 11 No 34,300 724
SoloReader-16MB 24,822 1 11 Yes 9,526 2606

We observe that when the cache is cold, Drill achieved a thruput of only 268 MB/sec while the other Parquet reader was able to achieve 399 MB/sec. When the cache is warm, Drill was marginally faster. The solo reader was able to read from disk at nearly three times the speed. We conclude that when reading from disk, Drill is not pulling data as fast as it can be processed. We also see that the file system is not the bottleneck as the solo reader can read data from disk faster than the Parquet readers are able to process in-memory data. #4 Design Overview ##4.1 Optimizing disk accesses A characteristic of the Parquet reader is the following read pattern -

Seek to offset where a column chunk begins.
Repeat until column is read
Read byte by byte until page header is read.
Read Page data (by default in Drill, the page size is a maximum of 1 MB)

This read pattern is essentially sequential but suboptimal, since many small reads are done followed by a larger read. This can lead to the file system not caching data effectively and can also prevent the file system from reading ahead as it would for a sequential read. Experimentally, we saw that by increasing the page size of the Parquet file, we would get better read speeds which indicates that we could get better read performance if the file system does larger sequential reads.

To address this, we can make two changes to the reader that will help the file system both read ahead as well as cache better. ###4.1.1 Buffering Reader The common method to reduce the number of disk accesses is to wrap an InputStream in a BufferedInputStream. BufferedInputStream, though, uses byte arrays and does not work with ByteBuffer and so will incur an additional copy when we copy from the byte array to the ByteBuffer. Additionally, the buffer size of BufferedInputStream is only 4K, far too small for the data set sizes we will encounter with Drill. This improvement proposes to add a similar buffering capability that supports reading directly into a DrillBuf and also allows for a configurable buffer size. ###4.1.2 File System Read Ahead A second strategy is to provide hints to the file system that the InputStream is going to read a specific range of data. The Posix fadvise [2] call is implemented by some underlying implementations of HDFS and when available this call can provide the file system with a hint to read ahead and cache as much as it can. This call has no effect on the semantics of the program itself, but will improve performance when available. This improvement intends to set the advice parameter to SEQUENTIAL. This call will be made optional. For underlying implementations that do not have support for fadvise, no call will be made. The call is made optional for another reason. Future improvements that push down filtering to the page level will essentially skip pages that do not need to be read and in that case the file system should not read ahead unnecessarily (the same holds true for limit 0 queries). In that case, no fdavise call will be made. The default will be with fadvise turned off.

##4.2 Pipelining The existing ParquetRecordReader reads all the columns for a file in a single thread, reading a page from a column at a time.

For every column
  Read Page
  Decompress
  Decode
  Copy to Value Vector
Until Value vector is full or all data is read.

This is illustrated in Figure 1 below:

Figure 1

This implementation has a bottleneck where each disk operation (Read Page in Figure 1) is followed by a time consuming Decode (actually a decompress+decode) and Copy operation. This can clearly be improved by parallelizing the Read Page operation for each column. Additionally by making the Read Page operation asynchronous, we can pipeline the operation and the Decode and Read Page operations can run without waiting.

The pipelining can be implemented in two phases: ###4.2.1 Asynchronous Page Reader The first phase makes the ReadPage step asynchronous as this appears to be the bottleneck as illustrated in Figure 2 below. Note that in this phase the Decode Copy operation will still be in a single thread and will decode and copy data once column at a time.

Figure 2

###4.2.2 Parallel Column Reader We are splitting this into two phases so we are able to measure the improvement we see by parallelizing and pipeline just the disk operation. The second phase is to parallelize the Decode + copy operation. This is trickier to implement as the threads need to be synchronised to fill the same number of records for every value vector.

Figure 3

##4.3 Thread Pool To prevent excessive use of threads the implementation should use a thread pool. This cannot be the same as the thread pool used to execute fragments as that can lead to excessive waiting by the scan threads which in turn will cause the downstream operators to wait. The current proposal is to create a single scan thread pool per drillbit. A second scan decode thread pool may be required for implementing the Parallel Column Reader. #5 Implementation Details ##5.1 Algorithms (and Data Structures) ###5.1.1 Buffering Reader We will implement a new BufferedDirectBufInputStream that will wrap around a FSDataInputStream currently used by the Parquet reader. This class explicitly does not try to implement support for other InputStream implementations as testing that implementation would be well beyond the scope of the project. There is however nothing specific to FSDataInputStream that is used, except to assume that seek is implemented. The Buffering reader will also take as input a range (start offset, size) of data. The range is intended to be used to issue an fadvise as well as to limit the reading of the data (see 5.1.1.1 Limitation). As a minor optimization, if range of data to be read is less than the default buffer, the smaller of the two sizes will be used as the default buffer size. ####5.1.1.1 Limitation Some older versions of Parquet data files do not have the correct Rowgroup size set in the metadata, which makes it impossible to limit the amount of data read without additional decoding of data. The Buffering Reader will therefore not actually return an EOF if it has read past the range of data indicated at creation time. ###5.1.2 Asynchronous Page Reader org.apache.drill.exec.store.parquet.columnreaders.PageReader (referred to as PageReader) , will be changed to use a BufferedDirectBufInputStream instead of the current ColumnDataReader. As with the ColumnDataReader in the current implementation, the BufferedDirectBufInputStream will be backed by an FSDataInputStream.

PageReader will also become asynchronous. At init time it will submit an asynchronous background task to the underlying thread pool. The background task will be implemented in the AsyncPageReader static nested class of PageReader and will implement the Callable interface. The call method will return a ReadStatus.

public class PageReader {
 static class AsyncPageReader implements Callable{
   ...
   ReadStatus call() throws Exception {
     ... 
   }
 }
 static class ReadStatus{
   public long bytesRead;
   public int returnVal;
   public Exception e;
   public DrillBuf pageData;
 }
}

Roughly:

Parquet Reader Thread Read thread
Submit page read task
While not end of data
Get page
Submit page read task
Decompress Decode
Read page header
Read page data bytes
Return page header+page data
or exception.
###5.1.3 Thread Pool
The thread pool for the scan will be started at Drillbit startup time and accessed by PageReader through the operator context.
###5.1.4 Parallelized Column Reader
ParquetRecordReader calls various methods from ColumnReader that implement decompressing, decoding, and copying to value vectors. These methods are called sequentially for each column and can easily be parallelized. It is hard to pipeline these though as the logic is rather intricately tied to maintaining the correct record count across all the columns.

####5.1.4.1 Fixed length fields Fixed length fields are decoded by calling the ColumnReader.processPages method from the ParquetRecordReader.readAllFixedFields method. The ParquetRecordReader.readAllFixedFields method will be changed to submit ColumnReader.processPages tasks in parallel to a scan decoding thread pool. It will block until all ColumnReader.processPages tasks have completed.

####5.1.4.2 Variable Length fields ParquetRecordReeader uses a VarLengthBinaryReader to read variable length columns. The method VarLengthBinaryReader.readFields calls two methods ColumnReader.determineSize and ColumnReader.readRecords serially.
The VarLengthBinaryReader.readFields method will be changed to submit ColumnReader.determineSize and ColumnReader.readRecords in parallel to a scan decoding thread pool.

Note that we cannot submit column decoding tasks to the scan thread pool as these methods need to block waiting for data to be read. If the thread pool size is not large enough, the disk read task which is created by the column decoding task will be blocked waiting for a thread to execute it and the system will deadlock. ##5.2 APIs and Protocols The BufferingDirectBufInputStream class implements the API of InputStream [3], except the mark and reset functionality. Additionally, the following APIs are implemented.

Return type API Description
void public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, long totalByteSize, int bufSize, boolean enableHints) Creates a buffered Input stream that reads the underlying InputStream starting at startOffset, reads upto totalByteSize bytes and uses an internal buffer of bufSize bytes.
enableHints will try to issue an fadvise call if available
int read(DrillBuf b) Reads some number of bytes from the input stream and stores them into the DrillBuf buffer b.
int read(DrillBuf b, int off, int len) Reads up to len bytes of data from the input stream into an array of bytes.

##5.3 Performance This entire feature is to improve Parquet reader performance. We expect to get close to the 399 MB/sec achieved by the other readers for the case where the cache is cold. There should be no degradation in performance with the case where the data is already in the file system cache. Performance benchmarking should be done as part of this improvement. ##5.4 Error and Failure handling The BufferedDirectBufInputStream will throw the same exceptions that InputStream does. These exceptions will be caught by the AsyncPageReader. Any Exception in AsyncPageReader will be caught by PageReader as IO Exceptions and rethrown as a UserException. ##5.5 Memory management This feature increases the minimum amount of memory required to read a single column of Parquet data to 8 MB. There is no way around this except by configuring a smaller read buffer size. ##5.6 Scalability Issues ###5.6.1 Resource utilization The number of threads created as part of a query’s execution increases and with an increased number of concurrent queries can lead to excessive thread context switches. This will be limited by using a thread pool. A second consideration is that the BufferedInputStream implementation will allocate an internal buffer to read and buffer data into. Based on experiments described in section 3, it would appear that 8 MB is good size for this buffer. However a large number of columns (say 300), can result in a large amount of memory (2.34 GB in this case) being pre-allocated. This could potentially lead to out-of-memory in conditions of high concurrency. ##5.7 Options and metrics ###5.7.1 Configurable Options The following options need to be made configurable for the Parquet Record reader -

  • Scan Thread Pool Size - A global thread pool for use by the scan operation. Page reader will use this. Default size will be 2 x number of cores.
  • Scan Decode Thread Pool Size - A global thread pool for use by the scan decode operation. Column Reader will use this. Default size will be 2 x number of cores.
  • Parquet Scan Buffering Block Size - This sets the default size of the read buffer.
  • Parquet Scan Enable Async Read - Turns on the asynchronous read feature for the Parquet scan. If false, the Parquet reader will revert to the old synchronous read method. The feature will be on by default.
  • Parquet Scan enable fadvise - Turns on the fadvise hint to the file reader. Off by default.

###5.7.2 Metrics

  • Parquet scan PageReader Wait Time - The total amount of time the PageReader was waiting to get data from an async read
  • Parquet Scan decode time - Time spent in decoding the data per minor fragment.
  • Parquet Scan decompress time - Time spent in decompressing the data. This is the total time spent across all columns per minor fragment.

##5.8 Testing implications Asynchronous reading of page data can have issues if dictionary data is not read fully before data pages are read and decoded. Functional tests should be run for all Parquet data types especially dictionary encoded types and files with pages that are a mix of dictionary encoded and non-dictionary encoded. Any off by one errors in the buffering can cause data corruption. Functional tests should be run on files with data pages that are split on the boundary of the read buffer. ##5.9 Tradeoffs and Limitations The faster implementation comes by utilizing more memory and more threads. A new pool of scan threads will increase the number of execution threads. The size of this pool is bounded though. Since the implementation will use a buffering input stream, the memory requirement for the input streams can become much larger than it is presently. The biggest limitation is that in the pipeline, decompressing data has to be done at the level of (Parquet) page data. This can easily become the performance bottleneck and there is no way to overcome that. For every data set, as the page size is changed, there is a point at which the disk will not remain the bottleneck but the decompression will be the slowest process in the pipeline. There is no way at the moment of tuning this automatically and the system will have to be tuned manually to determine the best compression type and page size. #6 Implementation Plan

  1. Initial Experimentation - (outlined in the Background and Research section)
  2. Implement BufferedDirectBufInputStream and use that in PageReader
  3. Implement AsynchronousPageReader
  4. Run Benchmark
  5. Implement parallel ColumnReaders
  6. Run Benchmark
  7. Add unit tests #7 Open items The design of Parallelized Column Reader (section 5.1.4) may need more detail. Any additional metrics that need to be tracked need to be added. The Operator stats implementation is not designed for multiple threads of a minor fragment. This needs to be improved.

#8 References

[1] https://parquet.apache.org/documentation/latest/

[2] http://linux.die.net/man/2/posix_fadvise

[3] https://docs.oracle.com/javase/7/docs/api/java/io/InputStream.html

9 Document History

Date Author Version Description
2016-07-17 Parth S. Chandra, Kunal Khatua 0.1 Initial Draft
2016-08-05 Parth S. Chandra 1.0 Included feedback. Added additional metrics and configurable parameters
Clone this wiki locally