Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix errcheck in ./host/ #3741

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions host/cancel_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,17 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
s.NoError(err0)
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

s.engine.RequestCancelWorkflowExecution(NewContext(), &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Identity: identity,
RequestId: uuid.New(),
})
_, err := s.engine.RequestCancelWorkflowExecution(NewContext(),
&workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Identity: identity,
RequestId: uuid.New(),
})
s.NoError(err)

childCancelled := false
var initiatedEvent *historypb.HistoryEvent
Expand Down Expand Up @@ -620,7 +622,7 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
}

s.Logger.Info("Process first workflow task which starts and request cancels child workflow")
_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal("BadRequestCancelExternalWorkflowExecutionAttributes: Start and RequestCancel for child workflow is not allowed in same workflow task.", err.Error())
Expand Down
19 changes: 11 additions & 8 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,9 +884,11 @@ func (s *clientIntegrationSuite) Test_StickyWorkerRestartWorkflowTask() {
for _, tt := range testCases {
s.Run(tt.name, func() {
workflowFn := func(ctx workflow.Context) (string, error) {
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
if err := workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return "query works", nil
})
}); err != nil {
return "", err
}

signalCh := workflow.GetSignalChannel(ctx, "test")
var msg string
Expand Down Expand Up @@ -1293,9 +1295,11 @@ func (s *clientIntegrationSuite) Test_BufferedQuery() {
workflowFn := func(ctx workflow.Context) error {
wfStarted.Done()
status := "init"
workflow.SetQueryHandler(ctx, "foo", func() (string, error) {
if err := workflow.SetQueryHandler(ctx, "foo", func() (string, error) {
return status, nil
})
}); err != nil {
return err
}
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
ScheduleToCloseTimeout: 10 * time.Second,
})
Expand All @@ -1305,9 +1309,7 @@ func (s *clientIntegrationSuite) Test_BufferedQuery() {
err1 := f1.Get(ctx1, nil)
status = "done"

workflow.Sleep(ctx, 5*time.Second)

return err1
return multierr.Combine(err1, workflow.Sleep(ctx, 5*time.Second))
}

s.worker.RegisterWorkflow(workflowFn)
Expand Down Expand Up @@ -1335,13 +1337,14 @@ func (s *clientIntegrationSuite) Test_BufferedQuery() {
// sleep 2s to make sure DescribeMutableState is called after QueryWorkflow
time.Sleep(2 * time.Second)
// make DescribeMutableState call, which force mutable state to reload from db
s.adminClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
_, err := s.adminClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: workflowRun.GetRunID(),
},
})
s.Assert().NoError(err)
}()

// this query will be buffered in mutable state because workflow task is in-flight.
Expand Down
2 changes: 1 addition & 1 deletion host/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (s *clientIntegrationSuite) TestCronWorkflowCompletionStates() {
s.Equal(lcr, "pass")
s.NotNil(workflow.GetLastError(ctx))
s.Equal(workflow.GetLastError(ctx).Error(), "second error")
workflow.Sleep(ctx, 10*time.Second) // cause wft timeout
s.NoError(workflow.Sleep(ctx, 10*time.Second)) // cause wft timeout
panic("should have been timed out on server already")

case 4:
Expand Down
2 changes: 1 addition & 1 deletion host/gethistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (s *clientIntegrationSuite) TestGetHistoryReverse_MultipleBranches() {
err1 = f1.Get(ctx1, nil)
s.NoError(err1)

workflow.Sleep(ctx, time.Second*2)
s.NoError(workflow.Sleep(ctx, time.Second*2))

f2 := workflow.ExecuteActivity(ctx1, activityFn)
err2 = f2.Get(ctx1, nil)
Expand Down
4 changes: 3 additions & 1 deletion host/max_buffered_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (s *clientIntegrationSuite) TestMaxBufferedEventsLimit() {
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn)
f1.Get(ctx, nil)
if err := f1.Get(ctx, nil); err != nil {
return 0, err
}

sigCh := workflow.GetSignalChannel(ctx, "test-signal")

Expand Down
45 changes: 21 additions & 24 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ package host
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/exp/maps"
"google.golang.org/grpc"

Expand Down Expand Up @@ -244,38 +243,28 @@ func (c *temporalImpl) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

var errs []string
var errs error

if c.enableWorker() {
c.shutdownWG.Add(1)
err := c.workerApp.Stop(ctx)
if err != nil {
errs = append(errs, err.Error())
}
errs = multierr.Combine(errs, c.workerApp.Stop(ctx))
}

c.shutdownWG.Add(3)

c.frontendApp.Stop(ctx)
if err := c.frontendApp.Stop(ctx); err != nil {
return err
}
for _, historyApp := range c.historyApps {
err := historyApp.Stop(ctx)
if err != nil {
errs = append(errs, err.Error())
}
errs = multierr.Combine(errs, historyApp.Stop(ctx))
}

err := c.matchingApp.Stop(ctx)
if err != nil {
errs = append(errs, err.Error())
}
errs = multierr.Combine(errs, c.matchingApp.Stop(ctx))

close(c.shutdownCh)
c.shutdownWG.Wait()

if len(errs) > 0 {
return errors.New("shutdown errors: " + strings.Join(errs, "; "))
}
return nil
return errs
}

func (c *temporalImpl) FrontendGRPCAddress() string {
Expand Down Expand Up @@ -443,7 +432,9 @@ func (c *temporalImpl) startFrontend(hosts map[primitives.ServiceName][]string,
c.adminClient = NewAdminClient(connection)
c.operatorClient = operatorservice.NewOperatorServiceClient(connection)

feApp.Start(context.Background())
if err := feApp.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start frontend service", tag.Error(err))
}

startWG.Done()
<-c.shutdownCh
Expand Down Expand Up @@ -537,7 +528,9 @@ func (c *temporalImpl) startHistory(
c.historyServices = append(c.historyServices, historyService)
c.historyNamespaceRegistries = append(c.historyNamespaceRegistries, namespaceRegistry)

app.Start(context.Background())
if err := app.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start history service", tag.Error(err))
}
}

startWG.Done()
Expand Down Expand Up @@ -600,7 +593,9 @@ func (c *temporalImpl) startMatching(hosts map[primitives.ServiceName][]string,
c.matchingApp = app
c.matchingService = matchingService
c.matchingNamespaceRegistry = namespaceRegistry
app.Start(context.Background())
if err := app.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start matching service", tag.Error(err))
}

startWG.Done()
<-c.shutdownCh
Expand Down Expand Up @@ -679,7 +674,9 @@ func (c *temporalImpl) startWorker(hosts map[primitives.ServiceName][]string, st
c.workerApp = app
c.workerService = workerService
c.workerNamespaceRegistry = namespaceRegistry
app.Start(context.Background())
if err := app.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start worker service", tag.Error(err))
}

startWG.Done()
<-c.shutdownCh
Expand Down
2 changes: 1 addition & 1 deletion host/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (s *scheduleIntegrationSuite) TestRefresh() {
atomic.AddInt32(&runs, 1)
return 0
})
workflow.Sleep(ctx, 10*time.Second) // longer than execution timeout
s.NoError(workflow.Sleep(ctx, 10*time.Second)) // longer than execution timeout
return nil
}
s.worker.RegisterWorkflowWithOptions(workflowFn, workflow.RegisterOptions{Name: wt})
Expand Down
13 changes: 9 additions & 4 deletions host/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/pborman/uuid"
"go.temporal.io/api/operatorservice/v1"
"go.uber.org/multierr"

