Skip to content

Commit

Permalink
closes #9, but need more testing
Browse files Browse the repository at this point in the history
  • Loading branch information
sohlich committed Apr 28, 2016
1 parent 9bb9094 commit c79965a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 40 deletions.
10 changes: 8 additions & 2 deletions client.go
Expand Up @@ -45,6 +45,8 @@ type Connector interface {
type NatsClient struct {
conn *nats.Conn
filters NatsHandlers
reqPool RequestPool
resPool ResponsePool
}

// NewNatsClient creates new NATS client
Expand All @@ -58,6 +60,8 @@ func NewNatsClient(conn *nats.Conn) (*NatsClient, error) {
return &NatsClient{
conn,
make([]NatsHandler, 0),
NewRequestPool(),
NewResponsePool(),
}, nil
}

Expand Down Expand Up @@ -97,12 +101,14 @@ func (nc *NatsClient) DELETE(url string, handler NatsHandler) {
func (nc *NatsClient) Subscribe(method, url string, handler NatsHandler) {
subscribeURL := SubscribeURLToNats(method, url)
nc.conn.Subscribe(subscribeURL, func(m *nats.Msg) {
request := &Request{}
request := nc.reqPool.GetRequest()
defer nc.reqPool.Put(request)
if err := request.UnmarshallFrom(m.Data); err != nil {
log.Println(err)
return
}
response := NewResponse()
response := nc.resPool.GetResponse()
defer nc.resPool.Put(response)
c := newContext(url, response, request)

// Iterate through filters
Expand Down
25 changes: 9 additions & 16 deletions proxy.go
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"net/http"
"regexp"
"sync"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -47,8 +46,8 @@ type NatsProxy struct {
conn *nats.Conn
hooks map[string]hookGroup
wsMapper *webSocketMapper
requestPool sync.Pool
responsePool sync.Pool
requestPool RequestPool
responsePool ResponsePool
}

type hookGroup struct {
Expand All @@ -69,24 +68,17 @@ func NewNatsProxy(conn *nats.Conn) (*NatsProxy, error) {
make(map[*websocket.Conn]string, 0),
make(map[string]*websocket.Conn, 0),
},
sync.Pool{
New: func() interface{} { return NewRequest() },
},
sync.Pool{
New: func() interface{} { return NewResponse() },
},
NewRequestPool(),
NewResponsePool(),
}, nil
}

func (np *NatsProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

// Transform the HTTP request to
// NATS proxy request.
request, _ := np.requestPool.Get().(*Request)
defer func() {
request.reset()
np.requestPool.Put(request)
}()
request := np.requestPool.GetRequest()
defer np.requestPool.Put(request)

err := request.FromHTTP(req)
if err != nil {
Expand All @@ -110,8 +102,9 @@ func (np *NatsProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
http.Error(rw, "No response", http.StatusInternalServerError)
return
}
var response *Response
response, err = DecodeResponse(msg.Data)
response := np.responsePool.GetResponse()
err = response.ReadFrom(msg.Data)
defer np.responsePool.Put(response)
if err != nil {
http.Error(rw, "Cannot deserialize response", http.StatusInternalServerError)
return
Expand Down
32 changes: 17 additions & 15 deletions proxy_test.go
Expand Up @@ -165,42 +165,44 @@ func TestProxyServeHttpError(t *testing.T) {

// Test if the pool and reset of request
// works correctly
func BenchmarkProxyPool(b *testing.B) {
func BenchProxyPool(b *testing.B) {

b.StopTimer()
proxyConn, _ := nats.Connect(nats_url)
proxyHandler, _ := NewNatsProxy(proxyConn)

clientConn, _ := nats.Connect(nats_url)
natsClient, _ := NewNatsClient(clientConn)

testVal := struct {
Data string
}{}
assertChan := make(chan string, 1)

natsClient.Subscribe("POST", "/test/:event/:session", func(c *Context) {
fmt.Println("Getting request")
c.ParseForm()
reqEvent := c.FormVariable("post")
if reqEvent != testVal.Data {
fmt.Printf("Not getting "+testVal.Data+" get %s instead\n", reqEvent)
b.FailNow()
}
fmt.Println("Posting value")
assertChan <- reqEvent
})

b.StopTimer()
b.ResetTimer()
b.StartTimer()
for i := 0; i < b.N; i++ {
testVal.Data = fmt.Sprintf("%v", time.Now().Unix())
testVal.Data = testVal.Data[0:rand.Intn(len(testVal.Data))]
fmt.Printf("Expected %s\n", testVal.Data)
reader := strings.NewReader("post=" + testVal.Data)
testVal := fmt.Sprintf("%v", time.Now().Unix())
testVal = testVal[0:rand.Intn(len(testVal))]
fmt.Printf("Expected %s\n", testVal)
reader := strings.NewReader("post=" + testVal)
req, _ := http.NewRequest("POST", "http://127.0.0.1:3000/test/12324/2222", reader)
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
rw := httptest.NewRecorder()
b.StartTimer()
proxyHandler.ServeHTTP(rw, req)
assert := <-assertChan
fmt.Printf("Assert value %s", assert)
if assert != testVal {
fmt.Printf("Not getting "+assert+" get %s instead\n", testVal)
b.FailNow()
}
b.StopTimer()
}
b.StopTimer()

}

Expand Down
22 changes: 22 additions & 0 deletions request.go
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"net/http"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/satori/go.uuid"
Expand Down Expand Up @@ -90,3 +91,24 @@ func (req *Request) reset() {
req.RemoteAddr = req.RemoteAddr[0:0]
req.URL = req.URL[0:0]
}

type RequestPool struct {
sync.Pool
}

func (r *RequestPool) GetRequest() *Request {
request, _ := r.Get().(*Request)
return request
}

func (r *RequestPool) PutRequest(req *Request) {
req.reset()
r.Put(req)
}

func NewRequestPool() RequestPool {
return RequestPool{
sync.Pool{
New: func() interface{} { return NewRequest() },
}}
}
31 changes: 26 additions & 5 deletions response.go
Expand Up @@ -2,6 +2,7 @@ package natsproxy

import (
"errors"
"sync"

"github.com/gogo/protobuf/proto"
)
Expand All @@ -27,15 +28,14 @@ func NewResponse() *Response {
// DecodeResponse decodes the
// marshalled Response struct
// back to struct.
func DecodeResponse(responseData []byte) (*Response, error) {
func (r *Response) ReadFrom(responseData []byte) error {
if responseData == nil || len(responseData) == 0 {
return nil, errors.New("natsproxy: No response content found")
return errors.New("natsproxy: No response content found")
}
r := &Response{}
if err := proto.Unmarshal(responseData, r); err != nil {
return nil, err
return err
}
return r, nil
return nil
}

func (res *Response) reset() {
Expand All @@ -44,3 +44,24 @@ func (res *Response) reset() {
res.DoUpgrade = false
res.StatusCode = int32(0)
}

type ResponsePool struct {
sync.Pool
}

func (r *ResponsePool) GetResponse() *Response {
res, _ := r.Get().(*Response)
return res
}

func (r *ResponsePool) PutResponse(res *Response) {
res.reset()
r.Put(res)
}

func NewResponsePool() ResponsePool {
return ResponsePool{
sync.Pool{
New: func() interface{} { return NewResponse() },
}}
}
5 changes: 3 additions & 2 deletions response_test.go
Expand Up @@ -4,13 +4,14 @@ import "testing"

func TestDecodeResponseError(t *testing.T) {
response := make([]byte, 5)
_, err := DecodeResponse(response)
r := NewResponse()
err := r.ReadFrom(response)
if err == nil {
t.Error("Bad content assertion failed")
}

var nilData []byte
_, err = DecodeResponse(nilData)
err = r.ReadFrom(nilData)
if err == nil {
t.Error("Nil content assertion failed")
}
Expand Down

0 comments on commit c79965a

Please sign in to comment.