Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding changes to disable automatic bind throttle #392

Merged
merged 13 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions lib/bindevict.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ type BindEvict struct {
// evicted binds get throttled to have overall steady state during bad bind queries
// nested map uses sqlhash "bindName|bindValue"
BindThrottle map[uint32]map[string]*BindThrottle
lock sync.Mutex
lock sync.Mutex
}

func GetBindEvict() *BindEvict {
cfg := gBindEvict.Load()
if cfg == nil {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
gBindEvict.Store(&out)
return &out
}
return cfg.(*BindEvict)
}
func (this *BindEvict) Copy() *BindEvict {
out := BindEvict{BindThrottle:make(map[uint32]map[string]*BindThrottle)}
for k,v := range this.BindThrottle {
out := BindEvict{BindThrottle: make(map[uint32]map[string]*BindThrottle)}
for k, v := range this.BindThrottle {
out.BindThrottle[k] = v
}
return &out
Expand All @@ -77,7 +77,7 @@ func NormalizeBindName(bindName0 string) string {

func (entry *BindThrottle) decrAllowEveryX(y int) {
if y >= 2 && logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y)
info := fmt.Sprintf("hash:%d bindName:%s val:%s allowEveryX:%d-%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX, y)
logger.GetLogger().Log(logger.Warning, "bind throttle decr", info)
}
entry.AllowEveryX -= y
Expand All @@ -96,7 +96,7 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
// copy everything except bindKV (skipping it is deleting it)
bindKV := fmt.Sprintf("%s|%s", entry.Name, entry.Value)
updateCopy := make(map[string]*BindThrottle)
for k,v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
for k, v := range GetBindEvict().BindThrottle[entry.Sqlhash] {
if k == bindKV {
continue
}
Expand All @@ -107,7 +107,7 @@ func (entry *BindThrottle) decrAllowEveryX(y int) {
}
func (entry *BindThrottle) incrAllowEveryX() {
if logger.GetLogger().V(logger.Warning) {
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d",entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
info := fmt.Sprintf("hash:%d bindName:%s val:%s prev:%d", entry.Sqlhash, entry.Name, entry.Value, entry.AllowEveryX)
logger.GetLogger().Log(logger.Warning, "bind throttle incr", info)
}
entry.AllowEveryX = 3*entry.AllowEveryX + 1
Expand Down Expand Up @@ -149,7 +149,7 @@ func (be *BindEvict) ShouldBlock(sqlhash uint32, bindKV map[string]string, heavy
entry.RecentAttempt.Store(&now)
entry.AllowEveryXCount++
if entry.AllowEveryXCount < entry.AllowEveryX {
return true/*block*/, entry
return true /*block*/, entry
}
entry.AllowEveryXCount = 0

Expand Down
26 changes: 12 additions & 14 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ type Config struct {
// time_skew_threshold_error(15)
TimeSkewThresholdErrorSec int
// max_stranded_time_interval(2000)
StrandedWorkerTimeoutMs int
StrandedWorkerTimeoutMs int
HighLoadStrandedWorkerTimeoutMs int
HighLoadSkipInitiateRecoverPct int
HighLoadPct int
InitLimitPct int
HighLoadSkipInitiateRecoverPct int
HighLoadPct int
InitLimitPct int

// the worker scheduler policy
LifoScheduler bool
Expand All @@ -110,7 +110,7 @@ type Config struct {
HostnamePrefix map[string]string
ShardingCrossKeysErr bool

CfgFromTns bool
CfgFromTns bool
CfgFromTnsOverrideNumShards int // -1 no-override
CfgFromTnsOverrideTaf int // -1 no-override, 0 override-false, 1 override-true
CfgFromTnsOverrideRWSplit int // -1 no-override, readChildPct
Expand Down Expand Up @@ -156,8 +156,8 @@ type Config struct {
// when numWorkers changes, it will write to this channel, for worker manager to update
numWorkersCh chan int

EnableConnLimitCheck bool
EnableQueryBindBlocker bool
EnableConnLimitCheck bool
EnableQueryBindBlocker bool
QueryBindBlockerMinSqlPrefix int

// taf testing
Expand All @@ -169,7 +169,7 @@ type Config struct {
EnableDanglingWorkerRecovery bool

GoStatsInterval int
RandomStartMs int
RandomStartMs int

// The max number of database connections to be established per second
MaxDbConnectsPerSec int
Expand Down Expand Up @@ -274,10 +274,9 @@ func InitConfig() error {
gAppConfig.StrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("max_stranded_time_interval", 2000)
gAppConfig.HighLoadStrandedWorkerTimeoutMs = cdb.GetOrDefaultInt("high_load_max_stranded_time_interval", 600111)
gAppConfig.HighLoadSkipInitiateRecoverPct = cdb.GetOrDefaultInt("high_load_skip_initiate_recover_pct", 80)
gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled
gAppConfig.HighLoadPct = cdb.GetOrDefaultInt("high_load_pct", 130) // >100 disabled
gAppConfig.InitLimitPct = cdb.GetOrDefaultInt("init_limit_pct", 125) // >100 disabled


gAppConfig.StateLogInterval = cdb.GetOrDefaultInt("state_log_interval", 1)
if gAppConfig.StateLogInterval <= 0 {
gAppConfig.StateLogInterval = 1
Expand All @@ -300,7 +299,7 @@ func InitConfig() error {
gAppConfig.ChildExecutable = "postgresworker"
}
} else {
// db type is not supported
// db type is not supported
return errors.New("database type must be either Oracle or MySQL")
}

Expand Down Expand Up @@ -425,9 +424,8 @@ func InitConfig() error {
fmt.Sscanf(cdb.GetOrDefaultString("bind_eviction_decr_per_sec", "10.0"),
"%f", &gAppConfig.BindEvictionDecrPerSec)

gAppConfig.SkipEvictRegex= cdb.GetOrDefaultString("skip_eviction_host_prefix","")
gAppConfig.EvictRegex= cdb.GetOrDefaultString("eviction_host_prefix", "")

gAppConfig.SkipEvictRegex = cdb.GetOrDefaultString("skip_eviction_host_prefix", "")
gAppConfig.EvictRegex = cdb.GetOrDefaultString("eviction_host_prefix", "")

gAppConfig.BouncerEnabled = cdb.GetOrDefaultBool("bouncer_enabled", true)
gAppConfig.BouncerStartupDelay = cdb.GetOrDefaultInt("bouncer_startup_delay", 10)
Expand Down
66 changes: 37 additions & 29 deletions lib/querybindblocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ import (
"github.com/paypal/hera/utility/logger"
)


type QueryBindBlockerEntry struct {
Herasqlhash uint32
Herasqltext string // prefix since some sql is too long
Bindvarname string // prefix for in clause
Herasqlhash uint32
Herasqltext string // prefix since some sql is too long
Bindvarname string // prefix for in clause
Bindvarvalue string // when set to "BLOCKALLVALUES" should block all sqltext queries
Blockperc int
Heramodule string
Blockperc int
Heramodule string
}

type QueryBindBlockerCfg struct {
Expand All @@ -48,7 +47,10 @@ type QueryBindBlockerCfg struct {
// check by sqltext prefix (delay to end)
}

func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool,string) {
var lastLoggingTime time.Time
var defaultQBBTableMissingErrorLoggingInterval = 2 * time.Hour

func (cfg *QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (bool, string) {
sqlhash := uint32(utility.GetSQLHash(sqltext))
if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, fmt.Sprintf("query bind blocker sqlhash and text %d %s", sqlhash, sqltext))
Expand All @@ -70,7 +72,7 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (
byBindValue, ok := byBindName[bindPairs[i]]
if !ok {
// strip numeric suffix to try to match
withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i],"")
withoutNumSuffix := regexp.MustCompile("[_0-9]*$").ReplaceAllString(bindPairs[i], "")
byBindValue, ok = byBindName[withoutNumSuffix]
if !ok {
continue
Expand Down Expand Up @@ -118,28 +120,27 @@ func (cfg * QueryBindBlockerCfg) IsBlocked(sqltext string, bindPairs []string) (
var g_module string
var gQueryBindBlockerCfg atomic.Value

func GetQueryBindBlockerCfg() (*QueryBindBlockerCfg) {
cfg := gQueryBindBlockerCfg.Load()
if cfg == nil {
return nil
}
return cfg.(*QueryBindBlockerCfg)
func GetQueryBindBlockerCfg() *QueryBindBlockerCfg {
cfg := gQueryBindBlockerCfg.Load()
if cfg == nil {
return nil
}
return cfg.(*QueryBindBlockerCfg)
}


func InitQueryBindBlocker(modName string) {
g_module = modName

db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0"))
if err != nil {
db, err := sql.Open("heraloop", fmt.Sprintf("0:0:0"))
if err != nil {
logger.GetLogger().Log(logger.Alert, "Loading query bind blocker - conn err ", err)
return
}
db.SetMaxIdleConns(0)

return
}
db.SetMaxIdleConns(0)
go func() {
time.Sleep(4*time.Second)
time.Sleep(4 * time.Second)
logger.GetLogger().Log(logger.Info, "Loading query bind blocker - initial")

loadBlockQueryBind(db)
c := time.Tick(11 * time.Second)
for now := range c {
Expand All @@ -152,11 +153,12 @@ func InitQueryBindBlocker(modName string) {
func loadBlockQueryBind(db *sql.DB) {
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
conn, err := db.Conn(ctx);
conn, err := db.Conn(ctx)
if err != nil {
logger.GetLogger().Log(logger.Alert, "Error (conn) loading query bind blocker:", err)
return
}

defer conn.Close()
q := fmt.Sprintf("SELECT /*queryBindBlocker*/ %ssqlhash, %ssqltext, bindvarname, bindvarvalue, blockperc, %smodule FROM %s_rate_limiter where %smodule='%s'", GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().StateLogPrefix, GetConfig().ManagementTablePrefix, GetConfig().StateLogPrefix, g_module)
logger.GetLogger().Log(logger.Info, "Loading query bind blocker meta-sql "+q)
Expand All @@ -167,12 +169,18 @@ func loadBlockQueryBind(db *sql.DB) {
}
rows, err := stmt.QueryContext(ctx)
if err != nil {
logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err)
return
if lastLoggingTime.IsZero() || time.Since(lastLoggingTime) > defaultQBBTableMissingErrorLoggingInterval {
//In case table missing log alert event for every 2 hour
logger.GetLogger().Log(logger.Alert, "Error (query) loading query bind blocker:", err)
lastLoggingTime = time.Now()
return
} else {
return
}
}
defer rows.Close()

cfgLoad := QueryBindBlockerCfg{BySqlHash:make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)}
cfgLoad := QueryBindBlockerCfg{BySqlHash: make(map[uint32]map[string]map[string][]QueryBindBlockerEntry)}

rowCount := 0
for rows.Next() {
Expand All @@ -182,9 +190,9 @@ func loadBlockQueryBind(db *sql.DB) {
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker:", err)
continue
}

if len(entry.Herasqltext) < GetConfig().QueryBindBlockerMinSqlPrefix {
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix," bytes or more - sqlhash:", entry.Herasqlhash)
logger.GetLogger().Log(logger.Alert, "Error (row scan) loading query bind blocker - sqltext must be ", GetConfig().QueryBindBlockerMinSqlPrefix, " bytes or more - sqlhash:", entry.Herasqlhash)
continue
}
rowCount++
Expand All @@ -200,7 +208,7 @@ func loadBlockQueryBind(db *sql.DB) {
}
bindVal, ok := bindName[entry.Bindvarvalue]
if !ok {
bindVal = make([]QueryBindBlockerEntry,0)
bindVal = make([]QueryBindBlockerEntry, 0)
bindName[entry.Bindvarvalue] = bindVal
}
bindName[entry.Bindvarvalue] = append(bindName[entry.Bindvarvalue], entry)
Expand Down