From bef32dc5658c34f032dd1da25afbdd4ca4c4112d Mon Sep 17 00:00:00 2001 From: Max Seiden Date: Tue, 25 Jan 2022 23:20:29 -0800 Subject: [PATCH] [Feature] add a cache to the FetchResult codepath to avoid API calls (#45) lint: fix `ConnectionId` Fix CI for current master (#47) * lint: fix `ConnectionId` * Test fixes for previous change (`41c90a09`) * nit: tweaks to dsn-test Co-authored-by: Agam Brahma fix var inits (#46) --- connection.go | 4 + dsn.go | 14 ++++ dsn_test.go | 192 +++++++++++++++++++++++++++------------------ exec_resp_cache.go | 99 +++++++++++++++++++++++ monitoring.go | 10 ++- 5 files changed, 240 insertions(+), 79 deletions(-) create mode 100644 exec_resp_cache.go diff --git a/connection.go b/connection.go index 7806d6205..0aac0543a 100644 --- a/connection.go +++ b/connection.go @@ -69,6 +69,7 @@ type snowflakeConn struct { SQLState string telemetry *snowflakeTelemetry internal InternalClient + execRespCache *execRespCache } var ( @@ -223,6 +224,9 @@ func (sc *snowflakeConn) cleanup() { // must flush log buffer while the process is running. sc.rest = nil sc.cfg = nil + + releaseExecRespCache(sc.execRespCache) + sc.execRespCache = nil } func (sc *snowflakeConn) Close() (err error) { diff --git a/dsn.go b/dsn.go index 791047e6c..12fad5ee7 100644 --- a/dsn.go +++ b/dsn.go @@ -12,6 +12,8 @@ import ( "strconv" "strings" "time" + + "github.com/google/uuid" ) const ( @@ -80,6 +82,10 @@ type Config struct { Transporter http.RoundTripper // RoundTripper to intercept HTTP requests and responses DisableTelemetry bool // indicates whether to disable telemetry + + // An identifier for this Config. Used to associate multiple connection instances with + // a single logical sql.DB connection. + ConnectionID string } // ocspMode returns the OCSP mode in string INSECURE, FAIL_OPEN, FAIL_CLOSED @@ -195,6 +201,10 @@ func DSN(cfg *Config) (dsn string, err error) { params.Add("validateDefaultParameters", strconv.FormatBool(cfg.ValidateDefaultParameters != ConfigBoolFalse)) + if cfg.ConnectionID != "" { + params.Add("connectionId", cfg.ConnectionID) + } + dsn = fmt.Sprintf("%v:%v@%v:%v", url.QueryEscape(cfg.User), url.QueryEscape(cfg.Password), cfg.Host, cfg.Port) if params.Encode() != "" { dsn += "?" + params.Encode() @@ -419,6 +429,10 @@ func fillMissingConfigParameters(cfg *Config) error { cfg.ValidateDefaultParameters = ConfigBoolTrue } + if cfg.ConnectionID == "" { + cfg.ConnectionID = uuid.New().String() + } + if strings.HasSuffix(cfg.Host, defaultDomain) && len(cfg.Host) == len(defaultDomain) { return &SnowflakeError{ Number: ErrCodeFailedToParseHost, diff --git a/dsn_test.go b/dsn_test.go index 037480809..88cbc7305 100644 --- a/dsn_test.go +++ b/dsn_test.go @@ -602,115 +602,129 @@ type tcDSN struct { func TestDSN(t *testing.T) { tmfmt := "MM-DD-YYYY" + testConnectionID := "abcd-0123-4567-1234" testcases := []tcDSN{ { cfg: &Config{ - User: "u", - Password: "p", - Account: "a-aofnadsf.somewhere.azure", + User: "u", + Password: "p", + Account: "a-aofnadsf.somewhere.azure", + ConnectionID: testConnectionID, }, - dsn: "u:p@a-aofnadsf.somewhere.azure.snowflakecomputing.com:443?ocspFailOpen=true®ion=somewhere.azure&validateDefaultParameters=true", + dsn: "u:p@a-aofnadsf.somewhere.azure.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=somewhere.azure&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a-aofnadsf.global", + User: "u", + Password: "p", + Account: "a-aofnadsf.global", + ConnectionID: testConnectionID, }, - dsn: "u:p@a-aofnadsf.global.snowflakecomputing.com:443?ocspFailOpen=true®ion=global&validateDefaultParameters=true", + dsn: "u:p@a-aofnadsf.global.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=global&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a-aofnadsf.global", - Region: "us-west-2", + User: "u", + Password: "p", + Account: "a-aofnadsf.global", + Region: "us-west-2", + ConnectionID: testConnectionID, }, - dsn: "u:p@a-aofnadsf.global.snowflakecomputing.com:443?ocspFailOpen=true®ion=global&validateDefaultParameters=true", + dsn: "u:p@a-aofnadsf.global.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=global&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a-aofnadsf.global", - Region: "r", + User: "u", + Password: "p", + Account: "a-aofnadsf.global", + Region: "r", + ConnectionID: testConnectionID, }, err: ErrInvalidRegion, }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a", + User: "u", + Password: "p", + Account: "a", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a", - Region: "us-west-2", + User: "u", + Password: "p", + Account: "a", + Region: "us-west-2", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a", - Region: "r", + User: "u", + Password: "p", + Account: "a", + Region: "r", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.r.snowflakecomputing.com:443?ocspFailOpen=true®ion=r&validateDefaultParameters=true", + dsn: "u:p@a.r.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=r&validateDefaultParameters=true", }, { cfg: &Config{ - User: "", - Password: "p", - Account: "a", + User: "", + Password: "p", + Account: "a", + ConnectionID: testConnectionID, }, err: ErrEmptyUsername, }, { cfg: &Config{ - User: "u", - Password: "", - Account: "a", + User: "u", + Password: "", + Account: "a", + ConnectionID: testConnectionID, }, err: ErrEmptyPassword, }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "", + User: "u", + Password: "p", + Account: "", + ConnectionID: testConnectionID, }, err: ErrEmptyAccount, }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a.e", + User: "u", + Password: "p", + Account: "a.e", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.e.snowflakecomputing.com:443?ocspFailOpen=true®ion=e&validateDefaultParameters=true", + dsn: "u:p@a.e.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=e&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a.e", - Region: "us-west-2", + User: "u", + Password: "p", + Account: "a.e", + Region: "us-west-2", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.e.snowflakecomputing.com:443?ocspFailOpen=true®ion=e&validateDefaultParameters=true", + dsn: "u:p@a.e.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=e&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a.e", - Region: "r", + User: "u", + Password: "p", + Account: "a.e", + Region: "r", + ConnectionID: testConnectionID, }, err: ErrInvalidRegion, }, @@ -729,8 +743,9 @@ func TestDSN(t *testing.T) { LoginTimeout: 10 * time.Second, RequestTimeout: 300 * time.Second, Application: "special go", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.b.snowflakecomputing.com:443?application=special+go&database=db&loginTimeout=10&ocspFailOpen=true&passcode=db&passcodeInPassword=true®ion=b&requestTimeout=300&role=ro&schema=sc&validateDefaultParameters=true", + dsn: "u:p@a.b.snowflakecomputing.com:443?application=special+go&connectionId=abcd-0123-4567-1234&database=db&loginTimeout=10&ocspFailOpen=true&passcode=db&passcodeInPassword=true&queryMonitoringThreshold=5®ion=b&requestTimeout=300&role=ro&schema=sc&validateDefaultParameters=true", }, { cfg: &Config{ @@ -738,8 +753,9 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a", Authenticator: AuthTypeExternalBrowser, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?authenticator=externalbrowser&ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?authenticator=externalbrowser&connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ @@ -751,8 +767,9 @@ func TestDSN(t *testing.T) { Scheme: "https", Host: "sc.okta.com", }, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?authenticator=https%3A%2F%2Fsc.okta.com&ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?authenticator=https%3A%2F%2Fsc.okta.com&connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ @@ -762,8 +779,9 @@ func TestDSN(t *testing.T) { Params: map[string]*string{ "TIMESTAMP_OUTPUT_FORMAT": &tmfmt, }, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.e.snowflakecomputing.com:443?TIMESTAMP_OUTPUT_FORMAT=MM-DD-YYYY&ocspFailOpen=true®ion=e&validateDefaultParameters=true", + dsn: "u:p@a.e.snowflakecomputing.com:443?TIMESTAMP_OUTPUT_FORMAT=MM-DD-YYYY&connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=e&validateDefaultParameters=true", }, { cfg: &Config{ @@ -773,8 +791,9 @@ func TestDSN(t *testing.T) { Params: map[string]*string{ "TIMESTAMP_OUTPUT_FORMAT": &tmfmt, }, + ConnectionID: testConnectionID, }, - dsn: "u:%3A%40abc@a.e.snowflakecomputing.com:443?TIMESTAMP_OUTPUT_FORMAT=MM-DD-YYYY&ocspFailOpen=true®ion=e&validateDefaultParameters=true", + dsn: "u:%3A%40abc@a.e.snowflakecomputing.com:443?TIMESTAMP_OUTPUT_FORMAT=MM-DD-YYYY&connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=e&validateDefaultParameters=true", }, { cfg: &Config{ @@ -782,8 +801,9 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a", OCSPFailOpen: OCSPFailOpenTrue, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ @@ -791,8 +811,9 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a", OCSPFailOpen: OCSPFailOpenFalse, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?ocspFailOpen=false&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=false&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ @@ -800,8 +821,9 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a", ValidateDefaultParameters: ConfigBoolFalse, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?ocspFailOpen=true&validateDefaultParameters=false", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=false", }, { cfg: &Config{ @@ -809,8 +831,9 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a", ValidateDefaultParameters: ConfigBoolTrue, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ @@ -818,32 +841,36 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a", InsecureMode: true, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.snowflakecomputing.com:443?insecureMode=true&ocspFailOpen=true&validateDefaultParameters=true", + dsn: "u:p@a.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&insecureMode=true&ocspFailOpen=true&queryMonitoringThreshold=5&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a.b.c", + User: "u", + Password: "p", + Account: "a.b.c", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.b.c.snowflakecomputing.com:443?ocspFailOpen=true®ion=b.c&validateDefaultParameters=true", + dsn: "u:p@a.b.c.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=b.c&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a.b.c", - Region: "us-west-2", + User: "u", + Password: "p", + Account: "a.b.c", + Region: "us-west-2", + ConnectionID: testConnectionID, }, - dsn: "u:p@a.b.c.snowflakecomputing.com:443?ocspFailOpen=true®ion=b.c&validateDefaultParameters=true", + dsn: "u:p@a.b.c.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=b.c&validateDefaultParameters=true", }, { cfg: &Config{ - User: "u", - Password: "p", - Account: "a.b.c", - Region: "r", + User: "u", + Password: "p", + Account: "a.b.c", + Region: "r", + ConnectionID: testConnectionID, }, err: ErrInvalidRegion, }, @@ -853,8 +880,19 @@ func TestDSN(t *testing.T) { Password: "p", Account: "a.b.c", ClientTimeout: 300 * time.Second, + ConnectionID: testConnectionID, + }, + dsn: "u:p@a.b.c.snowflakecomputing.com:443?clientTimeout=300&connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=5®ion=b.c&validateDefaultParameters=true", + }, + { + cfg: &Config{ + User: "u", + Password: "p", + Account: "a.e", + QueryMonitoringThreshold: 20 * time.Second, + ConnectionID: testConnectionID, }, - dsn: "u:p@a.b.c.snowflakecomputing.com:443?clientTimeout=300&ocspFailOpen=true®ion=b.c&validateDefaultParameters=true", + dsn: "u:p@a.e.snowflakecomputing.com:443?connectionId=abcd-0123-4567-1234&ocspFailOpen=true&queryMonitoringThreshold=20®ion=e&validateDefaultParameters=true", }, } for _, test := range testcases { diff --git a/exec_resp_cache.go b/exec_resp_cache.go new file mode 100644 index 000000000..77d492d10 --- /dev/null +++ b/exec_resp_cache.go @@ -0,0 +1,99 @@ +package gosnowflake + +import ( + "sync" + "sync/atomic" + "time" +) + +// A reference counted cache from string -> *execResponse. The +// refcount is a reference counter that is used to gc the cache +// so we don't leak memory. We use a sync.Map as the golang docs +// indicate the performance is better than a Mutex + native map. +type execRespCache struct { + id string + refcount int64 + table sync.Map +} + +// An entry in the exec response cache. The entry has a TTL +// since the URLs to S3 do have an access token that can +// expire. At the time of writing this TTL was 6 hours. +type execRespCacheEntry struct { + created time.Time + respd *execResponse +} + +const ( + execRespCacheEntryTTL = 1 * time.Hour +) + +// A global table of exec response caches. We need this since +// the gosnowflake driver does not do its own connection +// pooling and we want a shared cache across all sql.Conn +// instances created over the course of the sql.Driver lifetime. +// We use a native map + lock here to ensure there aren't race +// conditions in the acquire and release code. There should not be +// a performance implication since these fns are called infrequently. +var ( + globalExecRespCacheMu = sync.Mutex{} + globalExecRespCache = map[string]*execRespCache{} +) + +func acquireExecRespCache(id string) *execRespCache { + globalExecRespCacheMu.Lock() + defer globalExecRespCacheMu.Unlock() + + entry, found := globalExecRespCache[id] + if found { + atomic.AddInt64(&entry.refcount, 1) + return entry + } + + cache := &execRespCache{id, 1, sync.Map{}} + globalExecRespCache[id] = cache + return cache +} + +func releaseExecRespCache(cache *execRespCache) { + if cache == nil { + return + } + + globalExecRespCacheMu.Lock() + defer globalExecRespCacheMu.Unlock() + + refcount := atomic.AddInt64(&cache.refcount, -1) + if refcount <= 0 { + delete(globalExecRespCache, cache.id) + } +} + +func (c *execRespCache) load(key string) (*execResponse, bool) { + if c == nil { + return nil, false + } + + val, ok := c.table.Load(key) + if !ok { + return nil, false + } + + entry := val.(execRespCacheEntry) + if entry.isExpired() { + c.table.Delete(key) + return nil, false + } + return entry.respd, true +} + +func (c *execRespCache) store(key string, val *execResponse) { + if c == nil { + return + } + c.table.Store(key, execRespCacheEntry{time.Now(), val}) +} + +func (e execRespCacheEntry) isExpired() bool { + return time.Since(e.created) >= execRespCacheEntryTTL +} diff --git a/monitoring.go b/monitoring.go index b91f21855..c1d570796 100644 --- a/monitoring.go +++ b/monitoring.go @@ -210,8 +210,12 @@ func (sc *snowflakeConn) checkQueryStatus( func (sc *snowflakeConn) getQueryResultResp( ctx context.Context, - resultPath string) ( - *execResponse, error) { + resultPath string, +) (*execResponse, error) { + if respd, ok := sc.execRespCache.load(resultPath); ok { + return respd, nil + } + headers := getHeaders() if serviceName, ok := sc.cfg.Params[serviceName]; ok { headers[httpHeaderServiceName] = *serviceName @@ -235,6 +239,8 @@ func (sc *snowflakeConn) getQueryResultResp( logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err) return nil, err } + + sc.execRespCache.store(resultPath, respd) return respd, nil }