Skip to content

Commit

Permalink
Add support for forwarding Nexus HTTP requests (#5793)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
* Added a new component `cluster.HttpClientCache` which serves a similar
purpose to our gRPC `ClientBean` but provides HTTP clients for remote
clusters.
* Added logic to forward Nexus requests from standby clusters to active.
* Exposed frontend `namespace.Registry` from our test `temporalImpl` so
that tests can use `Eventually` functions to wait for namespace data to
be updated in-memory without having to use `time.Sleep`

## Why?
<!-- Tell your future self why have you made these changes -->
So that Nexus requests can be forwarded across clusters.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
New unit and functional tests.
  • Loading branch information
pdoerner committed May 24, 2024
1 parent c5cb979 commit b4c2cca
Show file tree
Hide file tree
Showing 14 changed files with 862 additions and 62 deletions.
114 changes: 114 additions & 0 deletions common/cluster/frontend_http_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cluster

import (
"crypto/tls"
"fmt"
"net/http"
"net/url"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/collection"
)

type tlsConfigProvider interface {
GetRemoteClusterClientConfig(hostname string) (*tls.Config, error)
}

type FrontendHTTPClient struct {
http.Client
Address string
}

type FrontendHTTPClientCache struct {
metadata Metadata
tlsProvider tlsConfigProvider
clients *collection.FallibleOnceMap[string, *FrontendHTTPClient]
}

func NewFrontendHTTPClientCache(
metadata Metadata,
tlsProvider tlsConfigProvider,
) *FrontendHTTPClientCache {
cache := &FrontendHTTPClientCache{
metadata: metadata,
tlsProvider: tlsProvider,
}
cache.clients = collection.NewFallibleOnceMap(cache.newClientForCluster)
metadata.RegisterMetadataChangeCallback(cache, cache.evictionCallback)
return cache
}

// Get returns a cached HttpClient if available, or constructs a new one for the given cluster name.
func (c *FrontendHTTPClientCache) Get(targetClusterName string) (*FrontendHTTPClient, error) {
return c.clients.Get(targetClusterName)
}

func (c *FrontendHTTPClientCache) newClientForCluster(targetClusterName string) (*FrontendHTTPClient, error) {
targetInfo, ok := c.metadata.GetAllClusterInfo()[targetClusterName]
if !ok {
return nil, serviceerror.NewNotFound(fmt.Sprintf("could not find cluster metadata for cluster %s", targetClusterName))
}

address, err := url.Parse(targetInfo.HTTPAddress)
if err != nil {
return nil, err
}

client := http.Client{}

if c.tlsProvider != nil {
tlsClientConfig, err := c.tlsProvider.GetRemoteClusterClientConfig(address.Hostname())
if err != nil {
return nil, err
}
client.Transport = &http.Transport{TLSClientConfig: tlsClientConfig}
}

return &FrontendHTTPClient{
Address: targetInfo.HTTPAddress,
Client: client,
}, nil
}

// evictionCallback is invoked by cluster.Metadata when cluster information changes.
// It invalidates clients which are either no longer present or have had their HTTP address changed.
// It is assumed that TLS information has not changed for clusters that are unmodified.
func (c *FrontendHTTPClientCache) evictionCallback(oldClusterMetadata map[string]*ClusterInformation, newClusterMetadata map[string]*ClusterInformation) {
for oldClusterName, oldClusterInfo := range oldClusterMetadata {
if oldClusterName == c.metadata.GetCurrentClusterName() || oldClusterInfo == nil {
continue
}

newClusterInfo, exists := newClusterMetadata[oldClusterName]
if !exists || oldClusterInfo.HTTPAddress != newClusterInfo.HTTPAddress {
// Cluster was removed or had its HTTP address changed, so invalidate the cached client for that cluster.
client, ok := c.clients.Pop(oldClusterName)
if ok {
client.CloseIdleConnections()
}
}
}
}
10 changes: 10 additions & 0 deletions common/collection/oncemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,13 @@ func (p *FallibleOnceMap[K, T]) Get(key K) (T, error) {

return value, nil
}

func (p *FallibleOnceMap[K, T]) Pop(key K) (T, bool) {
p.mu.Lock()
defer p.mu.Unlock()
val, ok := p.inner[key]
if ok {
delete(p.inner, key)
}
return val, ok
}
44 changes: 44 additions & 0 deletions common/nexus/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package nexus

import (
"errors"
"net/http"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -192,3 +193,46 @@ func AdaptAuthorizeError(err error) error {
}
return nexus.HandlerErrorf(nexus.HandlerErrorTypeUnauthorized, "permission denied")
}

func HandlerErrorFromClientError(err error) error {
var unexpectedRespErr *nexus.UnexpectedResponseError
if errors.As(err, &unexpectedRespErr) {
failure := unexpectedRespErr.Failure
if unexpectedRespErr.Failure == nil {
failure = &nexus.Failure{
Message: unexpectedRespErr.Error(),
}
}
handlerErr := &nexus.HandlerError{
Failure: failure,
}

switch unexpectedRespErr.Response.StatusCode {
case http.StatusBadRequest:
handlerErr.Type = nexus.HandlerErrorTypeBadRequest
case http.StatusUnauthorized:
handlerErr.Type = nexus.HandlerErrorTypeUnauthenticated
case http.StatusForbidden:
handlerErr.Type = nexus.HandlerErrorTypeUnauthorized
case http.StatusNotFound:
handlerErr.Type = nexus.HandlerErrorTypeNotFound
case http.StatusTooManyRequests:
handlerErr.Type = nexus.HandlerErrorTypeResourceExhausted
case http.StatusInternalServerError:
handlerErr.Type = nexus.HandlerErrorTypeInternal
case http.StatusNotImplemented:
handlerErr.Type = nexus.HandlerErrorTypeNotImplemented
case http.StatusServiceUnavailable:
handlerErr.Type = nexus.HandlerErrorTypeUnavailable
case nexus.StatusDownstreamError:
handlerErr.Type = nexus.HandlerErrorTypeDownstreamError
case nexus.StatusDownstreamTimeout:
handlerErr.Type = nexus.HandlerErrorTypeDownstreamTimeout
}

return handlerErr
}

// Let the nexus SDK handle this for us (log and convert to an internal error).
return err
}
8 changes: 8 additions & 0 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ var Module = fx.Options(
fx.Provide(MatchingRawClientProvider),
fx.Provide(MatchingClientProvider),
membership.GRPCResolverModule,
fx.Provide(FrontendHTTPClientCacheProvider),
fx.Invoke(RegisterBootstrapContainer),
fx.Provide(PersistenceConfigProvider),
fx.Provide(health.NewServer),
Expand Down Expand Up @@ -408,6 +409,13 @@ func RPCFactoryProvider(
), nil
}

func FrontendHTTPClientCacheProvider(
metadata cluster.Metadata,
tlsConfigProvider encryption.TLSConfigProvider,
) *cluster.FrontendHTTPClientCache {
return cluster.NewFrontendHTTPClientCache(metadata, tlsConfigProvider)
}

func getFrontendConnectionDetails(
cfg *config.Config,
tlsConfigProvider encryption.TLSConfigProvider,
Expand Down
24 changes: 12 additions & 12 deletions common/rpc/interceptor/redirection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
)

const (
dcRedirectionContextHeaderName = "xdc-redirection"
dcRedirectionApiHeaderName = "xdc-redirection-api"
DCRedirectionContextHeaderName = "xdc-redirection"
DCRedirectionApiHeaderName = "xdc-redirection-api"
dcRedirectionMetricsPrefix = "DCRedirection"
)

Expand Down Expand Up @@ -185,7 +185,7 @@ func (i *Redirection) Intercept(
if !strings.HasPrefix(info.FullMethod, api.WorkflowServicePrefix) {
return handler(ctx, req)
}
if !i.redirectionAllowed(ctx) {
if !i.RedirectionAllowed(ctx) {
return handler(ctx, req)
}

Expand Down Expand Up @@ -213,9 +213,9 @@ func (i *Redirection) handleLocalAPIInvocation(
handler grpc.UnaryHandler,
methodName string,
) (_ any, retError error) {
scope, startTime := i.beforeCall(dcRedirectionMetricsPrefix + methodName)
scope, startTime := i.BeforeCall(dcRedirectionMetricsPrefix + methodName)
defer func() {
i.afterCall(scope, startTime, i.currentClusterName, retError)
i.AfterCall(scope, startTime, i.currentClusterName, retError)
}()
return handler(ctx, req)
}
Expand All @@ -233,9 +233,9 @@ func (i *Redirection) handleRedirectAPIInvocation(
var clusterName string
var err error

scope, startTime := i.beforeCall(dcRedirectionMetricsPrefix + methodName)
scope, startTime := i.BeforeCall(dcRedirectionMetricsPrefix + methodName)
defer func() {
i.afterCall(scope, startTime, clusterName, retError)
i.AfterCall(scope, startTime, clusterName, retError)
}()

err = i.redirectionPolicy.WithNamespaceRedirect(ctx, namespaceName, methodName, func(targetDC string) error {
Expand All @@ -248,7 +248,7 @@ func (i *Redirection) handleRedirectAPIInvocation(
return err
}
resp = respCtorFn()
ctx = metadata.AppendToOutgoingContext(ctx, dcRedirectionApiHeaderName, "true")
ctx = metadata.AppendToOutgoingContext(ctx, DCRedirectionApiHeaderName, "true")
err = remoteClient.Invoke(ctx, info.FullMethod, req, resp)
if err != nil {
return err
Expand All @@ -259,13 +259,13 @@ func (i *Redirection) handleRedirectAPIInvocation(
return resp, err
}

func (i *Redirection) beforeCall(
func (i *Redirection) BeforeCall(
operation string,
) (metrics.Handler, time.Time) {
return i.metricsHandler.WithTags(metrics.OperationTag(operation), metrics.ServiceRoleTag(metrics.DCRedirectionRoleTagValue)), i.timeSource.Now()
}

func (i *Redirection) afterCall(
func (i *Redirection) AfterCall(
metricsHandler metrics.Handler,
startTime time.Time,
clusterName string,
Expand All @@ -279,15 +279,15 @@ func (i *Redirection) afterCall(
}
}

func (i *Redirection) redirectionAllowed(
func (i *Redirection) RedirectionAllowed(
ctx context.Context,
) bool {
// default to allow dc redirection
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return true
}
values := md.Get(dcRedirectionContextHeaderName)
values := md.Get(DCRedirectionContextHeaderName)
if len(values) == 0 {
return true
}
Expand Down
14 changes: 7 additions & 7 deletions common/rpc/interceptor/redirection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,31 +333,31 @@ func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_NamespaceNot

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_Empty() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{}))
allowed := s.redirector.redirectionAllowed(ctx)
allowed := s.redirector.RedirectionAllowed(ctx)
s.True(allowed)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_Error() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
dcRedirectionContextHeaderName: "?",
DCRedirectionContextHeaderName: "?",
}))
allowed := s.redirector.redirectionAllowed(ctx)
allowed := s.redirector.RedirectionAllowed(ctx)
s.True(allowed)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_True() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
dcRedirectionContextHeaderName: "t",
DCRedirectionContextHeaderName: "t",
}))
allowed := s.redirector.redirectionAllowed(ctx)
allowed := s.redirector.RedirectionAllowed(ctx)
s.True(allowed)
}

func (s *redirectionInterceptorSuite) TestRedirectionAllowed_False() {
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{
dcRedirectionContextHeaderName: "f",
DCRedirectionContextHeaderName: "f",
}))
allowed := s.redirector.redirectionAllowed(ctx)
allowed := s.redirector.RedirectionAllowed(ctx)
s.False(allowed)
}

