Skip to content

Commit

Permalink
Merge branch 'master' into queue-reader-persistence-api
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 1, 2023
2 parents 7e23744 + 7086b94 commit 9bc74f6
Show file tree
Hide file tree
Showing 45 changed files with 2,080 additions and 718 deletions.
32 changes: 2 additions & 30 deletions common/archiver/s3store/README.md
Expand Up @@ -74,33 +74,5 @@ s3://<bucket-name>/<namespace-id>/
1. Install awscli from [here](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)
2. Install localstack from [here](https://github.com/localstack/localstack#installing)
3. Launch localstack with `SERVICES=s3 localstack start`
4. Create a bucket using `aws --endpoint-url=http://localhost:4572 s3 mb s3://temporal-development`
5. Configure archival and namespaceDefaults with the following configuration
```
archival:
history:
state: "enabled"
enableRead: true
provider:
s3store:
region: "us-east-1"
endpoint: "http://127.0.0.1:4572"
s3ForcePathStyle: true
visibility:
state: "enabled"
enableRead: true
provider:
s3store:
region: "us-east-1"
endpoint: "http://127.0.0.1:4572"
s3ForcePathStyle: true
namespaceDefaults:
archival:
history:
state: "enabled"
URI: "s3://temporal-development"
visibility:
state: "enabled"
URI: "s3://temporal-development"
```
4. Create a bucket using `aws --endpoint-url=http://localhost:4566 s3 mb s3://temporal-development`
5. Launch the server with the localstack s3 environment config`--env development-cass-s3 start`
6 changes: 3 additions & 3 deletions common/membership/grpc_resolver.go
Expand Up @@ -36,7 +36,7 @@ import (
"go.temporal.io/server/common/primitives"
)

const GRPCResolverScheme = "membership"
const grpcResolverScheme = "membership"

// Empty type used to enforce a dependency using fx so that we're guaranteed to have
// initialized the global builder before we use it.
Expand All @@ -62,15 +62,15 @@ func initializeBuilder(monitor Monitor) GRPCResolver {
}

func (g *GRPCResolver) MakeURL(service primitives.ServiceName) string {
return fmt.Sprintf("%s://%s", GRPCResolverScheme, string(service))
return fmt.Sprintf("%s://%s", grpcResolverScheme, string(service))
}

type grpcBuilder struct {
monitor atomic.Value // Monitor
}

func (m *grpcBuilder) Scheme() string {
return GRPCResolverScheme
return grpcResolverScheme
}

func (m *grpcBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
Expand Down
16 changes: 8 additions & 8 deletions common/membership/hostinfo_provider.go
Expand Up @@ -31,24 +31,24 @@ import (
)

var HostInfoProviderModule = fx.Options(
fx.Provide(NewHostInfoProvider),
fx.Invoke(HostInfoProviderLifetimeHooks),
fx.Provide(newHostInfoProvider),
fx.Invoke(hostInfoProviderLifetimeHooks),
)

type (
CachingHostInfoProvider struct {
cachingHostInfoProvider struct {
hostInfo *HostInfo
membershipMonitor Monitor
}
)

func NewHostInfoProvider(membershipMonitor Monitor) HostInfoProvider {
return &CachingHostInfoProvider{
func newHostInfoProvider(membershipMonitor Monitor) HostInfoProvider {
return &cachingHostInfoProvider{
membershipMonitor: membershipMonitor,
}
}

func (hip *CachingHostInfoProvider) Start() error {
func (hip *cachingHostInfoProvider) Start() error {
var err error
hip.hostInfo, err = hip.membershipMonitor.WhoAmI()
if err != nil {
Expand All @@ -57,11 +57,11 @@ func (hip *CachingHostInfoProvider) Start() error {
return nil
}

func (hip *CachingHostInfoProvider) HostInfo() *HostInfo {
func (hip *cachingHostInfoProvider) HostInfo() *HostInfo {
return hip.hostInfo
}

func HostInfoProviderLifetimeHooks(
func hostInfoProviderLifetimeHooks(
lc fx.Lifecycle,
provider HostInfoProvider,
) {
Expand Down
2 changes: 1 addition & 1 deletion common/membership/ringpop.go → common/membership/rp.go
Expand Up @@ -69,7 +69,7 @@ func NewRingPop(
}

// Start start ring pop
func (r *RingPop) Start(
func (r *RingPop) start(
bootstrapHostPostRetriever func() ([]string, error),
bootstrapRetryBackoffInterval time.Duration,
) {
Expand Down
Expand Up @@ -135,7 +135,7 @@ func (rpo *ringpopMonitor) Start() {
rpo.logger.Fatal("unable to initialize membership heartbeats", tag.Error(err))
}

rpo.rp.Start(
rpo.rp.start(
func() ([]string, error) { return rpo.fetchCurrentBootstrapHostports() },
healthyHostLastHeartbeatCutoff/2)

Expand All @@ -144,7 +144,7 @@ func (rpo *ringpopMonitor) Start() {
rpo.logger.Fatal("unable to get ring pop labels", tag.Error(err))
}

if err = labels.Set(RolePort, strconv.Itoa(rpo.services[rpo.serviceName])); err != nil {
if err = labels.Set(rolePort, strconv.Itoa(rpo.services[rpo.serviceName])); err != nil {
rpo.logger.Fatal("unable to set ring pop ServicePort label", tag.Error(err))
}

Expand All @@ -164,7 +164,7 @@ func (rpo *ringpopMonitor) WaitUntilInitialized(ctx context.Context) error {
return err
}

func ServiceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) {
func serviceNameToServiceTypeEnum(name primitives.ServiceName) (persistence.ServiceType, error) {
switch name {
case primitives.AllServices:
return persistence.All, nil
Expand Down Expand Up @@ -199,8 +199,8 @@ func (rpo *ringpopMonitor) upsertMyMembership(
return err
}

// SplitHostPortTyped expands upon net.SplitHostPort by providing type parsing.
func SplitHostPortTyped(hostPort string) (net.IP, uint16, error) {
// splitHostPortTyped expands upon net.SplitHostPort by providing type parsing.
func splitHostPortTyped(hostPort string) (net.IP, uint16, error) {
ipstr, portstr, err := net.SplitHostPort(hostPort)
if err != nil {
return nil, 0, err
Expand All @@ -225,13 +225,13 @@ func (rpo *ringpopMonitor) startHeartbeat(broadcastHostport string) error {
sessionStarted := time.Now().UTC()

// Parse and validate broadcast hostport
broadcastAddress, broadcastPort, err := SplitHostPortTyped(broadcastHostport)
broadcastAddress, broadcastPort, err := splitHostPortTyped(broadcastHostport)
if err != nil {
return err
}

// Parse and validate existing service name
role, err := ServiceNameToServiceTypeEnum(rpo.serviceName)
role, err := serviceNameToServiceTypeEnum(rpo.serviceName)
if err != nil {
return err
}
Expand Down
File renamed without changes.
Expand Up @@ -51,10 +51,10 @@ const (
// ringpop instance. The data for this key is the service name
RoleKey = "serviceName"

// RolePort label is set by every single service as soon as it bootstraps its
// rolePort label is set by every single service as soon as it bootstraps its
// ringpop instance. The data for this key represents the TCP port through which
// the service can be accessed.
RolePort = "servicePort"
rolePort = "servicePort"

minRefreshInternal = time.Second * 4
defaultRefreshInterval = time.Second * 10
Expand Down Expand Up @@ -299,7 +299,7 @@ func (r *ringpopServiceResolver) getReachableMembers() ([]string, error) {
// Each temporal service in the ring should advertise which port it has its gRPC listener
// on via a RingPop label. If we cannot find the label, we will assume that that the
// temporal service is listening on the same port that this node is listening on.
servicePortLabel, ok := member.Label(RolePort)
servicePortLabel, ok := member.Label(rolePort)
if ok {
servicePort, err = strconv.Atoi(servicePortLabel)
if err != nil {
Expand Down
Expand Up @@ -107,7 +107,7 @@ func NewTestRingpopCluster(
}
logger.Info("seedNode", tag.Name(cluster.seedNode))

seedAddress, seedPort, err := SplitHostPortTyped(cluster.seedNode)
seedAddress, seedPort, err := splitHostPortTyped(cluster.seedNode)
if err != nil {
logger.Error("unable to split host port", tag.Error(err))
return nil
Expand Down Expand Up @@ -154,7 +154,7 @@ func NewTestRingpopCluster(
return nil
}
rpWrapper := NewRingPop(ringPop, time.Second*2, logger)
_, port, _ := SplitHostPortTyped(cluster.hostAddrs[i])
_, port, _ := splitHostPortTyped(cluster.hostAddrs[i])
cluster.rings[i] = NewRingpopMonitor(
serviceName,
map[primitives.ServiceName]int{serviceName: int(port)}, // use same port for "grpc" port
Expand Down
6 changes: 0 additions & 6 deletions common/metrics/metric_defs.go
Expand Up @@ -480,12 +480,6 @@ const (
PersistenceAppendRawHistoryNodesScope = "AppendRawHistoryNodes"
// PersistenceDeleteHistoryNodesScope tracks DeleteHistoryNodes calls made by service to persistence layer
PersistenceDeleteHistoryNodesScope = "DeleteHistoryNodes"
// PersistenceParseHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceParseHistoryBranchInfoScope = "ParseHistoryBranchInfo"
// PersistenceUpdateHistoryBranchInfoScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceUpdateHistoryBranchInfoScope = "UpdateHistoryBranchInfo"
// PersistenceNewHistoryBranchScope tracks NewHistoryBranch calls made by service to persistence layer
PersistenceNewHistoryBranchScope = "NewHistoryBranch"
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
PersistenceReadHistoryBranchScope = "ReadHistoryBranch"
// PersistenceReadHistoryBranchReverseScope tracks ReadHistoryBranchReverse calls made by service to persistence layer
Expand Down
52 changes: 1 addition & 51 deletions common/persistence/cassandra/history_store.go
Expand Up @@ -30,7 +30,6 @@ import (

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/log"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
Expand Down Expand Up @@ -72,6 +71,7 @@ type (
HistoryStore struct {
Session gocql.Session
Logger log.Logger
p.HistoryBranchUtilImpl
}
)

Expand Down Expand Up @@ -162,56 +162,6 @@ func (h *HistoryStore) DeleteHistoryNodes(
return nil
}

// ParseHistoryBranchInfo parses the history branch for branch information
func (h *HistoryStore) ParseHistoryBranchInfo(
ctx context.Context,
request *p.ParseHistoryBranchInfoRequest,
) (*p.ParseHistoryBranchInfoResponse, error) {

branchInfo, err := p.ParseHistoryBranchToken(request.BranchToken)
if err != nil {
return nil, err
}
return &p.ParseHistoryBranchInfoResponse{
BranchInfo: branchInfo,
}, nil
}

// UpdateHistoryBranchInfo updates the history branch with branch information
func (h *HistoryStore) UpdateHistoryBranchInfo(
ctx context.Context,
request *p.UpdateHistoryBranchInfoRequest,
) (*p.UpdateHistoryBranchInfoResponse, error) {

branchToken, err := p.UpdateHistoryBranchToken(request.BranchToken, request.BranchInfo)
if err != nil {
return nil, err
}
return &p.UpdateHistoryBranchInfoResponse{
BranchToken: branchToken,
}, nil
}

// NewHistoryBranch initializes a new history branch
func (h *HistoryStore) NewHistoryBranch(
ctx context.Context,
request *p.NewHistoryBranchRequest,
) (*p.NewHistoryBranchResponse, error) {
var branchID string
if request.BranchID == nil {
branchID = primitives.NewUUID().String()
} else {
branchID = *request.BranchID
}
branchToken, err := p.NewHistoryBranchToken(request.TreeID, branchID, request.Ancestors)
if err != nil {
return nil, err
}
return &p.NewHistoryBranchResponse{
BranchToken: branchToken,
}, nil
}

// ReadHistoryBranch returns history node data for a branch
// NOTE: For branch that has ancestors, we need to query Cassandra multiple times, because it doesn't support OR/UNION operator
func (h *HistoryStore) ReadHistoryBranch(
Expand Down
31 changes: 1 addition & 30 deletions common/persistence/client/fault_injection.go
Expand Up @@ -71,6 +71,7 @@ type (
}

FaultInjectionExecutionStore struct {
persistence.HistoryBranchUtilImpl
baseExecutionStore persistence.ExecutionStore
ErrorGenerator ErrorGenerator
}
Expand Down Expand Up @@ -674,36 +675,6 @@ func (e *FaultInjectionExecutionStore) DeleteHistoryNodes(
return e.baseExecutionStore.DeleteHistoryNodes(ctx, request)
}

func (e *FaultInjectionExecutionStore) ParseHistoryBranchInfo(
ctx context.Context,
request *persistence.ParseHistoryBranchInfoRequest,
) (*persistence.ParseHistoryBranchInfoResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.ParseHistoryBranchInfo(ctx, request)
}

func (e *FaultInjectionExecutionStore) UpdateHistoryBranchInfo(
ctx context.Context,
request *persistence.UpdateHistoryBranchInfoRequest,
) (*persistence.UpdateHistoryBranchInfoResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.UpdateHistoryBranchInfo(ctx, request)
}

func (e *FaultInjectionExecutionStore) NewHistoryBranch(
ctx context.Context,
request *persistence.NewHistoryBranchRequest,
) (*persistence.NewHistoryBranchResponse, error) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.NewHistoryBranch(ctx, request)
}

func (e *FaultInjectionExecutionStore) ReadHistoryBranch(
ctx context.Context,
request *persistence.InternalReadHistoryBranchRequest,
Expand Down

0 comments on commit 9bc74f6

Please sign in to comment.