Skip to content

Commit

Permalink
Adapt RocksDB 8.7.3 (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu committed Dec 12, 2023
1 parent 33f5fd7 commit 5aa204a
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 1 deletion.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version}

# Note: if you don't have a good reason, please do not set -DPORTABLE=ON
# This one is set here on purpose of compatibility with github action runtime processor
rocksdb_version="8.6.7"
rocksdb_version="8.7.3"
cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \
mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \
Expand Down
42 changes: 42 additions & 0 deletions c.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ typedef struct rocksdb_memory_consumers_t rocksdb_memory_consumers_t;
typedef struct rocksdb_memory_usage_t rocksdb_memory_usage_t;
typedef struct rocksdb_statistics_histogram_data_t
rocksdb_statistics_histogram_data_t;
typedef struct rocksdb_wait_for_compact_options_t
rocksdb_wait_for_compact_options_t;

/* DB operations */

Expand Down Expand Up @@ -1944,6 +1946,8 @@ extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_timestamp(
rocksdb_readoptions_t*, const char* ts, size_t tslen);
extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_iter_start_ts(
rocksdb_readoptions_t*, const char* ts, size_t tslen);
extern ROCKSDB_LIBRARY_API void rocksdb_readoptions_set_auto_readahead_size(
rocksdb_readoptions_t*, unsigned char);

/* Write options */

Expand Down Expand Up @@ -2927,6 +2931,44 @@ extern ROCKSDB_LIBRARY_API uint64_t rocksdb_statistics_histogram_data_get_sum(
extern ROCKSDB_LIBRARY_API double rocksdb_statistics_histogram_data_get_min(
rocksdb_statistics_histogram_data_t* data);

extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact(
rocksdb_t* db, rocksdb_wait_for_compact_options_t* options, char** errptr);

extern ROCKSDB_LIBRARY_API rocksdb_wait_for_compact_options_t*
rocksdb_wait_for_compact_options_create(void);

extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_destroy(
rocksdb_wait_for_compact_options_t* opt);

extern ROCKSDB_LIBRARY_API void
rocksdb_wait_for_compact_options_set_abort_on_pause(
rocksdb_wait_for_compact_options_t* opt, unsigned char v);

extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_wait_for_compact_options_get_abort_on_pause(
rocksdb_wait_for_compact_options_t* opt);

extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_set_flush(
rocksdb_wait_for_compact_options_t* opt, unsigned char v);

extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_wait_for_compact_options_get_flush(
rocksdb_wait_for_compact_options_t* opt);

extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_set_close_db(
rocksdb_wait_for_compact_options_t* opt, unsigned char v);

extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_wait_for_compact_options_get_close_db(
rocksdb_wait_for_compact_options_t* opt);

extern ROCKSDB_LIBRARY_API void rocksdb_wait_for_compact_options_set_timeout(
rocksdb_wait_for_compact_options_t* opt, uint64_t microseconds);

extern ROCKSDB_LIBRARY_API uint64_t
rocksdb_wait_for_compact_options_get_timeout(
rocksdb_wait_for_compact_options_t* opt);

#ifdef __cplusplus
} /* end extern "C" */
#endif
18 changes: 18 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,6 +1850,24 @@ func (db *DB) GetColumnFamilyMetadataCF(cf *ColumnFamilyHandle) (m *ColumnFamily
return
}

// WaitForCompact waits for all flush and compactions jobs to finish. Jobs to wait
// include the unscheduled (queued, but not scheduled yet). If the db is shutting down,
// Status::ShutdownInProgress will be returned.
//
// NOTE: This may also never return if there's sufficient ongoing writes that
// keeps flush and compaction going without stopping. The user would have to
// cease all the writes to DB to make this eventually return in a stable
// state. The user may also use timeout option in WaitForCompactOptions to
// make this stop waiting and return when timeout expires.
func (db *DB) WaitForCompact(opts *WaitForCompactOptions) (err error) {
var cErr *C.char

C.rocksdb_wait_for_compact(db.c, opts.p, &cErr)
err = fromCError(cErr)

return
}

