From 185b8a401dd6f1b7d64f7354c38deb3a539b1d04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20GLON?= Date: Fri, 26 Aug 2016 12:04:10 +0200 Subject: [PATCH] end of dev --- Transceiver.go | 6 - ipc_test.go => examples/flume/client.go | 24 +-- ipc.go | 264 ------------------------ protocol.go | 14 ++ protocol_test.go | 2 +- requestor.go | 53 ++++- requestor_test.go | 41 ++++ 7 files changed, 115 insertions(+), 289 deletions(-) rename ipc_test.go => examples/flume/client.go (72%) delete mode 100644 ipc.go diff --git a/Transceiver.go b/Transceiver.go index fb847ca..38d73b5 100644 --- a/Transceiver.go +++ b/Transceiver.go @@ -5,7 +5,6 @@ import ( "net" "encoding/binary" "fmt" - "os" "io" ) @@ -75,18 +74,13 @@ func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) { } func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) { - nettyNumberFame := binary.BigEndian.Uint32(frame[4:8]) result := make([]io.Reader, nettyNumberFame) startFrame := uint32(8) i:=uint32(0) for i < nettyNumberFame { - - messageSize := uint32(binary.BigEndian.Uint32(frame[startFrame:startFrame+4])) - fmt.Fprintf(os.Stdout, "\nnettyNumberFrame %v %v ", startFrame, frame[startFrame:startFrame+4]) message := frame[startFrame+4:startFrame+4+messageSize] - fmt.Fprintf(os.Stdout, "\nmessage: %v", message) startFrame = startFrame+4+messageSize br := bytes.NewReader(message) result[i] = br diff --git a/ipc_test.go b/examples/flume/client.go similarity index 72% rename from ipc_test.go rename to examples/flume/client.go index a1e1b29..8093a68 100644 --- a/ipc_test.go +++ b/examples/flume/client.go @@ -1,45 +1,45 @@ -package goavro - +package main import ( - "testing" + "github.com/sebglon/goavro" "net" + "log" ) -func TestRequestor(t *testing.T) { +func main() { //t.SkipNow() rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001") conn, err := net.DialTCP("tcp", nil, rAddr) if err != nil { - t.Fatal(err) + log.Fatal(err) } defer conn.Close() - transceiver := NewNettyTransceiver(conn) - protocol, err := NewProtocol() + transceiver := goavro.NewNettyTransceiver(conn) + protocol, err := goavro.NewProtocol() if err != nil { - t.Fatal(err) + log.Fatal(err) } flumeRecord, errFlume := protocol.NewRecord("AvroFlumeEvent") if errFlume != nil { - t.Fatal(errFlume) + log.Fatal(errFlume) } headers := make(map[string]interface{}) headers["host_header"] = "127.0.0.1" flumeRecord.Set("headers", headers) flumeRecord.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")) - requestor := NewRequestor(protocol, transceiver) + requestor := goavro.NewRequestor(protocol, transceiver) err = requestor.Request("append", flumeRecord) if err != nil { - t.Fatal("Request: ", err) + log.Fatal("Request: ", err) } err = requestor.Request("append", flumeRecord) if err != nil { - t.Fatal("Request: ", err) + log.Fatal("Request: ", err) } } diff --git a/ipc.go b/ipc.go deleted file mode 100644 index 18f2e7e..0000000 --- a/ipc.go +++ /dev/null @@ -1,264 +0,0 @@ -// Package avro provides th log driver for forwarding server logs to -// flume endpoints. -package goavro - -import ( - "fmt" - "net/rpc" - "bytes" - "io" -) -type Client struct { - hostname string - extra map[string]interface{} - conn *rpc.Client - codecFlume Codec - codecDocker Codec - codecMeta Codec - codecString Codec - protocol Protocol -} - -const ( - defaultHost = "10.98.80.113" - defaultPort = "63001" - - hostKey = "avro-host" - portKey = "avro-port" -) - -const AVRO_SCHEMA_LITERAL_HEADER = "flume.avro.schema.literal" - - -const stringSchema = ` -{"type": "string"} -` -const flumeSchema = ` -{ - "type": "record", - "name": "AvroFlumeEvent", - "fields": [{ - "name": "headers", - "type": { - "type": "map", - "values": "string" - } - }, { - "name": "body", - "type": "bytes" - }] -} -` - -const recordSchema = ` -{ - "type": "record", - "name": "docker_logs", - "doc:": "A basic schema for storing docker container logs", - "namespace": "docker", - "fields": [ - { - "doc": "Docker container ID", - "type": "string", - "name": "container_id" - }, - { - "doc": "Docker container Name", - "type": "string", - "name": "container_name" - }, - { - "doc": "Docker image ID", - "type": "string", - "name": "image_id" - }, - { - "doc": "Docker image Name", - "type": "string", - "name": "image_name" - }, - { - "doc": "Docker container commmand", - "type": "string", - "name": "command" - }, - { - "doc": "Docker container created timestamp", - "type": "long", - "name": "created" - }, - { - "doc": "Source of log (stdout, stderr)", - "type": "string", - "name": "source" - }, - { - "doc": "Docker host", - "type": "string", - "name": "docker_host" - }, - { - "doc": "Log message", - "type": "string", - "name": "log" - }, - { - "doc": "Unix timestamp in milliseconds", - "type": "long", - "name": "timestamp" - } - ] -} -` - -func NewDefaultClient() (*Client, error) { - return NewClient(defaultHost, defaultPort) - -} -// New create a avro logger using the configuration passed in on -// the context. -func NewClient(host string, port string) (*Client, error) { - servAddr := host+":"+port - conn, err := rpc.Dial("tcp", servAddr) - if err != nil { - return nil, err - } - - - codec, err := NewCodec(flumeSchema) - if err != nil { - return nil, err - } - - codecDocker, err := NewCodec(recordSchema) - if err != nil { - return nil, err - } - - codecMeta, err := NewCodec(metadataSchema) - if err != nil { - return nil, err - } - - codecString, err := NewCodec(stringSchema) - if err != nil { - return nil, err - } - - proto, err := NewProtocol() - if err != nil { - return nil, err - } - - return &Client { - extra: make(map[string]interface{}), - hostname: "TODO", - conn: conn, - codecFlume: codec, - codecDocker: codecDocker, - codecMeta: codecMeta, - codecString: codecString, - protocol: proto, - }, nil -} - -func (a *Client) Write_handshake_request( buffer io.Writer ) (err error) { - local_hash :=a.protocol.MD5 - // remote_name :="" // only setted by handshake response - remote_hash := make([]byte,0) - if len(remote_hash)==0 { - remote_hash = local_hash - } - - record, err := NewRecord(RecordSchema(handshakeRequestshema)) - if err != nil { - return fmt.Errorf("Avro fail to init record handshakeRequest",err) - } - - record.Set("clientHash", local_hash) - record.Set("serverHash", remote_hash) -// record.Set("clientProtocol", a.protocol.Name) - codecHandshake, err := NewCodec(handshakeRequestshema) - if err != nil { - return err - } - - if err = codecHandshake.Encode(buffer, record); err !=nil { - return fmt.Errorf("Encode handshakeRequest ",err) - } - return nil -} - - -func (a *Client) Log(msg string) error { - - bb := new(bytes.Buffer) - a.Write_handshake_request(bb) - - flumeRecord, errFlume := NewRecord(RecordSchema(flumeSchema)) - if errFlume != nil { - return fmt.Errorf("Avro fail to init record",errFlume) - } - headers := make(map[string]interface{}) - headers[AVRO_SCHEMA_LITERAL_HEADER] = stringSchema - headers["host_header"] = "127.0.0.1" - flumeRecord.Set("headers", headers) - flumeRecord.Set("body", []byte("test")) - - - // encode metadata - if err := a.codecMeta.Encode(bb, make(map[string]interface{})); err !=nil { - return fmt.Errorf("Encode metadata ",err) - } - - // encode message name - if err := a.codecString.Encode(bb, "append"); err !=nil { - return fmt.Errorf("Encode message name ",err) - } - - - // encode message parameters - - if err := a.codecFlume.Encode(bb, flumeRecord); err !=nil { - return fmt.Errorf("Encode flumeRecord ",err) - } - - return a.Send(bb.Bytes()) -} - -func (a *Client) Send(bytes []byte ) error { - var reply []byte - err := a.conn.Call("", bytes,&reply) - if err !=nil { - return err - } - return nil -} - -func (a *Client) Close() error { - return a.conn.Close() -} - - -// ValidateLogOpt looks for avro specific log option avro-host avro-port. -func ValidateLogOpt(cfg map[string]string) error { - for key := range cfg { - switch key { - case "env": - case "labes": - case hostKey: - case portKey: - // Accepted - default: - return fmt.Errorf("unknown log opt '%s' for avro log driver", key) - } - } - if len(cfg[hostKey]) == 0 { - cfg[hostKey] = defaultHost - } - if len(cfg[portKey]) == 0 { - cfg[portKey] = defaultPort - } - return nil -} - diff --git a/protocol.go b/protocol.go index a97312b..df62a46 100644 --- a/protocol.go +++ b/protocol.go @@ -118,6 +118,20 @@ func (p *Protocol) Json() (string, error) { return string(bb), nil } + +func (p *Protocol) MessageResponseCodec(messageName string) (Codec, error) { + json, err := p.MessageResponseJson(messageName) + if err!= nil { + return nil, err + } + return NewCodec(json) +} +func (p *Protocol) MessageResponseJson(messageName string) (string, error) { + field := p.Messages[messageName].Response + avroType := TYPES_CACHE[field] + json, err := json.Marshal(avroType) + return string(json), err +} func (p *Protocol) MessageRequestCodec(messageName string) (Codec, error) { json, err := p.MessageRequestJson(messageName) if err!= nil { diff --git a/protocol_test.go b/protocol_test.go index 7572862..3b677ef 100644 --- a/protocol_test.go +++ b/protocol_test.go @@ -82,7 +82,7 @@ func TestToJson(t *testing.T) { t.Fatal("%#v", err) } if result!= jsonCompact(proto) { - t.Errorf("Proto to Json not equals; Expected %#v, actual %#v",jsonCompact(proto), result) + t.Errorf("Proto to Json not equals; Expected \n%#v\nactual \n%#v",jsonCompact(proto), result) } } diff --git a/requestor.go b/requestor.go index 96c025b..7ac652d 100644 --- a/requestor.go +++ b/requestor.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "log" - "os" ) var REMOTE_HASHES map[string][]byte @@ -95,7 +94,6 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er } //buffer_decoder := bytes.NewBuffer(decoder) // process the handshake and call response - fmt.Fprintf(os.Stdout, "\nresponsee %#v", responses) ok, err := a.read_handshake_response(responses[0]) if err!=nil { return err @@ -103,8 +101,10 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er a.send_handshake= !ok if ok { - // a.read_call_response(message_name, buffer_decoder) - //} else { + a.read_call_responseCode(responses[1]) + if err!=nil { + return err + } // a.Request(message_name, request_datum) } return nil @@ -253,7 +253,7 @@ func (a *Requestor) read_handshake_response(decoder io.Reader) (bool, error) { return we_have_matching_schema, nil } -func (a *Requestor) read_call_response(message_name string, decoder io.Writer) { +func (a *Requestor) read_call_responseCode(decoder io.Reader) error { // The format of a call response is: // * response metadata, a map with values of type bytes // * a one-byte error flag boolean, followed by either: @@ -261,8 +261,49 @@ func (a *Requestor) read_call_response(message_name string, decoder io.Writer) { // the message response, serialized per the message's response schema. // * if the error flag is true, // the error, serialized per the message's error union schema. -// META_READER.Decode(decoder) + _, err := META_READER.Decode(decoder) + + if err != nil { + return fmt.Errorf("Decode metadata ", err) + } + return nil + } +func (a *Requestor) read_call_responseMessage(message_name string, decoder io.Reader ) error { + codec, err := a.local_protocol.MessageResponseCodec(message_name) + + if err != nil { + return fmt.Errorf("fail to get response codec for message %s: %v", message_name, err) + } + + datum, err := codec.Decode(decoder); + if err != nil { + + return fmt.Errorf("Fail to decode %v with error %v", decoder, err) + } + status, ok := datum.(string) + if !ok { + return fmt.Errorf("Fail to decode Status response %v", datum) + } + + switch status { + case "OK": + err = nil + case "FAILED": + err = fmt.Errorf("Reponse failure. status == %v", status) + + case "UNKNOWN": + err = fmt.Errorf("Reponse failure. match == %v", status) + + default: + err = fmt.Errorf("Unexpected status: %v", status) + } + + return err + + +} + diff --git a/requestor_test.go b/requestor_test.go index 2df1726..36b2da3 100644 --- a/requestor_test.go +++ b/requestor_test.go @@ -172,4 +172,45 @@ func TestWrite_call_requestHeader(t *testing.T) { } } +func TestRead_call_responseMessage(t *testing.T) { + //t.SkipNow() + + rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001") + conn, err := net.DialTCP("tcp", nil, rAddr) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + transceiver := NewNettyTransceiver(conn) + protocol, err := NewProtocol() + if err != nil { + t.Fatal(err) + } + requestor := NewRequestor(protocol, transceiver) + + + codec, err := protocol.MessageResponseCodec("append") + if err != nil { + t.Fatal(err) + } + bb := new(bytes.Buffer) + codec.Encode(bb, Enum{"Status", "OK"}) + t.Logf("Bytes for OK %x",bb.Bytes() ) + + + err = requestor.read_call_responseMessage("append", bb) + if err != nil { + t.Fatal(err) + } + + codec.Encode(bb, Enum{"Status", "FAILED"}) + t.Logf("Bytes for FAILED %x",bb.Bytes() ) + err = requestor.read_call_responseMessage("append", bb) + if err == nil || err.Error() != "Reponse failure. status == FAILED"{ + t.Fatalf("Status FAILED can return error") + } + +} +