Skip to content

Commit

Permalink
adding changes to disable automatic bind throttle (#392)
Browse files Browse the repository at this point in the history
* adding changes to disable automatic bind throttle

* updating values bind throttle decrese per sec and removed unused code

* updating bind eviction test

* fixing review comment

* fixing review comment

* adding test for if rate limit table not exist or empty

* move tests to different package to avoid running them in parallel

* updating sleep time in tests

* added changes for increase throttling recovery speed

* changes for updating text check condition in test code

* reverting changes for bind throttle

* reverted partial changes for local copy of bindEvict object it is going taken care separate change request

* changes for simplifying test code for qbb

---------

Co-authored-by: Rajesh S <samal.rajesh@gmail.com>
  • Loading branch information
rasamala83 and rajesh-1983 committed May 16, 2024
1 parent 36771d0 commit e2f3601
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 115 deletions.
16 changes: 8 additions & 8 deletions lib/bindevict.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ type BindEvict struct {
// evicted binds get throttled to have overall steady state during bad bind queries
// nested map uses sqlhash "bindName|bindValue"
BindThrottle map[uint32]map[string]*BindThrottle
lock sync.Mutex
lock sync.Mutex
}

func GetBindEvict() *BindEvict {
cfg := gBindEvict.Load()
if cfg == nil {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
gBindEvict.Store(&out)
return &out
}
return cfg.(*BindEvict)
}
func (this *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
for k,v := range this.BindThrottle {
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
for k, v := range this.BindThrottle {
out.BindThrottle[k] = v
}
return &out
Expand All @@ -77,7 +77,7 @@ func NormalizeBindName(bindName0 string) string {

func (entry *BindThrottle) decrAllowEveryX(y int) {
if y >= 2 && logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y)
info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y)
logger.GetLogger().Log(logger.Warning, "bind throttle decr", info)
}
entry.AllowEveryX -= y
Expand All @@ -96,7 +96,7 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
// copy everything except bindKV (skipping it is deleting it)
bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value)
updateCopy := make(map[string]*BindThrottle)
for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
for k, v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
if k == bindKV {
continue
}
Expand All @@ -107,7 +107,7 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
}
func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
Expand Down Expand Up @@ -149,7 +149,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
entry.RecentAttempt.Store(&now)
entry.AllowEveryXCount++
if entry.AllowEveryXCount < entry.AllowEveryX {
return true/*block*/, entry
return true /*block*/, entry
}
entry.AllowEveryXCount = 0