// Close the database.
func (db *DB) Close() {
C.rocksdb_close(db.c)
Expand Down
6 changes: 6 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ func TestDBMultiGet(t *testing.T) {
require.EqualValues(t, values[1].Data(), givenVal1)
require.EqualValues(t, values[2].Data(), givenVal2)
require.EqualValues(t, values[3].Data(), givenVal3)

waitForCompactOpts := NewWaitForCompactOptions()
defer waitForCompactOpts.Destroy()

err = db.WaitForCompact(waitForCompactOpts)
require.NoError(t, err)
}

func TestLoadLatestOpts(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions options_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,21 @@ func (opts *ReadOptions) SetIterStartTimestamp(ts []byte) {
cTSLen := C.size_t(len(ts))
C.rocksdb_readoptions_set_iter_start_ts(opts.c, cTS, cTSLen)
}

// SetAutoReadaheadSize (Experimental) if auto_readahead_size is set to true, it will auto tune
// the readahead_size during scans internally.
// For this feature to enabled, iterate_upper_bound must also be specified.
//
// NOTE: - Recommended for forward Scans only.
// - In case of backward scans like Prev or SeekForPrev, the
// cost of these backward operations might increase and affect the
// performace. So this option should not be enabled if workload
// contains backward scans.
// - If there is a backward scans, this option will be
// disabled internally and won't be reset if forward scan is done
// again.
//
// Default: false
func (opts *ReadOptions) SetAutoReadaheadSize(enable bool) {
C.rocksdb_readoptions_set_auto_readahead_size(opts.c, boolToChar(enable))
}
2 changes: 2 additions & 0 deletions options_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,6 @@ func TestReadOptions(t *testing.T) {
require.False(t, ro.IsAsyncIO())
ro.SetAsyncIO(true)
require.True(t, ro.IsAsyncIO())

ro.SetAutoReadaheadSize(true)
}
75 changes: 75 additions & 0 deletions options_wait_for_compact.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package grocksdb

// #include "rocksdb/c.h"
import "C"

type WaitForCompactOptions struct {
p *C.rocksdb_wait_for_compact_options_t
}

func NewWaitForCompactOptions() *WaitForCompactOptions {
return &WaitForCompactOptions{
p: C.rocksdb_wait_for_compact_options_create(),
}
}

func (w *WaitForCompactOptions) Destroy() {
C.rocksdb_wait_for_compact_options_destroy(w.p)
w.p = nil
}

// SetAbortOnPause toggles the abort_on_pause flag, to abort waiting in case of
// a pause (PauseBackgroundWork() called).
//
// - If true, Status::Aborted will be returned immediately.
// - If false, ContinueBackgroundWork() must be called to resume the background jobs.
//
// Otherwise, jobs that were queued, but not scheduled yet may never finish
// and WaitForCompact() may wait indefinitely (if timeout is set, it will
// expire and return Status::TimedOut).
func (w *WaitForCompactOptions) SetAbortOnPause(v bool) {
C.rocksdb_wait_for_compact_options_set_abort_on_pause(w.p, boolToChar(v))
}

// IsAbortOnPause checks if abort_on_pause flag is on.
func (w *WaitForCompactOptions) AbortOnPause() bool {
return charToBool(C.rocksdb_wait_for_compact_options_get_abort_on_pause(w.p))
}

// SetFlush toggles the "flush" flag to flush all column families before starting to wait.
func (w *WaitForCompactOptions) SetFlush(v bool) {
C.rocksdb_wait_for_compact_options_set_flush(w.p, boolToChar(v))
}

// IsFlush checks if "flush" flag is on.
func (w *WaitForCompactOptions) Flush() bool {
return charToBool(C.rocksdb_wait_for_compact_options_get_flush(w.p))
}

// SetCloseDB toggles the "close_db" flag to call Close() after waiting is done.
// By the time Close() is called here, there should be no background jobs in progress
// and no new background jobs should be added.
//
// DB may not have been closed if Close() returned Aborted status due to unreleased snapshots
// in the system.
func (w *WaitForCompactOptions) SetCloseDB(v bool) {
C.rocksdb_wait_for_compact_options_set_close_db(w.p, boolToChar(v))
}

// CloseDB checks if "close_db" flag is on.
func (w *WaitForCompactOptions) CloseDB() bool {
return charToBool(C.rocksdb_wait_for_compact_options_get_close_db(w.p))
}

// SetTimeout in microseconds for waiting for compaction to complete.
// Status::TimedOut will be returned if timeout expires.
// when timeout == 0, WaitForCompact() will wait as long as there's background
// work to finish.
func (w *WaitForCompactOptions) SetTimeout(microseconds uint64) {
C.rocksdb_wait_for_compact_options_set_timeout(w.p, C.uint64_t(microseconds))
}

// GetTimeout in microseconds.
func (w *WaitForCompactOptions) GetTimeout() uint64 {
return uint64(C.rocksdb_wait_for_compact_options_get_timeout(w.p))
}
30 changes: 30 additions & 0 deletions options_wait_for_compact_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package grocksdb

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestWaitForCompactOptions(t *testing.T) {
t.Parallel()

opts := NewWaitForCompactOptions()
defer opts.Destroy()

require.False(t, opts.AbortOnPause())
opts.SetAbortOnPause(true)
require.True(t, opts.AbortOnPause())

require.False(t, opts.Flush())
opts.SetFlush(true)
require.True(t, opts.Flush())

require.False(t, opts.CloseDB())
opts.SetCloseDB(true)
require.True(t, opts.CloseDB())

require.EqualValues(t, 0, opts.GetTimeout())
opts.SetTimeout(1234)
require.EqualValues(t, 1234, opts.GetTimeout())
}

0 comments on commit 5aa204a

Please sign in to comment.