From 78596cc2ab652c7076a9c5d8fda5552b5ed96fdb Mon Sep 17 00:00:00 2001 From: zhangyongding <59941594@qq.com> Date: Tue, 1 Aug 2023 10:57:17 +0800 Subject: [PATCH] add watch raft log --- auth/credential_store.go | 2 + db/db.go | 4 ++ http/service.go | 80 ++++++++++++++++++++++++++++++++++++++++ http/service_test.go | 4 ++ store/store.go | 73 ++++++++++++++++++++++++++++++++++++ 5 files changed, 163 insertions(+) diff --git a/auth/credential_store.go b/auth/credential_store.go index 0d1de7f89..8910e240d 100644 --- a/auth/credential_store.go +++ b/auth/credential_store.go @@ -36,6 +36,8 @@ const ( PermBackup = "backup" // PermLoad means user can load a SQLite dump into a node. PermLoad = "load" + // PermWatch means user can watch raft log + PermWatch = "watch" ) // BasicAuther is the interface an object must support to return basic auth information. diff --git a/db/db.go b/db/db.go index 2a9e59428..320aca01e 100644 --- a/db/db.go +++ b/db/db.go @@ -1401,6 +1401,10 @@ func copyDatabaseConnection(dst, src *sqlite3.SQLiteConn) error { return bk.Finish() } +func ParametersToValues(parameters []*command.Parameter) ([]interface{}, error) { + return parametersToValues(parameters) +} + // parametersToValues maps values in the proto params to SQL driver values. func parametersToValues(parameters []*command.Parameter) ([]interface{}, error) { if parameters == nil { diff --git a/http/service.go b/http/service.go index e7b22b982..0672ef5ea 100644 --- a/http/service.go +++ b/http/service.go @@ -6,6 +6,7 @@ import ( "bufio" "context" "crypto/tls" + "database/sql" "encoding/json" "errors" "expvar" @@ -96,6 +97,9 @@ type Store interface { // Backup wites backup of the node state to dst Backup(br *command.BackupRequest, dst io.Writer) error + + // Watch query raft log + Watch(index uint64) (lastIndex uint64, statemets []*command.Statement, err error) } // Cluster is the interface node API services must provide @@ -1717,6 +1721,76 @@ func (s *Service) handlePprof(w http.ResponseWriter, r *http.Request) { } } +// handleWatch serves watch raft log +func (s *Service) handleWatch(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + if !s.CheckRequestPerm(r, auth.PermWatch) { + w.WriteHeader(http.StatusUnauthorized) + return + } + + if r.Method != "GET" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + index, err := indexParam(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + index, statements, err := s.store.Watch(index) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var resp = struct { + Index uint64 `json:"index"` + Statements [][]interface{} `json:"statements,omitempty"` + }{ + Index: index, + } + + for _, stmt := range statements { + args, err := db.ParametersToValues(stmt.Parameters) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var statement []interface{} + statement = append(statement, stmt.Sql) + for _, arg := range args { + statement = append(statement, arg.(sql.NamedArg).Value) + } + + resp.Statements = append(resp.Statements, statement) + } + + pretty, _ := isPretty(r) + var b []byte + if pretty { + b, err = json.MarshalIndent(resp, "", " ") + } else { + b, err = json.Marshal(resp) + } + if err != nil { + http.Error(w, fmt.Sprintf("JSON marshal: %s", err.Error()), + http.StatusInternalServerError) + return + } + + _, err = w.Write(b) + if err != nil { + http.Error(w, fmt.Sprintf("write: %s", err.Error()), + http.StatusInternalServerError) + return + } +} + // Addr returns the address on which the Service is listening func (s *Service) Addr() net.Addr { return s.ln.Addr() @@ -2026,6 +2100,12 @@ func keyParam(req *http.Request) string { return strings.TrimSpace(q.Get("key")) } +func indexParam(req *http.Request) (uint64, error) { + q := req.URL.Query() + i := strings.TrimSpace(q.Get("index")) + return strconv.ParseUint(i, 10, 64) +} + func getSubJSON(jsonBlob []byte, keyString string) (json.RawMessage, error) { if keyString == "" { return jsonBlob, nil diff --git a/http/service_test.go b/http/service_test.go index 54e925024..0f278e0f7 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -1336,6 +1336,10 @@ func (m *MockStore) LoadChunk(lc *command.LoadChunkRequest) error { return nil } +func (m *MockStore) Watch(index uint64) (lastIndex uint64, statemets []*command.Statement, err error) { + return +} + type mockClusterService struct { apiAddr string executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error) diff --git a/store/store.go b/store/store.go index 2cfcc967a..a48120326 100644 --- a/store/store.go +++ b/store/store.go @@ -1680,6 +1680,79 @@ func (s *Store) Restore(rc io.ReadCloser) error { return nil } +func (s *Store) Watch(index uint64) (lastIndex uint64, statemets []*command.Statement, err error) { + lastIndex, err = s.raftLog.LastIndex() + if err != nil { + lastIndex = index + return + } + + if index == 0 || index > lastIndex { + return + } + + after := time.After(time.Minute) + ticker := time.NewTicker(time.Millisecond * 10) + defer ticker.Stop() + for { + select { + case <-after: + return + case <-ticker.C: + lastIndex, err = s.raftLog.LastIndex() + if err != nil { + lastIndex = index + return + } + + for i := index + 1; i <= lastIndex; i++ { + var log raft.Log + if err = s.raftLog.GetLog(i, &log); err != nil { + err = nil + return + } + if log.Type != raft.LogCommand { + continue + } + + var c command.Command + if err = command.Unmarshal(log.Data, &c); err != nil { + err = fmt.Errorf("failed to unmarshal command: %s", err.Error()) + return + } + + switch c.Type { + case command.Command_COMMAND_TYPE_EXECUTE: + var er command.ExecuteRequest + if err = command.UnmarshalSubCommand(&c, &er); err != nil { + err = fmt.Errorf("failed to unmarshal execute subcommand: %s", err.Error()) + return + } + statemets = append(statemets, er.Request.Statements...) + case command.Command_COMMAND_TYPE_EXECUTE_QUERY: + var eqr command.ExecuteQueryRequest + if err = command.UnmarshalSubCommand(&c, &eqr); err != nil { + err = fmt.Errorf("failed to unmarshal execute and query subcommand: %s", err.Error()) + return + } + statemets = append(statemets, eqr.Request.Statements...) + default: + continue + } + + if len(statemets) >= 100 { + lastIndex = i + break + } + } + + if len(statemets) > 0 { + return + } + } + } +} + // RegisterObserver registers an observer of Raft events func (s *Store) RegisterObserver(o *raft.Observer) { s.raft.RegisterObserver(o)