Skip to content

Commit

Permalink
Merge pull request #1140 from nats-io/improve_sql_msg_insert_perf
Browse files Browse the repository at this point in the history
[IMPROVED] Bulk insert of messages
  • Loading branch information
kozlovic committed Jan 8, 2021
2 parents 71d1dbc + 20f9b79 commit 8be03bd
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 4 deletions.
1 change: 1 addition & 0 deletions nats-streaming-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Streaming Server SQL Store Options:
--sql_source <string> Datasource used when opening an SQL connection to the database
--sql_no_caching <bool> Enable/Disable caching for improved performance
--sql_max_open_conns <int> Maximum number of opened connections to the database
--sql_bulk_insert_limit <int> Maximum number of messages stored with a single SQL "INSERT" statement
Streaming Server TLS Options:
-secure <bool> Use a TLS connection to the NATS server without
Expand Down
6 changes: 6 additions & 0 deletions server/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ func parseSQLOptions(itf interface{}, opts *Options) error {
return err
}
opts.SQLStoreOpts.MaxOpenConns = int(v.(int64))
case "bulk_insert_limit":
if err := checkType(name, reflect.Int64, v); err != nil {
return err
}
opts.SQLStoreOpts.BulkInsertLimit = int(v.(int64))
}
}
return nil
Expand Down Expand Up @@ -688,6 +693,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp,
defSQLOpts := stores.DefaultSQLStoreOptions()
fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching")
fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database")
fs.IntVar(&sopts.SQLStoreOpts.BulkInsertLimit, "sql_bulk_insert_limit", 0, "Limit the number of messages inserted in one SQL query")
fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name")
fs.BoolVar(&sopts.Encrypt, "encrypt", false, "Specify if server should use encryption at rest")
fs.StringVar(&sopts.EncryptionCipher, "encryption_cipher", stores.CryptoCipherAutoSelect, "Encryption cipher. Supported are AES and CHACHA (default is AES)")
Expand Down
3 changes: 3 additions & 0 deletions server/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ func TestParseConfig(t *testing.T) {
if opts.SQLStoreOpts.MaxOpenConns != 5 {
t.Fatalf("Expected SQL MaxOpenConns to be 5, got %v", opts.SQLStoreOpts.MaxOpenConns)
}
if opts.SQLStoreOpts.BulkInsertLimit != 1000 {
t.Fatalf("Expected SQL BulkInsertLimit to be 1000, got %v", opts.SQLStoreOpts.BulkInsertLimit)
}
if !opts.Encrypt {
t.Fatal("Expected Encrypt to be true")
}
Expand Down
111 changes: 109 additions & 2 deletions stores/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ const (
// Limit of number of messages in the cache before message store
// is automatically flushed on a Store() call.
sqlDefaultMsgCacheLimit = 1024

// If bulk insert limit is set, the server will still insert messages
// using tx if the limit is below this threshold.
sqlMinBulkInsertLimit = 5
)

// These are initialized based on the constants that have reasonable values.
Expand Down Expand Up @@ -215,6 +219,11 @@ type SQLStoreOptions struct {
// APIs will cause execution of their respective SQL statements.
NoCaching bool

// If this is non 0, and NoCaching is not enabled, the server will perform
// bulk insert of messages. This is the limit of values added to the SQL statement
// "INSERT INTO Messages (..) VALUES (..)[,(..)*]".
BulkInsertLimit int

// Maximum number of open connections to the database.
// If <= 0, then there is no limit on the number of open connections.
// The default is 0 (unlimited).
Expand All @@ -240,6 +249,14 @@ func SQLNoCaching(noCaching bool) SQLStoreOption {
}
}

// SQLBulkInsertLimit sets the BulkInsertLimit option
func SQLBulkInsertLimit(limit int) SQLStoreOption {
return func(o *SQLStoreOptions) error {
o.BulkInsertLimit = limit
return nil
}
}

// SQLMaxOpenConns sets the MaxOpenConns option
func SQLMaxOpenConns(max int) SQLStoreOption {
return func(o *SQLStoreOptions) error {
Expand All @@ -254,6 +271,7 @@ func SQLAllOptions(opts *SQLStoreOptions) SQLStoreOption {
return func(o *SQLStoreOptions) error {
o.NoCaching = opts.NoCaching
o.MaxOpenConns = opts.MaxOpenConns
o.BulkInsertLimit = opts.BulkInsertLimit
return nil
}
}
Expand All @@ -275,6 +293,8 @@ type SQLStore struct {
wg sync.WaitGroup
preparedStmts []*sql.Stmt
ssFlusher *subStoresFlusher
postgres bool
bulkInserts []string
}

type sqlDBLock struct {
Expand Down Expand Up @@ -410,11 +430,20 @@ func NewSQLStore(log logger.Logger, driver, source string, limits *StoreLimits,
db: db,
doneCh: make(chan struct{}),
preparedStmts: make([]*sql.Stmt, 0, len(sqlStmts)),
postgres: driver == driverPostgres,
}
if err := s.init(TypeSQL, log, limits); err != nil {
s.Close()
return nil, err
}
if s.postgres && opts.BulkInsertLimit > 0 {
limit := opts.BulkInsertLimit
s.bulkInserts = make([]string, limit)
for i := 0; i < limit; i++ {
j := i * 5
s.bulkInserts[i] = fmt.Sprintf("($%d,$%d,$%d,$%d,$%d)", j+1, j+2, j+3, j+4, j+5)
}
}
if err := s.createPreparedStmts(); err != nil {
s.Close()
return nil, err
Expand Down Expand Up @@ -1633,6 +1662,9 @@ func (ms *SQLMsgStore) flush() error {
ps *sql.Stmt
)
defer func() {
if ms.limits.MaxAge > 0 && ms.expireTimer == nil {
ms.createExpireTimer()
}
ms.writeCache.transferToFreeList()
if ps != nil {
ps.Close()
Expand All @@ -1641,6 +1673,9 @@ func (ms *SQLMsgStore) flush() error {
tx.Rollback()
}
}()
if limit := ms.sqlStore.opts.BulkInsertLimit; limit >= sqlMinBulkInsertLimit {
return ms.bulkInsert(limit)
}
tx, err := ms.sqlStore.db.Begin()
if err != nil {
return err
Expand All @@ -1664,8 +1699,80 @@ func (ms *SQLMsgStore) flush() error {
return err
}
tx = nil
if ms.limits.MaxAge > 0 && ms.expireTimer == nil {
ms.createExpireTimer()
return nil
}

// Insert messages with INSERT INTO MESSAGES () VALUES (),(),()...
// This is faster than the original insert with transactions.
// It is done only if user configures the BulkInsertLimit option.
// Lock held on entry.
func (ms *SQLMsgStore) bulkInsert(limit int) error {
const insertStmt = "INSERT INTO Messages (id, seq, timestamp, size, data) VALUES "
const valArgs = "(?,?,?,?,?)"

count := ms.writeCache.count

sb := strings.Builder{}
size := len(insertStmt) + count // number of "," + last ";"
if ms.sqlStore.postgres {
for i := 0; i < limit; i++ {
size += len(ms.sqlStore.bulkInserts[i])
}
} else {
size += count * len(valArgs)
}
sb.Grow(size)
sb.WriteString(insertStmt)

for i := 0; i < limit; i++ {
if i > 0 {
sb.WriteString(",")
}
if ms.sqlStore.postgres {
sb.WriteString(ms.sqlStore.bulkInserts[i])
} else {
sb.WriteString(valArgs)
}
}
sb.WriteString(";")
stmtb := []byte(sb.String())

args := make([]interface{}, 0, 5*count)
start := ms.writeCache.head
for count > 0 {
args = args[:0]
i := 0
l := len(insertStmt)
// Iterate through the cache, but do not remove elements from the list.
// They are needed by the caller.
for cm := start; cm != nil; cm = cm.next {
if i > 0 {
l++
}
if ms.sqlStore.postgres {
l += len(ms.sqlStore.bulkInserts[i])
} else {
l += len(valArgs)
}
args = append(args, ms.channelID, cm.msg.Sequence, cm.msg.Timestamp, len(cm.data), cm.data)
i++
if i == limit {
start = cm.next
break
}
}
count -= i
var stmt string
if i == limit {
stmt = sb.String()
} else {
l++
stmtb[l-1] = ';'
stmt = string(stmtb[:l])
}
if _, err := ms.sqlStore.db.Exec(stmt, args[:i*5]...); err != nil {
return err
}
}
return nil
}
Expand Down
49 changes: 47 additions & 2 deletions stores/sqlstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ func TestSQLAllOptions(t *testing.T) {
defer cleanupSQLDatastore(t)

opts := &SQLStoreOptions{
NoCaching: true,
MaxOpenConns: 123,
NoCaching: true,
MaxOpenConns: 123,
BulkInsertLimit: 456,
}
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil, SQLAllOptions(opts))
if err != nil {
Expand All @@ -178,6 +179,9 @@ func TestSQLAllOptions(t *testing.T) {
if so.MaxOpenConns != 123 {
t.Fatalf("MaxOpenConns should be 123, got %v", so.MaxOpenConns)
}
if so.BulkInsertLimit != 456 {
t.Fatalf("BulkInsertLimit should be 456, got %v", so.BulkInsertLimit)
}
}

func TestSQLPostgresDriverInit(t *testing.T) {
Expand Down Expand Up @@ -2140,3 +2144,44 @@ func TestSQLMaxAgeForMsgsWithTimestampInPast(t *testing.T) {
}
}
}

func TestSQLBulkInsertLimit(t *testing.T) {
if !doSQL {
t.SkipNow()
}

cleanupSQLDatastore(t)
defer cleanupSQLDatastore(t)

// Create store with caching enabled and bulk insert limit
s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, nil,
SQLNoCaching(false), SQLBulkInsertLimit(10))
if err != nil {
t.Fatalf("Error creating store: %v", err)
}
defer s.Close()

cs := storeCreateChannel(t, s, "foo")
for seq := uint64(1); seq < 127; seq++ {
msg := &pb.MsgProto{
Sequence: seq,
Subject: "foo",
Data: []byte(fmt.Sprintf("%v", seq)),
Timestamp: time.Now().UnixNano(),
}
if _, err := cs.Msgs.Store(msg); err != nil {
t.Fatalf("Error storing message: %v", err)
}
}
if err := cs.Msgs.Flush(); err != nil {
t.Fatalf("Error on flush: %v", err)
}

for seq := uint64(1); seq < 127; seq++ {
m := msgStoreLookup(t, cs.Msgs, seq)
expected := fmt.Sprintf("%v", seq)
if string(m.Data) != expected {
t.Fatalf("Expected %q, got %q", expected, m.Data)
}
}
}
1 change: 1 addition & 0 deletions test/configs/test_parse.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ streaming: {
source: "ivan:pwd@/nss_db"
no_caching: true
max_open_conns: 5
bulk_insert_limit: 1000
}
}

0 comments on commit 8be03bd

Please sign in to comment.