Skip to content

Commit

Permalink
Put initial denied tables entries in place on MoveTables create
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed May 22, 2024
1 parent 7dd69c2 commit 371c90c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
31 changes: 28 additions & 3 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
if cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if cerr = ts.dropTargetDeniedTables(ctx); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup denied table entries: %v", cerr)
}
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
Expand All @@ -1473,9 +1476,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}()

// Now that the streams have been successfully created, let's put the associated
// routing rules in place.
// routing rules and denied tables entries in place.
if externalTopo == nil {
if err := s.setupInitialRoutingRules(ctx, req, mz, tables, vschema); err != nil {
if err := s.setupInitialRoutingRules(ctx, req, mz, tables); err != nil {
return nil, err
}

Expand All @@ -1484,6 +1487,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
}
if err := s.setupInitialDeniedTables(ctx, req); err != nil {
return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards")
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1547,7 +1553,26 @@ func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateReque
return nil
}

func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string, vschema *vschemapb.Keyspace) error {
func (s *Server) setupInitialDeniedTables(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest) error {
ts, err := s.buildTrafficSwitcher(ctx, req.GetTargetKeyspace(), req.GetWorkflow())
if err != nil {
return err
}
err = ts.ForAllTargets(func(target *MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables())
}); err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(ctx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
return err
})
return err
}

func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string) error {
if err := s.validateRoutingRuleFlags(req, mz); err != nil {
return err
}
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,22 @@ func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error {
})
}

// preventTableTargetWrites puts denied table entries in place for the given tables on the
// target tablets. This should be used when a MoveTables workflow is initially created.
func (ts *trafficSwitcher) preventTableTargetWrites(ctx context.Context) error {
return ts.ForAllTargets(func(target *MigrationTarget) error {
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, false, ts.Tables())
}); err != nil {
return err
}
rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout)
defer cancel()
_, _, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger())
return err
})
}

func (ts *trafficSwitcher) changeRouting(ctx context.Context) error {
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
return ts.changeWriteRoute(ctx)
Expand Down

0 comments on commit 371c90c

Please sign in to comment.