diff --git a/_example/app.psgi b/_example/app.psgi index 13de0e3..a9fa635 100644 --- a/_example/app.psgi +++ b/_example/app.psgi @@ -33,6 +33,17 @@ $router->add("/connect", sub { return ["200", ["X-Kuiperbelt-Session" => $session], ["joined success"]]; }); +$router->add("/close", sub { + my $env = shift; + my $req = Plack::Request->new($env); + + my @session = $req->header("X-Kuiperbelt-Session"); + + $redis->srem("sessions", @session); + + return ["200", [], ["success closed"]]; +}); + $router->add("/recent", sub { my $env = shift; my $req = Plack::Request->new($env); diff --git a/_example/config.yml b/_example/config.yml index 8943170..8347f0b 100644 --- a/_example/config.yml +++ b/_example/config.yml @@ -1,4 +1,4 @@ port: 12345 callback: connect: "http://localhost:12346/connect" - receive: "http://localhost:12346/receive" + close: "http://localhost:12346/close" diff --git a/cmd/ekbo/config.yml b/cmd/ekbo/config.yml index 8943170..bb962b8 100644 --- a/cmd/ekbo/config.yml +++ b/cmd/ekbo/config.yml @@ -1,4 +1,3 @@ port: 12345 callback: connect: "http://localhost:12346/connect" - receive: "http://localhost:12346/receive" diff --git a/config.go b/config.go index 216a631..f700a0e 100644 --- a/config.go +++ b/config.go @@ -19,7 +19,7 @@ type Config struct { type Callback struct { Connect string `yaml:"connect"` - Receive string `yaml:"receive"` + Close string `yaml:"close"` } func NewConfig(filename string) (*Config, error) { diff --git a/config_test.go b/config_test.go index 16cb72a..e63c0f3 100644 --- a/config_test.go +++ b/config_test.go @@ -10,14 +10,13 @@ session_header: "X-Kuiperbelt-Session-Key" port: 12345 callback: connect: "http://localhost:12346/connect" - receive: "http://localhost:12346/receive" `) var TestConfig = Config{ Port: ":12345", SessionHeader: "X-Kuiperbelt-Session-Key", Callback: Callback{ Connect: "http://localhost:12346/connect", - Receive: "http://localhost:12346/receive", + Close: "", }, } diff --git a/server.go b/server.go index 7efd713..b8e87db 100644 --- a/server.go +++ b/server.go @@ -15,7 +15,8 @@ import ( ) const ( - ENDPOINT_HEADER_NAME = "X-Kuiperbelt-Endpoint" + ENDPOINT_HEADER_NAME = "X-Kuiperbelt-Endpoint" + CALLBACK_CLIENT_MAX_CONNS_PER_HOST = 32 ) var ( @@ -60,6 +61,10 @@ func (s *WebSocketServer) Handler(w http.ResponseWriter, r *http.Request) { } func (s *WebSocketServer) Register() { + callbackClient.Transport = &http.Transport{ + MaxIdleConnsPerHost: CALLBACK_CLIENT_MAX_CONNS_PER_HOST, + } + http.HandleFunc("/connect", s.Handler) } @@ -121,7 +126,7 @@ func (s *WebSocketServer) NewWebSocketHandler(resp *http.Response) func(ws *webs defer DelSession(session.Key()) log.WithFields(log.Fields{ - "session_key": session.Key(), + "session": session.Key(), }).Info("connected session") go session.WatchClose() session.WaitClose() @@ -163,19 +168,59 @@ func (s *WebSocketSession) WaitClose() { func (s *WebSocketSession) WatchClose() { defer s.Close() _, err := io.Copy(new(blackholeWriter), s) - if err == io.EOF { + if err == nil { + go s.SendCloseCallback() return } // ignore closed session error + if strings.HasSuffix(err.Error(), "use of closed network connection") { + return + } + + log.WithFields(log.Fields{ + "error": err.Error(), + }).Error("watch close frame error") +} + +func (s *WebSocketSession) SendCloseCallback() { + // cancel sending when not set callback url + if s.Config.Callback.Close == "" { + return + } + + callbackRequest, err := http.NewRequest("POST", s.Config.Callback.Close, nil) if err != nil { - if strings.HasSuffix(err.Error(), "use of closed network connection") { - return - } + log.WithFields(log.Fields{ + "session": s.Key(), + "error": err.Error(), + }).Error("cannot create close callback request.") + return + } + callbackRequest.Header.Add(s.Config.SessionHeader, s.Key()) + resp, err := callbackClient.Do(callbackRequest) + if err != nil { log.WithFields(log.Fields{ - "error": err.Error(), - }).Error("watch close frame error") + "session": s.Key(), + "error": err.Error(), + }).Error("failed send close callback request.") + return } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + b := new(bytes.Buffer) + b.ReadFrom(resp.Body) + log.WithFields(log.Fields{ + "session": s.Key(), + "status": resp.Status, + "error": b.String(), + }).Error("invalid close callback status.") + return + } + + log.WithFields(log.Fields{ + "session": s.Key(), + }).Info("success close callback.") } type blackholeWriter struct { diff --git a/server_test.go b/server_test.go index 273115d..7a9e6ea 100644 --- a/server_test.go +++ b/server_test.go @@ -20,6 +20,7 @@ const ( type testSuccessConnectCallbackServer struct { IsCallbacked bool + IsClosed bool Header http.Header } @@ -38,6 +39,13 @@ func (s *testSuccessConnectCallbackServer) FailHandler(w http.ResponseWriter, r io.WriteString(w, "fail authorization!") } +func (s *testSuccessConnectCallbackServer) CloseHandler(w http.ResponseWriter, r *http.Request) { + s.IsClosed = true + s.Header = r.Header + w.WriteHeader(http.StatusOK) + io.WriteString(w, "") +} + func newTestWebSocketRequest(url string) (*http.Request, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { @@ -133,10 +141,12 @@ func TestWebSocketServer__Handler__FailAuthorized(t *testing.T) { func TestWebSocketServer__Handler__CloseByClient(t *testing.T) { callbackServer := new(testSuccessConnectCallbackServer) - tcc := httptest.NewServer(http.HandlerFunc(callbackServer.SuccessHandler)) + tcc1 := httptest.NewServer(http.HandlerFunc(callbackServer.SuccessHandler)) + tcc2 := httptest.NewServer(http.HandlerFunc(callbackServer.CloseHandler)) c := TestConfig - c.Callback.Connect = tcc.URL + c.Callback.Connect = tcc1.URL + c.Callback.Close = tcc2.URL server := WebSocketServer{c} @@ -183,4 +193,8 @@ func TestWebSocketServer__Handler__CloseByClient(t *testing.T) { t.Error("not removed session:", err) } + if !callbackServer.IsClosed { + t.Error("not receive close callback") + } + }