Skip to content

Commit

Permalink
*: Implement /upgrade/start and upgrade/finish APIs (pingcap#47000)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Oct 11, 2023
1 parent 9fdf362 commit a1ea04c
Show file tree
Hide file tree
Showing 15 changed files with 581 additions and 97 deletions.
25 changes: 15 additions & 10 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,43 +143,48 @@ func NewMockStateSyncer() syncer.StateSyncer {
return &MockStateSyncer{}
}

// clusterState mocks cluster state.
// We move it from MockStateSyncer to here. Because we want to make it unaffected by ddl close.
var clusterState *atomicutil.Pointer[syncer.StateInfo]

// MockStateSyncer is a mock state syncer, it is exported for testing.
type MockStateSyncer struct {
clusterState *atomicutil.Pointer[syncer.StateInfo]
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
globalVerCh chan clientv3.WatchResponse
mockSession chan struct{}
}

// Init implements StateSyncer.Init interface.
func (s *MockStateSyncer) Init(context.Context) error {
s.globalVerCh = make(chan clientv3.WatchResponse, 1)
s.mockSession = make(chan struct{}, 1)
state := syncer.NewStateInfo(syncer.StateNormalRunning)
s.clusterState = atomicutil.NewPointer(state)
if clusterState == nil {
clusterState = atomicutil.NewPointer(state)
}
return nil
}

// UpdateGlobalState implements StateSyncer.UpdateGlobalState interface.
func (s *MockStateSyncer) UpdateGlobalState(_ context.Context, stateInfo *syncer.StateInfo) error {
failpoint.Inject("mockUpgradingState", func(val failpoint.Value) {
if val.(bool) {
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
failpoint.Return(nil)
}
})
s.globalVerCh <- clientv3.WatchResponse{}
s.clusterState.Store(stateInfo)
clusterState.Store(stateInfo)
return nil
}

// GetGlobalState implements StateSyncer.GetGlobalState interface.
func (s *MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return s.clusterState.Load(), nil
func (*MockStateSyncer) GetGlobalState(context.Context) (*syncer.StateInfo, error) {
return clusterState.Load(), nil
}

// IsUpgradingState implements StateSyncer.IsUpgradingState interface.
func (s *MockStateSyncer) IsUpgradingState() bool {
return s.clusterState.Load().State == syncer.StateUpgrading
func (*MockStateSyncer) IsUpgradingState() bool {
return clusterState.Load().State == syncer.StateUpgrading
}

// WatchChan implements StateSyncer.WatchChan interface.
Expand Down
12 changes: 11 additions & 1 deletion docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,6 @@ timezone.*
# reset the size of the ballast object (2GB in this example)
curl -v -X POST -d "2147483648" http://{TiDBIP}:10080/debug/ballast-object-sz
```
1. Set deadlock history table capacity
Expand All @@ -591,3 +590,14 @@ timezone.*
```shell
curl -X POST -d "transaction_summary_capacity={number}" http://{TiDBIP}:10080/settings
```
1. Send upgrade operations to the cluster. The operations here include `start` and `finish`.
```shell
curl -X POST http://{TiDBIP}:10080/upgrade/{op}
```
```shell
$curl -X POST http://127.0.0.1:10080/upgrade/start
"success!"
```
1 change: 1 addition & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"stat.go",
"statistics_handler.go",
"tokenlimiter.go",
"upgrade_handler.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/server",
Expand Down
Empty file added server/handler/BUILD.bazel
Empty file.
Empty file.
85 changes: 85 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,3 +1243,88 @@ func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
conf.Labels = map[string]string{}
})
}

func TestUpgrade(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ts.startServer(t)
defer ts.stopServer(t)

resp, err := ts.fetchStatus("/upgrade/start")
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
require.NoError(t, resp.Body.Close())

require.NoError(t, err)
require.NotNil(t, resp)
// test upgrade start
resp, err = ts.postStatus("/upgrade/start", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err := httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err := session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err := session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// Do start upgrade again.
resp, err = ts.postStatus("/upgrade/start", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"It's a duplicated operation and the cluster is already in upgrading state.\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// test upgrade finish
resp, err = ts.postStatus("/upgrade/finish", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)

// Do finish upgrade again.
resp, err = ts.postStatus("/upgrade/finish", "application/x-www-form-urlencoded", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"It's a duplicated operation and the cluster is already in normal state.\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)
}
3 changes: 3 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ func (s *Server) startHTTPServer() {
// HTTP path for get table tiflash replica info.
router.Handle("/tiflash/replica-deprecated", flashReplicaHandler{tikvHandlerTool})

// HTTP path for upgrade operations.
router.Handle("/upgrade/{op}", NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")

if s.cfg.Store == "tikv" {
// HTTP path for tikv.
router.Handle("/tables/{db}/{table}/regions", tableHandler{tikvHandlerTool, opTableRegions})
Expand Down
Loading

0 comments on commit a1ea04c

Please sign in to comment.