Skip to content

Commit

Permalink
Fix errcheck in ./host/
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 20, 2022
1 parent c6a89c0 commit 18c6fed
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 59 deletions.
22 changes: 12 additions & 10 deletions host/cancel_workflow_test.go
Expand Up @@ -513,15 +513,17 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
s.NoError(err0)
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

s.engine.RequestCancelWorkflowExecution(NewContext(), &workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Identity: identity,
RequestId: uuid.New(),
})
_, err := s.engine.RequestCancelWorkflowExecution(NewContext(),
&workflowservice.RequestCancelWorkflowExecutionRequest{
Namespace: s.namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.RunId,
},
Identity: identity,
RequestId: uuid.New(),
})
s.NoError(err)

childCancelled := false
var initiatedEvent *historypb.HistoryEvent
Expand Down Expand Up @@ -620,7 +622,7 @@ func (s *integrationSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
}

s.Logger.Info("Process first workflow task which starts and request cancels child workflow")
_, err := poller.PollAndProcessWorkflowTask(false, false)
_, err = poller.PollAndProcessWorkflowTask(false, false)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal("BadRequestCancelExternalWorkflowExecutionAttributes: Start and RequestCancel for child workflow is not allowed in same workflow task.", err.Error())
Expand Down
19 changes: 11 additions & 8 deletions host/client_integration_test.go
Expand Up @@ -884,9 +884,11 @@ func (s *clientIntegrationSuite) Test_StickyWorkerRestartWorkflowTask() {
for _, tt := range testCases {
s.Run(tt.name, func() {
workflowFn := func(ctx workflow.Context) (string, error) {
workflow.SetQueryHandler(ctx, "test", func() (string, error) {
if err := workflow.SetQueryHandler(ctx, "test", func() (string, error) {
return "query works", nil
})
}); err != nil {
return "", err
}

signalCh := workflow.GetSignalChannel(ctx, "test")
var msg string
Expand Down Expand Up @@ -1293,9 +1295,11 @@ func (s *clientIntegrationSuite) Test_BufferedQuery() {
workflowFn := func(ctx workflow.Context) error {
wfStarted.Done()
status := "init"
workflow.SetQueryHandler(ctx, "foo", func() (string, error) {
if err := workflow.SetQueryHandler(ctx, "foo", func() (string, error) {
return status, nil
})
}); err != nil {
return err
}
ctx1 := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
ScheduleToCloseTimeout: 10 * time.Second,
})
Expand All @@ -1305,9 +1309,7 @@ func (s *clientIntegrationSuite) Test_BufferedQuery() {
err1 := f1.Get(ctx1, nil)
status = "done"

workflow.Sleep(ctx, 5*time.Second)

return err1
return multierr.Combine(err1, workflow.Sleep(ctx, 5*time.Second))
}

s.worker.RegisterWorkflow(workflowFn)
Expand Down Expand Up @@ -1335,13 +1337,14 @@ func (s *clientIntegrationSuite) Test_BufferedQuery() {
// sleep 2s to make sure DescribeMutableState is called after QueryWorkflow
time.Sleep(2 * time.Second)
// make DescribeMutableState call, which force mutable state to reload from db
s.adminClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
_, err := s.adminClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: s.namespace,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: workflowRun.GetRunID(),
},
})
s.Assert().NoError(err)
}()

// this query will be buffered in mutable state because workflow task is in-flight.
Expand Down
2 changes: 1 addition & 1 deletion host/cron_test.go
Expand Up @@ -410,7 +410,7 @@ func (s *clientIntegrationSuite) TestCronWorkflowCompletionStates() {
s.Equal(lcr, "pass")
s.NotNil(workflow.GetLastError(ctx))
s.Equal(workflow.GetLastError(ctx).Error(), "second error")
workflow.Sleep(ctx, 10*time.Second) // cause wft timeout
s.NoError(workflow.Sleep(ctx, 10*time.Second)) // cause wft timeout
panic("should have been timed out on server already")

case 4:
Expand Down
2 changes: 1 addition & 1 deletion host/gethistory_test.go
Expand Up @@ -701,7 +701,7 @@ func (s *clientIntegrationSuite) TestGetHistoryReverse_MultipleBranches() {
err1 = f1.Get(ctx1, nil)
s.NoError(err1)

workflow.Sleep(ctx, time.Second*2)
s.NoError(workflow.Sleep(ctx, time.Second*2))

f2 := workflow.ExecuteActivity(ctx1, activityFn)
err2 = f2.Get(ctx1, nil)
Expand Down
4 changes: 3 additions & 1 deletion host/max_buffered_event_test.go
Expand Up @@ -64,7 +64,9 @@ func (s *clientIntegrationSuite) TestMaxBufferedEventsLimit() {
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1},
})
f1 := workflow.ExecuteLocalActivity(ctx1, localActivityFn)
f1.Get(ctx, nil)
if err := f1.Get(ctx, nil); err != nil {
return 0, err
}

sigCh := workflow.GetSignalChannel(ctx, "test-signal")

Expand Down
45 changes: 21 additions & 24 deletions host/onebox.go
Expand Up @@ -27,14 +27,13 @@ package host
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/exp/maps"
"google.golang.org/grpc"

Expand Down Expand Up @@ -244,38 +243,28 @@ func (c *temporalImpl) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

var errs []string
var errs error

if c.enableWorker() {
c.shutdownWG.Add(1)
err := c.workerApp.Stop(ctx)
if err != nil {
errs = append(errs, err.Error())
}
errs = multierr.Combine(errs, c.workerApp.Stop(ctx))
}

c.shutdownWG.Add(3)

c.frontendApp.Stop(ctx)
if err := c.frontendApp.Stop(ctx); err != nil {
return err
}
for _, historyApp := range c.historyApps {
err := historyApp.Stop(ctx)
if err != nil {
errs = append(errs, err.Error())
}
errs = multierr.Combine(errs, historyApp.Stop(ctx))
}

err := c.matchingApp.Stop(ctx)
if err != nil {
errs = append(errs, err.Error())
}
errs = multierr.Combine(errs, c.matchingApp.Stop(ctx))

close(c.shutdownCh)
c.shutdownWG.Wait()

if len(errs) > 0 {
return errors.New("shutdown errors: " + strings.Join(errs, "; "))
}
return nil
return errs
}

func (c *temporalImpl) FrontendGRPCAddress() string {
Expand Down Expand Up @@ -443,7 +432,9 @@ func (c *temporalImpl) startFrontend(hosts map[primitives.ServiceName][]string,
c.adminClient = NewAdminClient(connection)
c.operatorClient = operatorservice.NewOperatorServiceClient(connection)

feApp.Start(context.Background())
if err := feApp.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start frontend service", tag.Error(err))
}

startWG.Done()
<-c.shutdownCh
Expand Down Expand Up @@ -537,7 +528,9 @@ func (c *temporalImpl) startHistory(
c.historyServices = append(c.historyServices, historyService)
c.historyNamespaceRegistries = append(c.historyNamespaceRegistries, namespaceRegistry)

app.Start(context.Background())
if err := app.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start history service", tag.Error(err))
}
}

startWG.Done()
Expand Down Expand Up @@ -600,7 +593,9 @@ func (c *temporalImpl) startMatching(hosts map[primitives.ServiceName][]string,
c.matchingApp = app
c.matchingService = matchingService
c.matchingNamespaceRegistry = namespaceRegistry
app.Start(context.Background())
if err := app.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start matching service", tag.Error(err))
}

startWG.Done()
<-c.shutdownCh
Expand Down Expand Up @@ -679,7 +674,9 @@ func (c *temporalImpl) startWorker(hosts map[primitives.ServiceName][]string, st
c.workerApp = app
c.workerService = workerService
c.workerNamespaceRegistry = namespaceRegistry
app.Start(context.Background())
if err := app.Start(context.Background()); err != nil {
c.logger.Fatal("unable to start worker service", tag.Error(err))
}

startWG.Done()
<-c.shutdownCh
Expand Down
2 changes: 1 addition & 1 deletion host/schedule_test.go
Expand Up @@ -508,7 +508,7 @@ func (s *scheduleIntegrationSuite) TestRefresh() {
atomic.AddInt32(&runs, 1)
return 0
})
workflow.Sleep(ctx, 10*time.Second) // longer than execution timeout
s.NoError(workflow.Sleep(ctx, 10*time.Second)) // longer than execution timeout
return nil
}
s.worker.RegisterWorkflowWithOptions(workflowFn, workflow.RegisterOptions{Name: wt})
Expand Down
13 changes: 9 additions & 4 deletions host/test_cluster.go
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/pborman/uuid"
"go.temporal.io/api/operatorservice/v1"
"go.uber.org/multierr"

