Skip to content

Commit

Permalink
[IMPROVED] Bulk insert of messages
Browse files Browse the repository at this point in the history
A new configuration `bulk_insert_limit` switches the server from
the current insertion of messages within a SQL transaction, to
an "INSERT INTO MESSAGES () VALUES (),(),..." which can speed up
performance by several folds.

The server still may perform regular insert within transaction
if the limit is deemed too low.

This new configuration parameter is not enabled by default. It
needs to be explicitly set, either in configuration file or
from command line `--sql_bulk_insert_limit <number here>`.

Resolves #1132

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jan 8, 2021
1 parent 71d1dbc commit 20f9b79
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 4 deletions.
1 change: 1 addition & 0 deletions nats-streaming-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Streaming Server SQL Store Options:
--sql_source <string> Datasource used when opening an SQL connection to the database
--sql_no_caching <bool> Enable/Disable caching for improved performance
--sql_max_open_conns <int> Maximum number of opened connections to the database
--sql_bulk_insert_limit <int> Maximum number of messages stored with a single SQL "INSERT" statement
Streaming Server TLS Options:
-secure <bool> Use a TLS connection to the NATS server without
Expand Down
6 changes: 6 additions & 0 deletions server/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ func parseSQLOptions(itf interface{}, opts *Options) error {
return err
}
opts.SQLStoreOpts.MaxOpenConns = int(v.(int64))
case "bulk_insert_limit":
if err := checkType(name, reflect.Int64, v); err != nil {
return err
}
opts.SQLStoreOpts.BulkInsertLimit = int(v.(int64))
}
}
return nil
Expand Down Expand Up @@ -688,6 +693,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
defSQLOpts := stores.DefaultSQLStoreOptions()
fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching")
fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database")
fs.IntVar(&sopts.SQLStoreOpts.BulkInsertLimit, "sql_bulk_insert_limit", 0, "Limit the number of messages inserted in one SQL query")
fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name")
fs.BoolVar(&sopts.Encrypt, "encrypt", false, "Specify if server should use encryption at rest")
fs.StringVar(&sopts.EncryptionCipher, "encryption_cipher", stores.CryptoCipherAutoSelect, "Encryption cipher. Supported are AES and CHACHA (default is AES)")
Expand Down
3 changes: 3 additions & 0 deletions server/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ func TestParseConfig(t *testing.T) {
if opts.SQLStoreOpts.MaxOpenConns != 5 {
t.Fatalf("Expected SQL MaxOpenConns to be 5, got %v", opts.SQLStoreOpts.MaxOpenConns)
}
if opts.SQLStoreOpts.BulkInsertLimit != 1000 {
t.Fatalf("Expected SQL BulkInsertLimit to be 1000, got %v", opts.SQLStoreOpts.BulkInsertLimit)
}
if !opts.Encrypt {
t.Fatal("Expected Encrypt to be true")
}
Expand Down
111 changes: 109 additions & 2 deletions stores/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ const (
// Limit of number of messages in the cache before message store
// is automatically flushed on a Store() call.
sqlDefaultMsgCacheLimit = 1024

// If bulk insert limit is set, the server will still insert messages
// using tx if the limit is below this threshold.
sqlMinBulkInsertLimit = 5
)

// These are initialized based on the constants that have reasonable values.
Expand Down Expand Up @@ -215,6 +219,11 @@ type SQLStoreOptions struct {
// APIs will cause execution of their respective SQL statements.
NoCaching bool

// If this is non 0, and NoCaching is not enabled, the server will perform
// bulk insert of messages. This is the limit of values added to the SQL statement
// "INSERT INTO Messages (..) VALUES (..)[,(..)*]".
BulkInsertLimit int

// Maximum number of open connections to the database.
// If <= 0, then there is no limit on the number of open connections.
// The default is 0 (unlimited).
Expand All @@ -240,6 +249,14 @@ func SQLNoCaching(noCaching bool) SQLStoreOption {
}
}

// SQLBulkInsertLimit sets the BulkInsertLimit option
func SQLBulkInsertLimit(limit int) SQLStoreOption {
return func(o *SQLStoreOptions) error {
o.BulkInsertLimit = limit
return nil
}
}

// SQLMaxOpenConns sets the MaxOpenConns option
func SQLMaxOpenConns(max int) SQLStoreOption {
return func(o *SQLStoreOptions) error {
Expand All @@ -254,6 +271,7 @@ func SQLAllOptions(opts *SQLStoreOptions) SQLStoreOption {
return func(o *SQLStoreOptions) error {
o.NoCaching = opts.NoCaching
o.MaxOpenConns = opts.MaxOpenConns
o.BulkInsertLimit = opts.BulkInsertLimit
return nil
}
}
Expand All @@ -275,6 +293,8 @@ type SQLStore struct {
wg sync.WaitGroup
preparedStmts []*sql.Stmt
ssFlusher *subStoresFlusher
postgres bool
bulkInserts []string
}

type sqlDBLock struct {
Expand Down Expand Up @@ -410,11 +430,20 @@ func NewSQLStore(log logger.Logger, driver, source string, limits *StoreLimits,
db: db,
doneCh: make(chan struct{}),
preparedStmts: make([]*sql.Stmt, 0, len(sqlStmts)),
postgres: driver == driverPostgres,
}
if err := s.init(TypeSQL, log, limits); err != nil {
s.Close()
return nil, err
}
if s.postgres && opts.BulkInsertLimit > 0 {
limit := opts.BulkInsertLimit
s.bulkInserts = make([]string, limit)
for i := 0; i < limit; i++ {
j := i * 5
s.bulkInserts[i] = fmt.Sprintf("($%d,$%d,$%d,$%d,$%d)", j+1, j+2, j+3, j+4, j+5)
}
}
if err := s.createPreparedStmts(); err != nil {
s.Close()
return nil, err
Expand Down Expand Up @@ -1633,6 +1662,9 @@ func (ms *SQLMsgStore) flush() error {
ps *sql.Stmt
)
defer func() {
if ms.limits.MaxAge > 0 && ms.expireTimer == nil {
ms.createExpireTimer()
}
ms.writeCache.transferToFreeList()
if ps != nil {
ps.Close()
Expand All @@ -1641,6 +1673,9 @@ func (ms *SQLMsgStore) flush() error {
tx.Rollback()
}
}()
if limit := ms.sqlStore.opts.BulkInsertLimit; limit >= sqlMinBulkInsertLimit {
return ms.bulkInsert(limit)
}
tx, err := ms.sqlStore.db.Begin()
if err != nil {
return err
Expand All @@ -1664,8 +1699,80 @@ func (ms *SQLMsgStore) flush() error {
return err
}
tx = nil
if ms.limits.MaxAge > 0 && ms.expireTimer == nil {
ms.createExpireTimer()
return nil
}

// Insert messages with INSERT INTO MESSAGES () VALUES (),(),()...
// This is faster than the original insert with transactions.
// It is done only if user configures the BulkInsertLimit option.
// Lock held on entry.
func (ms *SQLMsgStore) bulkInsert(limit int) error {
const insertStmt = "INSERT INTO Messages (id, seq, timestamp, size, data) VALUES "
const valArgs = "(?,?,?,?,?)"

count := ms.writeCache.count

sb := strings.Builder{}
size := len(insertStmt) + count // number of "," + last ";"
if ms.sqlStore.postgres {
for i := 0; i < limit; i++ {
size += len(ms.sqlStore.bulkInserts[i])
}
} else {
size += count * len(valArgs)
}
sb.Grow(size)
sb.WriteString(insertStmt)

for i := 0; i < limit; i++ {
if i > 0 {
sb.WriteString(",")
}
if ms.sqlStore.postgres {
sb.WriteString(ms.sqlStore.bulkInserts[i])
} else {
sb.WriteString(valArgs)
}
}
sb.WriteString(";")
stmtb := []byte(sb.String())

args := make([]interface{}, 0, 5*count)
start := ms.writeCache.head
for count > 0 {
args = args[:0]
i := 0
l := len(insertStmt)
// Iterate through the cache, but do not remove elements from the list.
// They are needed by the caller.
for cm := start; cm != nil; cm = cm.next {
if i > 0 {
l++
}
if ms.sqlStore.postgres {
l += len(ms.sqlStore.bulkInserts[i])
} else {
l += len(valArgs)
}
args = append(args, ms.channelID, cm.msg.Sequence, cm.msg.Timestamp, len(cm.data), cm.data)
i++
if i == limit {
start = cm.next
break
}
}
count -= i
var stmt string
if i == limit {
stmt = sb.String()
} else {
l++
stmtb[l-1] = ';'
stmt = string(stmtb[:l])
}
if _, err := ms.sqlStore.db.Exec(stmt, args[:i*5]...); err != nil {
return err
}
}
return nil
}
Expand Down
49 changes: 47 additions & 2 deletions stores/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ func TestSQLAllOptions(t *testing.T) {
defer cleanupSQLDatastore(t)

opts := &SQLStoreOptions{
NoCaching: true,
MaxOpenConns: 123,
NoCaching: true,
MaxOpenConns: 123,
BulkInsertLimit: 456,
}
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLAllOptions(opts))
if err != nil {
Expand All @@ -178,6 +179,9 @@ func TestSQLAllOptions(t *testing.T) {
if so.MaxOpenConns != 123 {
t.Fatalf("MaxOpenConns should be 123, got %v", so.MaxOpenConns)
}
if so.BulkInsertLimit != 456 {
t.Fatalf("BulkInsertLimit should be 456, got %v", so.BulkInsertLimit)
}
}

func TestSQLPostgresDriverInit(t *testing.T) {
Expand Down Expand Up @@ -2140,3 +2144,44 @@ func TestSQLMaxAgeForMsgsWithTimestampInPast(t *testing.T) {
}
}
}

func TestSQLBulkInsertLimit(t *testing.T) {
if !doSQL {
t.SkipNow()
}

cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

// Create store with caching enabled and bulk insert limit
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil,
SQLNoCaching(false), SQLBulkInsertLimit(10))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

cs := storeCreateChannel(t, s, "foo")
for seq := uint64(1); seq < 127; seq++ {
msg := &pb.MsgProto{
Sequence: seq,
Subject: "foo",
Data: []byte(fmt.Sprintf("%v", seq)),
Timestamp: time.Now().UnixNano(),
}
if _, err := cs.Msgs.Store(msg); err != nil {
t.Fatalf("Error storing message: %v", err)
}
}
if err := cs.Msgs.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

for seq := uint64(1); seq < 127; seq++ {
m := msgStoreLookup(t, cs.Msgs, seq)
expected := fmt.Sprintf("%v", seq)
if string(m.Data) != expected {
t.Fatalf("Expected %q, got %q", expected, m.Data)
}
}
}
1 change: 1 addition & 0 deletions test/configs/test_parse.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ streaming: {
source: "ivan:pwd@/nss_db"
no_caching: true
max_open_conns: 5
bulk_insert_limit: 1000
}
}

0 comments on commit 20f9b79

Please sign in to comment.