Skip to content

Commit

Permalink
feat: message ack sync changes improvements (#43)
Browse files Browse the repository at this point in the history
* feat: message ack performance improvements
  • Loading branch information
AkbaraliShaikh committed Sep 6, 2022
1 parent b895a6c commit ef219b6
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 18 deletions.
4 changes: 2 additions & 2 deletions docs/docs/concepts/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ Event data loss happens in the following scenarios:

## Acknowledging events

Event acknowledgements was designed to signify if the events batch is received and sent to Kafka successfully. This will enable the clients to retry on failed event delivery. Raccoon chooses when to send event acknoledgements based on the configuration parameter `EVENT_ACK`.
Event acknowledgements was designed to signify if the events batch is received and sent to Kafka successfully. This will enable the clients to retry on failed event delivery. Raccoon chooses when to send event acknowledgement based on the configuration parameter `EVENT_ACK`.

### EVENT_ACK = 0

Raccoon sends the acknowledgments as soon as it receives and deserializes the events successfully using the proto `SendEventRequest`. This configuration is recommended when low latency takes precedence over end to end acknoledment. The acks are sent even before it is produced to Kafka. The following picture depicts the sequence of the event ack.
Raccoon sends the acknowledgments as soon as it receives and deserializes the events successfully using the proto `SendEventRequest`. This configuration is recommended when low latency takes precedence over end to end acknowledgement. The acks are sent even before it is produced to Kafka. The following picture depicts the sequence of the event ack.

![](/assets/raccoon_sync.png)

Expand Down
2 changes: 2 additions & 0 deletions services/rest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func NewRestService(c collection.Collector) *Service {

go reportConnectionMetrics(*wh.Table())

go websocket.AckHandler(websocket.AckChan)

restHandler := NewHandler(c)
router := mux.NewRouter()
router.Path("/ping").HandlerFunc(pingHandler).Methods(http.MethodGet)
Expand Down
38 changes: 38 additions & 0 deletions services/rest/websocket/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package websocket

import (
"time"

"github.com/odpf/raccoon/metrics"
"github.com/odpf/raccoon/serialization"
"github.com/odpf/raccoon/services/rest/websocket/connection"
)

var AckChan = make(chan AckInfo)

type AckInfo struct {
MessageType int
RequestGuid string
Err error
Conn connection.Conn
serializer serialization.SerializeFunc
TimeConsumed time.Time
AckTimeConsumed time.Time
}

func AckHandler(ch <-chan AckInfo) {
for c := range ch {
ackTim := time.Since(c.AckTimeConsumed)
metrics.Timing("ack_event_rtt_ms", ackTim.Milliseconds(), "")

tim := time.Since(c.TimeConsumed)
if c.Err != nil {
metrics.Timing("event_rtt_ms", tim.Milliseconds(), "")
writeFailedResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid, c.Err)
continue
}

metrics.Timing("event_rtt_ms", tim.Milliseconds(), "")
writeSuccessResponse(c.Conn, c.serializer, c.MessageType, c.RequestGuid)
}
}
5 changes: 4 additions & 1 deletion services/rest/websocket/connection/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (c *Conn) Ping(writeWaitInterval time.Duration) error {
}

func (c *Conn) Close() {
c.conn.Close()
if err := c.conn.Close(); err != nil {
logger.Errorf("[Connection Error] %v", err)
metrics.Increment("conn_close_err_count", "")
}
c.calculateSessionTime()
c.closeHook(*c)
}
Expand Down
26 changes: 12 additions & 14 deletions services/rest/websocket/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (h *Handler) Table() *connection.Table {
return h.upgrader.Table
}

//HandlerWSEvents handles the upgrade and the events sent by the peers
// HandlerWSEvents handles the upgrade and the events sent by the peers
func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
conn, err := h.upgrader.Upgrade(w, r)
if err != nil {
Expand Down Expand Up @@ -104,37 +104,35 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
}
metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", conn.Identifier.Group))
h.sendEventCounters(payload.Events, conn.Identifier.Group)
resChannel := make(chan struct{}, 1)

h.collector.Collect(r.Context(), &collection.CollectRequest{
ConnectionIdentifier: conn.Identifier,
TimeConsumed: timeConsumed,
SendEventRequest: payload,
AckFunc: h.Ack(conn, resChannel, s, messageType, payload.ReqGuid),
AckFunc: h.Ack(conn, AckChan, s, messageType, payload.ReqGuid, timeConsumed),
})
<-resChannel
}
}

func (h *Handler) Ack(conn connection.Conn, resChannel chan struct{}, s serialization.SerializeFunc, messageType int, reqGuid string) collection.AckFunc {
func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collection.AckFunc {
switch config.Event.Ack {
case config.Asynchronous:
writeSuccessResponse(conn, s, messageType, reqGuid)
resChannel <- struct{}{}
return nil
case config.Synchronous:
return func(err error) {
if err != nil {
logger.Errorf("[websocket.Ack] publish message failed for %s: %v", conn.Identifier.Group, err)
writeFailedResponse(conn, s, messageType, reqGuid, err)
resChannel <- struct{}{}
return
AckChan <- AckInfo{
MessageType: messageType,
RequestGuid: reqGuid,
Err: err,
Conn: conn,
serializer: h.serdeMap[messageType].serializer,
TimeConsumed: timeConsumed,
AckTimeConsumed: time.Now(),
}
writeSuccessResponse(conn, s, messageType, reqGuid)
resChannel <- struct{}{}
}
default:
writeSuccessResponse(conn, s, messageType, reqGuid)
resChannel <- struct{}{}
return nil
}
}
Expand Down
5 changes: 4 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (w *Pool) StartWorkers() {

err := w.kafkaProducer.ProduceBulk(request.GetEvents(), request.ConnectionIdentifier.Group, deliveryChan)

produceTime := time.Since(batchReadTime)
metrics.Timing("kafka_producebulk_tt_ms", produceTime.Milliseconds(), "")

if request.AckFunc != nil {
request.AckFunc(err)
}
Expand Down Expand Up @@ -75,7 +78,7 @@ func (w *Pool) StartWorkers() {
}

// FlushWithTimeOut waits for the workers to complete the pending the messages
//to be flushed to the publisher within a timeout.
// to be flushed to the publisher within a timeout.
// Returns true if waiting timed out, meaning not all the events could be processed before this timeout.
func (w *Pool) FlushWithTimeOut(timeout time.Duration) bool {
c := make(chan struct{})
Expand Down

0 comments on commit ef219b6

Please sign in to comment.