"go.temporal.io/server/api/adminservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -309,12 +310,16 @@ func (tc *TestCluster) SetFaultInjectionRate(rate float64) {
// TearDownCluster tears down the test cluster
func (tc *TestCluster) TearDownCluster() error {
tc.SetFaultInjectionRate(0)
err := tc.host.Stop()
errs := tc.host.Stop()
tc.host = nil
tc.testBase.TearDownWorkflowStore()
os.RemoveAll(tc.archiverBase.historyStoreDirectory)
os.RemoveAll(tc.archiverBase.visibilityStoreDirectory)
return err
if err := os.RemoveAll(tc.archiverBase.historyStoreDirectory); err != nil {
errs = multierr.Combine(errs, err)
}
if err := os.RemoveAll(tc.archiverBase.visibilityStoreDirectory); err != nil {
errs = multierr.Combine(errs, err)
}
return errs
}

// GetFrontendClient returns a frontend client from the test cluster
Expand Down
6 changes: 3 additions & 3 deletions host/workflow_failures_test.go
Expand Up @@ -181,9 +181,9 @@ func (s *integrationSuite) TestWorkflowTaskFailed() {

// Send signals during workflow task
if sendSignal {
s.sendSignal(s.namespace, workflowExecution, "signalC", nil, identity)
s.sendSignal(s.namespace, workflowExecution, "signalD", nil, identity)
s.sendSignal(s.namespace, workflowExecution, "signalE", nil, identity)
s.NoError(s.sendSignal(s.namespace, workflowExecution, "signalC", nil, identity))
s.NoError(s.sendSignal(s.namespace, workflowExecution, "signalD", nil, identity))
s.NoError(s.sendSignal(s.namespace, workflowExecution, "signalE", nil, identity))
sendSignal = false
}

Expand Down
12 changes: 6 additions & 6 deletions host/xdc/integration_failover_test.go
Expand Up @@ -1899,7 +1899,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
}
worker1.RegisterWorkflow(testWorkflowFn)
worker1.RegisterActivity(activityWithHB)
worker1.Start()
s.NoError(worker1.Start())