Expand Down
6 changes: 6 additions & 0 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,9 +726,12 @@ func RegisterNexusHTTPHandler(
serviceName primitives.ServiceName,
matchingClient resource.MatchingClient,
metricsHandler metrics.Handler,
clusterMetadata cluster.Metadata,
clientCache *cluster.FrontendHTTPClientCache,
namespaceRegistry namespace.Registry,
endpointRegistry *nexus.EndpointRegistry,
authInterceptor *authorization.Interceptor,
redirectionInterceptor *interceptor.Redirection,
namespaceRateLimiterInterceptor *interceptor.NamespaceRateLimitInterceptor,
namespaceCountLimiterInterceptor *interceptor.ConcurrentRequestLimitInterceptor,
namespaceValidatorInterceptor *interceptor.NamespaceValidatorInterceptor,
Expand All @@ -740,9 +743,12 @@ func RegisterNexusHTTPHandler(
serviceConfig,
matchingClient,
metricsHandler,
clusterMetadata,
clientCache,
namespaceRegistry,
endpointRegistry,
authInterceptor,
redirectionInterceptor,
namespaceValidatorInterceptor,
namespaceRateLimiterInterceptor,
namespaceCountLimiterInterceptor,
Expand Down

0 comments on commit b4c2cca

Please sign in to comment.