"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -309,12 +310,16 @@ func (tc *TestCluster) SetFaultInjectionRate(rate float64) {
// TearDownCluster tears down the test cluster
func (tc *TestCluster) TearDownCluster() error {
tc.SetFaultInjectionRate(0)
err := tc.host.Stop()
errs := tc.host.Stop()
tc.host = nil
tc.testBase.TearDownWorkflowStore()
os.RemoveAll(tc.archiverBase.historyStoreDirectory)
os.RemoveAll(tc.archiverBase.visibilityStoreDirectory)
return err
if err := os.RemoveAll(tc.archiverBase.historyStoreDirectory); err != nil {
errs = multierr.Combine(errs, err)
}
if err := os.RemoveAll(tc.archiverBase.visibilityStoreDirectory); err != nil {
errs = multierr.Combine(errs, err)
}
return errs
}

// GetFrontendClient returns a frontend client from the test cluster
Expand Down
6 changes: 3 additions & 3 deletions host/workflow_failures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ func (s *integrationSuite) TestWorkflowTaskFailed() {

// Send signals during workflow task
if sendSignal {
s.sendSignal(s.namespace, workflowExecution, "signalC", nil, identity)
s.sendSignal(s.namespace, workflowExecution, "signalD", nil, identity)
s.sendSignal(s.namespace, workflowExecution, "signalE", nil, identity)
s.NoError(s.sendSignal(s.namespace, workflowExecution, "signalC", nil, identity))
s.NoError(s.sendSignal(s.namespace, workflowExecution, "signalD", nil, identity))
s.NoError(s.sendSignal(s.namespace, workflowExecution, "signalE", nil, identity))
sendSignal = false
}

Expand Down
12 changes: 6 additions & 6 deletions host/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
}
worker1.RegisterWorkflow(testWorkflowFn)
worker1.RegisterActivity(activityWithHB)
worker1.Start()
s.NoError(worker1.Start())

// Start a workflow
startTime := time.Now()
Expand Down Expand Up @@ -1954,7 +1954,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
// start worker2
worker2.RegisterWorkflow(testWorkflowFn)
worker2.RegisterActivity(activityWithHB)
worker2.Start()
s.NoError(worker2.Start())
defer worker2.Stop()

// ExecuteWorkflow return existing running workflow if it already started
Expand Down Expand Up @@ -1998,7 +1998,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()
s.NoError(worker1.Start())

// Start wf1 (in local ns)
workflowID := "local-ns-wf-1"
Expand Down Expand Up @@ -2353,7 +2353,7 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()
s.NoError(worker1.Start())

// Start wf1
workflowID := "force-replication-test-wf-1"
Expand Down Expand Up @@ -2438,7 +2438,7 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
s.Equal(clusterName[1], nsResp.ReplicationConfig.ActiveClusterName)

worker2.RegisterWorkflow(testWorkflowFn)
worker2.Start()
s.NoError(worker2.Start())

// Test reset workflow in cluster 2
resetResp, err := client2.ResetWorkflowExecution(testCtx, &workflowservice.ResetWorkflowExecutionRequest{
Expand Down Expand Up @@ -2477,7 +2477,7 @@ func (s *integrationClustersTestSuite) TestForceMigration_ResetWorkflow() {
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()
s.NoError(worker1.Start())

// Start wf1
workflowID := "force-replication-test-reset-wf-1"
Expand Down