Skip to content

Commit

Permalink
add: close callback
Browse files Browse the repository at this point in the history
  • Loading branch information
mackee committed Nov 17, 2015
1 parent 73043d4 commit 54129a1
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 15 deletions.
11 changes: 11 additions & 0 deletions _example/app.psgi
Expand Up @@ -33,6 +33,17 @@ $router->add("/connect", sub {
return ["200", ["X-Kuiperbelt-Session" => $session], ["joined success"]];
});

$router->add("/close", sub {
my $env = shift;
my $req = Plack::Request->new($env);

my @session = $req->header("X-Kuiperbelt-Session");

$redis->srem("sessions", @session);

return ["200", [], ["success closed"]];
});

$router->add("/recent", sub {
my $env = shift;
my $req = Plack::Request->new($env);
Expand Down
2 changes: 1 addition & 1 deletion _example/config.yml
@@ -1,4 +1,4 @@
port: 12345
callback:
connect: "http://localhost:12346/connect"
receive: "http://localhost:12346/receive"
close: "http://localhost:12346/close"
1 change: 0 additions & 1 deletion cmd/ekbo/config.yml
@@ -1,4 +1,3 @@
port: 12345
callback:
connect: "http://localhost:12346/connect"
receive: "http://localhost:12346/receive"
2 changes: 1 addition & 1 deletion config.go
Expand Up @@ -19,7 +19,7 @@ type Config struct {

type Callback struct {
Connect string `yaml:"connect"`
Receive string `yaml:"receive"`
Close string `yaml:"close"`
}

func NewConfig(filename string) (*Config, error) {
Expand Down
3 changes: 1 addition & 2 deletions config_test.go
Expand Up @@ -10,14 +10,13 @@ session_header: "X-Kuiperbelt-Session-Key"
port: 12345
callback:
connect: "http://localhost:12346/connect"
receive: "http://localhost:12346/receive"
`)
var TestConfig = Config{
Port: ":12345",
SessionHeader: "X-Kuiperbelt-Session-Key",
Callback: Callback{
Connect: "http://localhost:12346/connect",
Receive: "http://localhost:12346/receive",
Close: "",
},
}

Expand Down
61 changes: 53 additions & 8 deletions server.go
Expand Up @@ -15,7 +15,8 @@ import (
)

const (
ENDPOINT_HEADER_NAME = "X-Kuiperbelt-Endpoint"
ENDPOINT_HEADER_NAME = "X-Kuiperbelt-Endpoint"
CALLBACK_CLIENT_MAX_CONNS_PER_HOST = 32
)

var (
Expand Down Expand Up @@ -60,6 +61,10 @@ func (s *WebSocketServer) Handler(w http.ResponseWriter, r *http.Request) {
}

func (s *WebSocketServer) Register() {
callbackClient.Transport = &http.Transport{
MaxIdleConnsPerHost: CALLBACK_CLIENT_MAX_CONNS_PER_HOST,
}

http.HandleFunc("/connect", s.Handler)
}

Expand Down Expand Up @@ -121,7 +126,7 @@ func (s *WebSocketServer) NewWebSocketHandler(resp *http.Response) func(ws *webs

defer DelSession(session.Key())
log.WithFields(log.Fields{
"session_key": session.Key(),
"session": session.Key(),
}).Info("connected session")
go session.WatchClose()
session.WaitClose()
Expand Down Expand Up @@ -163,19 +168,59 @@ func (s *WebSocketSession) WaitClose() {
func (s *WebSocketSession) WatchClose() {
defer s.Close()
_, err := io.Copy(new(blackholeWriter), s)
if err == io.EOF {
if err == nil {
go s.SendCloseCallback()
return
}
// ignore closed session error
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}

log.WithFields(log.Fields{
"error": err.Error(),
}).Error("watch close frame error")
}

func (s *WebSocketSession) SendCloseCallback() {
// cancel sending when not set callback url
if s.Config.Callback.Close == "" {
return
}

callbackRequest, err := http.NewRequest("POST", s.Config.Callback.Close, nil)
if err != nil {
if strings.HasSuffix(err.Error(), "use of closed network connection") {
return
}
log.WithFields(log.Fields{
"session": s.Key(),
"error": err.Error(),
}).Error("cannot create close callback request.")
return
}

callbackRequest.Header.Add(s.Config.SessionHeader, s.Key())
resp, err := callbackClient.Do(callbackRequest)
if err != nil {
log.WithFields(log.Fields{
"error": err.Error(),
}).Error("watch close frame error")
"session": s.Key(),
"error": err.Error(),
}).Error("failed send close callback request.")
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
b := new(bytes.Buffer)
b.ReadFrom(resp.Body)
log.WithFields(log.Fields{
"session": s.Key(),
"status": resp.Status,
"error": b.String(),
}).Error("invalid close callback status.")
return
}

log.WithFields(log.Fields{
"session": s.Key(),
}).Info("success close callback.")
}

type blackholeWriter struct {
Expand Down
18 changes: 16 additions & 2 deletions server_test.go
Expand Up @@ -20,6 +20,7 @@ const (

type testSuccessConnectCallbackServer struct {
IsCallbacked bool
IsClosed bool
Header http.Header
}

Expand All @@ -38,6 +39,13 @@ func (s *testSuccessConnectCallbackServer) FailHandler(w http.ResponseWriter, r
io.WriteString(w, "fail authorization!")
}

func (s *testSuccessConnectCallbackServer) CloseHandler(w http.ResponseWriter, r *http.Request) {
s.IsClosed = true
s.Header = r.Header
w.WriteHeader(http.StatusOK)
io.WriteString(w, "")
}

func newTestWebSocketRequest(url string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
Expand Down Expand Up @@ -133,10 +141,12 @@ func TestWebSocketServer__Handler__FailAuthorized(t *testing.T) {

func TestWebSocketServer__Handler__CloseByClient(t *testing.T) {
callbackServer := new(testSuccessConnectCallbackServer)
tcc := httptest.NewServer(http.HandlerFunc(callbackServer.SuccessHandler))
tcc1 := httptest.NewServer(http.HandlerFunc(callbackServer.SuccessHandler))
tcc2 := httptest.NewServer(http.HandlerFunc(callbackServer.CloseHandler))

c := TestConfig
c.Callback.Connect = tcc.URL
c.Callback.Connect = tcc1.URL
c.Callback.Close = tcc2.URL

server := WebSocketServer{c}

Expand Down Expand Up @@ -183,4 +193,8 @@ func TestWebSocketServer__Handler__CloseByClient(t *testing.T) {
t.Error("not removed session:", err)
}

if !callbackServer.IsClosed {
t.Error("not receive close callback")
}

}

0 comments on commit 54129a1

Please sign in to comment.