Expand Down
26 changes: 12 additions & 14 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ type Config struct {
// time_skew_threshold_error(15)
TimeSkewThresholdErrorSec int
// max_stranded_time_interval(2000)
StrandedWorkerTimeoutMs int
StrandedWorkerTimeoutMs int
HighLoadStrandedWorkerTimeoutMs int
HighLoadSkipInitiateRecoverPct int
HighLoadPct int
InitLimitPct int
HighLoadSkipInitiateRecoverPct int
HighLoadPct int
InitLimitPct int

// the worker scheduler policy
LifoScheduler bool
Expand All @@ -110,7 +110,7 @@ type Config struct {
HostnamePrefix map[string]string
ShardingCrossKeysErr bool

CfgFromTns bool
CfgFromTns bool
CfgFromTnsOverrideNumShards int // -1 no-override
CfgFromTnsOverrideTaf int // -1 no-override, 0 override-false, 1 override-true
CfgFromTnsOverrideRWSplit int // -1 no-override, readChildPct
Expand Down Expand Up @@ -156,8 +156,8 @@ type Config struct {
// when numWorkers changes, it will write to this channel, for worker manager to update
numWorkersCh chan int

EnableConnLimitCheck bool
EnableQueryBindBlocker bool
EnableConnLimitCheck bool
EnableQueryBindBlocker bool
QueryBindBlockerMinSqlPrefix int

// taf testing
Expand All @@ -169,7 +169,7 @@ type Config struct {
EnableDanglingWorkerRecovery bool

GoStatsInterval int
RandomStartMs int
RandomStartMs int

// The max number of database connections to be established per second
MaxDbConnectsPerSec int
Expand Down Expand Up @@ -274,10 +274,9 @@ func InitConfig() error {
gAppConfig.StrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("max_stranded_time_interval", 2000)
gAppConfig.HighLoadStrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("high_load_max_stranded_time_interval", 600111)
gAppConfig.HighLoadSkipInitiateRecoverPct = cdb.GetOrDefaultInt("high_load_skip_initiate_recover_pct", 80)
gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled
gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled
gAppConfig.InitLimitPct = cdb.GetOrDefaultInt("init_limit_pct", 125) // >100 disabled


gAppConfig.StateLogInterval = cdb.GetOrDefaultInt("state_log_interval", 1)
if gAppConfig.StateLogInterval <= 0 {
gAppConfig.StateLogInterval = 1
Expand All @@ -300,7 +299,7 @@ func InitConfig() error {
gAppConfig.ChildExecutable = "postgresworker"
}
} else {
// db type is not supported
// db type is not supported
return errors.New("database type must be either Oracle or MySQL")
}

Expand Down Expand Up @@ -425,9 +424,8 @@ func InitConfig() error {
fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", "10.0"),
"%f", &gAppConfig.BindEvictionDecrPerSec)

gAppConfig.SkipEvictRegex= cdb.GetOrDefaultString("skip_eviction_host_prefix","")
gAppConfig.EvictRegex= cdb.GetOrDefaultString("eviction_host_prefix", "")

gAppConfig.SkipEvictRegex = cdb.GetOrDefaultString("skip_eviction_host_prefix", "")
gAppConfig.EvictRegex = cdb.GetOrDefaultString("eviction_host_prefix", "")

gAppConfig.BouncerEnabled = cdb.GetOrDefaultBool("bouncer_enabled", true)
gAppConfig.BouncerStartupDelay = cdb.GetOrDefaultInt("bouncer_startup_delay", 10)
Expand Down
66 changes: 37 additions & 29 deletions lib/querybindblocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ import (
"github.com/paypal/hera/utility/logger"
)


type QueryBindBlockerEntry struct {
Herasqlhash uint32
Herasqltext string // prefix since some sql is too long
Bindvarname string // prefix for in clause
Herasqlhash uint32
Herasqltext string // prefix since some sql is too long
Bindvarname string // prefix for in clause
Bindvarvalue string // when set to "BLOCKALLVALUES" should block all sqltext queries
Blockperc int
Heramodule string
Blockperc int
Heramodule string
}

type QueryBindBlockerCfg struct {
Expand All @@ -48,7 +47,10 @@ type QueryBindBlockerCfg struct {
// check by sqltext prefix (delay to end)
}

func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool,string) {
var lastLoggingTime time.Time
var defaultQBBTableMissingErrorLoggingInterval = 2 * time.Hour

func (cfg *QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool, string) {
sqlhash := uint32(utility.GetSQLHash(sqltext))
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, fmt.Sprintf("query bind blocker sqlhash and text %d %s", sqlhash, sqltext))
Expand All @@ -70,7 +72,7 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (
byBindValue, ok := byBindName[bindPairs[i]]
if !ok {
// strip numeric suffix to try to match
withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i],"")
withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i], "")
byBindValue, ok = byBindName[withoutNumSuffix]
if !ok {
continue
Expand Down Expand Up @@ -118,28 +120,27 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (
var g_module string
var gQueryBindBlockerCfg atomic.Value

func GetQueryBindBlockerCfg() (*QueryBindBlockerCfg) {
cfg := gQueryBindBlockerCfg.Load()
if cfg == nil {
return nil
}
return cfg.(*QueryBindBlockerCfg)
func GetQueryBindBlockerCfg() *QueryBindBlockerCfg {
cfg := gQueryBindBlockerCfg.Load()
if cfg == nil {
return nil
}
return cfg.(*QueryBindBlockerCfg)
}


func InitQueryBindBlocker(modName string) {
g_module = modName

db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0"))
if err != nil {
db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0"))
if err != nil {
logger.GetLogger().Log(logger.Alert, "Loading query bind blocker - conn err ", err)
return
}
db.SetMaxIdleConns(0)

return
}
db.SetMaxIdleConns(0)
go func() {
time.Sleep(4*time.Second)
time.Sleep(4 * time.Second)
logger.GetLogger().Log(logger.Info, "Loading query bind blocker - initial")

loadBlockQueryBind(db)
c := time.Tick(11 * time.Second)
for now := range c {
Expand All @@ -152,11 +153,12 @@ func InitQueryBindBlocker(modName string) {
func loadBlockQueryBind(db *sql.DB) {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
conn, err := db.Conn(ctx);
conn, err := db.Conn(ctx)
if err != nil {
logger.GetLogger().Log(logger.Alert, "Error (conn) loading query bind blocker:", err)
return
}

defer conn.Close()
q := fmt.Sprintf("SELECT /*queryBindBlocker*/ %ssqlhash, %ssqltext, bindvarname, bindvarvalue, blockperc, %smodule FROM %s_rate_limiter where %smodule='%s'", GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().ManagementTablePrefix, GetConfig().StateLogPrefix, g_module)
logger.GetLogger().Log(logger.Info, "Loading query bind blocker meta-sql "+q)
Expand All @@ -167,12 +169,18 @@ func loadBlockQueryBind(db *sql.DB) {
}
rows, err := stmt.QueryContext(ctx)
if err != nil {
logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err)
return
if lastLoggingTime.IsZero() || time.Since(lastLoggingTime) > defaultQBBTableMissingErrorLoggingInterval {
//In case table missing log alert event for every 2 hour
logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err)
lastLoggingTime = time.Now()
return
} else {
return
}
}
defer rows.Close()

cfgLoad := QueryBindBlockerCfg{BySqlHash:make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)}
cfgLoad := QueryBindBlockerCfg{BySqlHash: make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)}

rowCount := 0
for rows.Next() {
Expand All @@ -182,9 +190,9 @@ func loadBlockQueryBind(db *sql.DB) {
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker:", err)
continue
}

if len(entry.Herasqltext) < GetConfig().QueryBindBlockerMinSqlPrefix {
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix," bytes or more - sqlhash:", entry.Herasqlhash)
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix, " bytes or more - sqlhash:", entry.Herasqlhash)
continue
}
rowCount++
Expand All @@ -200,7 +208,7 @@ func loadBlockQueryBind(db *sql.DB) {
}
bindVal, ok := bindName[entry.Bindvarvalue]
if !ok {
bindVal = make([]QueryBindBlockerEntry,0)
bindVal = make([]QueryBindBlockerEntry, 0)
bindName[entry.Bindvarvalue] = bindVal
}
bindName[entry.Bindvarvalue] = append(bindName[entry.Bindvarvalue], entry)
Expand Down
Loading

0 comments on commit e2f3601

Please sign in to comment.