Skip to content

Commit

Permalink
[feat]: add get write ahead log replica state api
Browse files Browse the repository at this point in the history
  • Loading branch information
stone1100 committed Nov 24, 2021
1 parent 6d2a054 commit 300dbf2
Show file tree
Hide file tree
Showing 20 changed files with 500 additions and 98 deletions.
20 changes: 10 additions & 10 deletions app/broker/api/metadata/explore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,54 +78,54 @@ func TestExploreAPI_ExploreRepo(t *testing.T) {
resp := mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath, "")
assert.Equal(t, http.StatusInternalServerError, resp.Code)
// not found
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=broker&type=LiveNode1", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Broker&type=LiveNode1", "")
assert.Equal(t, http.StatusNotFound, resp.Code)

// case 2: walk entry err
repo.EXPECT().WalkEntry(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("err"))
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=broker&type=LiveNode", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Broker&type=LiveNode", "")
assert.Equal(t, http.StatusInternalServerError, resp.Code)
// case 3: walk entry value format err
repo.EXPECT().WalkEntry(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, prefix string, fn func(key, value []byte)) error {
fn([]byte("key"), []byte("value"))
return nil
})
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=broker&type=LiveNode", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Broker&type=LiveNode", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 4: walk entry ok
repo.EXPECT().WalkEntry(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, prefix string, fn func(key, value []byte)) error {
fn([]byte("key"), encoding.JSONMarshal(&models.StatelessNode{HostIP: "1.1.1.1"}))
return nil
})
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=broker&type=LiveNode", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Broker&type=LiveNode", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 6: explore master
repo.EXPECT().WalkEntry(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, prefix string, fn func(key, value []byte)) error {
fn([]byte("key"), encoding.JSONMarshal(&models.Master{ElectTime: 11}))
return nil
})
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=master&type=Master", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Master&type=Master", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 7: explore storage, storage name is nil
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=storage&type=LiveNode", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Storage&type=LiveNode", "")
assert.Equal(t, http.StatusInternalServerError, resp.Code)
// case 8: explore storage, current master, state not found
master.EXPECT().IsMaster().Return(true).MaxTimes(2)
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=storage&type=LiveNode1&storageName=test", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Storage&type=LiveNode1&storageName=test", "")
assert.Equal(t, http.StatusNotFound, resp.Code)
stateMgr.EXPECT().GetStorageCluster("test").Return(nil)
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=storage&type=LiveNode&storageName=test", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Storage&type=LiveNode&storageName=test", "")
assert.Equal(t, http.StatusNotFound, resp.Code)
// case 9: explore storage, current master, ok
master.EXPECT().IsMaster().Return(true)
storage := masterpkg.NewMockStorageCluster(ctrl)
stateMgr.EXPECT().GetStorageCluster(gomock.Any()).Return(storage).AnyTimes()
storage.EXPECT().GetRepo().Return(repo)
repo.EXPECT().WalkEntry(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=storage&type=LiveNode&storageName=test", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Storage&type=LiveNode&storageName=test", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 10: explore storage, current is not master, need forward to master
backend := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -141,6 +141,6 @@ func TestExploreAPI_ExploreRepo(t *testing.T) {

master.EXPECT().IsMaster().Return(false)
master.EXPECT().GetMaster().Return(&models.Master{Node: &models.StatelessNode{HostIP: "127.0.0.1", HTTPPort: 8089}})
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=storage&type=LiveNode&storageName=test", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreRepoPath+"?role=Storage&type=LiveNode&storageName=test", "")
assert.Equal(t, http.StatusOK, resp.Code)
}
3 changes: 3 additions & 0 deletions app/broker/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type API struct {
storage *admin.StorageClusterAPI
explore *metadata.ExploreAPI
stateExplore *state.ExploreAPI
replica *state.ReplicaAPI
metricExplore *monitoring.ExploreAPI
influxIngestion *ingest.InfluxWriter
protoIngestion *ingest.ProtoWriter
Expand All @@ -55,6 +56,7 @@ func NewAPI(deps *deps.HTTPDeps) *API {
storage: admin.NewStorageClusterAPI(deps),
explore: metadata.NewExploreAPI(deps),
stateExplore: state.NewExploreAPI(deps),
replica: state.NewReplicaAPI(deps),
metricExplore: monitoring.NewExploreAPI(deps.GlobalKeyValues),
influxIngestion: ingest.NewInfluxWriter(deps),
protoIngestion: ingest.NewProtoWriter(deps),
Expand All @@ -74,6 +76,7 @@ func (api *API) RegisterRouter(router *gin.RouterGroup) {

api.stateExplore.Register(router)
api.metricExplore.Register(router)
api.replica.Register(router)

api.metadata.Register(router)
api.metric.Register(router)
Expand Down
22 changes: 10 additions & 12 deletions app/broker/api/state/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package state
import (
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -147,12 +148,14 @@ func (d *ExploreAPI) fetchStateData(c *gin.Context, nodes []models.Node) {
go func() {
defer wait.Done()
node := nodes[i]
req, _ := http.NewRequest("GET", node.HTTPAddress(), nil)
req, _ := http.NewRequest(http.MethodGet, node.HTTPAddress(), nil)
req.URL.Path = c.Request.URL.Path + "/current"
req.URL.RawQuery = params
rs, err := d.get(req)
if err == nil {
result[i] = rs
var metric map[string][]*models.StateMetric
if err := get(req, func(body io.Reader) error {
return json.NewDecoder(body).Decode(&metric)
}); err == nil {
result[i] = metric
}
}()
}
Expand All @@ -176,20 +179,15 @@ func (d *ExploreAPI) fetchStateData(c *gin.Context, nodes []models.Node) {
}

// get does http get request, then returns the internal metric for given target node.
func (d *ExploreAPI) get(req *http.Request) (map[string][]*models.StateMetric, error) {
func get(req *http.Request, decoder func(body io.Reader) error) error {
req.Header.Set("Content-Type", "application/json;charset=UTF-8")

resp, err := doRequest(req)
if err != nil {
return nil, err
return err
}
defer func() {
_ = resp.Body.Close()
}()
var metric map[string][]*models.StateMetric
err = json.NewDecoder(resp.Body).Decode(&metric)
if err != nil {
return nil, err
}
return metric, nil
return decoder(resp.Body)
}
22 changes: 11 additions & 11 deletions app/broker/api/state/explore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestExploreAPI_Explore(t *testing.T) {
// case 1: params invalid
resp := mock.DoRequest(t, r, http.MethodGet, ExplorePath, "")
assert.Equal(t, http.StatusInternalServerError, resp.Code)
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=broker1&names=cpu", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Broker1&names=cpu", "")
assert.Equal(t, http.StatusNotFound, resp.Code)
// case 2: fetch err
stateMgr.EXPECT().GetLiveNodes().Return([]models.StatelessNode{{
Expand All @@ -59,33 +59,33 @@ func TestExploreAPI_Explore(t *testing.T) {
doRequest = func(req *http.Request) (*http.Response, error) {
return nil, fmt.Errorf("err")
}
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=broker&names=cpu", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Broker&names=cpu", "")
assert.Equal(t, http.StatusOK, resp.Code)

// case 3: fetch ok, resp data invalid
doRequest = func(req *http.Request) (*http.Response, error) {
return &http.Response{Body: io.NopCloser(strings.NewReader("a"))}, nil
}
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=broker&names=cpu", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Broker&names=cpu", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 4: broker success
buf := io.NopCloser(strings.NewReader(`{
"cpu":[{"fields":[{"value":1}]},{"fields":[{"value":1}]}]}`))
doRequest = func(req *http.Request) (*http.Response, error) {
return &http.Response{Body: buf}, nil
}
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=broker&names=cpu", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Broker&names=cpu", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 5: storage name is nil, err
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=storage&names=cpu", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Storage&names=cpu", "")
assert.Equal(t, http.StatusInternalServerError, resp.Code)
// case 6: storage name not exist
stateMgr.EXPECT().GetStorage(gomock.Any()).Return(nil, false)
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=storage&names=cpu&storageName=xx", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Storage&names=cpu&storageName=xx", "")
assert.Equal(t, http.StatusNotFound, resp.Code)
// case 6: storage no live node
stateMgr.EXPECT().GetStorage(gomock.Any()).Return(&models.StorageState{}, true)
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=storage&names=cpu&storageName=xx", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Storage&names=cpu&storageName=xx", "")
assert.Equal(t, http.StatusNotFound, resp.Code)

// case 7: storage success
Expand All @@ -95,7 +95,7 @@ func TestExploreAPI_Explore(t *testing.T) {
}
stateMgr.EXPECT().GetStorage(gomock.Any()).
Return(&models.StorageState{LiveNodes: map[models.NodeID]models.StatefulNode{1: {}, 2: {}}}, true)
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=storage&names=cpu&storageName=xx", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExplorePath+"?role=Storage&names=cpu&storageName=xx", "")
assert.Equal(t, http.StatusOK, resp.Code)
}

Expand All @@ -114,14 +114,14 @@ func TestExploreAPI_ExploreLiveNode(t *testing.T) {
// case 1: params invalid
resp := mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath, "")
assert.Equal(t, http.StatusInternalServerError, resp.Code)
resp = mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath+"?role=broker1", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath+"?role=Broker1", "")
assert.Equal(t, http.StatusNotFound, resp.Code)
// case 2: broker ok
stateMgr.EXPECT().GetLiveNodes().Return(nil)
resp = mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath+"?role=broker", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath+"?role=Broker", "")
assert.Equal(t, http.StatusOK, resp.Code)
// case 3: storage ok
stateMgr.EXPECT().GetStorageList().Return(nil)
resp = mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath+"?role=storage", "")
resp = mock.DoRequest(t, r, http.MethodGet, ExploreLiveNodePath+"?role=Storage", "")
assert.Equal(t, http.StatusOK, resp.Code)
}
116 changes: 116 additions & 0 deletions app/broker/api/state/replica.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package state

import (
"encoding/json"
"io"
"net/http"
"sync"

"github.com/gin-gonic/gin"

"github.com/lindb/lindb/app/broker/deps"
"github.com/lindb/lindb/models"
httppkg "github.com/lindb/lindb/pkg/http"
"github.com/lindb/lindb/pkg/logger"
)

var (
ReplicaPath = "/state/replica"
)

// ReplicaAPI represents internal wal replica state rest api.
type ReplicaAPI struct {
deps *deps.HTTPDeps
logger *logger.Logger
}

// NewReplicaAPI creates replica api instance.
func NewReplicaAPI(deps *deps.HTTPDeps) *ReplicaAPI {
return &ReplicaAPI{
deps: deps,
logger: logger.GetLogger("broker", "NewReplicaAPI"),
}
}

// Register adds replica state url route.
func (d *ReplicaAPI) Register(route gin.IRoutes) {
route.GET(ReplicaPath, d.GetReplicaState)
}

// GetReplicaState returns wal replica state.
func (d *ReplicaAPI) GetReplicaState(c *gin.Context) {
var param struct {
StorageName string `form:"storageName" binding:"required"`
DB string `form:"db" binding:"required"`
}
err := c.ShouldBind(&param)
if err != nil {
httppkg.Error(c, err)
return
}
storage, ok := d.deps.StateMgr.GetStorage(param.StorageName)
if !ok {
httppkg.NotFound(c)
return
}
liveNodes := storage.LiveNodes
var nodes []models.Node
for id := range liveNodes {
n := liveNodes[id]
nodes = append(nodes, &n)
}
d.fetchStateData(c, nodes)
}

// fetchStateData fetches the state metric from each live nodes.
func (d *ReplicaAPI) fetchStateData(c *gin.Context, nodes []models.Node) {
size := len(nodes)
if size == 0 {
httppkg.NotFound(c)
return
}
q := c.Request.URL.Query()
params := q.Encode()
result := make([][]models.FamilyLogReplicaState, size)
var wait sync.WaitGroup
wait.Add(size)
for idx := range nodes {
i := idx
go func() {
defer wait.Done()
node := nodes[i]
req, _ := http.NewRequest(http.MethodGet, node.HTTPAddress(), nil)
req.URL.Path = c.Request.URL.Path
req.URL.RawQuery = params
var state []models.FamilyLogReplicaState
if err := get(req, func(body io.Reader) error {
return json.NewDecoder(body).Decode(&state)
}); err == nil {
result[i] = state
}
}()
}
wait.Wait()
rs := make(map[string][]models.FamilyLogReplicaState)
for idx := range nodes {
rs[nodes[idx].Indicator()] = result[idx]
}
httppkg.OK(c, rs)
}
Loading

0 comments on commit 300dbf2

Please sign in to comment.