Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#8261 from tinyspeck/am_vreplication_log
Browse files Browse the repository at this point in the history
[workflow] Add vreplication_log data to workflow protos, and `VtctldServer.GetWorkflows` method

Signed-off-by: Andrew Mason <amason@slack-corp.com>
  • Loading branch information
ajm188 committed Jun 15, 2021
1 parent 4710d2f commit f7be4b4
Show file tree
Hide file tree
Showing 10 changed files with 3,796 additions and 103,422 deletions.
462 changes: 295 additions & 167 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

151 changes: 151 additions & 0 deletions go/vt/vtctl/workflow/server.go
Expand Up @@ -20,14 +20,17 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"

"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/workflow/vexec"
"vitess.io/vitess/go/vt/vtgate/evalengine"
Expand Down Expand Up @@ -283,6 +286,147 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
if err := scanWorkflow(ctx, workflow, row, tablet); err != nil {
return nil, err
}

// Sort shard streams by stream_id ASC, to support an optimization
// in fetchStreamLogs below.
for _, shardStreams := range workflow.ShardStreams {
sort.Slice(shardStreams.Streams, func(i, j int) bool {
return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id
})
}
}
}

var (
wg sync.WaitGroup
vrepLogQuery = strings.TrimSpace(`
SELECT
id,
vrepl_id,
type,
state,
message,
created_at,
updated_at,
count
FROM
_vt.vreplication_log
ORDER BY
vrepl_id ASC,
id ASC
`)
)

fetchStreamLogs := func(ctx context.Context, workflow *vtctldatapb.Workflow) {
defer wg.Done()

results, err := vx.WithWorkflow(workflow.Name).QueryContext(ctx, vrepLogQuery)
if err != nil {
// Note that we do not return here. If there are any query results
// in the map (i.e. some tablets returned successfully), we will
// still try to read log rows from them on a best-effort basis. But,
// we will also pre-emptively record the top-level fetch error on
// every stream in every shard in the workflow. Further processing
// below may override the error message for certain streams.
for _, streams := range workflow.ShardStreams {
for _, stream := range streams.Streams {
stream.LogFetchError = err.Error()
}
}
}

for target, p3qr := range results {
qr := sqltypes.Proto3ToResult(p3qr)
shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString())

ss, ok := workflow.ShardStreams[shardStreamKey]
if !ok || ss == nil {
continue
}

streams := ss.Streams
streamIdx := 0
markErrors := func(err error) {
if streamIdx >= len(streams) {
return
}

streams[streamIdx].LogFetchError = err.Error()
}

for _, row := range qr.Rows {
id, err := evalengine.ToInt64(row[0])
if err != nil {
markErrors(err)
continue
}

streamID, err := evalengine.ToInt64(row[1])
if err != nil {
markErrors(err)
continue
}

typ := row[2].ToString()
state := row[3].ToString()
message := row[4].ToString()

createdAt, err := time.Parse("2006-01-02 15:04:05", row[5].ToString())
if err != nil {
markErrors(err)
continue
}

updatedAt, err := time.Parse("2006-01-02 15:04:05", row[6].ToString())
if err != nil {
markErrors(err)
continue
}

count, err := evalengine.ToInt64(row[7])
if err != nil {
markErrors(err)
continue
}

streamLog := &vtctldatapb.Workflow_Stream_Log{
Id: id,
StreamId: streamID,
Type: typ,
State: state,
CreatedAt: &vttime.Time{
Seconds: createdAt.Unix(),
},
UpdatedAt: &vttime.Time{
Seconds: updatedAt.Unix(),
},
Message: message,
Count: count,
}

// Earlier, in the main loop where we called scanWorkflow for
// each _vt.vreplication row, we also sorted each ShardStreams
// slice by ascending id, and our _vt.vreplication_log query
// ordered by (stream_id ASC, id ASC), so we can walk the
// streams in index order in O(n) amortized over all the rows
// for this tablet.
for streamIdx < len(streams) {
stream := streams[streamIdx]
if stream.Id < streamLog.StreamId {
streamIdx++
continue
}

if stream.Id > streamLog.StreamId {
log.Warningf("Found stream log for nonexistent stream: %+v", streamLog)
break
}

// stream.Id == streamLog.StreamId
stream.Logs = append(stream.Logs, streamLog)
break
}
}
}
}

Expand Down Expand Up @@ -326,9 +470,16 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows

workflow.MaxVReplicationLag = int64(maxVReplicationLag)

// Fetch logs for all streams associated with this workflow in the background.
wg.Add(1)
go fetchStreamLogs(ctx, workflow)

workflows = append(workflows, workflow)
}

// Wait for all the log fetchers to finish.
wg.Wait()

return &vtctldatapb.GetWorkflowsResponse{
Workflows: workflows,
}, nil
Expand Down
114 changes: 105 additions & 9 deletions go/vt/vtctl/workflow/vexec/query_plan.go
Expand Up @@ -31,17 +31,28 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
)

