From 3d90c89abab0de21d596e56da07872758cd93de7 Mon Sep 17 00:00:00 2001 From: art2ip <126820369+art2ip@users.noreply.github.com> Date: Tue, 10 Oct 2023 10:16:36 -0700 Subject: [PATCH] Enhancements in go client library. --- cmd/dbscanserv/prime/replicate.go | 35 ++--- cmd/dbscanserv/prime/result.go | 12 +- cmd/tools/cmd/cfg/rtcfg.go | 6 +- cmd/tools/cmd/cli/cli.go | 11 +- cmd/tools/cmd/cli/sscli.go | 9 +- internal/cli/conn.go | 107 +++++++++++++ internal/cli/{proc.go => connector.go} | 122 ++++++++++----- internal/cli/error.go | 29 +++- internal/cli/processor.go | 201 ++++++++++++++++++++----- internal/cli/tracker.go | 8 +- pkg/client/README.md | 164 ++++++++++++++++++++ pkg/client/client.go | 65 ++------ pkg/client/clientimpl.go | 188 +++++++++++++---------- pkg/client/config.go | 87 ++++------- pkg/client/error.go | 24 +-- test/cli/basic_test.go | 145 ++++++++++++++++++ test/cli/cond_test.go | 54 +++++++ test/cli/define.go | 28 ++++ test/cli/main_test.go | 81 ++++++++++ test/cli/util.go | 181 ++++++++++++++++++++++ test/drv/bulkload/bulkload.go | 10 +- test/drv/junoload/junoload.go | 48 +++--- test/functest/setup_test.go | 12 +- test/testutil/server/server.go | 3 +- test/unittest/setup_test.go | 4 +- 25 files changed, 1287 insertions(+), 347 deletions(-) create mode 100644 internal/cli/conn.go rename internal/cli/{proc.go => connector.go} (72%) create mode 100644 pkg/client/README.md create mode 100644 test/cli/basic_test.go create mode 100644 test/cli/cond_test.go create mode 100644 test/cli/define.go create mode 100644 test/cli/main_test.go create mode 100644 test/cli/util.go diff --git a/cmd/dbscanserv/prime/replicate.go b/cmd/dbscanserv/prime/replicate.go index 2a4d392c..134ce707 100644 --- a/cmd/dbscanserv/prime/replicate.go +++ b/cmd/dbscanserv/prime/replicate.go @@ -21,7 +21,6 @@ package prime import ( "errors" - "runtime" "time" "github.com/paypal/junodb/third_party/forked/golang/glog" @@ -34,7 +33,7 @@ import ( var ( secConfig *sec.Config - processor []*cli.Processor + processor *cli.Processor inChan = make(chan *proto.OperationalMessage, 1000) outChan = make(chan bool, 1000) ) @@ -57,39 +56,31 @@ func InitReplicator(proxyAddr string, numConns int) { } if numConns <= 0 { - numConns = 1 + numConns = 2 } if numConns > 4 { numConns = 4 } - processor = make([]*cli.Processor, numConns) - for i := 0; i < numConns; i++ { + processor = cli.NewProcessor( + io.ServiceEndpoint{Addr: proxyAddr, SSLEnabled: secConfig != nil}, + "dbscan", + numConns, // connPoolSize + time.Duration(500*time.Millisecond), // ConnectTimeout + time.Duration(1000*time.Millisecond), // ResponseTimeout + nil) // GetTLSConfig - processor[i] = cli.NewProcessor( - io.ServiceEndpoint{Addr: proxyAddr, SSLEnabled: secConfig != nil}, - "dbscan", - time.Duration(500*time.Millisecond), // ConnectTimeout - time.Duration(1000*time.Millisecond), // RequestTimeout - time.Duration(60*time.Second)) // ConnectRecycleTimeout - - processor[i].Start() - - runtime.SetFinalizer(processor[i], func(p *cli.Processor) { - p.Close() - }) - - go processRequest(i) - } + processor.Start() + go processRequest() } -func processRequest(k int) { +func processRequest() { count := uint64(0) for { select { case op := <-inChan: for i := 0; i < 3; i++ { - reply, err := processor[k].ProcessRequest(op) + reply, err := processor.ProcessRequest(op) if err != nil { if i < 2 { time.Sleep(10 * time.Millisecond) diff --git a/cmd/dbscanserv/prime/result.go b/cmd/dbscanserv/prime/result.go index 9e589903..6d629916 100644 --- a/cmd/dbscanserv/prime/result.go +++ b/cmd/dbscanserv/prime/result.go @@ -88,14 +88,10 @@ func RepairKey(key []byte, display bool) bool { if !found { clientCfg := client.Config{ - Appname: "dbscan", - Namespace: ns, - RetryCount: 1, - ConnectTimeout: util.Duration{500 * time.Millisecond}, - ReadTimeout: util.Duration{500 * time.Millisecond}, - WriteTimeout: util.Duration{500 * time.Millisecond}, - RequestTimeout: util.Duration{1000 * time.Millisecond}, - ConnRecycleTimeout: util.Duration{60 * time.Second}, + Appname: "dbscan", + Namespace: ns, + ConnectTimeout: util.Duration{500 * time.Millisecond}, + ResponseTimeout: util.Duration{1000 * time.Millisecond}, } clientCfg.Server.Addr = proxyAddr diff --git a/cmd/tools/cmd/cfg/rtcfg.go b/cmd/tools/cmd/cfg/rtcfg.go index b1bdb2e0..80bda9ca 100644 --- a/cmd/tools/cmd/cfg/rtcfg.go +++ b/cmd/tools/cmd/cfg/rtcfg.go @@ -31,6 +31,7 @@ import ( "github.com/paypal/junodb/pkg/client" "github.com/paypal/junodb/pkg/cmd" "github.com/paypal/junodb/pkg/etcd" + "github.com/paypal/junodb/pkg/util" ) const ( @@ -78,7 +79,10 @@ func (c *cmdRuntimeConfig) Parse(args []string) (err error) { return } } - c.clientConfig.SetDefault() + c.clientConfig.DefaultTimeToLive = 1800 + c.clientConfig.ConnPoolSize = 1 + c.clientConfig.ConnectTimeout = util.Duration{1000 * time.Millisecond} + c.clientConfig.ResponseTimeout = util.Duration{1000 * time.Millisecond} if cfg, e := c.config.GetConfig("Juno"); e == nil { cfg.WriteTo(&c.clientConfig) diff --git a/cmd/tools/cmd/cli/cli.go b/cmd/tools/cmd/cli/cli.go index d90f1490..3de3c6bf 100644 --- a/cmd/tools/cmd/cli/cli.go +++ b/cmd/tools/cmd/cli/cli.go @@ -47,13 +47,10 @@ const ( ) var defaultConfig = client.Config{ - RetryCount: 1, - DefaultTimeToLive: 1800, - ConnectTimeout: util.Duration{100 * time.Millisecond}, - ReadTimeout: util.Duration{500 * time.Millisecond}, - WriteTimeout: util.Duration{500 * time.Millisecond}, - RequestTimeout: util.Duration{1000 * time.Millisecond}, - ConnRecycleTimeout: util.Duration{9 * time.Second}, + DefaultTimeToLive: 1800, + ConnPoolSize: 1, + ConnectTimeout: util.Duration{100 * time.Millisecond}, + ResponseTimeout: util.Duration{1000 * time.Millisecond}, } type ( diff --git a/cmd/tools/cmd/cli/sscli.go b/cmd/tools/cmd/cli/sscli.go index 2d29078e..861765a7 100644 --- a/cmd/tools/cmd/cli/sscli.go +++ b/cmd/tools/cmd/cli/sscli.go @@ -37,8 +37,8 @@ import ( ) const ( - connectTimeout = 100 * time.Millisecond - requestTimeout = 1000 * time.Millisecond + connectTimeout = 100 * time.Millisecond + responseTimeout = 1000 * time.Millisecond ) type ( @@ -173,9 +173,10 @@ func (s *shardOptionsT) getShardId(key []byte) uint16 { func newProcessor(cfg *client.Config) *cli.Processor { processor := cli.NewProcessor(cfg.Server, cfg.Appname, + 1, // connPoolSize cfg.ConnectTimeout.Duration, - cfg.RequestTimeout.Duration, - cfg.ConnRecycleTimeout.Duration) + cfg.ResponseTimeout.Duration, + nil) // GetTLSConfig processor.Start() return processor } diff --git a/internal/cli/conn.go b/internal/cli/conn.go new file mode 100644 index 00000000..61f6f1eb --- /dev/null +++ b/internal/cli/conn.go @@ -0,0 +1,107 @@ +// +// Copyright 2023 PayPal Inc. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cli + +import ( + "crypto/tls" + "errors" + "fmt" + "net" + "time" + + "github.com/paypal/junodb/pkg/logging" + "github.com/paypal/junodb/pkg/logging/cal" +) + +func TLSInitialized() bool { + return false +} + +func Dial(addr string, timeout time.Duration, getTLSConfig func() *tls.Config) (conn net.Conn, err error) { + var tlsConn *tls.Conn + + if getTLSConfig == nil { + return nil, errors.New("Unable to get TLS config") + } + timeStart := time.Now() + tlsCfg := getTLSConfig() + if tlsCfg == nil { + err = errors.New("Unable to get TLS config") + } else { + dialer := &net.Dialer{Timeout: timeout} + tlsConn, err = tls.DialWithDialer(dialer, "tcp", addr, tlsCfg) + conn = tlsConn + if tlsConn == nil && err == nil { + err = errors.New("Connect failed.") + } + } + + if !cal.IsEnabled() { + return conn, err + } + + // Cal logging + status := cal.StatusSuccess + b := logging.NewKVBuffer() + if err != nil { + status = cal.StatusError + b.Add([]byte("err"), err.Error()) + } else { + b.Add([]byte("ssl"), getConnectionState(tlsConn)) + } + + cal.AtomicTransaction(cal.TxnTypeConnect, addr, status, + time.Since(timeStart), b.Bytes()) + + return conn, err +} + +func getConnectionState(c *tls.Conn) string { + if c == nil { + return "" + } + + st := c.ConnectionState() + rid := 0 + if st.DidResume { + rid = 1 + } + msg := fmt.Sprintf("GoTLS:%s:%s:ssl_r=%d", getVersionName(st.Version), + tls.CipherSuiteName(st.CipherSuite), rid) + + return msg +} + +func getVersionName(ver uint16) string { + switch ver { + case tls.VersionSSL30: + return "SSLv3" + case tls.VersionTLS10: + return "TLSv1" + case tls.VersionTLS11: + return "TLSv1.1" + case tls.VersionTLS12: + return "TLSv1.2" + case tls.VersionTLS13: + return "TLSv1.3" + default: + return "" + } +} diff --git a/internal/cli/proc.go b/internal/cli/connector.go similarity index 72% rename from internal/cli/proc.go rename to internal/cli/connector.go index d67c13b9..913d5021 100644 --- a/internal/cli/proc.go +++ b/internal/cli/connector.go @@ -20,9 +20,13 @@ package cli import ( + "crypto/tls" "io" + + "math/rand" "net" "os" + "sync/atomic" "syscall" "time" @@ -42,6 +46,14 @@ type ( } ) +var ( + connCount int64 +) + +func init() { + rand.Seed(time.Now().UTC().UnixNano()) +} + func (c *Connection) Close() { if c.conn != nil { c.conn.Close() @@ -72,6 +84,8 @@ func (c *Connection) Shutdown() { c.conn.Close() } c.conn = nil + c.beingRecycle = false + glog.Debugf("Close connCount=%d", atomic.AddInt64(&connCount, -1)) } } @@ -137,53 +151,79 @@ func startResponseReader(r io.ReadCloser) <-chan *ReaderResponse { func StartRequestProcessor( server junoio.ServiceEndpoint, sourceName string, + connDone chan<- bool, + connIndex int, connectTimeout time.Duration, - requestTimeout time.Duration, - connRecycleTimeout time.Duration, + responseTimeout time.Duration, + getTLSConfig func() *tls.Config, chDone <-chan bool, chRequest <-chan *RequestContext) (chProcessorDone <-chan bool) { ch := make(chan bool) - go doRequestProcess(server, sourceName, connectTimeout, requestTimeout, connRecycleTimeout, chDone, ch, chRequest) + go doRequestProcess(server, sourceName, connDone, connIndex, + connectTimeout, responseTimeout, getTLSConfig, chDone, ch, chRequest) + return ch } +func resetRecycleTimer( + server junoio.ServiceEndpoint, + connIndex int, + responseTimeout time.Duration, + connRecycleTimer *util.TimerWrapper) { + if connRecycleTimeout <= 0 { + return + } + + t := connRecycleTimeout + // Reduced by a random value between 0 to 20%. + t = t * time.Duration(1000-rand.Intn(200)) / 1000 + if t < 2*responseTimeout { + t = 2 * responseTimeout + } + glog.Debugf("connection=%d addr=%s", connIndex, server.Addr) + connRecycleTimer.Reset(t) +} + func doRequestProcess( server junoio.ServiceEndpoint, sourceName string, + connDone chan<- bool, + connIndex int, connectTimeout time.Duration, - requestTimeout time.Duration, - connRecycleTimeout time.Duration, + responseTimeout time.Duration, + getTLSConfig func() *tls.Config, chDone <-chan bool, chDoneNotify chan<- bool, chRequest <-chan *RequestContext) { - if connRecycleTimeout != 0 { - if connRecycleTimeout < requestTimeout+requestTimeout { - connRecycleTimeout = requestTimeout + requestTimeout - glog.Infof("conntion recycle timeout adjusted to be %s", connRecycleTimeout) - } - } + glog.Debugf("Start connection %d", connIndex) + connRecycleTimer := util.NewTimerWrapper(connRecycleTimeout) active := &Connection{} recycled := &Connection{} - connect := func() (err error) { + connect := func() error { var conn net.Conn - conn, err = junoio.Connect(&server, connectTimeout) + var err error + if server.SSLEnabled && getTLSConfig != nil { + conn, err = Dial(server.Addr, connectTimeout, getTLSConfig) + } else { + conn, err = junoio.Connect(&server, connectTimeout) + } if err != nil { - return + return err } active.conn = conn - active.tracker = newPendingTracker(requestTimeout) + active.tracker = newPendingTracker(responseTimeout) active.chReaderResponse = startResponseReader(conn) - if connRecycleTimeout != 0 { - glog.Debugf("connection recycle in %s", connRecycleTimeout) - connRecycleTimer.Reset(connRecycleTimeout) - } else { - glog.Debugf("connection won't be recycled") - } - return + resetRecycleTimer( + server, + connIndex, + responseTimeout, + connRecycleTimer) + glog.Debugf("Open connCount=%d", atomic.AddInt64(&connCount, 1)) + return nil } var sequence uint32 @@ -191,24 +231,37 @@ func doRequestProcess( var err error connect() + connDone <- true -loop: for { select { case <-chDone: glog.Verbosef("proc done channel got notified") - active.Shutdown() - break loop + active.Shutdown() ///TODO to revisit + return case _, ok := <-connRecycleTimer.GetTimeoutCh(): + connRecycleTimer.Stop() if ok { glog.Debug("connection recycle timer fired") - connRecycleTimer.Stop() + recycled.Shutdown() recycled = active - recycled.beingRecycle = true active = &Connection{} err = connect() if err != nil { glog.Error(err) + active = recycled // reuse current one. + recycled = &Connection{} + resetRecycleTimer( + server, + connIndex, + responseTimeout, + connRecycleTimer) + } else { + recycled.beingRecycle = true + if recycled.tracker != nil && + len(recycled.tracker.pendingQueue) == 0 { + recycled.Shutdown() + } } } else { glog.Errorf("connection recycle timer not ok") @@ -224,7 +277,7 @@ loop: if ok { recycled.tracker.OnTimeout(now) if len(recycled.tracker.pendingQueue) == 0 { - glog.Debugf("close write for the recybled connection as it has handled all the pending request(s)") + // close write for the recybled connection as it has handled all the pending request(s)") recycled.Shutdown() } else { glog.Debugf("being recycled request timeout") @@ -235,10 +288,10 @@ loop: } case r, ok := <-chRequest: - if !ok { // shouldn't happen as it won't be closed - break loop + if !ok { // shouldn't happen + continue // ignore } - glog.Verbosef("processor got request") + glog.Debugf("connection %d got request", connIndex) var err error if active.conn == nil { @@ -250,14 +303,14 @@ loop: req := r.GetRequest() if req == nil { glog.Error("nil request") - return + continue // ignore } req.SetSource(saddr.IP, uint16(saddr.Port), []byte(sourceName)) sequence++ var raw proto.RawMessage if err = req.Encode(&raw); err != nil { glog.Errorf("encoding error %s", err) - return + continue // ignore } raw.SetOpaque(sequence) @@ -268,7 +321,8 @@ loop: active.Close() } } else { - r.ReplyError(err) + ErrConnect.SetError(err.Error()) + r.ReplyError(ErrConnect) } case readerResp, ok := <-active.chReaderResponse: if ok { diff --git a/internal/cli/error.go b/internal/cli/error.go index 3bd8d263..26cb04a9 100644 --- a/internal/cli/error.go +++ b/internal/cli/error.go @@ -19,7 +19,10 @@ package cli -import () +import ( + "errors" + "sync" +) type IRetryable interface { Retryable() bool @@ -29,6 +32,20 @@ type Error struct { What string } +var ( + ErrConnect = &Error{"connection error"} + ErrResponseTimeout = &Error{"response timeout"} + rwMutex sync.RWMutex +) + +func specialError(err error) bool { + if errors.Is(err, ErrConnect) || + errors.Is(err, ErrResponseTimeout) { + return true + } + return false +} + func (e *Error) Retryable() bool { return false } type RetryableError struct { @@ -38,6 +55,8 @@ type RetryableError struct { func (e *RetryableError) Retryable() bool { return true } func (e *Error) Error() string { + rwMutex.RLock() + defer rwMutex.RUnlock() return "error: " + e.What } @@ -45,6 +64,12 @@ func (e *RetryableError) Error() string { return "error: " + e.What } +func (e *Error) SetError(v string) { + rwMutex.Lock() + e.What = v + rwMutex.Unlock() +} + func NewError(err error) *Error { return &Error{ What: err.Error(), @@ -52,7 +77,7 @@ func NewError(err error) *Error { } func NewErrorWithString(err string) *Error { - return &Error{err} + return &Error{What: err} } /* diff --git a/internal/cli/processor.go b/internal/cli/processor.go index cb7ee3ad..27cb75a1 100644 --- a/internal/cli/processor.go +++ b/internal/cli/processor.go @@ -20,9 +20,10 @@ package cli import ( + "crypto/tls" "fmt" - "os" "sync" + "sync/atomic" "time" "github.com/paypal/junodb/third_party/forked/golang/glog" @@ -37,6 +38,34 @@ type IOError struct { Err error } +var ( + connRecycleTimeout = time.Duration(0 * time.Second) +) + +func SetConnectRecycleTimeout(recycleTimeout time.Duration) { + if recycleTimeout < 0 { + return + } + if recycleTimeout > 0 { + if recycleTimeout < time.Duration(5*time.Second) { + recycleTimeout = time.Duration(5 * time.Second) + } + if recycleTimeout > time.Duration(90*time.Second) { + recycleTimeout = time.Duration(90 * time.Second) + } + } + connRecycleTimeout = recycleTimeout + glog.Debugf("Set conn_recycle_timeout=%v", connRecycleTimeout) +} + +func SetDefaultRecycleTimeout(defaultTimeout time.Duration) { + if connRecycleTimeout > 0 { + glog.Infof("Default conn_recycle_timeout=%v", connRecycleTimeout) + return // already set + } + connRecycleTimeout = defaultTimeout +} + func (e *IOError) Retryable() bool { return true } func (e *IOError) Error() string { @@ -48,89 +77,194 @@ var ( kCalTxnType = "JUNO_CLIENT" kCalSslTxnType = "JUNO_SSL_CLIENT" + + poolMap = make(map[string]*Processor, 10) + execCount int64 + mutex sync.Mutex ) type Processor struct { + PoolSize int server io.ServiceEndpoint sourceName string + connIndex int - connectTimeout time.Duration - requestTimeout time.Duration - connRecycleTimeout time.Duration + connectTimeout time.Duration + responseTimeout time.Duration + getTLSConfig func() *tls.Config chDone chan bool chProcDone <-chan bool chRequest chan *RequestContext startOnce sync.Once + moreConns []*Processor +} + +func addProcessorExecutor(c *Processor) { + if c.PoolSize == 1 { + atomic.AddInt64(&execCount, 1) + return + } + poolMap[c.server.Addr] = c +} + +func getProcessor(addr string) *Processor { + c, found := poolMap[addr] + if !found { + return nil + } + return c +} + +func decrementExecutor() { + atomic.AddInt64(&execCount, -1) +} + +func ShowProcStats() { + glog.Infof("pool_count=%d", len(poolMap)) + mutex.Lock() + defer mutex.Unlock() + + for k, v := range poolMap { + glog.Infof("addr=%s, pool_size=%d", k, v.PoolSize) + } + + count := atomic.LoadInt64(&execCount) + glog.Infof("exec_count=%d", count) } func NewProcessor( server io.ServiceEndpoint, sourceName string, + connPoolSize int, connectTimeout time.Duration, - requestTimeout time.Duration, - connRecycleTimeout time.Duration) *Processor { + responseTimeout time.Duration, + getTLSConfig func() *tls.Config) *Processor { + + if connPoolSize < 1 { + connPoolSize = 2 + } + if connPoolSize > 20 { + connPoolSize = 20 + } + if connPoolSize > 1 { + mutex.Lock() + defer mutex.Unlock() + proc := getProcessor(server.Addr) + if proc != nil { + return proc + } // else need to create processor + } + + if connectTimeout == 0 { + connectTimeout = time.Duration(2000 * time.Millisecond) + } + + glog.Debugf("addr=%s pool_size=%d connect_timeout=%dms response_timeout=%dms", + server.Addr, connPoolSize, connectTimeout.Nanoseconds()/int64(1e6), + responseTimeout.Nanoseconds()/int64(1e6)) + chDone := make(chan bool) + chRequest := make(chan *RequestContext, kMaxRequestChanBufferSize) c := &Processor{ - server: server, - sourceName: sourceName, - connectTimeout: connectTimeout, - requestTimeout: requestTimeout, - connRecycleTimeout: connRecycleTimeout, - chDone: make(chan bool), - chRequest: make(chan *RequestContext, kMaxRequestChanBufferSize), + PoolSize: connPoolSize, + server: server, + sourceName: sourceName, + connIndex: 0, + connectTimeout: connectTimeout, + responseTimeout: responseTimeout, + getTLSConfig: getTLSConfig, + chDone: chDone, + chRequest: chRequest, + moreConns: make([]*Processor, connPoolSize-1), } + for i := 0; i < connPoolSize-1; i++ { + c.moreConns[i] = &Processor{ + PoolSize: connPoolSize, + server: server, + sourceName: sourceName, + connIndex: i + 1, + connectTimeout: connectTimeout, + responseTimeout: responseTimeout, + getTLSConfig: getTLSConfig, + chDone: chDone, + chRequest: chRequest, + } + } + addProcessorExecutor(c) return c } func (c *Processor) Start() { + var connDone = make(chan bool, len(c.moreConns)+1) c.startOnce.Do(func() { c.chProcDone = StartRequestProcessor( - c.server, c.sourceName, c.connectTimeout, c.requestTimeout, c.connRecycleTimeout, c.chDone, c.chRequest) + c.server, c.sourceName, connDone, c.connIndex, + c.connectTimeout, c.responseTimeout, c.getTLSConfig, c.chDone, c.chRequest) + for _, p := range c.moreConns { + p.chProcDone = StartRequestProcessor( + p.server, p.sourceName, connDone, p.connIndex, + p.connectTimeout, p.responseTimeout, c.getTLSConfig, p.chDone, p.chRequest) + } + if len(c.moreConns) > 0 { + glog.Infof("conn_count=%d", len(c.moreConns)+1) + } + if len(c.moreConns) == 0 { + return + } + timer := time.NewTimer(c.connectTimeout) + defer timer.Stop() + select { + case <-connDone: + case <-timer.C: + } }) } -///TODO revisit func (c *Processor) Close() { + if c.PoolSize > 1 { + // Connection pool is persistent + return + } close(c.chDone) <-c.chProcDone + decrementExecutor() } -func (c *Processor) sendWithResponseChannel(chResponse chan IResponseContext, m *proto.OperationalMessage) (ok bool) { +func (c *Processor) sendWithResponseChannel(chResponse chan IResponseContext, m *proto.OperationalMessage) error { + var err error select { case c.chRequest <- NewRequestContext(m, chResponse): - ok = true default: - ok = false + err = fmt.Errorf("Likely queue full, qlen=%d ps=%d", len(c.chRequest), c.PoolSize) } + if glog.LOG_VERBOSE { opcode := m.GetOpCode() buf := logging.NewKVBufferForLog() if opcode != proto.OpCodeNop { buf.AddReqIdString(m.GetRequestIDString()) } - if ok { + if err == nil { glog.Verbosef("proc <- %s %s", opcode.String(), buf.String()) } else { glog.Verbosef("Failed: proc <- %s %s", opcode.String(), buf.String()) } } - return + return err } -func (c *Processor) send(request *proto.OperationalMessage) (chResponse <-chan IResponseContext, ok bool) { +func (c *Processor) send(request *proto.OperationalMessage) (<-chan IResponseContext, error) { ch := make(chan IResponseContext) - chResponse = ch - ok = c.sendWithResponseChannel(ch, request) - return + return ch, c.sendWithResponseChannel(ch, request) } func (c *Processor) ProcessRequest(request *proto.OperationalMessage) (resp *proto.OperationalMessage, err error) { timeStart := time.Now() - glog.Verbosef("process request rid=%s", request.GetRequestIDString()) - if ch, sent := c.send(request); sent { + ch, err := c.send(request) + if err == nil { if r, ok := <-ch; ok { resp = r.GetResponse() err = r.GetError() @@ -138,10 +272,9 @@ func (c *Processor) ProcessRequest(request *proto.OperationalMessage) (resp *pro resp = nil err = fmt.Errorf("response channel closed by request processor") } - } else { - err = fmt.Errorf("fail to send request") } - if err != nil { + + if err != nil && !specialError(err) { err = &IOError{err} } if cal.IsEnabled() { @@ -153,16 +286,16 @@ func (c *Processor) ProcessRequest(request *proto.OperationalMessage) (resp *pro } rht := time.Since(timeStart) - // Get user name from OS to log in CAL - username := os.Getenv("USER") - if err == nil { status := resp.GetOpStatus() b := logging.NewKVBuffer() - b.AddOpRequestResponseInfoWithUser(request, resp, username) + b.AddOpRequestResponseInfo(request, resp) cal.AtomicTransaction(txnType, request.GetOpCode().String(), logging.CalStatus(status).CalStatus(), rht, b.Bytes()) } else { - cal.AtomicTransaction(txnType, request.GetOpCode().String(), cal.StatusError, rht, []byte(err.Error())) ///TODO to change: data to cal + tail := fmt.Sprintf("raddr=%s&res_timeout=%dms&ns=%s&%s", c.server.Addr, + c.responseTimeout.Nanoseconds()/int64(1e6), request.GetNamespace(), err.Error()) + cal.AtomicTransaction(txnType, request.GetOpCode().String(), + cal.StatusError, rht, []byte(tail)) ///TODO to change: data to cal } } diff --git a/internal/cli/tracker.go b/internal/cli/tracker.go index 28af2b13..fdced7dc 100644 --- a/internal/cli/tracker.go +++ b/internal/cli/tracker.go @@ -86,12 +86,12 @@ func (p *PendingTracker) OnTimeout(now time.Time) { } else { // st.cancelFunc() seq := pr.sequence - err := fmt.Errorf("request timeout") + err := ErrResponseTimeout if _, found := p.mapRequestsSent[seq]; found { req := pr.reqCtx.request if req != nil { - glog.Warningf("Timeout <- server: %s elapsed=%d,rid=%s", - req.GetOpCode(), now.Sub(pr.timeSent), pr.reqCtx.request.GetRequestIDString()) + glog.Debugf("Timeout <- server: %s elapsed=%dns,rid=%s", + req.GetOpCode(), now.Sub(pr.timeSent).Nanoseconds(), pr.reqCtx.request.GetRequestIDString()) } pr.reqCtx.ReplyError(err) delete(p.mapRequestsSent, seq) @@ -119,7 +119,7 @@ func (p *PendingTracker) OnResonseReceived(readerResp *ReaderResponse) { pending.reqCtx.Reply(resp) pending.reqCtx = nil } else { - glog.Warningf("no pending response found. seq:%d,rid=%s\n", connSequence, resp.GetRequestIDString()) + glog.Debugf("No pending response found. seq:%d,rid=%s", connSequence, resp.GetRequestIDString()) } } else { p.responseTimer.Stop() ///TODO diff --git a/pkg/client/README.md b/pkg/client/README.md new file mode 100644 index 00000000..742b9488 --- /dev/null +++ b/pkg/client/README.md @@ -0,0 +1,164 @@ +[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) + +# Juno Golang SDK + +## Sample Code + +``` +package main + +import ( + "crypto/tls" + "fmt" + "time" + + "github.com/paypal/junodb/pkg/client" + cal "github.com/paypal/junodb/pkg/logging/cal/config" + "github.com/paypal/junodb/pkg/util" +) + +// addr is a Juno server endpoint in the form "ip:port". +// getTLSConfig is a func to get *tls.Config. +func createClient(addr string, getTLSConfig func() *tls.Config) (client.IClient, error) { + + cfg := client.Config{ + Appname: "example", + Namespace: "example_namespace", + DefaultTimeToLive: 60, // seconds + ConnectTimeout: util.Duration{1000 * time.Millisecond}, + ResponseTimeout: util.Duration{500 * time.Millisecond}, + } + + cfg.Server.Addr = addr + cfg.Server.SSLEnabled = true // Set to true if addr has an SSL port. + + client, err := client.NewWithTLS(cfg, getTLSConfig) + return client, err +} + +// Show metadata. +func showInfo(ctx client.IContext) { + fmt.Printf("v=%d ct=%d ttl=%d\n", ctx.GetVersion(), ctx.GetCreationTime(), + ctx.GetTimeToLive()) +} + +func basicAPI(cli client.IClient) { + key := []byte("test_key") + val := []byte("test_payload") + ctx, err := cli.Create(key, val) + if err != nil { + // log error + } + + // Update val slice before call Update + ctx, err = cli.Update(key, val) + if err == nil { + showInfo(ctx) + } else if err != client.ErrNoKey { + // log error + } + + _, err = cli.Set(key, val) + if err != nil { + // log error + } + + val, _, err = cli.Get(key) + if err != nil && err != client.ErrNoKey { + // log error + } + + err = cli.Destroy(key) + if err != nil { + // log error + } +} + +// Extend TTL if the value of WithTTL is greater than the current. +func basicAPIwithTTL(cli client.IClient) { + + key := []byte("test_key") + val := []byte("test_Payload") + ctx, err := cli.Create(key, val, client.WithTTL(uint32(100))) + if err == nil { + showInfo(ctx) + } + + // Update val slice before call Update + ctx, err = cli.Update(key, val, client.WithTTL(uint32(150))) + if err == nil { + showInfo(ctx) + } + + ctx, err = cli.Set(key, val, client.WithTTL(uint32(200))) + if err == nil { + showInfo(ctx) + } + + val, ctx, err = cli.Get(key, client.WithTTL(uint32(500))) + if err == nil { + showInfo(ctx) + } + + err = cli.Destroy(key) + if err != nil { + // log error + } +} + +// Test conditional update based on record version. +func condUpdate(cli client.IClient) error { + key := []byte("new_key") + val := []byte("new_payload") + + ctx, err := cli.Create(key, val) + if err != nil { + return err + } + + ctx, err = cli.Update(key, val) + if err != nil { + return err + } + + // Update succeeds if current record version is equal to ctx.GetVersion(). + // After the update, record version is incremented. + _, err = cli.Update(key, val, client.WithCond(ctx)) + if err != nil { + return err + } + + // Expect ErrConditionViolation + // because current record version is not equal to ctx.GetVersion(). + _, err = cli.Update(key, val, client.WithCond(ctx)) + if err != client.ErrConditionViolation { + return err + } + + err = cli.Destroy(key) + if err != nil { + return err + } + return nil +} + +func main() { + // Init variables + var addr string // = ... + var getTLSConfig func() *tls.Config // = ... + + // A client object should be created only once per unique addr. + cli, err := createClient(addr, getTLSConfig) + if err != nil { + // log error + return + } + + basicAPI(cli) + basicAPIwithTTL(cli) + if err := condUpdate(cli); err != nil { + // log error + } +} +``` + diff --git a/pkg/client/client.go b/pkg/client/client.go index 9e078650..ed93763d 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -20,57 +20,42 @@ /* package client implements Juno client API. -possible returned error if client successfully received the response from Proxy +Possible returned errors. - Create + Common Errors * nil * ErrBadMsg * ErrBadParam - * ErrInternal * ErrBusy + * ErrConnect + * ErrInternal * ErrNoStorage - * ErrUniqueKeyViolation + * ErrResponseTimeout + + Create + * Common Errors * ErrRecordLocked * ErrWriteFailure + * ErrUniqueKeyViolation Get - * nil - * ErrBadMsg - * ErrBadParam - * ErrInternal - * ErrBusy - * ErrNoStorage - * ErrNoKey + * Common Errors + * ErrNoKey // Normal if key has not been created or has expired. * ErrTTLExtendFailure Update - * nil - * ErrBadMsg - * ErrBadParam - * ErrInternal - * ErrBusy - * ErrNoStorage - * ErrRecordLocked + * Common Errors * ErrConditionViolation + * ErrRecordLocked * ErrWriteFailure Set - * nil - * ErrBadMsg - * ErrBadParam - * ErrInternal - * ErrBusy - * ErrNoStorage + * Common Errors * ErrRecordLocked * ErrWriteFailure Destroy - * nil - * ErrBadMsg - * ErrBadParam - * ErrInternal - * ErrBusy - * ErrNoStorage + * Common Errors * ErrRecordLocked * ErrWriteFailure @@ -88,31 +73,13 @@ type IContext interface { PrettyPrint(w io.Writer) } -///TODO check API input arguments - type IClient interface { Create(key []byte, value []byte, opts ...IOption) (IContext, error) Get(key []byte, opts ...IOption) ([]byte, IContext, error) Update(key []byte, value []byte, opts ...IOption) (IContext, error) Set(key []byte, value []byte, opts ...IOption) (IContext, error) Destroy(key []byte, opts ...IOption) (err error) + UDFGet(key []byte, fname []byte, params []byte, opts ...IOption) ([]byte, IContext, error) UDFSet(key []byte, fname []byte, params []byte, opts ...IOption) (IContext, error) } - -//type IResult interface { -// Get() -// GetWithTimeout() -// Poll() -//} -//type IValueResult interface { -// IResult -//} -// -//type AsyncClient interface { -// Create(key []byte, value []byte, opts ...IOption) IResult -// Update(key []byte, value []byte, opts ...IOption) IResult -// Set(key []byte, value []byte, opts ...IOption) IResult -// Get(key []byte, value []byte, opts ...IOption) IResult -// Destroy(key []byte) IResult -//} diff --git a/pkg/client/clientimpl.go b/pkg/client/clientimpl.go index 7ab57804..13e635a0 100644 --- a/pkg/client/clientimpl.go +++ b/pkg/client/clientimpl.go @@ -21,14 +21,17 @@ package client import ( + "crypto/tls" + "errors" "fmt" "runtime" + "time" "github.com/paypal/junodb/third_party/forked/golang/glog" "github.com/paypal/junodb/internal/cli" - "github.com/paypal/junodb/pkg/io" "github.com/paypal/junodb/pkg/logging" + cal "github.com/paypal/junodb/pkg/logging/cal" "github.com/paypal/junodb/pkg/proto" ) @@ -40,98 +43,113 @@ type clientImplT struct { processor *cli.Processor } -// newProcessorWithConfig initializes a new Processor with the given configuration. -func newProcessorWithConfig(conf *Config) *cli.Processor { +func newProcessorWithConfig(conf *Config, getTLSConfig func() *tls.Config) *cli.Processor { if conf == nil { return nil } c := cli.NewProcessor( conf.Server, conf.Appname, + conf.ConnPoolSize, conf.ConnectTimeout.Duration, - conf.RequestTimeout.Duration, - conf.ConnRecycleTimeout.Duration) + conf.ResponseTimeout.Duration, + getTLSConfig) return c } -// New initializes a new IClient with the given configuration. Returns an error if configuration validation fails. -func New(conf Config) (IClient, error) { - if err := conf.validate(); err != nil { +func NewWithTLS(conf Config, getTLSConfig func() *tls.Config) (IClient, error) { + if conf.Server.SSLEnabled && getTLSConfig == nil { + return nil, errors.New("getTLSConfig is nil.") + } + if err := conf.validate(true); err != nil { return nil, err } + glog.Infof("client cfg=%v withTLS=%v", conf, getTLSConfig != nil) + if conf.ConnPoolSize < 2 { + conf.ConnPoolSize = 2 + } + cli.SetDefaultRecycleTimeout(time.Duration(30 * time.Second)) client := &clientImplT{ config: conf, - processor: newProcessorWithConfig(&conf), + processor: newProcessorWithConfig(&conf, getTLSConfig), appName: conf.Appname, namespace: conf.Namespace, } + if conf.Cal.Enabled { + cal.InitWithConfig(&conf.Cal) + } client.processor.Start() - runtime.SetFinalizer(client.processor, func(p *cli.Processor) { - p.Close() - }) return client, nil } -// NewClient initializes a new IClient with the provided server address, namespace and app name. -func NewClient(server string, ns string, app string) (IClient, error) { - c := &clientImplT{ - config: Config{ - Server: io.ServiceEndpoint{Addr: server, SSLEnabled: false}, - Namespace: ns, - Appname: app, - RetryCount: defaultConfig.RetryCount, - DefaultTimeToLive: defaultConfig.DefaultTimeToLive, - ConnectTimeout: defaultConfig.ConnectTimeout, - ReadTimeout: defaultConfig.ReadTimeout, - WriteTimeout: defaultConfig.WriteTimeout, - RequestTimeout: defaultConfig.RequestTimeout, - }, - appName: app, - namespace: ns, - } - c.processor = newProcessorWithConfig(&c.config) - if c.processor != nil { - c.processor.Start() - } else { - errstr := "fail to create processor" - glog.Error(errstr) - return nil, fmt.Errorf(errstr) - } - runtime.SetFinalizer(c.processor, func(p *cli.Processor) { - p.Close() - }) - return c, nil +func New(conf Config) (IClient, error) { + if err := conf.validate(false); err != nil { + return nil, err + } + glog.Debugf("client cfg=%v", conf) + if conf.ConnPoolSize <= 1 { + conf.ConnPoolSize = 1 + } + client := &clientImplT{ + config: conf, + processor: newProcessorWithConfig(&conf, nil), + appName: conf.Appname, + namespace: conf.Namespace, + } + if conf.Cal.Enabled { + cal.InitWithConfig(&conf.Cal) + } + client.processor.Start() + if conf.ConnPoolSize == 1 { + runtime.SetFinalizer(client.processor, func(p *cli.Processor) { + p.Close() + }) + } + return client, nil } -///TODO to revisit - -// Close closes the client and cleans up resources. func (c *clientImplT) Close() { if c.processor != nil { c.processor.Close() c.processor = nil - } + } } -// getOptions collects all provided options into an optionData object. func (c *clientImplT) getOptions(opts ...IOption) *optionData { - data := &optionData{} + data := &optionData{} for _, op := range opts { op(data) } return data } -// newContext creates a new context from the provided operational message. func newContext(resp *proto.OperationalMessage) IContext { recInfo := &cli.RecordInfo{} recInfo.SetFromOpMsg(resp) return recInfo } -// Create sends a Create operation request to the server. +func (c *clientImplT) logError(op string, err error) { + if err == nil || err == ErrNoKey || + err == ErrConditionViolation || + err == ErrUniqueKeyViolation { + return + } + + addr_type := "tcp" + if c.config.Server.SSLEnabled { + addr_type = "ssl" + } + msg := fmt.Sprintf("[ERROR] op=%s %s_addr=%s response_timeout=%dms ns=%s. %s", + op, addr_type, c.config.Server.Addr, + c.config.ResponseTimeout.Nanoseconds()/int64(1e6), c.config.Namespace, err.Error()) + glog.Error(msg) + if err == ErrBusy || err == ErrRecordLocked { + time.Sleep(20 * time.Millisecond) + } +} + func (c *clientImplT) Create(key []byte, value []byte, opts ...IOption) (context IContext, err error) { - glog.Verbosef("Create ") var resp *proto.OperationalMessage options := newOptionData(opts...) recInfo := &cli.RecordInfo{} @@ -140,10 +158,12 @@ func (c *clientImplT) Create(key []byte, value []byte, opts ...IOption) (context if len(options.correlationId) > 0 { request.SetCorrelationID([]byte(options.correlationId)) } - if resp, err = c.processor.ProcessRequest(request); err == nil { - if err = checkResponse(request, resp, recInfo); err != nil { - glog.Debug(err) - } + resp, err = c.processor.ProcessRequest(request) + if err == nil { + err = checkResponse(request, resp, recInfo) + } + if err != nil { + c.logError("Create", err) } return } @@ -158,16 +178,19 @@ func (c *clientImplT) Get(key []byte, opts ...IOption) (value []byte, context IC if len(options.correlationId) > 0 { request.SetCorrelationID([]byte(options.correlationId)) } - if resp, err = c.processor.ProcessRequest(request); err == nil { - if err = checkResponse(request, resp, recInfo); err == nil { - payload := resp.GetPayload() - sz := payload.GetLength() - if sz != 0 { - value, err = payload.GetClearValue() - } - } else { - glog.Debug(err) - } + resp, err = c.processor.ProcessRequest(request) + if err == nil { + err = checkResponse(request, resp, recInfo) + } + if err != nil { + c.logError("Get", err) + return + } + + payload := resp.GetPayload() + sz := payload.GetLength() + if sz != 0 { + value, err = payload.GetClearValue() } return } @@ -187,10 +210,12 @@ func (c *clientImplT) Update(key []byte, value []byte, opts ...IOption) (context r.SetRequestWithUpdateCond(request) } } - if resp, err = c.processor.ProcessRequest(request); err == nil { - if err = checkResponse(request, resp, recInfo); err != nil { - glog.Debug(err) - } + resp, err = c.processor.ProcessRequest(request) + if err == nil { + err = checkResponse(request, resp, recInfo) + } + if err != nil { + c.logError("Update", err) } return } @@ -205,26 +230,31 @@ func (c *clientImplT) Set(key []byte, value []byte, opts ...IOption) (context IC if len(options.correlationId) > 0 { request.SetCorrelationID([]byte(options.correlationId)) } - if resp, err = c.processor.ProcessRequest(request); err == nil { - if err = checkResponse(request, resp, recInfo); err != nil { - glog.Debug(err) - } + resp, err = c.processor.ProcessRequest(request) + if err == nil { + err = checkResponse(request, resp, recInfo) + } + if err != nil { + c.logError("Set", err) } return } // Destroy sends a Destroy operation request to the server. func (c *clientImplT) Destroy(key []byte, opts ...IOption) (err error) { + var resp *proto.OperationalMessage options := newOptionData(opts...) request := c.NewRequest(proto.OpCodeDestroy, key, nil, 0) if len(options.correlationId) > 0 { request.SetCorrelationID([]byte(options.correlationId)) } - if resp, err = c.processor.ProcessRequest(request); err == nil { - if err = checkResponse(request, resp, nil); err != nil { - glog.Debug(err) - } + resp, err = c.processor.ProcessRequest(request) + if err == nil { + err = checkResponse(request, resp, nil) + } + if err != nil { + c.logError("Destroy", err) } return } @@ -286,6 +316,9 @@ func (c *clientImplT) NewRequest(op proto.OpCode, key []byte, value []byte, ttl request = &proto.OperationalMessage{} var payload proto.Payload payload.SetWithClearValue(value) + if ttl == 0 && op == proto.OpCodeCreate { + ttl = uint32(c.config.DefaultTimeToLive) + } request.SetRequest(op, key, []byte(c.namespace), &payload, ttl) request.SetNewRequestID() return @@ -297,6 +330,9 @@ func (c *clientImplT) NewUDFRequest(op proto.OpCode, key []byte, fname []byte, p request = &proto.OperationalMessage{} var payload proto.Payload payload.SetWithClearValue(params) + if ttl == 0 { + ttl = uint32(c.config.DefaultTimeToLive) + } request.SetRequest(op, key, []byte(c.namespace), &payload, ttl) request.SetNewRequestID() request.SetUDFName(fname) diff --git a/pkg/client/config.go b/pkg/client/config.go index 860cf076..e93cca3f 100644 --- a/pkg/client/config.go +++ b/pkg/client/config.go @@ -1,29 +1,30 @@ -// Copyright 2023 PayPal Inc. +// Copyright 2023 PayPal Inc. // -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. // Package client handles the configuration for a Juno client. package client import ( "fmt" - "time" "github.com/paypal/junodb/pkg/io" "github.com/paypal/junodb/pkg/util" + "github.com/paypal/junodb/internal/cli" + cal "github.com/paypal/junodb/pkg/logging/cal/config" ) // Duration is a type alias for util.Duration. @@ -31,52 +32,19 @@ type Duration = util.Duration // Config holds the configuration values for the Juno client. type Config struct { - Server io.ServiceEndpoint // Server defines the ServiceEndpoint of the Juno server. - Appname string // Appname is the name of the application. - Namespace string // Namespace is the namespace of the application. - RetryCount int // RetryCount is the maximum number of retries. - DefaultTimeToLive int // DefaultTimeToLive is the default TTL (time to live) for requests. - ConnectTimeout Duration // ConnectTimeout is the timeout for establishing connections. - ReadTimeout Duration // ReadTimeout is the timeout for read operations. - WriteTimeout Duration // WriteTimeout is the timeout for write operations. - RequestTimeout Duration // RequestTimeout is the timeout for each request. - ConnRecycleTimeout Duration // ConnRecycleTimeout is the timeout for connection recycling. -} - -// defaultConfig defines the default configuration values. -var defaultConfig = Config{ - RetryCount: 1, - DefaultTimeToLive: 1800, - ConnectTimeout: Duration{100 * time.Millisecond}, - ReadTimeout: Duration{500 * time.Millisecond}, - WriteTimeout: Duration{500 * time.Millisecond}, - RequestTimeout: Duration{1000 * time.Millisecond}, - ConnRecycleTimeout: Duration{9 * time.Second}, -} - -// SetDefaultTimeToLive sets the default time to live (TTL) for the configuration. -func SetDefaultTimeToLive(ttl int) { - defaultConfig.DefaultTimeToLive = ttl -} + Server io.ServiceEndpoint + Appname string + Namespace string -// SetDefaultTimeout sets the default timeout durations for the configuration. -func SetDefaultTimeout(connect, read, write, request, connRecycle time.Duration) { - defaultConfig.ConnectTimeout.Duration = connect - defaultConfig.ReadTimeout.Duration = read - defaultConfig.WriteTimeout.Duration = write - defaultConfig.RequestTimeout.Duration = request - defaultConfig.ConnRecycleTimeout.Duration = connRecycle + DefaultTimeToLive int + ConnPoolSize int + ConnectTimeout Duration + ResponseTimeout Duration + BypassLTM bool + Cal cal.Config } -// SetDefault updates the current Config to match the default Config. -func (c *Config) SetDefault() { - *c = defaultConfig -} - -// validate checks if the required fields of the Config are correctly populated. -// It validates the Server field and checks if Appname and Namespace are specified. -// It returns an error if any of the above conditions are not met. -func (c *Config) validate() error { +func (c *Config) validate(useGetTLS bool) error { if err := c.Server.Validate(); err != nil { return err } @@ -86,6 +54,11 @@ func (c *Config) validate() error { if len(c.Namespace) == 0 { return fmt.Errorf("Config.Namespace not specified.") } - // TODO to validate others + if c.DefaultTimeToLive < 0 { + return fmt.Errorf("Config.DefaultTimeToLive is negative.") + } + if c.Server.SSLEnabled && !useGetTLS && !cli.TLSInitialized() { + return fmt.Errorf("getTLSConfig is nil.") + } return nil } diff --git a/pkg/client/error.go b/pkg/client/error.go index 5dd6991f..5a930446 100644 --- a/pkg/client/error.go +++ b/pkg/client/error.go @@ -27,10 +27,13 @@ import ( // Error variables for different scenarios in the application. var ( - ErrNoKey error // Error when no key is found. - ErrUniqueKeyViolation error // Error when there is a violation of a unique key. - ErrBadParam error // Error when a bad parameter is provided. - ErrConditionViolation error // Error when a condition violation occurs. + ErrConnect error + ErrResponseTimeout error + + ErrNoKey error + ErrUniqueKeyViolation error + ErrBadParam error + ErrConditionViolation error ErrBadMsg error // Error when a bad message is encountered. ErrNoStorage error // Error when no storage is available. @@ -48,11 +51,14 @@ var errorMapping map[proto.OpStatus]error // init function initializes the error variables and the errorMapping map. func init() { - ErrNoKey = &cli.Error{"no key"} // Error when the key does not exist. - ErrUniqueKeyViolation = &cli.Error{"unique key violation"} // Error when unique key constraint is violated. - ErrBadParam = &cli.Error{"bad parameter"} // Error when a bad parameter is passed. - ErrConditionViolation = &cli.Error{"condition violation"} // Error when there is a condition violation. - ErrTTLExtendFailure = &cli.Error{"fail to extend TTL"} // Error when TTL extension fails. + ErrResponseTimeout = cli.ErrResponseTimeout + ErrConnect = cli.ErrConnect + + ErrNoKey = &cli.Error{"no key"} + ErrUniqueKeyViolation = &cli.Error{"unique key violation"} + ErrBadParam = &cli.Error{"bad parameter"} + ErrConditionViolation = &cli.Error{"condition violation"} //version too old + ErrTTLExtendFailure = &cli.Error{"fail to extend TTL"} ErrBadMsg = &cli.RetryableError{"bad message"} // Error when an inappropriate message is received. ErrNoStorage = &cli.RetryableError{"no storage"} // Error when there is no storage available. diff --git a/test/cli/basic_test.go b/test/cli/basic_test.go new file mode 100644 index 00000000..016ead1e --- /dev/null +++ b/test/cli/basic_test.go @@ -0,0 +1,145 @@ +// +// Copyright 2023 PayPal Inc. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cli + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/paypal/junodb/pkg/client" + "github.com/paypal/junodb/third_party/forked/golang/glog" +) + +var ( + server *Cmd + server2 *Cmd +) + +func TestBasic(t *testing.T) { + glog.Info("") + glog.Info("=== TestBasic") + + server, _ = NewCmdWithConfig(serverAddr, 5) + + if server == nil { + t.Errorf("Failed to init") + return + } + sendRequests(server, 5, t) +} + +func TestTls(t *testing.T) { + glog.Info("") + glog.Info("=== TestTls") + + server, _ = NewCmdWithConfig(serverTls, 5) + if server == nil { + t.Errorf("Failed to init") + return + } + sendRequests(server, 5, t) +} + +func sendRequests(server *Cmd, count int, t *testing.T) { + for i := 0; i < count; i++ { + if server.deleteKey(i) != nil { + t.Errorf("Delete failed") + } + _, err := server.createKey(i) + if err != nil { + t.Errorf("Create failed") + } + _, err = server.updateKey(i) + if err != nil { + t.Errorf("Update failed") + } + if server.setKey(i) != nil { + t.Errorf("Set failed") + } + if server.getKey(i) != nil { + t.Errorf("Get failed") + } + if server.deleteKey(i) != nil { + t.Errorf("Delete failed") + } + } +} + +func TestConcurrent(t *testing.T) { + glog.Info("") + glog.Info("=== TestConcurrent") + + a := 20 + b := 1000 + total := a * b + + server, _ = NewCmdWithConfig(serverAddr, 5) + if server == nil { + t.Errorf("Failed to init") + return + } + + var wg sync.WaitGroup + wg.Add(a) + st := time.Now() + + failCount := int64(0) + timeoutCount := int64(0) + for i := 0; i < a; i++ { + go func(k int) { + for j := 0; j < b; j++ { + if (j % 300) == 0 { + glog.Infof("loop k=%d j=%d", k, j) + } + err := server.setKey(k*b + j) + if err != nil { + atomic.AddInt64(&failCount, 1) + if errors.Is(err, client.ErrResponseTimeout) { + atomic.AddInt64(&timeoutCount, 1) + } + } + } + wg.Done() + }(i) + } + wg.Wait() + elapsed := time.Since(st).Seconds() + + var wg2 sync.WaitGroup + wg2.Add(a) + for i := 0; i < a; i++ { + go func(k int) { + for j := 0; j < b; j++ { + server.deleteKey(k*b + j) + } + wg2.Done() + }(i) + } + wg2.Wait() + + glog.Infof("elapsed=%.1fs fails=%d timeouts=%d total=%d", elapsed, + failCount, timeoutCount, total) + if failCount > int64(total/10) { + t.Errorf("Concurrent test failed") + } +} diff --git a/test/cli/cond_test.go b/test/cli/cond_test.go new file mode 100644 index 00000000..8e00cc20 --- /dev/null +++ b/test/cli/cond_test.go @@ -0,0 +1,54 @@ +// +// Copyright 2023 PayPal Inc. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cli + +import ( + "testing" + + "github.com/paypal/junodb/pkg/client" + "github.com/paypal/junodb/third_party/forked/golang/glog" +) + +func TestCond(t *testing.T) { + glog.Info("") + glog.Info("=== TestCond") + + server, _ = NewCmdWithConfig(serverAddr, 5) + + if server == nil { + t.Errorf("Failed to init") + return + } + for i := 100; i < 105; i++ { + ctx, err := server.createKey(i) + if err != nil { + t.Errorf("Create failed") + } + _, err = server.updateKeyWithCond(i, ctx) + if err != nil { + t.Errorf("Update WithCond failed") + continue + } + _, err = server.updateKeyWithCond(i, ctx) + if err == nil || err != client.ErrConditionViolation { + t.Errorf("Expected condition violation") + } + } +} diff --git a/test/cli/define.go b/test/cli/define.go new file mode 100644 index 00000000..3997e746 --- /dev/null +++ b/test/cli/define.go @@ -0,0 +1,28 @@ +// +// Copyright 2023 PayPal Inc. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cli + + +// For TLS serevr, need to provide server.crt and server.pem. +// See main_test.go: cert, err := tls.LoadX509KeyPair("./server.crt", "./server.pem") +var ( // Change ip:port to that of JunoDB proxy service + serverAddr = "127.0.0.1:8080" // TCP + serverTls = "127.0.0.1:5080" // TLS +) diff --git a/test/cli/main_test.go b/test/cli/main_test.go new file mode 100644 index 00000000..39bc2743 --- /dev/null +++ b/test/cli/main_test.go @@ -0,0 +1,81 @@ +// +// Copyright 2023 PayPal Inc. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cli + +import ( + "crypto/tls" + "crypto/x509" + "os" + "sync" + "testing" + "time" + + "github.com/paypal/junodb/third_party/forked/golang/glog" +) + +var ( + tlsConfig *tls.Config + mutex sync.RWMutex +) + +func GetTLSConfig() *tls.Config { + mutex.RLock() + defer mutex.RUnlock() + return tlsConfig +} + +func loadCertificate() { + caCert, err := os.ReadFile("./server.crt") + if err != nil { + glog.Exitf("%s", err.Error()) + } + rootCAs, _ := x509.SystemCertPool() + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + rootCAs.AppendCertsFromPEM(caCert) + + cert, err := tls.LoadX509KeyPair("./server.crt", "./server.pem") + if err != nil { + glog.Exitf("%s", err.Error()) + } + + mutex.Lock() + defer mutex.Unlock() + tlsConfig = &tls.Config{ + RootCAs: rootCAs, + Certificates: []tls.Certificate{cert}, + InsecureSkipVerify: true, + SessionTicketsDisabled: false, + ClientSessionCache: tls.NewLRUClientSessionCache(0), + } +} + +func TestMain(m *testing.M) { + + glog.Infof("Start testing") + + loadCertificate() + + code := m.Run() + glog.Finalize() + time.Sleep(1 * time.Second) + os.Exit(code) +} diff --git a/test/cli/util.go b/test/cli/util.go new file mode 100644 index 00000000..0d0bfbd1 --- /dev/null +++ b/test/cli/util.go @@ -0,0 +1,181 @@ +// +// Copyright 2023 PayPal Inc. +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package cli + +import ( + "encoding/binary" + "errors" + "time" + + "github.com/paypal/junodb/pkg/client" + "github.com/paypal/junodb/pkg/util" + "github.com/paypal/junodb/third_party/forked/golang/glog" +) + +type Cmd struct { + addr string + client client.IClient + payload []byte + ErrCount int +} + +func createPayload() []byte { + k := 1024 + p := make([]byte, k) + + for i := 0; i < k/2; i++ { + val := byte((uint(i) * 12345678) & 0xff) + p[i] = val + p[i+k/2] = val + } + return p +} + +func verifyPayload(p []byte) bool { + off := len(p) / 2 + for k := 0; k < off; k++ { + if p[k] != p[k+off] { + return false + } + } + return true +} + +func showMetaInfo(ctx client.IContext) { + glog.Infof("v=%d ct=%d ttl=%d", ctx.GetVersion(), ctx.GetCreationTime(), ctx.GetTimeToLive()) +} + +func NewCmdWithConfig(addr string, poolSize int) (*Cmd, error) { + cfg := client.Config{ + Appname: "testcli", + Namespace: "juno_cli_qa", + DefaultTimeToLive: 60, + ConnPoolSize: poolSize, + ResponseTimeout: util.Duration{1000 * time.Millisecond}, + } + + cfg.Server.Addr = addr + if addr == serverTls { + cfg.Server.SSLEnabled = true + } + client, err := client.NewWithTLS(cfg, GetTLSConfig) + if err != nil { + glog.Errorf("%s", err) + return nil, err + } + + cmd := &Cmd{ + addr: addr, + client: client, + payload: createPayload(), + } + if !verifyPayload(cmd.payload) { + cmd = nil + } + return cmd, err +} + +func (c *Cmd) newRandomKey(s int) []byte { + off := 2 + key := make([]byte, 16+off) + r := uint32(((int64(s+1)*25214903917 + 11) >> 5) & 0x7fffffff) + binary.BigEndian.PutUint32(key[0+off:], r) + binary.BigEndian.PutUint32(key[4+off:], uint32(s)) + binary.BigEndian.PutUint32(key[12+off:], 0xff) + + return key +} + +func (c *Cmd) createKey(ix int) (ctx client.IContext, err error) { + key := c.newRandomKey(ix) + + for i := 0; i < 2; i++ { + ctx, err = c.client.Create(key, c.payload, client.WithTTL(uint32(30))) + if err == nil { + break + } + } + + return ctx, err +} + +func (c *Cmd) getKey(ix int) error { + key := c.newRandomKey(ix) + + val, _, err := c.client.Get(key, client.WithTTL(uint32(30))) + if err == nil && !verifyPayload(val) { + glog.Errorf("validate failed") + err = errors.New("Bad payload") + } + return err +} + +func (c *Cmd) setKey(ix int) error { + key := c.newRandomKey(ix) + + var err error + var ctx client.IContext + stop := 2 + for i := 0; i < stop; i++ { + ctx, err = c.client.Set(key, c.payload, client.WithTTL(uint32(60))) + if err == nil { + showMetaInfo(ctx) + break + } + } + return err +} + +func (c *Cmd) updateKey(ix int) (ctx client.IContext, err error) { + key := c.newRandomKey(ix) + + for i := 0; i < 2; i++ { + ctx, err = c.client.Update(key, c.payload, client.WithTTL(uint32(60))) + if err == nil { + break + } + } + return ctx, err +} + +func (c *Cmd) updateKeyWithCond(ix int, ctxIn client.IContext) (ctx client.IContext, err error) { + key := c.newRandomKey(ix) + + for i := 0; i < 2; i++ { + ctx, err = c.client.Update(key, c.payload, client.WithCond(ctxIn)) + if err == nil { + break + } + } + return ctx, err +} + +func (c *Cmd) deleteKey(ix int) error { + key := c.newRandomKey(ix) + + var err error + for i := 0; i < 2; i++ { + err = c.client.Destroy(key) + if err == nil { + break + } + } + return err +} diff --git a/test/drv/bulkload/bulkload.go b/test/drv/bulkload/bulkload.go index f7460e3f..58b20e2c 100644 --- a/test/drv/bulkload/bulkload.go +++ b/test/drv/bulkload/bulkload.go @@ -74,13 +74,9 @@ func (c *CmdLine) Init(server string, ns string, prefix string, payloadLen int, var err error c.clientCfg = client.Config{ - RetryCount: 1, - DefaultTimeToLive: ttl, - ConnectTimeout: Duration{500 * time.Millisecond}, - ReadTimeout: Duration{500 * time.Millisecond}, - WriteTimeout: Duration{500 * time.Millisecond}, - RequestTimeout: Duration{500 * time.Millisecond}, - ConnRecycleTimeout: Duration{300 * time.Second}, + DefaultTimeToLive: ttl, + ConnectTimeout: Duration{500 * time.Millisecond}, + ResponseTimeout: Duration{500 * time.Millisecond}, } c.clientCfg.Server.Addr = server c.clientCfg.Appname = "bulkload" diff --git a/test/drv/junoload/junoload.go b/test/drv/junoload/junoload.go index ea3f2aa9..f86604fe 100644 --- a/test/drv/junoload/junoload.go +++ b/test/drv/junoload/junoload.go @@ -35,11 +35,12 @@ import ( "github.com/BurntSushi/toml" + "github.com/paypal/junodb/internal/cli" "github.com/paypal/junodb/pkg/client" "github.com/paypal/junodb/pkg/cmd" "github.com/paypal/junodb/pkg/logging/cal" "github.com/paypal/junodb/pkg/sec" - "github.com/paypal/junodb/pkg/util" + "github.com/paypal/junodb/pkg/util" "github.com/paypal/junodb/pkg/version" ) @@ -62,24 +63,25 @@ type ( CmdOptions struct { cfgFile string - server string - requestPattern string - sslEnabled bool - numExecutor int - payloadLen int - numReqPerSecond int - runningTime int - statOutputRate int - timeToLive int - httpMonAddr string - version bool - numKeys int - dbpath string - logLevel string - isVariable bool - disableGetTTL bool - keys string - randomize bool + server string + requestPattern string + sslEnabled bool + numExecutor int + payloadLen int + numReqPerSecond int + runningTime int + statOutputRate int + connRecycleTimeout int + timeToLive int + httpMonAddr string + version bool + numKeys int + dbpath string + logLevel string + isVariable bool + disableGetTTL bool + keys string + randomize bool } ) @@ -100,7 +102,11 @@ const ( ) func (d *SyncTestDriver) setDefaultConfig() { - d.config.SetDefault() + + d.config.DefaultTimeToLive = 1800 + d.config.ConnPoolSize = 1 + d.config.ConnectTimeout = util.Duration{1000 * time.Millisecond} + d.config.ResponseTimeout = util.Duration{1000 * time.Millisecond} d.config.Sec = sec.DefaultConfig d.config.Cal.Default() @@ -145,6 +151,7 @@ func (d *SyncTestDriver) Init(name string, desc string) { d.IntOption(&d.cmdOpts.runningTime, "t|running-time", kDefaultRunningTime, "specify driver's running time in second") d.IntOption(&d.cmdOpts.timeToLive, "ttl|record-time-to-live", kDefaultRecordTimeToLive, "specify record TTL in second") d.IntOption(&d.cmdOpts.statOutputRate, "o|stat-output-rate", kDefaultStatOutputRate, "specify how often to output statistic information in second\n\tfor the period of time.") + d.IntOption(&d.cmdOpts.connRecycleTimeout, "rt", 0, "connection recycle timeout") d.StringOption(&d.cmdOpts.httpMonAddr, "mon-addr|monitoring-address", "", "specify the http monitoring address. \n\toverride HttpMonAddr in config file") d.BoolOption(&d.cmdOpts.version, "version", false, "display version information.") d.StringOption(&d.cmdOpts.dbpath, "dbpath", "", "to display rocksdb stats") @@ -200,6 +207,7 @@ func (d *SyncTestDriver) Parse(args []string) (err error) { return } d.setDefaultConfig() + cli.SetConnectRecycleTimeout(time.Duration(d.cmdOpts.connRecycleTimeout) * time.Second) if len(d.cmdOpts.cfgFile) != 0 { if _, err := toml.DecodeFile(d.cmdOpts.cfgFile, &d.config); err != nil { diff --git a/test/functest/setup_test.go b/test/functest/setup_test.go index 27dd384e..08f1d480 100644 --- a/test/functest/setup_test.go +++ b/test/functest/setup_test.go @@ -34,6 +34,7 @@ import ( "github.com/BurntSushi/toml" + "github.com/paypal/junodb/internal/cli" "github.com/paypal/junodb/cmd/proxy/config" "github.com/paypal/junodb/pkg/client" "github.com/paypal/junodb/pkg/cluster" @@ -72,20 +73,12 @@ var ( defaultClientConfig = client.Config{ DefaultTimeToLive: 1800, ConnectTimeout: util.Duration{4000 * time.Millisecond}, - ReadTimeout: util.Duration{1500 * time.Millisecond}, - WriteTimeout: util.Duration{1500 * time.Millisecond}, - RequestTimeout: util.Duration{3000 * time.Millisecond}, + ResponseTimeout: util.Duration{3000 * time.Millisecond}, } ) func setup() { - client.SetDefaultTimeToLive(defaultClientConfig.DefaultTimeToLive) - client.SetDefaultTimeout(defaultClientConfig.ConnectTimeout.Duration, - defaultClientConfig.ReadTimeout.Duration, - defaultClientConfig.WriteTimeout.Duration, - defaultClientConfig.RequestTimeout.Duration, - defaultClientConfig.ConnRecycleTimeout.Duration) sigs := make(chan os.Signal) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func(sigCh chan os.Signal) { @@ -176,6 +169,7 @@ func TestMain(m *testing.M) { cal.InitWithConfig(&testConfig.CAL) } + cli.SetConnectRecycleTimeout(time.Duration(0 * time.Second)) sec.Initialize(&testConfig.Sec, sec.KFlagClientTlsEnabled|sec.KFlagEncryptionEnabled) ProxyAddr = testConfig.ProxyAddress diff --git a/test/testutil/server/server.go b/test/testutil/server/server.go index 9f1c0c10..2efacacb 100644 --- a/test/testutil/server/server.go +++ b/test/testutil/server/server.go @@ -144,7 +144,8 @@ func (s *ServerBase) IsUp() bool { if frwk.LOG_DEBUG { glog.DebugInfof("Testing if %s is up", s.Address()) } - clientProcessor := cli.NewProcessor(io.ServiceEndpoint{Addr: s.Address(), SSLEnabled: s.IsSSLEnabled()}, "testFramework", s.startWaitTime, s.startWaitTime, 0) + clientProcessor := cli.NewProcessor(io.ServiceEndpoint{Addr: s.Address(), SSLEnabled: s.IsSSLEnabled()}, "testFramework", 1, + s.startWaitTime, s.startWaitTime, nil) clientProcessor.Start() defer clientProcessor.Close() request := &proto.OperationalMessage{} diff --git a/test/unittest/setup_test.go b/test/unittest/setup_test.go index a7881275..3e2d30c2 100644 --- a/test/unittest/setup_test.go +++ b/test/unittest/setup_test.go @@ -95,9 +95,7 @@ func mainSetup() { Namespace: "ns", DefaultTimeToLive: 3600, ConnectTimeout: util.Duration{4000 * time.Millisecond}, - ReadTimeout: util.Duration{1500 * time.Millisecond}, - WriteTimeout: util.Duration{1500 * time.Millisecond}, - RequestTimeout: util.Duration{3000 * time.Millisecond}, + ResponseTimeout: util.Duration{3000 * time.Millisecond}, } Mockclient = mock.NewMockClient(config.Conf.ClusterInfo.ConnInfo, cliCfg) }