diff --git a/examples/flume/client.go b/examples/flume/client.go index b687292..80684c4 100644 --- a/examples/flume/client.go +++ b/examples/flume/client.go @@ -8,7 +8,7 @@ import ( func main() { //t.SkipNow() - transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:true, NettyHost:"10.98.80.113"}) + transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:false, NettyHost:"192.168.11.152"}) if err != nil { log.Fatal(err) } @@ -24,20 +24,27 @@ func main() { headers := make(map[string]interface{}) headers["host_header"] = "127.0.0.1" flumeRecord.Set("headers", headers) - flumeRecord.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")) - requestor := goavro.NewRequestor(protocol, transceiver) + requestor := goavro.NewRequestor(protocol, transceiver) + flumeRecord.Set("body", []byte("test 1")) err = requestor.Request("append", flumeRecord) if err != nil { - log.Fatal("Request: ", err) + log.Fatal("Request 1: ", err) } + + log.Printf("Test 1 OK") + + time.Sleep(5 * time.Second) + flumeRecord.Set("body", []byte("test 2")) err = requestor.Request("append", flumeRecord) if err != nil { - log.Fatal("Request: ", err) + log.Fatal("Request 2: ", err) } + log.Printf("Test 2 OK") + } diff --git a/requestor.go b/requestor.go index 9359882..8eaefa9 100644 --- a/requestor.go +++ b/requestor.go @@ -45,7 +45,8 @@ func init() { } func NewRequestor(localProto Protocol, transceiver transceiver.Transceiver) *Requestor { - return &Requestor{ + + r := &Requestor{ local_protocol: localProto, transceiver: transceiver, // remote_protocol: nil, @@ -53,6 +54,8 @@ func NewRequestor(localProto Protocol, transceiver transceiver.Transceiver) *Req send_protocol: false, send_handshake: true, } + transceiver.InitHandshake(r.write_handshake_request, r.read_handshake_response) + return r } @@ -72,12 +75,7 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er frame1 := new(bytes.Buffer) frame2 := new(bytes.Buffer) - err := a.write_handshake_request(frame1) - if err!=nil { - return err - } - - err = a.write_call_requestHeader(message_name, frame1) + err := a.write_call_requestHeader(message_name, frame1) if err!=nil { return err } @@ -91,33 +89,24 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er responses, err := a.transceiver.Transceive(buffer_writers) if err!=nil { - return err + return fmt.Errorf("Fail to transceive %v", err) } //buffer_decoder := bytes.NewBuffer(decoder) // process the handshake and call response if len(responses) >0 { - ok, err := a.read_handshake_response(responses[0]) + a.read_call_responseCode(responses[1]) if err != nil { return err } - a.send_handshake = !ok - - if ok { - a.read_call_responseCode(responses[1]) - if err != nil { - return err - } - // a.Request(message_name, request_datum) - } + // a.Request(message_name, request_datum) } return nil } -func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) { - if !a.send_handshake { - return nil - } +func (a *Requestor) write_handshake_request() (handshake []byte ,err error) { + buffer := new(bytes.Buffer) + defer buffer.Write(handshake) local_hash :=a.local_protocol.MD5 remote_name := a.remote_protocol.Name remote_hash := REMOTE_HASHES[remote_name] @@ -128,7 +117,8 @@ func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) { record, err := NewRecord(RecordSchema(handshakeRequestshema)) if err != nil { - return fmt.Errorf("Avro fail to init record handshakeRequest %v",err) + err = fmt.Errorf("Avro fail to init record handshakeRequest %v",err) + return } record.Set("clientHash", local_hash) @@ -136,22 +126,27 @@ func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) { record.Set("meta", make(map[string]interface{})) codecHandshake, err := NewCodec(handshakeRequestshema) if err != nil { - return fmt.Errorf("Avro fail to get codec handshakeRequest %v",err) + err = fmt.Errorf("Avro fail to get codec handshakeRequest %v",err) + return } if a.send_protocol { json, err := a.local_protocol.Json() if err!=nil { - return err + return nil ,err } record.Set("clientProtocol", json) } + + if err = codecHandshake.Encode(buffer, record); err !=nil { - return fmt.Errorf("Encode handshakeRequest ",err) + err = fmt.Errorf("Encode handshakeRequest ",err) + return } - return nil + + return } func (a *Requestor) write_call_request(message_name string, request_datum interface{}, frame io.Writer) (err error) { diff --git a/requestor_test.go b/requestor_test.go index 98beac3..149c33b 100644 --- a/requestor_test.go +++ b/requestor_test.go @@ -6,18 +6,15 @@ import ( "bytes" "reflect" netty "github.com/sebglon/goavro/transceiver/netty" + "github.com/sebglon/goavro/transceiver" + "runtime" + "strconv" ) func TestWrite_handshake_request(t *testing.T) { //t.SkipNow() - rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001") - conn, err := net.DialTCP("tcp", nil, rAddr) - if err != nil { - t.Fatal(err) - } - defer conn.Close() - transceiver, err := netty.NewTransceiver(netty.Config{}) + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) if err != nil { t.Fatal(err) } @@ -27,11 +24,11 @@ func TestWrite_handshake_request(t *testing.T) { } requestor := NewRequestor(protocol, transceiver) - bb := new(bytes.Buffer) - requestor.write_handshake_request(bb) + + hds, err := requestor.write_handshake_request() // conn.Write(bb.Bytes()) - t.Logf("Handshake_request size %v %x\n",bb.Len(), bb.Bytes()) - t.Logf( "Handshake_request %v\n", bb.String()) + t.Logf("Handshake_request size %v %x\n",len(hds), hds) + t.Logf( "Handshake_request %v\n", hds) refHandshake := []byte("\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65\x00\x86\xaa\xda\xe2\xc4\x54\x74\xc0\xfe\x93\xff\xd0\xf2\x35\x0a\x65\x02\x00") //bytes := bb.Bytes() @@ -61,7 +58,7 @@ func TestRead_handshake_reponse(t *testing.T) { if err != nil { t.Fatal(err) } - record.Set("match", Enum{"match","BOTH"}) + record.Set("match", Enum{"match", "BOTH"}) record.Set("serverProtocol", nil) record.Set("serverHash", nil) record.Set("meta", nil) @@ -73,14 +70,12 @@ func TestRead_handshake_reponse(t *testing.T) { } t.Logf("Encode HandshakeResponse %v", bb.Bytes()) - _, err = codecHandshake.Decode(bytes.NewReader(bb.Bytes())) if err != nil { t.Fatal(err) } - - transceiver, err := netty.NewTransceiver(netty.Config{}) + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) if err != nil { t.Fatal(err) } @@ -95,120 +90,150 @@ func TestRead_handshake_reponse(t *testing.T) { if err != nil { t.Fatal(err) } - -} - -type Conn struct { - bytes.Buffer -} -func (c *Conn) Close() error { - return nil } -func TestWrite_call_request(t *testing.T) { - //t.SkipNow() - +func TestWrite_call_request(t * testing.T) { + //t.SkipNow() + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + bb := new(bytes.Buffer) + datum, err := protocol.NewRecord("AvroFlumeEvent") + if err != nil { + t.Fatal(err) + } + + headers := make(map[string]interface{}) + headers["host_header"] = "127.0.0.1" + datum.Set("headers", headers) + datum.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")) + + requestor.write_call_request("append", datum, bb) + // conn.Write(bb.Bytes()) + t.Logf("\nCall_request size %v %v\n", bb.Len(), bb.Bytes()) + t.Logf("\nCall_request %v\n", bb.String()) + + codec, err := protocol.MessageRequestCodec("append") + if err != nil { + t.Fatal(err) + } + value, err := codec.Decode(bb) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(datum, value) { + t.Fatalf("Request not equals to ref %x, %x", datum, value) + } + } + +func TestWrite_call_requestHeader(t * testing.T) { + //t.SkipNow() + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + bb := new(bytes.Buffer) + + requestor.write_call_requestHeader("append", bb) + + refHeader := []byte("\x00\x0c\x61\x70\x70\x65\x6e\x64") + bytes := bb.Bytes() + if !reflect.DeepEqual(refHeader, bytes) { + t.Fatalf("Request_Header not equals to ref %n%x, %n%x", len(refHeader), refHeader, len(bytes), bytes) + } + } + +func TestRead_call_responseMessage(t * testing.T) { + + transceiver, err := netty.NewTransceiver(transceiver.Config{Host:HOST, Port:PORT}) + + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + codec, err := protocol.MessageResponseCodec("append") + if err != nil { + t.Fatal(err) + } + bb := new(bytes.Buffer) + codec.Encode(bb, Enum{"Status", "OK"}) + t.Logf("Bytes for OK %x", bb.Bytes()) + + err = requestor.read_call_responseMessage("append", bb) + if err != nil { + t.Fatal(err) + } + + codec.Encode(bb, Enum{"Status", "FAILED"}) + t.Logf("Bytes for FAILED %x", bb.Bytes()) + err = requestor.read_call_responseMessage("append", bb) + if err == nil || err.Error() != "Reponse failure. status == FAILED" { + t.Fatalf("Status FAILED can return error") + } + + } + + +const ( + RECV_BUF_LEN = 1024 + NETWORK = "tcp" + HOST = "127.0.0.1" + PORT=6666 + ADDR="127.0.0.1:6666" +) - transceiver, err := netty.NewTransceiver(netty.Config{}) - buf := &Conn{} - transceiver.Conn = buf - protocol, err := NewProtocol() - if err != nil { - t.Fatal(err) +func init() { + numProcs := runtime.NumCPU() + if numProcs < 2 { + numProcs = 2 } - requestor := NewRequestor(protocol, transceiver) + runtime.GOMAXPROCS(numProcs) - bb := new(bytes.Buffer) - datum, err := protocol.NewRecord("AvroFlumeEvent") + listener, err := net.Listen(NETWORK, "0.0.0.0:"+strconv.Itoa(PORT)) if err != nil { - t.Fatal(err) - } - - headers := make(map[string]interface{}) - headers["host_header"] = "127.0.0.1" - datum.Set("headers", headers) - datum.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")) - - - - requestor.write_call_request("append", datum,bb) - // conn.Write(bb.Bytes()) - t.Logf("\nCall_request size %v %v\n",bb.Len(), bb.Bytes()) - t.Logf("\nCall_request %v\n", bb.String()) - - codec, err := protocol.MessageRequestCodec("append") - if err != nil { - t.Fatal(err) - } - value, err := codec.Decode(bb) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(datum, value) { - t.Fatalf("Request not equals to ref %x, %x", datum, value) + println("error listening:", err.Error()) } + go func() { + for { + conn, err := listener.Accept() + if err != nil { + println("Error accept:", err.Error()) + return + } + go EchoFunc(conn) + } + }() } -func TestWrite_call_requestHeader(t *testing.T) { - //t.SkipNow() - transceiver, err := netty.NewTransceiver(netty.Config{}) - buf := &Conn{} - transceiver.Conn = buf - - protocol, err := NewProtocol() - if err != nil { - t.Fatal(err) - } - requestor := NewRequestor(protocol, transceiver) - - bb := new(bytes.Buffer) - - requestor.write_call_requestHeader("append", bb) - - refHeader := []byte("\x00\x0c\x61\x70\x70\x65\x6e\x64") - bytes := bb.Bytes() - if !reflect.DeepEqual(refHeader, bytes) { - t.Fatalf("Request_Header not equals to ref %n%x, %n%x", len(refHeader), refHeader, len(bytes), bytes) - } -} - -func TestRead_call_responseMessage(t *testing.T) { - - - transceiver, err := netty.NewTransceiver(netty.Config{}) - buf := &Conn{} - transceiver.Conn = buf - - protocol, err := NewProtocol() - if err != nil { - t.Fatal(err) +func EchoFunc(conn net.Conn) { + for { + buf := make([]byte, RECV_BUF_LEN) + n, err := conn.Read(buf) + if err != nil { + println("Error reading:", err.Error()) + return + } + println("received ", n, " bytes of data =", string(buf)) + n, err = conn.Write(buf) + if err != nil { + println("Error writing:", err.Error()) + return + } + println("sended ", n, " bytes of data =", string(buf)) } - requestor := NewRequestor(protocol, transceiver) - - - codec, err := protocol.MessageResponseCodec("append") - if err != nil { - t.Fatal(err) - } - bb := new(bytes.Buffer) - codec.Encode(bb, Enum{"Status", "OK"}) - t.Logf("Bytes for OK %x",bb.Bytes() ) - - - err = requestor.read_call_responseMessage("append", bb) - if err != nil { - t.Fatal(err) - } - - codec.Encode(bb, Enum{"Status", "FAILED"}) - t.Logf("Bytes for FAILED %x",bb.Bytes() ) - err = requestor.read_call_responseMessage("append", bb) - if err == nil || err.Error() != "Reponse failure. status == FAILED"{ - t.Fatalf("Status FAILED can return error") - } - } diff --git a/transceiver/Tranceiver.go b/transceiver/Tranceiver.go index bf2dffb..3a36ed1 100644 --- a/transceiver/Tranceiver.go +++ b/transceiver/Tranceiver.go @@ -3,9 +3,32 @@ package transceiver import ( "bytes" "io" + "time" ) +type WriteHandshake func() ([]byte, error) +type ReadHandshake func(io.Reader) (bool, error) type Transceiver interface { Transceive(request []bytes.Buffer) ([]io.Reader, error) + InitHandshake(WriteHandshake, ReadHandshake ) + + + +} + + + +type Config struct { + Port int `json:"port"` + Host string `json:"host"` + Network string `json:"network"` + SocketPath string `json:"socket_path"` + Timeout time.Duration `json:"timeout"` + AsyncConnect bool `json:"async_connect"` + BufferLimit int `json:"buffer_limit"` + RetryWait int `json:"retry_wait"` + MaxRetry int `json:"max_retry"` + InitialCap int `json:"initial_cap"` + MaxCap int `json:"max_cap"` +} -} \ No newline at end of file diff --git a/transceiver/connection.go b/transceiver/connection.go new file mode 100644 index 0000000..886a2c2 --- /dev/null +++ b/transceiver/connection.go @@ -0,0 +1,46 @@ +package transceiver + +import ( + "io" + "net" + "strconv" +) + +type HandShakeConn interface { + io.ReadWriteCloser + IsChecked() bool + Checked(bool) + GetConn() (net.Conn, error) +} + +type Connection struct { + net.Conn + checked bool + bad bool +} + + +func NewConnection(config Config) (*Connection, error) { + + conn := &Connection{} + var err error + switch config.Network { + case "tcp": + conn.Conn, err = net.DialTimeout(config.Network, config.Host+":"+strconv.Itoa(config.Port), config.Timeout) + case "unix": + conn.Conn, err = net.DialTimeout(config.Network, config.SocketPath, config.Timeout) + default: + err = net.UnknownNetworkError(config.Network) + } + + return conn, err +} + +func (c *Connection) Checked(check bool) { + c.checked = check +} + +func (c *Connection) IsChecked() bool{ + return c.checked +} + diff --git a/transceiver/netty/Netty.go b/transceiver/netty/Netty.go index 9b66ac4..d400d60 100644 --- a/transceiver/netty/Netty.go +++ b/transceiver/netty/Netty.go @@ -2,108 +2,85 @@ package netty import ( "bytes" - "net" "encoding/binary" - "fmt" "io" "sync" - "time" - "strconv" - "math" - "os" + "github.com/sebglon/goavro/transceiver" + "fmt" ) +type NettyTransceiver struct { + transceiver.Pool + pool *transceiver.Pool + mu sync.Mutex + pending []byte + alreadyCalled bool + writeHandShake transceiver.WriteHandshake + readHandshake transceiver.ReadHandshake +} +func NewTransceiver(config transceiver.Config) (f* NettyTransceiver, err error){ + f = &NettyTransceiver{} + f.pool, err = transceiver.NewPool(config) + return +} +func (t NettyTransceiver) InitHandshake(writer transceiver.WriteHandshake,reader transceiver.ReadHandshake ) { + t.writeHandShake=writer + t.readHandshake=reader +} -const ( - defaultHost = "127.0.0.1" - defaultNetwork = "tcp" - defaultSocketPath = "" - defaultPort = 63001 - defaultTimeout = 3 * time.Second - defaultBufferLimit = 8 * 1024 * 1024 - defaultRetryWait = 500 - defaultMaxRetry = 13 - defaultReconnectWaitIncreRate = 1.5 -) -type Config struct { - NettyPort int `json:"netty_port"` - NettyHost string `json:"netty_host"` - NettyNetwork string `json:"netty_network"` - NettySocketPath string `json:"netty_socket_path"` - Timeout time.Duration `json:"timeout"` - AsyncConnect bool `json:"async_connect"` - BufferLimit int `json:"buffer_limit"` - RetryWait int `json:"retry_wait"` - MaxRetry int `json:"max_retry"` -} +func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){ + nettyFrame := new(bytes.Buffer) + t.Pack(nettyFrame, requests) + conn, pc, err := t.Pool.Conn() -type NettyTransceiver struct { - Config - Conn io.ReadWriteCloser - reconnecting bool - mu sync.Mutex - pending []byte -} -func NewTransceiver(config Config) (f* NettyTransceiver, err error){ - if config.NettyNetwork == "" { - config.NettyNetwork = defaultNetwork - } - if config.NettyHost == "" { - config.NettyHost = defaultHost - } - if config.NettyPort == 0 { - config.NettyPort = defaultPort - } - if config.NettySocketPath == "" { - config.NettySocketPath = defaultSocketPath - } - if config.Timeout == 0 { - config.Timeout = defaultTimeout + if err!=nil { + return nil, err } - if config.BufferLimit == 0 { - config.BufferLimit = defaultBufferLimit - } - if config.RetryWait == 0 { - config.RetryWait = defaultRetryWait - } - if config.MaxRetry == 0 { - config.MaxRetry = defaultMaxRetry + defer pc.Close() + + if !conn.IsChecked() { + frame0 := requests[0] + handshake, err := t.writeHandShake() + if err!=nil { + return nil, err + } + + requests[0].Reset() + _, err = requests[0].Write(append( handshake, frame0.Bytes()...)) + if err!=nil { + return nil, err + } } - if config.AsyncConnect { - f = &NettyTransceiver{Config: config, reconnecting: true} - f.reconnect() - } else { - f = &NettyTransceiver{Config: config, reconnecting: false} - err = f.connect() + + bodyBytes, err := t.pool.Call(conn, pc, nettyFrame.Bytes()) + if err != nil { + return nil, err } - return -} -func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){ - nettyFrame := new(bytes.Buffer) - t.Pack(nettyFrame, requests) - // Send request - t.pending = append(t.pending, nettyFrame.Bytes()...) - if err := t.send(); err != nil { - t.close() - if len(t.pending) > t.Config.BufferLimit { - t.flushBuffer() + + resps, err := t.Unpack(bodyBytes) + if err != nil { + return nil, err + } + + if !conn.IsChecked() { + ok, err := t.readHandshake(resps[0]) + if err!=nil { + return nil, err + } + conn.Checked(ok) + if !ok { + return nil, fmt.Errorf("Fail to validate Handshake") } + return resps[1:], nil } else { - t.flushBuffer() + return resps, nil } - // Read Response - bodyBytes := make([]byte, 1024) - t.receive(bodyBytes) -// if err!=nil { -// return nil, fmt.Errorf("Fail to read on socket %v", err) -// } - return t.Unpack(bodyBytes) } func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) { @@ -142,110 +119,3 @@ func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) { return result, nil } - -// connect establishes a new connection using the specified transport. -func (f *NettyTransceiver) connect() (error) { - var err error - switch f.Config.NettyNetwork { - case "tcp": - f.Conn, err = net.DialTimeout(f.Config.NettyNetwork, f.Config.NettyHost+":"+strconv.Itoa(f.Config.NettyPort), f.Config.Timeout) - case "unix": - f.Conn, err = net.DialTimeout(f.Config.NettyNetwork, f.Config.NettySocketPath, f.Config.Timeout) - default: - err = net.UnknownNetworkError(f.Config.NettyNetwork) - } - return err -} - - -func e(x, y float64) int { - return int(math.Pow(x, y)) -} - -func (f *NettyTransceiver) reconnect() { - go func() { - - for i := 0; ; i++ { - - err := f.connect() - if err == nil { - f.mu.Lock() - f.reconnecting = false - f.mu.Unlock() - break - } else { - if i == f.Config.MaxRetry { - panic("Netty#reconnect: failed to reconnect!") - } - waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) - time.Sleep(time.Duration(waitTime) * time.Millisecond) - } - } - }() -} - -func (f *NettyTransceiver) flushBuffer() { - f.mu.Lock() - defer f.mu.Unlock() - f.pending = f.pending[0:0] -} - - -// Close closes the connection. -func (f *NettyTransceiver) Close() (err error) { - if len(f.pending) > 0 { - err = f.send() - } - f.close() - return -} - -// close closes the connection. -func (f *NettyTransceiver) close() (err error) { - if f.Conn != nil { - f.mu.Lock() - defer f.mu.Unlock() - } else { - return - } - if f.Conn != nil { - f.Conn.Close() - f.Conn = nil - } - return -} - -func (f *NettyTransceiver) send() (err error) { - if f.Conn == nil { - if f.reconnecting == false { - f.mu.Lock() - f.reconnecting = true - f.mu.Unlock() - f.reconnect() - } - err = fmt.Errorf("Netty#send: can't send logs, client is reconnecting") - } else { - f.mu.Lock() - _, err = f.Conn.Write(f.pending) - f.mu.Unlock() - } - return -} - -func (f *NettyTransceiver) receive(resp []byte) (err error) { - - if f.Conn == nil { - if f.reconnecting == false { - f.mu.Lock() - f.reconnecting = true - f.mu.Unlock() - f.reconnect() - } - err = fmt.Errorf("Netty#receive: can't send logs, client is reconnecting") - } else { - f.mu.Lock() - _, err = f.Conn.Read(resp) - f.mu.Unlock() - } - return -} \ No newline at end of file diff --git a/transceiver/netty/Netty_test.go b/transceiver/netty/Netty_test.go index 7b8de8b..6e6de40 100644 --- a/transceiver/netty/Netty_test.go +++ b/transceiver/netty/Netty_test.go @@ -8,19 +8,19 @@ import ( "io/ioutil" "runtime" "net" + "github.com/sebglon/goavro/transceiver" + + "strconv" ) const ( RECV_BUF_LEN = 1024 + NETWORK = "tcp" + HOST = "127.0.0.1" + PORT=6666 + ADDR="127.0.0.1:6666" ) -type Conn struct { - bytes.Buffer -} - -func (c *Conn) Close() error { - return nil -} func init() { numProcs := runtime.NumCPU() @@ -29,7 +29,7 @@ func init() { } runtime.GOMAXPROCS(numProcs) - listener, err := net.Listen("tcp", "0.0.0.0:6666") + listener, err := net.Listen(NETWORK, "0.0.0.0:"+strconv.Itoa(PORT)) if err != nil { println("error listening:", err.Error()) } @@ -54,14 +54,21 @@ func EchoFunc(conn net.Conn) { return } println("received ", n, " bytes of data =", string(buf)) + n, err = conn.Write(buf) + if err != nil { + println("Error writing:", err.Error()) + return + } + println("sended ", n, " bytes of data =", string(buf)) } } func TestTransceive(t *testing.T) { - f := &NettyTransceiver{Config: Config{}, reconnecting: false} + f, err := NewTransceiver(transceiver.Config{Network:NETWORK, SocketPath:ADDR, Host:HOST, Port:PORT}) + if err != nil { + t.Fatal(err) + } - buf := &Conn{} - f.Conn = buf msg := "This is test writing." bmsg := make([]bytes.Buffer, 1) @@ -69,9 +76,8 @@ func TestTransceive(t *testing.T) { resp, err := f.Transceive(bmsg) if err != nil { - t.Error(err) + t.Fatal(err.Error()) } - brcv := make([]byte, len([]byte(msg))) resp[0].Read(brcv) rcv := string(brcv) diff --git a/transceiver/pool.go b/transceiver/pool.go new file mode 100644 index 0000000..5d05edf --- /dev/null +++ b/transceiver/pool.go @@ -0,0 +1,135 @@ +package transceiver + +import ( + "gopkg.in/fatih/pool.v2" + "net" + "fmt" + "sync" + "errors" + "log" + "time" +) + + +var ( + errPoolClosed = errors.New("Avro transceiver: Pool Closed") +) +type Pool struct { + Config + pool pool.Pool + mu sync.RWMutex + closed bool +} + +const ( + defaultHost = "127.0.0.1" + defaultNetwork = "tcp" + defaultSocketPath = "" + defaultPort = 63001 + defaultTimeout = 3 * time.Second + defaultBufferLimit = 8 * 1024 * 1024 + defaultRetryWait = 500 + defaultMaxRetry = 13 + defaultInitialCap = 2 + defaultMaxCap = 5 + defaultReconnectWaitIncreRate = 1.5 +) + +func NewPool(config Config) (*Pool, error) { + if config.Network == "" { + config.Network = defaultNetwork + } + if config.Host == "" { + config.Host = defaultHost + } + if config.Port == 0 { + config.Port = defaultPort + } + if config.SocketPath == "" { + config.SocketPath = defaultSocketPath + } + if config.Timeout == 0 { + config.Timeout = defaultTimeout + } + if config.BufferLimit == 0 { + config.BufferLimit = defaultBufferLimit + } + if config.RetryWait == 0 { + config.RetryWait = defaultRetryWait + } + if config.MaxRetry == 0 { + config.MaxRetry = defaultMaxRetry + } + if config.InitialCap == 0 { + config.InitialCap = defaultInitialCap + } + if config.MaxCap == 0 { + config.MaxCap = defaultMaxCap + } + p, err := pool.NewChannelPool(config.InitialCap,config.MaxCap, func() (net.Conn, error) { + conn, err := NewConnection(config) + if err != nil { + return nil, fmt.Errorf("\nFail to init connec, %#v \n%v",config,err) + } + return conn, err + }) + if err != nil { + return nil, err + } + return &Pool{ + pool: p, + Config: config, + }, nil + +} + +func (p *Pool) Conn() (*Connection, *pool.PoolConn, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.closed { + return nil, nil, errPoolClosed + } + + + nc, err := p.pool.Get() + if err != nil { + return nil, nil, err + } + + log.Printf(" %T %#v", nc,nc) + + pc, ok := nc.(*pool.PoolConn) + if !ok { + // This should never happen! + return nil, nil, fmt.Errorf("Invalid connection in pool") + } + + conn, ok := pc.Conn.(*Connection) + if !ok { + // This should never happen! + return nil, nil, fmt.Errorf("Invalid connection in pool") + } + + return conn, pc, nil +} + +func (p *Pool) Call(conn *Connection, pc *pool.PoolConn, req []byte) (resp []byte, err error) { + if err != nil { + return + } + defer pc.Close() + + _,err= conn.Write(req) + + if err != nil { + return nil, err + } + resp = make([]byte, 1024) + _,err = conn.Read(resp) + + if err != nil { + return nil, err + } + return +} \ No newline at end of file