diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index a9ce8b841a5..bd6bf724707 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -49,6 +49,19 @@ const ( sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a" // Update the configuration values for a workflow's vreplication stream. sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a" +<<<<<<< HEAD +======= + // Update field values for multiple workflows. The final format specifier is + // used to optionally add any additional predicates to the query. + sqlUpdateVReplicationWorkflows = "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ %s.vreplication set%s where db_name = '%s'%s" + // Check if workflow is still copying. + sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d" +) + +var ( + errNoFieldsToUpdate = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no field values provided to update") + errAllWithIncludeExcludeWorkflows = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot specify all workflows along with either of include or exclude workflows") +>>>>>>> 4a7ad80a3e (VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217)) ) func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { @@ -227,6 +240,18 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl return resp, nil } +func isStreamCopying(tm *TabletManager, id int64) (bool, error) { + query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id) + res, err := tm.VREngine.Exec(query) + if err != nil { + return false, err + } + if res != nil && len(res.Rows) > 0 { + return true, nil + } + return false, nil +} + // UpdateVReplicationWorkflow updates the sidecar databases's vreplication // record(s) for this tablet's vreplication workflow stream(s). If there // are no streams for the given workflow on the tablet then a nil result @@ -302,6 +327,17 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if !textutil.ValueIsSimulatedNull(req.State) { state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)] } + if state == binlogdatapb.VReplicationWorkflowState_Running.String() { + // `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set + // the state as Copying. + isCopying, err := isStreamCopying(tm, id) + if err != nil { + return nil, err + } + if isCopying { + state = binlogdatapb.VReplicationWorkflowState_Copying.String() + } + } bindVars = map[string]*querypb.BindVariable{ "st": sqltypes.StringBindVariable(state), "sc": sqltypes.StringBindVariable(string(source)), diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 5ef9b4cd8c6..aa1141418b9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -290,6 +290,7 @@ func TestMoveTables(t *testing.T) { tenv.tmc.setVReplicationExecResults(sourceTablet.tablet, checkForJournal, &sqltypes.Result{}) for _, ftc := range targetShards { +<<<<<<< HEAD tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(checkForWorkflow, targetKs, wf), &sqltypes.Result{}) tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(checkForFrozenWorkflow, targetKs), &sqltypes.Result{}) tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflow, targetKs, wf), @@ -301,6 +302,11 @@ func TestMoveTables(t *testing.T) { "1", ), ) +======= + addInvariants(ftc.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) + getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), vreplID) + ftc.vrdbClient.AddInvariant(getCopyStateQuery, &sqltypes.Result{}) +>>>>>>> 4a7ad80a3e (VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217)) tenv.tmc.setVReplicationExecResults(ftc.tablet, getCopyState, &sqltypes.Result{}) tenv.tmc.setVReplicationExecResults(ftc.tablet, fmt.Sprintf(getWorkflowStatus, wf, targetKs), sqltypes.MakeTestResult( @@ -485,6 +491,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { ), fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]), ) + idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a", sqltypes.Int64BindVariable(int64(vreplID))) require.NoError(t, err) @@ -496,10 +503,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { fmt.Sprintf("%d", vreplID), ) + getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID)) + copyStatusFields := sqltypes.MakeTestFields( + "id", + "int64", + ) + notCopying := sqltypes.MakeTestResult(copyStatusFields) + copying := sqltypes.MakeTestResult(copyStatusFields, "1") + tests := []struct { - name string - request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest - query string + name string + request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest + query string + isCopying bool }{ { name: "update cells", @@ -579,6 +595,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID), }, + { + name: "update to running while copying", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState_Running, + Cells: textutil.SimulatedNullStringSlice, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, + OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), + }, + isCopying: true, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + keyspace, shard, cells[0], tabletTypes[0], vreplID), + }, } for _, tt := range tests { @@ -597,12 +626,26 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { // These are the same for each RPC call. tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil) +<<<<<<< HEAD tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil) +======= + if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running || + tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) { + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil) + if tt.isCopying { + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil) + } else { + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil) +>>>>>>> 4a7ad80a3e (VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217)) + } + } // This is our expected query, which will also short circuit // the test with an error as at this point we've tested what // we wanted to test. + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil) + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil) tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(tt.query, &sqltypes.Result{RowsAffected: 1}, errShortCircuit) _, err = tenv.tmc.tablets[tabletUID].tm.UpdateVReplicationWorkflow(ctx, tt.request) tenv.tmc.tablets[tabletUID].vrdbClient.Wait()