Skip to content

Commit

Permalink
Add optional session drain
Browse files Browse the repository at this point in the history
  • Loading branch information
rsafonseca committed Sep 30, 2023
1 parent 3689999 commit 5d3bf8e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
23 changes: 23 additions & 0 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,29 @@ func (app *App) Start() {
logger.Log.Warn("the app will shutdown in a few seconds")
case s := <-sg:
logger.Log.Warn("got signal: ", s, ", shutting down...")
if app.config.Session.Drain.Enabled && s == syscall.SIGTERM {
logger.Log.Info("Session drain is enabled, draining all sessions before shutting down")
timeoutTimer := time.NewTimer(app.config.Session.Drain.Timeout)
loop:
for {
if app.sessionPool.GetSessionCount() == 0 {
logger.Log.Info("All sessions drained")
break loop
}
select {
case s := <-sg:
logger.Log.Warn("got signal: ", s)
if s == syscall.SIGINT {
logger.Log.Warnf("Bypassing session draing due to SIGINT. %d sessions will be immediately terminated", app.sessionPool.GetSessionCount())
}
break loop
case <-timeoutTimer.C:
logger.Log.Warnf("Session drain has reached maximum timeout. %d sessions will be immediately terminated", app.sessionPool.GetSessionCount())
case <-time.After(app.config.Session.Drain.Timeout):
logger.Log.Infof("Waiting for all sessions to finish: %d sessions remaining...", app.sessionPool.GetSessionCount())
}
}
}
close(app.dieChan)
}

Expand Down
25 changes: 22 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type PitayaConfig struct {
} `mapstructure:"concurrency"`
Session struct {
Unique bool `mapstructure:"unique"`
Drain struct {
Enabled bool `mapstructure:"enabled"`
Timeout time.Duration `mapstructure:"timeout"`
Period time.Duration `mapstructure:"period"`
} `mapstructure:"drain"`
} `mapstructure:"session"`
Metrics struct {
Period time.Duration `mapstructure:"period"`
Expand Down Expand Up @@ -95,8 +100,22 @@ func NewDefaultPitayaConfig() *PitayaConfig {
},
Session: struct {
Unique bool `mapstructure:"unique"`
Drain struct {
Enabled bool `mapstructure:"enabled"`
Timeout time.Duration `mapstructure:"timeout"`
Period time.Duration `mapstructure:"period"`
} `mapstructure:"drain"`
}{
Unique: true,
Drain: struct {
Enabled bool `mapstructure:"enabled"`
Timeout time.Duration `mapstructure:"timeout"`
Period time.Duration `mapstructure:"period"`
}{
Enabled: false,
Timeout: time.Duration(6 * time.Hour),
Period: time.Duration(5 * time.Second),
},
},
Metrics: struct {
Period time.Duration `mapstructure:"period"`
Expand Down Expand Up @@ -181,9 +200,9 @@ func NewBuilderConfig(config *Config) *BuilderConfig {
if err := config.Unmarshal(&conf); err != nil {
panic(err)
}
if err := config.UnmarshalKey("pitaya", &conf); err != nil {
panic(err)
}
if err := config.UnmarshalKey("pitaya", &conf); err != nil {
panic(err)
}
return conf
}

Expand Down
3 changes: 3 additions & 0 deletions config/viper_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func (c *Config) fillDefaultValues() {
"pitaya.conn.ratelimiting.interval": rateLimitingConfig.Interval,
"pitaya.conn.ratelimiting.forcedisable": rateLimitingConfig.ForceDisable,
"pitaya.session.unique": pitayaConfig.Session.Unique,
"pitaya.session.drain.enabled": pitayaConfig.Session.Drain.Enabled,
"pitaya.session.drain.timeout": pitayaConfig.Session.Drain.Timeout,
"pitaya.session.drain.period": pitayaConfig.Session.Drain.Period,
"pitaya.worker.concurrency": workerConfig.Concurrency,
"pitaya.worker.redis.pool": workerConfig.Redis.Pool,
"pitaya.worker.redis.url": workerConfig.Redis.ServerURL,
Expand Down

0 comments on commit 5d3bf8e

Please sign in to comment.