Skip to content

Commit

Permalink
Workflow Delete: remove workflow before artifacts
Browse files Browse the repository at this point in the history
Otherwise the cancel/delete and cleanup work can fail.

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed May 20, 2024
1 parent 9911880 commit 0b12d82
Showing 1 changed file with 36 additions and 38 deletions.
74 changes: 36 additions & 38 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete")
defer span.Finish()

ts, state, err := s.getWorkflowState(ctx, req.TargetKeyspace, req.Workflow)
ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow())
if err != nil {
return nil, err
}
Expand All @@ -1630,8 +1630,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
var dryRunResults *[]string

if state.WorkflowType == TypeMigrate {
dryRunResults, err = s.finalizeMigrateWorkflow(ctx, req.TargetKeyspace, req.Workflow, strings.Join(ts.tables, ","),
false, req.KeepData, req.KeepRoutingRules, req.DryRun)
dryRunResults, err = s.finalizeMigrateWorkflow(ctx, ts, strings.Join(ts.tables, ","), false, req.KeepData, req.KeepRoutingRules, req.DryRun)
if err != nil {
return nil, vterrors.Wrapf(err, "failed to finalize the %s workflow in the %s keyspace",
req.Workflow, req.TargetKeyspace)
Expand Down Expand Up @@ -1970,11 +1969,24 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
span.Annotate("keep_routing_rules", req.KeepRoutingRules)
span.Annotate("shards", req.Shards)

// Cleanup related data and artifacts.
if _, err := s.DropTargets(ctx, req.Keyspace, req.Workflow, req.KeepData, req.KeepRoutingRules, false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace)
}
ts, state, err := s.getWorkflowState(ctx, req.GetKeyspace(), req.GetWorkflow())
if err != nil {
log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", req.GetKeyspace(), req.GetWorkflow(), err)
return nil, err
}

// There is nothing to drop for a LookupVindex workflow.
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
return nil, nil
}

// Return an error if the workflow traffic is partially switched.
if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 {
return nil, ErrWorkflowPartiallySwitched
}

if state.WorkflowType == TypeMigrate {
_, err := s.finalizeMigrateWorkflow(ctx, ts, "", true, req.GetKeepData(), req.GetKeepRoutingRules(), false)
return nil, err
}

Expand Down Expand Up @@ -2002,6 +2014,14 @@ func (s *Server) WorkflowDelete(ctx context.Context, req *vtctldatapb.WorkflowDe
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "the %s workflow does not exist in the %s keyspace", req.Workflow, req.Keyspace)
}

// Cleanup related data and artifacts.
if _, err := s.DropTargets(ctx, ts, req.GetKeepData(), req.GetKeepRoutingRules(), false); err != nil {
if topo.IsErrType(err, topo.NoNode) {
return nil, vterrors.Wrapf(err, "%s keyspace does not exist", req.Keyspace)
}
return nil, err
}

response := &vtctldatapb.WorkflowDeleteResponse{}
response.Summary = fmt.Sprintf("Successfully cancelled the %s workflow in the %s keyspace", req.Workflow, req.Keyspace)
details := make([]*vtctldatapb.WorkflowDeleteResponse_TabletInfo, 0, len(res))
Expand Down Expand Up @@ -2497,28 +2517,8 @@ func (s *Server) optimizeCopyStateTable(tablet *topodatapb.Tablet) {

// DropTargets cleans up target tables, shards and denied tables if a MoveTables/Reshard
// is cancelled.
func (s *Server) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
ts, state, err := s.getWorkflowState(ctx, targetKeyspace, workflow)
if err != nil {
log.Errorf("Failed to get VReplication workflow state for %s.%s: %v", targetKeyspace, workflow, err)
return nil, err
}

// There is nothing to drop for a LookupVindex workflow.
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
return nil, nil
}

// Return an error if the workflow traffic is partially switched.
if state.WritesSwitched || len(state.ReplicaCellsSwitched) > 0 || len(state.RdonlyCellsSwitched) > 0 {
return nil, ErrWorkflowPartiallySwitched
}

if state.WorkflowType == TypeMigrate {
_, err := s.finalizeMigrateWorkflow(ctx, targetKeyspace, workflow, "", true, keepData, keepRoutingRules, dryRun)
return nil, err
}

func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
var err error
ts.keepRoutingRules = keepRoutingRules
var sw iswitcher
if dryRun {
Expand Down Expand Up @@ -2942,13 +2942,11 @@ func (s *Server) refreshPrimaryTablets(ctx context.Context, shards []*topo.Shard

// finalizeMigrateWorkflow deletes the streams for the Migrate workflow.
// We only cleanup the target for external sources.
func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, workflow, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflow)
if err != nil {
ts.Logger().Errorf("buildTrafficSwitcher failed: %v", err)
return nil, err
}
var sw iswitcher
func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitcher, tableSpecs string, cancel, keepData, keepRoutingRules, dryRun bool) (*[]string, error) {
var (
sw iswitcher
err error
)
if dryRun {
sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()}
} else {
Expand All @@ -2966,7 +2964,7 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, targetKeyspace, wo
return nil, err
}
if !cancel {
if err := sw.addParticipatingTablesToKeyspace(ctx, targetKeyspace, tableSpecs); err != nil {
if err := sw.addParticipatingTablesToKeyspace(ctx, ts.targetKeyspace, tableSpecs); err != nil {
return nil, err
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down

0 comments on commit 0b12d82

Please sign in to comment.