From ef219b6ecaefa88852cd0bd842143063c2196b6f Mon Sep 17 00:00:00 2001 From: Akbar Shaikh Date: Tue, 6 Sep 2022 11:29:36 +0530 Subject: [PATCH] feat: message ack sync changes improvements (#43) * feat: message ack performance improvements --- docs/docs/concepts/architecture.md | 4 +-- services/rest/service.go | 2 ++ services/rest/websocket/ack.go | 38 ++++++++++++++++++++++ services/rest/websocket/connection/conn.go | 5 ++- services/rest/websocket/handler.go | 26 +++++++-------- worker/worker.go | 5 ++- 6 files changed, 62 insertions(+), 18 deletions(-) create mode 100644 services/rest/websocket/ack.go diff --git a/docs/docs/concepts/architecture.md b/docs/docs/concepts/architecture.md index abdcc9ff..77a19e41 100644 --- a/docs/docs/concepts/architecture.md +++ b/docs/docs/concepts/architecture.md @@ -64,11 +64,11 @@ Event data loss happens in the following scenarios: ## Acknowledging events -Event acknowledgements was designed to signify if the events batch is received and sent to Kafka successfully. This will enable the clients to retry on failed event delivery. Raccoon chooses when to send event acknoledgements based on the configuration parameter `EVENT_ACK`. +Event acknowledgements was designed to signify if the events batch is received and sent to Kafka successfully. This will enable the clients to retry on failed event delivery. Raccoon chooses when to send event acknowledgement based on the configuration parameter `EVENT_ACK`. ### EVENT_ACK = 0 -Raccoon sends the acknowledgments as soon as it receives and deserializes the events successfully using the proto `SendEventRequest`. This configuration is recommended when low latency takes precedence over end to end acknoledment. The acks are sent even before it is produced to Kafka. The following picture depicts the sequence of the event ack. +Raccoon sends the acknowledgments as soon as it receives and deserializes the events successfully using the proto `SendEventRequest`. This configuration is recommended when low latency takes precedence over end to end acknowledgement. The acks are sent even before it is produced to Kafka. The following picture depicts the sequence of the event ack. ![](/assets/raccoon_sync.png) diff --git a/services/rest/service.go b/services/rest/service.go index bfd5c334..632c525f 100644 --- a/services/rest/service.go +++ b/services/rest/service.go @@ -26,6 +26,8 @@ func NewRestService(c collection.Collector) *Service { go reportConnectionMetrics(*wh.Table()) + go websocket.AckHandler(websocket.AckChan) + restHandler := NewHandler(c) router := mux.NewRouter() router.Path("/ping").HandlerFunc(pingHandler).Methods(http.MethodGet) diff --git a/services/rest/websocket/ack.go b/services/rest/websocket/ack.go new file mode 100644 index 00000000..824f88be --- /dev/null +++ b/services/rest/websocket/ack.go @@ -0,0 +1,38 @@ +package websocket + +import ( + "time" + + "github.com/odpf/raccoon/metrics" + "github.com/odpf/raccoon/serialization" + "github.com/odpf/raccoon/services/rest/websocket/connection" +) + +var AckChan = make(chan AckInfo) + +type AckInfo struct { + MessageType int + RequestGuid string + Err error + Conn connection.Conn + serializer serialization.SerializeFunc + TimeConsumed time.Time + AckTimeConsumed time.Time +} + +func AckHandler(ch <-chan AckInfo) { + for c := range ch { + ackTim := time.Since(c.AckTimeConsumed) + metrics.Timing("ack_event_rtt_ms", ackTim.Milliseconds(), "") + + tim := time.Since(c.TimeConsumed) + if c.Err != nil { + metrics.Timing("event_rtt_ms", tim.Milliseconds(), "") + writeFailedResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid, c.Err) + continue + } + + metrics.Timing("event_rtt_ms", tim.Milliseconds(), "") + writeSuccessResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid) + } +} diff --git a/services/rest/websocket/connection/conn.go b/services/rest/websocket/connection/conn.go index d5b2e264..54dc63f7 100644 --- a/services/rest/websocket/connection/conn.go +++ b/services/rest/websocket/connection/conn.go @@ -30,7 +30,10 @@ func (c *Conn) Ping(writeWaitInterval time.Duration) error { } func (c *Conn) Close() { - c.conn.Close() + if err := c.conn.Close(); err != nil { + logger.Errorf("[Connection Error] %v", err) + metrics.Increment("conn_close_err_count", "") + } c.calculateSessionTime() c.closeHook(*c) } diff --git a/services/rest/websocket/handler.go b/services/rest/websocket/handler.go index 39a877f4..1ea6b3ee 100644 --- a/services/rest/websocket/handler.go +++ b/services/rest/websocket/handler.go @@ -67,7 +67,7 @@ func (h *Handler) Table() *connection.Table { return h.upgrader.Table } -//HandlerWSEvents handles the upgrade and the events sent by the peers +// HandlerWSEvents handles the upgrade and the events sent by the peers func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) { conn, err := h.upgrader.Upgrade(w, r) if err != nil { @@ -104,37 +104,35 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) { } metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", conn.Identifier.Group)) h.sendEventCounters(payload.Events, conn.Identifier.Group) - resChannel := make(chan struct{}, 1) + h.collector.Collect(r.Context(), &collection.CollectRequest{ ConnectionIdentifier: conn.Identifier, TimeConsumed: timeConsumed, SendEventRequest: payload, - AckFunc: h.Ack(conn, resChannel, s, messageType, payload.ReqGuid), + AckFunc: h.Ack(conn, AckChan, s, messageType, payload.ReqGuid, timeConsumed), }) - <-resChannel } } -func (h *Handler) Ack(conn connection.Conn, resChannel chan struct{}, s serialization.SerializeFunc, messageType int, reqGuid string) collection.AckFunc { +func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collection.AckFunc { switch config.Event.Ack { case config.Asynchronous: writeSuccessResponse(conn, s, messageType, reqGuid) - resChannel <- struct{}{} return nil case config.Synchronous: return func(err error) { - if err != nil { - logger.Errorf("[websocket.Ack] publish message failed for %s: %v", conn.Identifier.Group, err) - writeFailedResponse(conn, s, messageType, reqGuid, err) - resChannel <- struct{}{} - return + AckChan <- AckInfo{ + MessageType: messageType, + RequestGuid: reqGuid, + Err: err, + Conn: conn, + serializer: h.serdeMap[messageType].serializer, + TimeConsumed: timeConsumed, + AckTimeConsumed: time.Now(), } - writeSuccessResponse(conn, s, messageType, reqGuid) - resChannel <- struct{}{} } default: writeSuccessResponse(conn, s, messageType, reqGuid) - resChannel <- struct{}{} return nil } } diff --git a/worker/worker.go b/worker/worker.go index b5944ec1..8f639469 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -46,6 +46,9 @@ func (w *Pool) StartWorkers() { err := w.kafkaProducer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group, deliveryChan) + produceTime := time.Since(batchReadTime) + metrics.Timing("kafka_producebulk_tt_ms", produceTime.Milliseconds(), "") + if request.AckFunc != nil { request.AckFunc(err) } @@ -75,7 +78,7 @@ func (w *Pool) StartWorkers() { } // FlushWithTimeOut waits for the workers to complete the pending the messages -//to be flushed to the publisher within a timeout. +// to be flushed to the publisher within a timeout. // Returns true if waiting timed out, meaning not all the events could be processed before this timeout. func (w *Pool) FlushWithTimeOut(timeout time.Duration) bool { c := make(chan struct{})