Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ type KeepAlive struct {
}

type ProxyServerOnline struct {
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty" reloadable:"true"`
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty" reloadable:"true"`
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
MaxConnections uint64 `yaml:"max-connections,omitempty" toml:"max-connections,omitempty" json:"max-connections,omitempty" reloadable:"true"`
HighMemoryUsageRejectThreshold float64 `yaml:"high-memory-usage-reject-threshold,omitempty" toml:"high-memory-usage-reject-threshold,omitempty" json:"high-memory-usage-reject-threshold,omitempty" reloadable:"true"`
ConnBufferSize int `yaml:"conn-buffer-size,omitempty" toml:"conn-buffer-size,omitempty" json:"conn-buffer-size,omitempty" reloadable:"true"`
FrontendKeepalive KeepAlive `yaml:"frontend-keepalive" toml:"frontend-keepalive" json:"frontend-keepalive"`
// BackendHealthyKeepalive applies when the observer treats the backend as healthy.
// The config values should be conservative to save CPU and tolerate network fluctuation.
BackendHealthyKeepalive KeepAlive `yaml:"backend-healthy-keepalive" toml:"backend-healthy-keepalive" json:"backend-healthy-keepalive"`
Expand Down Expand Up @@ -149,6 +150,7 @@ func NewConfig() *Config {

cfg.Proxy.Addr = "0.0.0.0:6000"
cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive()
cfg.Proxy.HighMemoryUsageRejectThreshold = 0.9
cfg.Proxy.PDAddrs = "127.0.0.1:2379"
cfg.Proxy.GracefulCloseConnTimeout = 15
cfg.Proxy.FailoverTimeout = 60
Expand Down Expand Up @@ -211,6 +213,12 @@ func (cfg *Config) Check() error {
}

func (ps *ProxyServer) Check() error {
if ps.HighMemoryUsageRejectThreshold < 0 || ps.HighMemoryUsageRejectThreshold > 1 {
return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.high-memory-usage-reject-threshold")
}
if ps.HighMemoryUsageRejectThreshold > 0 && ps.HighMemoryUsageRejectThreshold < 0.5 {
ps.HighMemoryUsageRejectThreshold = 0.5
}
if ps.FailoverTimeout < 0 {
return errors.Wrapf(ErrInvalidConfigValue, "proxy.failover-timeout must be greater than or equal to 0")
}
Expand Down
35 changes: 28 additions & 7 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ var testProxyConfig = Config{
Addr: "0.0.0.0:4000",
PDAddrs: "127.0.0.1:4089",
ProxyServerOnline: ProxyServerOnline{
MaxConnections: 1,
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
FailBackendList: []string{"db-tidb-0", "db-tidb-1"},
FailoverTimeout: 60,
ConnBufferSize: 32 * 1024,
MaxConnections: 1,
HighMemoryUsageRejectThreshold: 0.9,
FrontendKeepalive: KeepAlive{Enabled: true},
ProxyProtocol: "v2",
GracefulWaitBeforeShutdown: 10,
FailBackendList: []string{"db-tidb-0", "db-tidb-1"},
FailoverTimeout: 60,
ConnBufferSize: 32 * 1024,
},
},
API: API{
Expand Down Expand Up @@ -92,6 +93,26 @@ func TestProxyCheck(t *testing.T) {
post func(*testing.T, *Config)
err error
}{
{
pre: func(t *testing.T, c *Config) {
c.Proxy.HighMemoryUsageRejectThreshold = -0.1
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.HighMemoryUsageRejectThreshold = 1.1
},
err: ErrInvalidConfigValue,
},
{
pre: func(t *testing.T, c *Config) {
c.Proxy.HighMemoryUsageRejectThreshold = 0.4
},
post: func(t *testing.T, c *Config) {
require.Equal(t, 0.5, c.Proxy.HighMemoryUsageRejectThreshold)
},
},
{
pre: func(t *testing.T, c *Config) {
c.Workdir = ""
Expand Down
144 changes: 122 additions & 22 deletions pkg/manager/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"runtime"
"runtime/pprof"
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/util/memory"
Expand All @@ -18,16 +19,26 @@ import (
)

const (
// Check the memory usage every 30 seconds.
checkInterval = 30 * time.Second
// Refresh the memory usage every 5 seconds.
refreshInterval = 5 * time.Second
// No need to record too frequently.
recordMinInterval = 5 * time.Minute
// Record the profiles when the memory usage is higher than 60%.
alarmThreshold = 0.6
// Remove the oldest profiles when the number of profiles exceeds this limit.
maxSavedProfiles = 20
// Fail open if the latest sampled usage is too old.
snapshotExpireInterval = 3 * refreshInterval
)

type UsageSnapshot struct {
Used uint64
Limit uint64
Usage float64
UpdateTime time.Time
Valid bool
}

// MemManager is a manager for memory usage.
// Although the continuous profiling collects profiles periodically, when TiProxy runs in the replayer mode,
// the profiles are not collected.
Expand All @@ -38,77 +49,166 @@ type MemManager struct {
cfgGetter config.ConfigGetter
savedProfileNames []string
lastRecordTime time.Time
checkInterval time.Duration // used for test
refreshInterval time.Duration // used for test
recordMinInterval time.Duration // used for test
maxSavedProfiles int // used for test
snapshotExpire time.Duration // used for test
memoryLimit uint64
latestUsage atomic.Value
// connBufferMemDelta tracks the estimated buffer memory change since the latest refreshUsage.
connBufferMemDelta atomic.Int64
}

func NewMemManager(lg *zap.Logger, cfgGetter config.ConfigGetter) *MemManager {
return &MemManager{
mgr := &MemManager{
lg: lg,
cfgGetter: cfgGetter,
checkInterval: checkInterval,
refreshInterval: refreshInterval,
recordMinInterval: recordMinInterval,
maxSavedProfiles: maxSavedProfiles,
snapshotExpire: snapshotExpireInterval,
}
mgr.latestUsage.Store(UsageSnapshot{})
return mgr
}

func (m *MemManager) Start(ctx context.Context) {
// Call the memory.MemTotal and memory.MemUsed in TiDB repo because they have considered cgroup.
limit, err := memory.MemTotal()
if err != nil || limit == 0 {
m.lg.Error("get memory limit failed", zap.Uint64("limit", limit), zap.Error(err))
m.lg.Warn("get memory limit failed", zap.Uint64("limit", limit), zap.Error(err))
return
}
m.memoryLimit = limit
if _, err = m.refreshUsage(); err != nil {
return
}
childCtx, cancel := context.WithCancel(ctx)
m.cancel = cancel
m.wg.RunWithRecover(func() {
m.alarmLoop(childCtx)
m.refreshLoop(childCtx)
}, nil, m.lg)
}

func (m *MemManager) alarmLoop(ctx context.Context) {
ticker := time.NewTicker(m.checkInterval)
func (m *MemManager) refreshLoop(ctx context.Context) {
ticker := time.NewTicker(m.refreshInterval)
defer ticker.Stop()
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.checkAndAlarm()
m.refreshAndAlarm()
}
}
}

func (m *MemManager) checkAndAlarm() {
func (m *MemManager) refreshAndAlarm() {
snapshot, err := m.refreshUsage()
if err != nil || !snapshot.Valid {
return
}
if snapshot.Usage < alarmThreshold {
return
}
if time.Since(m.lastRecordTime) < m.recordMinInterval {
return
}
// The filename is hot-reloadable.
logPath := m.cfgGetter.GetConfig().Log.LogFile.Filename
cfg := m.cfgGetter.GetConfig()
if cfg == nil {
return
}
logPath := cfg.Log.LogFile.Filename
if logPath == "" {
return
}
recordDir := filepath.Dir(logPath)

m.lastRecordTime = snapshot.UpdateTime
m.lg.Warn("memory usage alarm", zap.Uint64("limit", snapshot.Limit), zap.Uint64("used", snapshot.Used), zap.Float64("usage", snapshot.Usage))
now := time.Now().Format(time.RFC3339)
m.recordHeap(filepath.Join(recordDir, "heap_"+now))
m.recordGoroutine(filepath.Join(recordDir, "goroutine_"+now))
m.rmExpiredProfiles()
}

func (m *MemManager) refreshUsage() (UsageSnapshot, error) {
if m.memoryLimit == 0 {
return UsageSnapshot{}, nil
}
used, err := memory.MemUsed()
if err != nil || used == 0 {
m.lg.Error("get used memory failed", zap.Uint64("used", used), zap.Error(err))
return
m.lg.Warn("get used memory failed", zap.Uint64("used", used), zap.Error(err))
return UsageSnapshot{}, err
}
// Start a new delta window from this sampled snapshot. Later connection create/close
// events only adjust the in-memory estimate relative to this refresh result.
m.connBufferMemDelta.Swap(0)
snapshot := UsageSnapshot{
Used: used,
Limit: m.memoryLimit,
Usage: float64(used) / float64(m.memoryLimit),
UpdateTime: time.Now(),
Valid: true,
}
memoryUsage := float64(used) / float64(m.memoryLimit)
if memoryUsage < alarmThreshold {
m.latestUsage.Store(snapshot)
return snapshot, nil
}

func (m *MemManager) LatestUsage() UsageSnapshot {
snapshot, _ := m.latestUsage.Load().(UsageSnapshot)
return snapshot
}

func (m *MemManager) UpdateConnBufferMemory(delta int64) {
if m == nil || delta == 0 {
return
}
m.connBufferMemDelta.Add(delta)
}

m.lastRecordTime = time.Now()
m.lg.Warn("memory usage alarm", zap.Uint64("limit", m.memoryLimit), zap.Uint64("used", used), zap.Float64("usage", memoryUsage))
now := time.Now().Format(time.RFC3339)
m.recordHeap(filepath.Join(recordDir, "heap_"+now))
m.recordGoroutine(filepath.Join(recordDir, "goroutine_"+now))
m.rmExpiredProfiles()
// adjustUsageByConnBuffer applies the connection buffer delta accumulated after the
// latest refreshUsage, so ShouldRejectNewConn can react before the next memory sample.
func (m *MemManager) adjustUsageByConnBuffer(snapshot UsageSnapshot) UsageSnapshot {
delta := m.connBufferMemDelta.Load()
if delta == 0 {
return snapshot
}
if delta > 0 {
snapshot.Used += uint64(delta)
} else {
released := uint64(-delta)
if released >= snapshot.Used {
snapshot.Used = 0
} else {
snapshot.Used -= released
}
}
if snapshot.Limit > 0 {
snapshot.Usage = float64(snapshot.Used) / float64(snapshot.Limit)
}
return snapshot
}

func (m *MemManager) ShouldRejectNewConn() (bool, UsageSnapshot, float64) {
if m == nil || m.cfgGetter == nil {
return false, UsageSnapshot{}, 0
}
cfg := m.cfgGetter.GetConfig()
if cfg == nil {
return false, UsageSnapshot{}, 0
}
threshold := cfg.Proxy.HighMemoryUsageRejectThreshold
if threshold == 0 {
return false, UsageSnapshot{}, 0
}
snapshot := m.LatestUsage()
if !snapshot.Valid || time.Since(snapshot.UpdateTime) > m.snapshotExpire {
return false, snapshot, threshold
}
snapshot = m.adjustUsageByConnBuffer(snapshot)
return snapshot.Usage >= threshold, snapshot, threshold
}

func (m *MemManager) recordHeap(fileName string) {
Expand Down
Loading