Skip to content

Commit

Permalink
Merge branch 'master' into siddontang/tso-bench
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Apr 5, 2017
2 parents a68cefe + a36602b commit bad44a0
Show file tree
Hide file tree
Showing 19 changed files with 4 additions and 4,746 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ bin
default.etcd
default.pd
*.swp
.DS_Store
534 changes: 0 additions & 534 deletions server/api/bindata_assetfs.go

This file was deleted.

107 changes: 0 additions & 107 deletions server/api/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@
package api

import (
"encoding/json"
"net/http"
"strconv"
"sync"
"time"

"github.com/gorilla/websocket"
"github.com/ngaut/log"
"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)
Expand Down Expand Up @@ -83,105 +78,3 @@ func (h *eventsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
evts := cluster.FetchEvents(0, true)
h.rd.JSON(w, http.StatusOK, evts)
}

type wsHandler struct {
sync.RWMutex

upgrader websocket.Upgrader
chs map[*http.Request]chan server.LogEvent
evtCh chan server.LogEvent

offset uint64
svr *server.Server
}

func newWSHandler(svr *server.Server) *wsHandler {
h := &wsHandler{
chs: make(map[*http.Request]chan server.LogEvent, 1000),
evtCh: make(chan server.LogEvent, 100),
svr: svr,
}

go h.fanout()
go h.fetchEventFeed()

return h
}

func (h *wsHandler) fanout() {
for evt := range h.evtCh {
h.RLock()
for _, ch := range h.chs {
select {
case ch <- evt:
default:
}
}
h.RUnlock()
}
}

func (h *wsHandler) fetchEventFeed() {
for {
time.Sleep(time.Second)

cluster := h.svr.GetRaftCluster()
if cluster == nil {
continue
}

evts := cluster.FetchEvents(h.offset, false)

for _, evt := range evts {
h.evtCh <- evt
h.offset = evt.ID
}
}
}

func (h *wsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c, err := h.upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()

// Make sure the client is alive
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

ch := make(chan server.LogEvent, 100)
h.Lock()
h.chs[r] = ch
h.Unlock()

defer func() {
h.Lock()
log.Info("client is closed, removing channel")
close(h.chs[r])
delete(h.chs, r)
h.Unlock()
}()

for {
select {
case <-ticker.C:
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Error(err)
return
}
case event := <-ch:
logMsg, err := json.Marshal(event)
if err != nil {
log.Error(err)
return
}

err = c.WriteMessage(websocket.TextMessage, logMsg)
if err != nil {
log.Error(err)
return
}
}
}
}
38 changes: 0 additions & 38 deletions server/api/home.go

This file was deleted.

14 changes: 3 additions & 11 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,16 @@
package api

import (
"net/http"

"github.com/gorilla/mux"
"github.com/pingcap/pd/server"
"github.com/unrolled/render"
)

func createRouter(prefix string, svr *server.Server) *mux.Router {
rd := render.New(render.Options{
Directory: "templates",
Extensions: []string{".html"},
Asset: Asset,
AssetNames: func() []string {
return []string{"templates/index.html"}
},
IndentJSON: true,
Delims: render.Delims{"[[", "]]"},
})

router := mux.NewRouter().PathPrefix(prefix).Subrouter()
Expand Down Expand Up @@ -79,9 +74,6 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
router.Handle("/api/v1/members/{name}", newMemberDeleteHandler(svr, rd)).Methods("DELETE")
router.Handle("/api/v1/leader", newLeaderHandler(svr, rd)).Methods("GET")

router.Handle("/", newHomeHandler(rd)).Methods("GET")
router.Handle("/ws", newWSHandler(svr))

router.HandleFunc("/ping", newHomeHandler(rd).Ping).Methods("GET")
router.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {}).Methods("GET")
return router
}
4 changes: 0 additions & 4 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ func NewHandler(svr *server.Server) http.Handler {
recovery := negroni.NewRecovery()
engine.Use(recovery)

static := negroni.NewStatic(assetFS())
static.Prefix = apiPrefix + "/web"
engine.Use(static)

router := mux.NewRouter()
router.PathPrefix(apiPrefix).Handler(negroni.New(
newRedirector(svr),
Expand Down
Loading

0 comments on commit bad44a0

Please sign in to comment.