/
temporal.go
109 lines (92 loc) · 2.86 KB
/
temporal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package temporal
// <!-- START clutchdoc -->
// description: Workflow client for temporal.io.
// <!-- END clutchdoc -->
import (
"crypto/tls"
"crypto/x509"
"fmt"
"sync"
"github.com/uber-go/tally/v4"
temporalclient "go.temporal.io/sdk/client"
temporaltally "go.temporal.io/sdk/contrib/tally"
"go.temporal.io/sdk/log"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
temporalv1 "github.com/lyft/clutch/backend/api/config/service/temporal/v1"
"github.com/lyft/clutch/backend/service"
)
const Name = "clutch.service.temporal"
func New(cfg *anypb.Any, logger *zap.Logger, scope tally.Scope) (service.Service, error) {
config := &temporalv1.Config{}
if err := cfg.UnmarshalTo(config); err != nil {
return nil, err
}
return newClient(config, logger, scope)
}
type ClientManager interface {
GetNamespaceClient(namespace string) (Client, error)
}
// Client exists to protect users from creating a connection during instantiation of a component,
// since Temporal's NewClient function has the side effect of connecting to the server. See
// https://github.com/temporalio/sdk-go/issues/753 for more details.
type Client interface {
// GetConnection will connect to the server in order to check its capabilities on the first call.
// Subsequent calls to GetConnection will return a cached client.
GetConnection() (temporalclient.Client, error)
}
func newClient(cfg *temporalv1.Config, logger *zap.Logger, scope tally.Scope) (ClientManager, error) {
ret := &clientManagerImpl{
hostPort: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port),
metricsHandler: temporaltally.NewMetricsHandler(scope),
logger: newTemporalLogger(logger),
copts: temporalclient.ConnectionOptions{},
}
if cfg.ConnectionOptions != nil {
if cfg.ConnectionOptions.UseSystemCaBundle {
certs, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
ret.copts.TLS = &tls.Config{
RootCAs: certs,
MinVersion: tls.VersionTLS12,
}
}
}
return ret, nil
}
type clientManagerImpl struct {
hostPort string
logger log.Logger
metricsHandler temporalclient.MetricsHandler
copts temporalclient.ConnectionOptions
}
func (c *clientManagerImpl) GetNamespaceClient(namespace string) (Client, error) {
return &lazyClientImpl{
opts: &temporalclient.Options{
HostPort: c.hostPort,
Logger: c.logger,
MetricsHandler: c.metricsHandler,
Namespace: namespace,
ConnectionOptions: c.copts,
},
}, nil
}
type lazyClientImpl struct {
mu sync.Mutex
cachedClient temporalclient.Client
opts *temporalclient.Options
}
func (l *lazyClientImpl) GetConnection() (temporalclient.Client, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.cachedClient == nil {
c, err := temporalclient.Dial(*l.opts)
if err != nil {
return nil, err
}
l.cachedClient = c
}
return l.cachedClient, nil
}