diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a261e18..a6973867 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +# Release 0.22.29 + +- Improve session refresh behavior. + +## Changes to API session refresh +* Limit refreshes of both api and sessions to at most every 30 seconds. +* Base faster refreshes not on number of available ERs but on usable ER endpoints, since some ERs may not have usable endpoints +* Only refresh API sessions if session is no longer valid, as opposed if an api session refresh fails, which can happen if the + controller is down or busy. +* Only allow one api session refresh at a time +* Use exponential backoff for api session refresh retries + # Release 0.22.12 - Deprecate ListenOptions.MaxConnections in favor of ListenOptions.MaxTerminators diff --git a/ziti/ziti.go b/ziti/ziti.go index b8d31fb2..129fdb76 100644 --- a/ziti/ziti.go +++ b/ziti/ziti.go @@ -27,9 +27,11 @@ import ( rest_session "github.com/openziti/edge-api/rest_client_api_client/session" "github.com/openziti/foundation/v2/concurrenz" "github.com/openziti/foundation/v2/errorz" + "github.com/openziti/foundation/v2/stringz" apis "github.com/openziti/sdk-golang/edge-apis" "github.com/openziti/secretstream/kx" "math" + "math/rand" "net" "reflect" "strconv" @@ -187,6 +189,8 @@ type ContextImpl struct { authQueryHandlers map[string]func(query *rest_model.AuthQueryDetail, response MfaCodeResponse) error events.EventEmmiter + apiSessionLock sync.Mutex + lastSuccessfulApiSessionRefresh time.Time } func (context *ContextImpl) AddServiceAddedListener(handler func(Context, *rest_model.ServiceDetail)) func() { @@ -844,10 +848,17 @@ func (context *ContextImpl) Reauthenticate() error { } func (context *ContextImpl) Authenticate() error { + context.apiSessionLock.Lock() + defer context.apiSessionLock.Unlock() + if context.CtrlClt.GetCurrentApiSession() != nil { + if time.Since(context.lastSuccessfulApiSessionRefresh) < 5*time.Second { + return nil + } logrus.Debug("previous apiSession detected, checking if valid") - if _, err := context.CtrlClt.Refresh(); err == nil { + if err := context.RefreshApiSessionWithBackoff(); err == nil { logrus.Info("previous apiSession refreshed") + context.lastSuccessfulApiSessionRefresh = time.Now() return nil } else { logrus.WithError(err).Info("previous apiSession failed to refresh, attempting to authenticate") @@ -857,6 +868,31 @@ func (context *ContextImpl) Authenticate() error { return context.authenticate() } +func (context *ContextImpl) RefreshApiSessionWithBackoff() error { + expBackoff := backoff.NewExponentialBackOff() + + expBackoff.InitialInterval = 5 * time.Second + expBackoff.MaxInterval = 5 * time.Minute + expBackoff.MaxElapsedTime = 24 * time.Hour + + operation := func() error { + _, err := context.CtrlClt.Refresh() + if err == nil { + return nil + } + + unauthorizedErr := ¤t_api_session.GetCurrentAPISessionUnauthorized{} + if errors.As(err, &unauthorizedErr) { + logrus.Info("previous apiSession expired") + return backoff.Permanent(err) + } + logrus.WithError(err).Info("unable to refresh apiSession, will retry") + return err + } + + return backoff.Retry(operation, expBackoff) +} + func (context *ContextImpl) CloseAllEdgeRouterConns() { for entry := range context.routerConnections.IterBuffered() { key, val := entry.Key, entry.Val @@ -1240,7 +1276,7 @@ func (context *ContextImpl) connectEdgeRouter(routerName, ingressUrl string, ret return } - pfxlog.Logger().Debugf("connection to edge router using api session token %v", currentApiSession.GetToken()) + pfxlog.Logger().Debugf("connection to edge router using api session token %s", string(currentApiSession.GetToken())) id, err := context.CtrlClt.GetIdentity() if err != nil { @@ -1616,18 +1652,20 @@ func newListenerManager(service *rest_model.ServiceDetail, context *ContextImpl, } type listenerManager struct { - service *rest_model.ServiceDetail - context *ContextImpl - session *rest_model.SessionDetail - options *edge.ListenOptions - routerConnections map[string]edge.RouterConn - connects map[string]time.Time - listener network.MultiListener - connectChan chan *edgeRouterConnResult - eventChan chan listenerEvent - sessionRefreshTime time.Time - disconnectedTime *time.Time - observers concurrenz.CopyOnWriteSlice[ListenEventObserver] + service *rest_model.ServiceDetail + context *ContextImpl + session *rest_model.SessionDetail + options *edge.ListenOptions + routerConnections map[string]edge.RouterConn + connects map[string]time.Time + listener network.MultiListener + connectChan chan *edgeRouterConnResult + eventChan chan listenerEvent + sessionRefreshInterval time.Duration + restartSessionRefresh bool + lastSessionRefresh time.Time + disconnectedTime *time.Time + observers concurrenz.CopyOnWriteSlice[ListenEventObserver] } func (mgr *listenerManager) AddObserver(observer ListenEventObserver) { @@ -1645,6 +1683,7 @@ func (mgr *listenerManager) notify(eventType ListenEventType) { } func (mgr *listenerManager) run() { + log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name)) // need to either establish a session, or fail if we can't create one for mgr.session == nil { mgr.createSessionWithBackoff() @@ -1665,41 +1704,21 @@ func (mgr *listenerManager) run() { identitySecret, err := signing.AssertIdentityWithSecret(id.Cert().PrivateKey) if err != nil { - pfxlog.Logger().Errorf("failed to sign identity: %v", err) + log.WithError(err).Error("failed to sign identity") } else { mgr.options.IdentitySecret = string(identitySecret) } } - ticker := time.NewTicker(250 * time.Millisecond) - + ticker := time.NewTicker(time.Second) defer ticker.Stop() - refreshSessionTimerInterval := 10 * time.Second + var refreshSessionChan <-chan time.Time for !mgr.listener.IsClosed() { - var refreshSessionTimer *time.Timer - if len(mgr.session.EdgeRouters) == 0 { - refreshSessionTimer = time.NewTimer(refreshSessionTimerInterval) - } else if len(mgr.session.EdgeRouters) < mgr.options.MaxTerminators { - if refreshSessionTimerInterval < 5*time.Minute { - refreshSessionTimerInterval = 5 * time.Minute - } - refreshSessionTimer = time.NewTimer(refreshSessionTimerInterval) - } - - if refreshSessionTimer != nil { - refreshSessionTimerInterval *= 2 - if refreshSessionTimerInterval > 30*time.Minute { - refreshSessionTimerInterval = 30 * time.Minute - } - } else { - refreshSessionTimerInterval = 10 * time.Second - } - - var refreshSessionTimerC <-chan time.Time - if refreshSessionTimer != nil { - refreshSessionTimerC = refreshSessionTimer.C + if mgr.restartSessionRefresh { + refreshSessionChan = time.After(mgr.sessionRefreshInterval) + mgr.restartSessionRefresh = false } //goland:noinspection GoNilness @@ -1708,8 +1727,14 @@ func (mgr *listenerManager) run() { mgr.handleRouterConnectResult(routerConnectionResult) case event := <-mgr.eventChan: event.handle(mgr) - case <-refreshSessionTimerC: + case <-refreshSessionChan: mgr.refreshSession() + log.Debugf("next refresh in %s", mgr.sessionRefreshInterval.String()) + refreshSessionChan = time.After(mgr.sessionRefreshInterval) + mgr.sessionRefreshInterval *= 2 + if mgr.sessionRefreshInterval > 30*time.Minute { + mgr.sessionRefreshInterval = 30 * time.Minute + } case <-ticker.C: mgr.makeMoreListeners() case <-mgr.options.GetEventChannel(): @@ -1717,11 +1742,47 @@ func (mgr *listenerManager) run() { case <-mgr.context.closeNotify: mgr.listener.CloseWithError(errors.New("context closed")) } + } +} + +func (mgr *listenerManager) sessionRefreshed(session *rest_model.SessionDetail) { + oldUsableCount := mgr.getUsableEndpointCount(mgr.session) + newUsableCount := mgr.getUsableEndpointCount(session) + + if oldUsableCount >= 0 && newUsableCount == 0 { + mgr.sessionRefreshInterval = time.Duration(5+rand.Intn(10)) * time.Second + } else if newUsableCount == 0 || newUsableCount < mgr.options.MaxTerminators { + mgr.sessionRefreshInterval = time.Duration(5+rand.Intn(5)) * time.Minute + } else { + mgr.sessionRefreshInterval = 30 * time.Minute + } + + mgr.session = session + mgr.restartSessionRefresh = true + mgr.lastSessionRefresh = time.Now() - if refreshSessionTimer != nil { - refreshSessionTimer.Stop() + log := pfxlog.Logger(). + WithField("service", stringz.OrEmpty(mgr.service.Name)). + WithField("sessionId", stringz.OrEmpty(mgr.session.ID)). + WithField("usableEndpoints", newUsableCount). + WithField("nextRefresh", mgr.sessionRefreshInterval.String()) + log.Debug("session refreshed") +} + +func (mgr *listenerManager) getUsableEndpointCount(session *rest_model.SessionDetail) int { + if session == nil { + return 0 + } + + count := 0 + for _, edgeRouter := range session.EdgeRouters { + for _, routerUrl := range edgeRouter.Urls { + if mgr.context.options.isEdgeRouterUrlAccepted(routerUrl) { + count++ + } } } + return count } func (mgr *listenerManager) handleRouterConnectResult(result *edgeRouterConnResult) { @@ -1779,26 +1840,35 @@ func (mgr *listenerManager) createListener(routerConnection edge.RouterConn, ses } func (mgr *listenerManager) makeMoreListeners() { + log := pfxlog.Logger().WithField("service", *mgr.service.Name).WithField("erCount", len(mgr.session.EdgeRouters)) if mgr.listener.IsClosed() || len(mgr.routerConnections) >= mgr.options.MaxTerminators || len(mgr.session.EdgeRouters) <= len(mgr.routerConnections) { + log.Trace("not trying to make more connections") return } for _, edgeRouter := range mgr.session.EdgeRouters { if _, ok := mgr.routerConnections[*edgeRouter.Name]; ok { + log.WithField("router", *edgeRouter.Name).Trace("already connected") // already connected to this router continue } for _, routerUrl := range edgeRouter.Urls { if !mgr.context.options.isEdgeRouterUrlAccepted(routerUrl) { + log.WithField("router", *edgeRouter.Name).WithField("url", routerUrl). + Trace("skipping unusable url") continue } if _, ok := mgr.connects[routerUrl]; ok { // this url already has a connect in progress + log.WithField("router", *edgeRouter.Name).WithField("url", routerUrl). + Trace("connect already in progress") continue } + log.WithField("router", *edgeRouter.Name).WithField("url", routerUrl). + Trace("attempting to connect to router") mgr.connects[routerUrl] = time.Now() go mgr.context.connectEdgeRouter(*edgeRouter.Name, routerUrl, mgr.connectChan) } @@ -1806,11 +1876,19 @@ func (mgr *listenerManager) makeMoreListeners() { } func (mgr *listenerManager) refreshSession() { + if time.Since(mgr.lastSessionRefresh) < 30*time.Second { + return + } + + log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name)) if mgr.session == nil { + log.Debug("establishing initial session") mgr.createSessionWithBackoff() return } + log = log.WithField("sessionId", stringz.OrEmpty(mgr.session.ID)).WithField("erCount", len(mgr.session.EdgeRouters)) + log.Debug("starting session refresh") session, err := mgr.context.refreshSession(mgr.session) if err != nil { @@ -1823,7 +1901,7 @@ func (mgr *listenerManager) refreshSession() { target = &rest_session.DetailSessionUnauthorized{} if errors.As(err, &target) { - pfxlog.Logger().WithError(err).Debugf("failure refreshing bind session for service %v", mgr.listener.GetServiceName()) + log.WithError(err).Debugf("failure refreshing bind session for service %v", mgr.listener.GetServiceName()) if err := mgr.context.EnsureAuthenticated(mgr.options); err != nil { err := fmt.Errorf("unable to establish API session (%w)", err) if len(mgr.routerConnections) == 0 { @@ -1837,7 +1915,7 @@ func (mgr *listenerManager) refreshSession() { if err != nil { target = &rest_session.DetailSessionUnauthorized{} if errors.As(err, &target) { - pfxlog.Logger().WithError(err).Errorf( + log.WithError(err).Errorf( "failure refreshing bind session even after re-authenticating api session. service %v", mgr.listener.GetServiceName()) if len(mgr.routerConnections) == 0 { @@ -1846,7 +1924,7 @@ func (mgr *listenerManager) refreshSession() { return } - pfxlog.Logger().WithError(err).Errorf("failed to to refresh session %v", *mgr.session.ID) + log.WithError(err).Errorf("failed to to refresh session %v", *mgr.session.ID) // try to create new session mgr.createSessionWithBackoff() @@ -1856,8 +1934,7 @@ func (mgr *listenerManager) refreshSession() { // token only returned on created, so if we refreshed the session (as opposed to creating a new one) we have to back-fill it on lookups if session != nil { session.Token = mgr.session.Token - mgr.session = session - mgr.sessionRefreshTime = time.Now() + mgr.sessionRefreshed(session) } } @@ -1874,8 +1951,7 @@ func (mgr *listenerManager) createSessionWithBackoff() { session, err := mgr.context.createSessionWithBackoff(mgr.service, SessionType(SessionBind), mgr.options) if session != nil { - mgr.session = session - mgr.sessionRefreshTime = time.Now() + mgr.sessionRefreshed(session) pfxlog.Logger().WithField("session token", *session.Token).Info("new service session") } else { pfxlog.Logger().WithError(err).Errorf("failed to create bind session for service %v", mgr.service.Name) @@ -1923,7 +1999,11 @@ func (event *routerConnectionListenFailedEvent) handle(mgr *listenerManager) { mgr.disconnectedTime = &now } mgr.notify(ListenerRemoved) - mgr.refreshSession() // if a listener failed, ensure our session is valid + if mgr.sessionRefreshInterval > 10*time.Second && time.Since(mgr.lastSessionRefresh) > 10*time.Second { + mgr.sessionRefreshInterval = time.Duration(100+(rand.Intn(10)*1000)) * time.Millisecond + mgr.restartSessionRefresh = true + } + mgr.refreshSession() mgr.makeMoreListeners() }