// QueryPlan wraps a planned query produced by a QueryPlanner. It is safe to
// execute a QueryPlan repeatedly and in multiple goroutines.
type QueryPlan struct {
// QueryPlan defines the interface to executing a preprared vexec query on one
// or more tablets. Implementations should ensure that it is safe to call the
// various Execute* methods repeatedly and in multiple goroutines.
type QueryPlan interface {
// Execute executes the planned query on a single target.
Execute(ctx context.Context, target *topo.TabletInfo) (*querypb.QueryResult, error)
// ExecuteScatter executes the planned query on the specified targets concurrently,
// returning a mapping of the target tablet to a querypb.QueryResult.
ExecuteScatter(ctx context.Context, targets ...*topo.TabletInfo) (map[*topo.TabletInfo]*querypb.QueryResult, error)
}

// FixedQueryPlan wraps a planned query produced by a QueryPlanner. It executes
// the same query with the same bind vals, regardless of the target.
type FixedQueryPlan struct {
ParsedQuery *sqlparser.ParsedQuery

workflow string
tmc tmclient.TabletManagerClient
}

// Execute executes a QueryPlan on a single target.
func (qp *QueryPlan) Execute(ctx context.Context, target *topo.TabletInfo) (qr *querypb.QueryResult, err error) {
// Execute is part of the QueryPlan interface.
func (qp *FixedQueryPlan) Execute(ctx context.Context, target *topo.TabletInfo) (qr *querypb.QueryResult, err error) {
if qp.ParsedQuery == nil {
return nil, fmt.Errorf("%w: call PlanQuery on a query planner first", ErrUnpreparedQuery)
}
Expand Down Expand Up @@ -71,10 +82,10 @@ func (qp *QueryPlan) Execute(ctx context.Context, target *topo.TabletInfo) (qr *
return qr, nil
}

// ExecuteScatter executes a QueryPlan on multiple targets concurrently,
// returning a mapping of target tablet to querypb.QueryResult. Errors from
// individual targets are aggregated into a singular error.
func (qp *QueryPlan) ExecuteScatter(ctx context.Context, targets ...*topo.TabletInfo) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
// ExecuteScatter is part of the QueryPlan interface. For a FixedQueryPlan, the
// exact same query is executed on each target, and errors from individual
// targets are aggregated into a singular error.
func (qp *FixedQueryPlan) ExecuteScatter(ctx context.Context, targets ...*topo.TabletInfo) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
if qp.ParsedQuery == nil {
// This check is an "optimization" on error handling. We check here,
// even though we will check this during the individual Execute calls,
Expand Down Expand Up @@ -114,3 +125,88 @@ func (qp *QueryPlan) ExecuteScatter(ctx context.Context, targets ...*topo.Tablet

return results, rec.AggrError(vterrors.Aggregate)
}

// PerTargetQueryPlan implements the QueryPlan interface. Unlike FixedQueryPlan,
// this implementation implements different queries, keyed by tablet alias, on
// different targets.
//
// It is the callers responsibility to ensure that the shape of the QueryResult
// (i.e. fields returned) is consistent for each target's planned query, but
// this is not enforced.
type PerTargetQueryPlan struct {
ParsedQueries map[string]*sqlparser.ParsedQuery

tmc tmclient.TabletManagerClient
}

// Execute is part of the QueryPlan interface.
//
// It returns ErrUnpreparedQuery if there is no ParsedQuery for the target's
// tablet alias.
func (qp *PerTargetQueryPlan) Execute(ctx context.Context, target *topo.TabletInfo) (qr *querypb.QueryResult, err error) {
if qp.ParsedQueries == nil {
return nil, fmt.Errorf("%w: call PlanQuery on a query planner first", ErrUnpreparedQuery)
}

targetAliasStr := target.AliasString()
query, ok := qp.ParsedQueries[targetAliasStr]
if !ok {
return nil, fmt.Errorf("%w: no prepared query for target %s", ErrUnpreparedQuery, targetAliasStr)
}

defer func() {
if err != nil {
log.Warningf("Result on %v: %v", targetAliasStr, err)
return
}
}()

qr, err = qp.tmc.VReplicationExec(ctx, target.Tablet, query.Query)
if err != nil {
return nil, err
}

return qr, nil
}

// ExecuteScatter is part of the QueryPlan interface.
func (qp *PerTargetQueryPlan) ExecuteScatter(ctx context.Context, targets ...*topo.TabletInfo) (map[*topo.TabletInfo]*querypb.QueryResult, error) {
if qp.ParsedQueries == nil {
// This check is an "optimization" on error handling. We check here,
// even though we will check this during the individual Execute calls,
// so that we return one error, rather than the same error aggregated
// len(targets) times.
return nil, fmt.Errorf("%w: call PlanQuery on a query planner first", ErrUnpreparedQuery)
}

var (
m sync.Mutex
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
results = make(map[*topo.TabletInfo]*querypb.QueryResult, len(targets))
)

for _, target := range targets {
wg.Add(1)

go func(ctx context.Context, target *topo.TabletInfo) {
defer wg.Done()

qr, err := qp.Execute(ctx, target)
if err != nil {
rec.RecordError(err)

return
}

m.Lock()
defer m.Unlock()

results[target] = qr
}(ctx, target)
}

wg.Wait()

return results, rec.AggrError(vterrors.Aggregate)
}

0 comments on commit f7be4b4

Please sign in to comment.