Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# Release 0.22.29

- Improve session refresh behavior.

## Changes to API session refresh
* Limit refreshes of both api and sessions to at most every 30 seconds.
* Base faster refreshes not on number of available ERs but on usable ER endpoints, since some ERs may not have usable endpoints
* Only refresh API sessions if session is no longer valid, as opposed if an api session refresh fails, which can happen if the
controller is down or busy.
* Only allow one api session refresh at a time
* Use exponential backoff for api session refresh retries

# Release 0.22.12

- Deprecate ListenOptions.MaxConnections in favor of ListenOptions.MaxTerminators
Expand Down
182 changes: 131 additions & 51 deletions ziti/ziti.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
rest_session "github.com/openziti/edge-api/rest_client_api_client/session"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/foundation/v2/stringz"
apis "github.com/openziti/sdk-golang/edge-apis"
"github.com/openziti/secretstream/kx"
"math"
"math/rand"
"net"
"reflect"
"strconv"
Expand Down Expand Up @@ -187,6 +189,8 @@ type ContextImpl struct {
authQueryHandlers map[string]func(query *rest_model.AuthQueryDetail, response MfaCodeResponse) error

events.EventEmmiter
apiSessionLock sync.Mutex
lastSuccessfulApiSessionRefresh time.Time
}

func (context *ContextImpl) AddServiceAddedListener(handler func(Context, *rest_model.ServiceDetail)) func() {
Expand Down Expand Up @@ -844,10 +848,17 @@ func (context *ContextImpl) Reauthenticate() error {
}

func (context *ContextImpl) Authenticate() error {
context.apiSessionLock.Lock()
defer context.apiSessionLock.Unlock()

if context.CtrlClt.GetCurrentApiSession() != nil {
if time.Since(context.lastSuccessfulApiSessionRefresh) < 5*time.Second {
return nil
}
logrus.Debug("previous apiSession detected, checking if valid")
if _, err := context.CtrlClt.Refresh(); err == nil {
if err := context.RefreshApiSessionWithBackoff(); err == nil {
logrus.Info("previous apiSession refreshed")
context.lastSuccessfulApiSessionRefresh = time.Now()
return nil
} else {
logrus.WithError(err).Info("previous apiSession failed to refresh, attempting to authenticate")
Expand All @@ -857,6 +868,31 @@ func (context *ContextImpl) Authenticate() error {
return context.authenticate()
}

func (context *ContextImpl) RefreshApiSessionWithBackoff() error {
expBackoff := backoff.NewExponentialBackOff()

expBackoff.InitialInterval = 5 * time.Second
expBackoff.MaxInterval = 5 * time.Minute
expBackoff.MaxElapsedTime = 24 * time.Hour

operation := func() error {
_, err := context.CtrlClt.Refresh()
if err == nil {
return nil
}

unauthorizedErr := &current_api_session.GetCurrentAPISessionUnauthorized{}
if errors.As(err, &unauthorizedErr) {
logrus.Info("previous apiSession expired")
return backoff.Permanent(err)
}
logrus.WithError(err).Info("unable to refresh apiSession, will retry")
return err
}

return backoff.Retry(operation, expBackoff)
}

func (context *ContextImpl) CloseAllEdgeRouterConns() {
for entry := range context.routerConnections.IterBuffered() {
key, val := entry.Key, entry.Val
Expand Down Expand Up @@ -1240,7 +1276,7 @@ func (context *ContextImpl) connectEdgeRouter(routerName, ingressUrl string, ret
return
}

pfxlog.Logger().Debugf("connection to edge router using api session token %v", currentApiSession.GetToken())
pfxlog.Logger().Debugf("connection to edge router using api session token %s", string(currentApiSession.GetToken()))
id, err := context.CtrlClt.GetIdentity()

if err != nil {
Expand Down Expand Up @@ -1616,18 +1652,20 @@ func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl,
}

type listenerManager struct {
service *rest_model.ServiceDetail
context *ContextImpl
session *rest_model.SessionDetail
options *edge.ListenOptions
routerConnections map[string]edge.RouterConn
connects map[string]time.Time
listener network.MultiListener
connectChan chan *edgeRouterConnResult
eventChan chan listenerEvent
sessionRefreshTime time.Time
disconnectedTime *time.Time
observers concurrenz.CopyOnWriteSlice[ListenEventObserver]
service *rest_model.ServiceDetail
context *ContextImpl
session *rest_model.SessionDetail
options *edge.ListenOptions
routerConnections map[string]edge.RouterConn
connects map[string]time.Time
listener network.MultiListener
connectChan chan *edgeRouterConnResult
eventChan chan listenerEvent
sessionRefreshInterval time.Duration
restartSessionRefresh bool
lastSessionRefresh time.Time
disconnectedTime *time.Time
observers concurrenz.CopyOnWriteSlice[ListenEventObserver]
}

func (mgr *listenerManager) AddObserver(observer ListenEventObserver) {
Expand All @@ -1645,6 +1683,7 @@ func (mgr *listenerManager) notify(eventType ListenEventType) {
}

func (mgr *listenerManager) run() {
log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name))
// need to either establish a session, or fail if we can't create one
for mgr.session == nil {
mgr.createSessionWithBackoff()
Expand All @@ -1665,41 +1704,21 @@ func (mgr *listenerManager) run() {

identitySecret, err := signing.AssertIdentityWithSecret(id.Cert().PrivateKey)
if err != nil {
pfxlog.Logger().Errorf("failed to sign identity: %v", err)
log.WithError(err).Error("failed to sign identity")
} else {
mgr.options.IdentitySecret = string(identitySecret)
}
}

ticker := time.NewTicker(250 * time.Millisecond)

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

refreshSessionTimerInterval := 10 * time.Second
var refreshSessionChan <-chan time.Time

for !mgr.listener.IsClosed() {
var refreshSessionTimer *time.Timer
if len(mgr.session.EdgeRouters) == 0 {
refreshSessionTimer = time.NewTimer(refreshSessionTimerInterval)
} else if len(mgr.session.EdgeRouters) < mgr.options.MaxTerminators {
if refreshSessionTimerInterval < 5*time.Minute {
refreshSessionTimerInterval = 5 * time.Minute
}
refreshSessionTimer = time.NewTimer(refreshSessionTimerInterval)
}

if refreshSessionTimer != nil {
refreshSessionTimerInterval *= 2
if refreshSessionTimerInterval > 30*time.Minute {
refreshSessionTimerInterval = 30 * time.Minute
}
} else {
refreshSessionTimerInterval = 10 * time.Second
}

var refreshSessionTimerC <-chan time.Time
if refreshSessionTimer != nil {
refreshSessionTimerC = refreshSessionTimer.C
if mgr.restartSessionRefresh {
refreshSessionChan = time.After(mgr.sessionRefreshInterval)
mgr.restartSessionRefresh = false
}

//goland:noinspection GoNilness
Expand All @@ -1708,20 +1727,62 @@ func (mgr *listenerManager) run() {
mgr.handleRouterConnectResult(routerConnectionResult)
case event := <-mgr.eventChan:
event.handle(mgr)
case <-refreshSessionTimerC:
case <-refreshSessionChan:
mgr.refreshSession()
log.Debugf("next refresh in %s", mgr.sessionRefreshInterval.String())
refreshSessionChan = time.After(mgr.sessionRefreshInterval)
mgr.sessionRefreshInterval *= 2
if mgr.sessionRefreshInterval > 30*time.Minute {
mgr.sessionRefreshInterval = 30 * time.Minute
}
case <-ticker.C:
mgr.makeMoreListeners()
case <-mgr.options.GetEventChannel():
mgr.notify(ListenerEstablished)
case <-mgr.context.closeNotify:
mgr.listener.CloseWithError(errors.New("context closed"))
}
}
}

func (mgr *listenerManager) sessionRefreshed(session *rest_model.SessionDetail) {
oldUsableCount := mgr.getUsableEndpointCount(mgr.session)
newUsableCount := mgr.getUsableEndpointCount(session)

if oldUsableCount >= 0 && newUsableCount == 0 {
mgr.sessionRefreshInterval = time.Duration(5+rand.Intn(10)) * time.Second
} else if newUsableCount == 0 || newUsableCount < mgr.options.MaxTerminators {
mgr.sessionRefreshInterval = time.Duration(5+rand.Intn(5)) * time.Minute
} else {
mgr.sessionRefreshInterval = 30 * time.Minute
}

mgr.session = session
mgr.restartSessionRefresh = true
mgr.lastSessionRefresh = time.Now()

if refreshSessionTimer != nil {
refreshSessionTimer.Stop()
log := pfxlog.Logger().
WithField("service", stringz.OrEmpty(mgr.service.Name)).
WithField("sessionId", stringz.OrEmpty(mgr.session.ID)).
WithField("usableEndpoints", newUsableCount).
WithField("nextRefresh", mgr.sessionRefreshInterval.String())
log.Debug("session refreshed")
}

func (mgr *listenerManager) getUsableEndpointCount(session *rest_model.SessionDetail) int {
if session == nil {
return 0
}

count := 0
for _, edgeRouter := range session.EdgeRouters {
for _, routerUrl := range edgeRouter.Urls {
if mgr.context.options.isEdgeRouterUrlAccepted(routerUrl) {
count++
}
}
}
return count
}

func (mgr *listenerManager) handleRouterConnectResult(result *edgeRouterConnResult) {
Expand Down Expand Up @@ -1779,38 +1840,55 @@ func (mgr *listenerManager) createListener(routerConnection edge.RouterConn, ses
}

func (mgr *listenerManager) makeMoreListeners() {
log := pfxlog.Logger().WithField("service", *mgr.service.Name).WithField("erCount", len(mgr.session.EdgeRouters))
if mgr.listener.IsClosed() || len(mgr.routerConnections) >= mgr.options.MaxTerminators || len(mgr.session.EdgeRouters) <= len(mgr.routerConnections) {
log.Trace("not trying to make more connections")
return
}

for _, edgeRouter := range mgr.session.EdgeRouters {
if _, ok := mgr.routerConnections[*edgeRouter.Name]; ok {
log.WithField("router", *edgeRouter.Name).Trace("already connected")
// already connected to this router
continue
}

for _, routerUrl := range edgeRouter.Urls {
if !mgr.context.options.isEdgeRouterUrlAccepted(routerUrl) {
log.WithField("router", *edgeRouter.Name).WithField("url", routerUrl).
Trace("skipping unusable url")
continue
}

if _, ok := mgr.connects[routerUrl]; ok {
// this url already has a connect in progress
log.WithField("router", *edgeRouter.Name).WithField("url", routerUrl).
Trace("connect already in progress")
continue
}

log.WithField("router", *edgeRouter.Name).WithField("url", routerUrl).
Trace("attempting to connect to router")
mgr.connects[routerUrl] = time.Now()
go mgr.context.connectEdgeRouter(*edgeRouter.Name, routerUrl, mgr.connectChan)
}
}
}

func (mgr *listenerManager) refreshSession() {
if time.Since(mgr.lastSessionRefresh) < 30*time.Second {
return
}

log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name))
if mgr.session == nil {
log.Debug("establishing initial session")
mgr.createSessionWithBackoff()
return
}

log = log.WithField("sessionId", stringz.OrEmpty(mgr.session.ID)).WithField("erCount", len(mgr.session.EdgeRouters))
log.Debug("starting session refresh")
session, err := mgr.context.refreshSession(mgr.session)

if err != nil {
Expand All @@ -1823,7 +1901,7 @@ func (mgr *listenerManager) refreshSession() {

target = &rest_session.DetailSessionUnauthorized{}
if errors.As(err, &target) {
pfxlog.Logger().WithError(err).Debugf("failure refreshing bind session for service %v", mgr.listener.GetServiceName())
log.WithError(err).Debugf("failure refreshing bind session for service %v", mgr.listener.GetServiceName())
if err := mgr.context.EnsureAuthenticated(mgr.options); err != nil {
err := fmt.Errorf("unable to establish API session (%w)", err)
if len(mgr.routerConnections) == 0 {
Expand All @@ -1837,7 +1915,7 @@ func (mgr *listenerManager) refreshSession() {
if err != nil {
target = &rest_session.DetailSessionUnauthorized{}
if errors.As(err, &target) {
pfxlog.Logger().WithError(err).Errorf(
log.WithError(err).Errorf(
"failure refreshing bind session even after re-authenticating api session. service %v",
mgr.listener.GetServiceName())
if len(mgr.routerConnections) == 0 {
Expand All @@ -1846,7 +1924,7 @@ func (mgr *listenerManager) refreshSession() {
return
}

pfxlog.Logger().WithError(err).Errorf("failed to to refresh session %v", *mgr.session.ID)
log.WithError(err).Errorf("failed to to refresh session %v", *mgr.session.ID)

// try to create new session
mgr.createSessionWithBackoff()
Expand All @@ -1856,8 +1934,7 @@ func (mgr *listenerManager) refreshSession() {
// token only returned on created, so if we refreshed the session (as opposed to creating a new one) we have to back-fill it on lookups
if session != nil {
session.Token = mgr.session.Token
mgr.session = session
mgr.sessionRefreshTime = time.Now()
mgr.sessionRefreshed(session)
}
}

Expand All @@ -1874,8 +1951,7 @@ func (mgr *listenerManager) createSessionWithBackoff() {

session, err := mgr.context.createSessionWithBackoff(mgr.service, SessionType(SessionBind), mgr.options)
if session != nil {
mgr.session = session
mgr.sessionRefreshTime = time.Now()
mgr.sessionRefreshed(session)
pfxlog.Logger().WithField("session token", *session.Token).Info("new service session")
} else {
pfxlog.Logger().WithError(err).Errorf("failed to create bind session for service %v", mgr.service.Name)
Expand Down Expand Up @@ -1923,7 +1999,11 @@ func (event *routerConnectionListenFailedEvent) handle(mgr *listenerManager) {
mgr.disconnectedTime = &now
}
mgr.notify(ListenerRemoved)
mgr.refreshSession() // if a listener failed, ensure our session is valid
if mgr.sessionRefreshInterval > 10*time.Second && time.Since(mgr.lastSessionRefresh) > 10*time.Second {
mgr.sessionRefreshInterval = time.Duration(100+(rand.Intn(10)*1000)) * time.Millisecond
mgr.restartSessionRefresh = true
}
mgr.refreshSession()
mgr.makeMoreListeners()
}

Expand Down