// Start a workflow
startTime := time.Now()
Expand Down Expand Up @@ -1954,7 +1954,7 @@ func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
// start worker2
worker2.RegisterWorkflow(testWorkflowFn)
worker2.RegisterActivity(activityWithHB)
worker2.Start()
s.NoError(worker2.Start())
defer worker2.Stop()

// ExecuteWorkflow return existing running workflow if it already started
Expand Down Expand Up @@ -1998,7 +1998,7 @@ func (s *integrationClustersTestSuite) TestLocalNamespaceMigration() {
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()
s.NoError(worker1.Start())

// Start wf1 (in local ns)
workflowID := "local-ns-wf-1"
Expand Down Expand Up @@ -2353,7 +2353,7 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()
s.NoError(worker1.Start())

// Start wf1
workflowID := "force-replication-test-wf-1"
Expand Down Expand Up @@ -2438,7 +2438,7 @@ func (s *integrationClustersTestSuite) TestForceMigration_ClosedWorkflow() {
s.Equal(clusterName[1], nsResp.ReplicationConfig.ActiveClusterName)

worker2.RegisterWorkflow(testWorkflowFn)
worker2.Start()
s.NoError(worker2.Start())

// Test reset workflow in cluster 2
resetResp, err := client2.ResetWorkflowExecution(testCtx, &workflowservice.ResetWorkflowExecutionRequest{
Expand Down Expand Up @@ -2477,7 +2477,7 @@ func (s *integrationClustersTestSuite) TestForceMigration_ResetWorkflow() {
}

worker1.RegisterWorkflow(testWorkflowFn)
worker1.Start()
s.NoError(worker1.Start())

// Start wf1
workflowID := "force-replication-test-reset-wf-1"
Expand Down

0 comments on commit 18c6fed

Please sign in to comment.