Skip to content

Commit

Permalink
Cherry-pick 4a7ad80 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Jun 18, 2024
1 parent d45b70b commit ede3240
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
36 changes: 36 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ const (
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a"
<<<<<<< HEAD
=======
// Update field values for multiple workflows. The final format specifier is
// used to optionally add any additional predicates to the query.
sqlUpdateVReplicationWorkflows = "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ %s.vreplication set%s where db_name = '%s'%s"
// Check if workflow is still copying.
sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d"
)

var (
errNoFieldsToUpdate = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no field values provided to update")
errAllWithIncludeExcludeWorkflows = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot specify all workflows along with either of include or exclude workflows")
>>>>>>> 4a7ad80a3e (VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217))
)

func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
Expand Down Expand Up @@ -227,6 +240,18 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl
return resp, nil
}

func isStreamCopying(tm *TabletManager, id int64) (bool, error) {
query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id)
res, err := tm.VREngine.Exec(query)
if err != nil {
return false, err
}
if res != nil && len(res.Rows) > 0 {
return true, nil
}
return false, nil
}

// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
// record(s) for this tablet's vreplication workflow stream(s). If there
// are no streams for the given workflow on the tablet then a nil result
Expand Down Expand Up @@ -302,6 +327,17 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
if !textutil.ValueIsSimulatedNull(req.State) {
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
}
if state == binlogdatapb.VReplicationWorkflowState_Running.String() {
// `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set
// the state as Copying.
isCopying, err := isStreamCopying(tm, id)
if err != nil {
return nil, err
}
if isCopying {
state = binlogdatapb.VReplicationWorkflowState_Copying.String()
}
}
bindVars = map[string]*querypb.BindVariable{
"st": sqltypes.StringBindVariable(state),
"sc": sqltypes.StringBindVariable(string(source)),
Expand Down
49 changes: 46 additions & 3 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestMoveTables(t *testing.T) {
tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, checkForJournal, &sqltypes.Result{})

for _, ftc := range targetShards {
<<<<<<< HEAD
tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflow, targetKs, wf),
Expand All @@ -296,6 +297,11 @@ func TestMoveTables(t *testing.T) {
"1",
),
)
=======
addInvariants(ftc.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0])
getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), vreplID)
ftc.vrdbClient.AddInvariant(getCopyStateQuery, &sqltypes.Result{})
>>>>>>> 4a7ad80a3e (VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217))
tenv.tmc.setVReplicationExecResults(ftc.tablet, getCopyState, &sqltypes.Result{})
tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflowStatus, wf, targetKs),
sqltypes.MakeTestResult(
Expand Down Expand Up @@ -478,6 +484,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
),
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]),
)

idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a",
sqltypes.Int64BindVariable(int64(vreplID)))
require.NoError(t, err)
Expand All @@ -489,10 +496,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
fmt.Sprintf("%d", vreplID),
)

getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID))
copyStatusFields := sqltypes.MakeTestFields(
"id",
"int64",
)
notCopying := sqltypes.MakeTestResult(copyStatusFields)
copying := sqltypes.MakeTestResult(copyStatusFields, "1")

tests := []struct {
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
name string
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
query string
isCopying bool
}{
{
name: "update cells",
Expand Down Expand Up @@ -572,6 +588,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
{
name: "update to running while copying",
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflow,
State: binlogdatapb.VReplicationWorkflowState_Running,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
},
isCopying: true,
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, cells[0], tabletTypes[0], vreplID),
},
}

for _, tt := range tests {
Expand All @@ -590,12 +619,26 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
// These are the same for each RPC call.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
<<<<<<< HEAD
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)
=======
if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running ||
tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
if tt.isCopying {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil)
} else {
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil)
>>>>>>> 4a7ad80a3e (VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217))

}
}
// This is our expected query, which will also short circuit
// the test with an error as at this point we've tested what
// we wanted to test.
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(tt.query, &sqltypes.Result{RowsAffected: 1}, errShortCircuit)
_, err = tenv.tmc.tablets[tabletUID].tm.UpdateVReplicationWorkflow(ctx, tt.request)
tenv.tmc.tablets[tabletUID].vrdbClient.Wait()
Expand Down

0 comments on commit ede3240

Please sign in to comment.