Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#8272 from tinyspeck/am_vtctld_concurrent_…
Browse files Browse the repository at this point in the history
…scanworkflow

[workflow] Call `scanWorkflow` concurrently

Signed-off-by: Andrew Mason <amason@slack-corp.com>
  • Loading branch information
ajm188 committed Jun 15, 2021
1 parent f2d95dd commit 479197f
Showing 1 changed file with 49 additions and 17 deletions.
66 changes: 49 additions & 17 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/workflow/vexec"
Expand Down Expand Up @@ -120,6 +121,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return nil, err
}

m := sync.Mutex{} // guards access to the following maps during concurrent calls to scanWorkflow
workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results))
sourceKeyspaceByWorkflow := make(map[string]string, len(results))
sourceShardsByWorkflow := make(map[string]sets.String, len(results))
Expand Down Expand Up @@ -205,6 +207,15 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
stream.State = "Lagging"
}

// At this point, we're going to start modifying the maps defined
// outside this function, as well as fields on the passed-in Workflow
// pointer. Since we're running concurrently, take the lock.
//
// We've already made the remote call to getCopyStates, so synchronizing
// here shouldn't hurt too badly, performance-wise.
m.Lock()
defer m.Unlock()

shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString())
shardStream, ok := workflow.ShardStreams[shardStreamKey]
if !ok {
Expand Down Expand Up @@ -255,6 +266,11 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return nil
}

var (
scanWorkflowWg sync.WaitGroup
scanWorkflowErrors concurrency.FirstErrorRecorder
)

for tablet, result := range results {
qr := sqltypes.Proto3ToResult(result)

Expand Down Expand Up @@ -283,22 +299,23 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
targetShardsByWorkflow[workflowName] = sets.NewString()
}

if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
return nil, err
}

// Sort shard streams by stream_id ASC, to support an optimization
// in fetchStreamLogs below.
for _, shardStreams := range workflow.ShardStreams {
sort.Slice(shardStreams.Streams, func(i, j int) bool {
return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
})
}
scanWorkflowWg.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) {
defer scanWorkflowWg.Done()
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
scanWorkflowErrors.RecordError(err)
}
}(ctx, workflow, row, tablet)
}
}

scanWorkflowWg.Wait()
if scanWorkflowErrors.HasErrors() {
return nil, scanWorkflowErrors.Error()
}

var (
wg sync.WaitGroup
fetchLogsWG sync.WaitGroup
vrepLogQuery = strings.TrimSpace(`
SELECT
id,
Expand All @@ -318,7 +335,11 @@ ORDER BY
)

fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer wg.Done()
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("workflow", workflow.Name)

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
if err != nil {
Expand Down Expand Up @@ -470,15 +491,26 @@ ORDER BY

workflow.MaxVReplicationLag = int64(maxVReplicationLag)

// Fetch logs for all streams associated with this workflow in the background.
wg.Add(1)
go fetchStreamLogs(ctx, workflow)
// Sort shard streams by stream_id ASC, to support an optimization
// in fetchStreamLogs below.
for _, shardStreams := range workflow.ShardStreams {
sort.Slice(shardStreams.Streams, func(i, j int) bool {
return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
})
}

workflows = append(workflows, workflow)

// Fetch logs for all streams associated with this workflow in the background.
fetchLogsWG.Add(1)
go func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer fetchLogsWG.Done()
fetchStreamLogs(ctx, workflow)
}(ctx, workflow)
}

// Wait for all the log fetchers to finish.
wg.Wait()
fetchLogsWG.Wait()

return &vtctldatapb.GetWorkflowsResponse{
Workflows: workflows,
Expand Down

0 comments on commit 479197f

Please sign in to comment.