Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
remove namespace waiting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jlegrone committed Sep 20, 2021
1 parent e9784c1 commit b54068f
Showing 1 changed file with 2 additions and 85 deletions.
87 changes: 2 additions & 85 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,21 @@ package temporalite
import (
"context"
"fmt"
"sync"
"time"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"github.com/DataDog/temporalite/internal/liteconfig"
"go.temporal.io/sdk/client"
"go.temporal.io/server/common/authorization"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/temporal"
"google.golang.org/grpc"

"github.com/DataDog/temporalite/internal/liteconfig"
)

// Server wraps a temporal.Server.
type Server struct {
internal *temporal.Server
frontendHostPort string
config *liteconfig.Config
setupWaitGroup sync.WaitGroup
}

type ServerOption interface {
Expand Down Expand Up @@ -70,60 +64,12 @@ func NewServer(opts ...ServerOption) (*Server, error) {
frontendHostPort: cfg.PublicClient.HostPort,
config: c,
}
s.setupWaitGroup.Add(1)

return s, nil
}

// Start temporal server.
func (s *Server) Start() error {
if len(s.config.Namespaces) > 0 {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
nsClient, err := s.newNamespaceClient(ctx)
if err != nil {
panic(err)
}
defer nsClient.Close()

// Wait for each namespace to be ready
for _, ns := range s.config.Namespaces {
c, err := s.newClient(ctx, client.Options{Namespace: ns})
if err != nil {
panic(err)
}

// Wait up to 1 minute (20ms backoff x 3000 attempts)
var (
maxAttempts = 3000
backoff = 20 * time.Millisecond
)
for i := 0; i < maxAttempts; i++ {
_, err = c.ListOpenWorkflow(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: ns,
})
if err == nil {
if _, err := c.DescribeTaskQueue(ctx, "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil {
fmt.Println(err)
break
}
}
time.Sleep(backoff)
}
if err != nil {
panic(fmt.Sprintf("could not connect to namespace %q: %s", ns, err))
}

c.Close()
}

s.setupWaitGroup.Done()
}()
} else {
s.setupWaitGroup.Done()
}

return s.internal.Start()
}

Expand All @@ -135,7 +81,7 @@ func (s *Server) Stop() {
// NewClient initializes a client ready to communicate with the Temporal
// server in the target namespace.
func (s *Server) NewClient(ctx context.Context, namespace string) (client.Client, error) {
return s.newClientBlocking(ctx, client.Options{Namespace: namespace})
return s.NewClientWithOptions(ctx, client.Options{Namespace: namespace})
}

// NewClientWithOptions is the same as NewClient but allows further customization.
Expand All @@ -144,15 +90,6 @@ func (s *Server) NewClient(ctx context.Context, namespace string) (client.Client
//
// Note that the HostPort and ConnectionOptions fields of client.Options will always be overridden.
func (s *Server) NewClientWithOptions(ctx context.Context, options client.Options) (client.Client, error) {
return s.newClientBlocking(ctx, options)
}

func (s *Server) newClientBlocking(ctx context.Context, options client.Options) (client.Client, error) {
s.setupWaitGroup.Wait()
return s.newClient(ctx, options)
}

func (s *Server) newClient(ctx context.Context, options client.Options) (client.Client, error) {
options.HostPort = s.frontendHostPort
options.ConnectionOptions = client.ConnectionOptions{
DisableHealthCheck: false,
Expand All @@ -161,29 +98,9 @@ func (s *Server) newClient(ctx context.Context, options client.Options) (client.
return client.NewClient(options)
}

func (s *Server) newNamespaceClient(ctx context.Context) (client.NamespaceClient, error) {
if err := s.healthCheckFrontend(ctx); err != nil {
return nil, err
}
return client.NewNamespaceClient(client.Options{
HostPort: s.frontendHostPort,
ConnectionOptions: client.ConnectionOptions{
DisableHealthCheck: false,
HealthCheckTimeout: timeoutFromContext(ctx, time.Minute),
},
})
}

func timeoutFromContext(ctx context.Context, defaultTimeout time.Duration) time.Duration {
if deadline, ok := ctx.Deadline(); ok {
return deadline.Sub(time.Now())
}
return defaultTimeout
}

func (s *Server) healthCheckFrontend(ctx context.Context) error {
if _, err := grpc.DialContext(ctx, s.frontendHostPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
return nil
}

0 comments on commit b54068f

Please sign in to comment.