Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(channel): performance improvements #36

Merged
merged 8 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ import (
"time"

"github.com/odpf/raccoon/identification"
pb "github.com/odpf/raccoon/proto"
)

type Event struct {
chakravarthyvp marked this conversation as resolved.
Show resolved Hide resolved
Type string
EventBytes []byte
siddhanta-rath marked this conversation as resolved.
Show resolved Hide resolved
}

type CollectRequest struct {
ConnectionIdentifier identification.Identifier
TimeConsumed time.Time
TimePushed time.Time
*pb.SendEventRequest
SentTime time.Time
chakravarthyvp marked this conversation as resolved.
Show resolved Hide resolved
Events []Event
}

func (c CollectRequest) GetEvents() []Event {
return c.Events
}

type Collector interface {
Expand Down
6 changes: 3 additions & 3 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (pr *Kafka) ProduceBulk(request collection.CollectRequest, deliveryChannel

err := pr.kp.Produce(message, deliveryChannel)
if err != nil {
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.GetType()))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.Type))
if err.Error() == "Local: Unknown topic" {
errors[order] = fmt.Errorf("%v %s", err, topic)
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, request.ConnectionIdentifier.Group))
Expand All @@ -73,15 +73,15 @@ func (pr *Kafka) ProduceBulk(request collection.CollectRequest, deliveryChannel
}
continue
}
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.GetType()))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.Type))
totalProcessed++
}
// Wait for deliveryChannel as many as processed
for i := 0; i < totalProcessed; i++ {
d := <-deliveryChannel
m := d.(*kafka.Message)
if m.TopicPartition.Error != nil {
eventType := events[i].GetType()
eventType := events[i].Type
metrics.Decrement("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, eventType))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, eventType))
order := m.Opaque.(int)
Expand Down
9 changes: 4 additions & 5 deletions publisher/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/odpf/raccoon/collection"
"github.com/odpf/raccoon/logger"
pb "github.com/odpf/raccoon/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
Expand Down Expand Up @@ -54,7 +53,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
})
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk(collection.CollectRequest{Events: []collection.Event{collection.Event{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}, make(chan kafka.Event, 2))
assert.NoError(t, err)
})
})
Expand All @@ -78,7 +77,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk(collection.CollectRequest{Events: []collection.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}, make(chan kafka.Event, 2))
assert.Len(t, err.(BulkError).Errors, 3)
assert.Error(t, err.(BulkError).Errors[0])
assert.Empty(t, err.(BulkError).Errors[1])
Expand All @@ -90,7 +89,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk(collection.CollectRequest{Events: []collection.Event{{EventBytes: []byte{}, Type: topic}}}, make(chan kafka.Event, 2))
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
})
})
Expand All @@ -114,7 +113,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
}).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk(collection.CollectRequest{Events: []collection.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}, make(chan kafka.Event, 2))
assert.NotEmpty(t, err)
assert.Len(t, err.(BulkError).Errors, 2)
assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error())
Expand Down
10 changes: 9 additions & 1 deletion services/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb.

metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", identifier.Group))
h.sendEventCounters(req.Events, identifier.Group)
events := make([]collection.Event, len(req.GetEvents()))

for i, event := range req.GetEvents() {
events[i] = collection.Event{
Type: event.Type,
EventBytes: event.GetEventBytes(),
}
}
h.C.Collect(ctx, &collection.CollectRequest{
ConnectionIdentifier: identifier,
TimeConsumed: timeConsumed,
SendEventRequest: req,
Events: events,
SentTime: req.SentTime.AsTime(),
})

return &pb.SendEventResponse{
Expand Down
11 changes: 10 additions & 1 deletion services/rest/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,19 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) {

metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", identifier.Group))
h.sendEventCounters(req.Events, identifier.Group)
events := make([]collection.Event, len(req.GetEvents()))

for i, event := range req.GetEvents() {
events[i] = collection.Event{
Type: event.Type,
EventBytes: event.GetEventBytes(),
}
}
h.collector.Collect(r.Context(), &collection.CollectRequest{
ConnectionIdentifier: identifier,
TimeConsumed: timeConsumed,
SendEventRequest: req,
Events: events,
SentTime: req.SentTime.AsTime(),
})

_, err = res.SetCode(pb.Code_CODE_OK).SetStatus(pb.Status_STATUS_SUCCESS).SetSentTime(time.Now().Unix()).
Expand Down
11 changes: 10 additions & 1 deletion services/rest/websocket/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,19 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
}
metrics.Increment("batches_read_total", fmt.Sprintf("status=success,conn_group=%s", conn.Identifier.Group))
h.sendEventCounters(payload.Events, conn.Identifier.Group)

events := make([]collection.Event, len(payload.GetEvents()))
for i, event := range payload.GetEvents() {
events[i] = collection.Event{
Type: event.Type,
EventBytes: event.GetEventBytes(),
}
}
h.collector.Collect(r.Context(), &collection.CollectRequest{
ConnectionIdentifier: conn.Identifier,
TimeConsumed: timeConsumed,
SendEventRequest: payload,
Events: events,
SentTime: payload.SentTime.AsTime(),
})
writeSuccessResponse(conn, s, messageType, payload.ReqGuid)
}
Expand Down
2 changes: 1 addition & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (w *Pool) StartWorkers() {
lenBatch := int64(len(request.GetEvents()))
logger.Debug(fmt.Sprintf("Success sending messages, %v", lenBatch-int64(totalErr)))
if lenBatch > 0 {
eventTimingMs := time.Since(time.Unix(request.SentTime.Seconds, 0)).Milliseconds() / lenBatch
eventTimingMs := time.Since(request.SentTime).Milliseconds() / lenBatch
metrics.Timing("event_processing_duration_milliseconds", eventTimingMs, fmt.Sprintf("conn_group=%s", request.ConnectionIdentifier.Group))
now := time.Now()
metrics.Timing("worker_processing_duration_milliseconds", (now.Sub(batchReadTime).Milliseconds())/lenBatch, "worker="+workerName)
Expand Down
6 changes: 1 addition & 5 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"testing"
"time"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/odpf/raccoon/collection"
"github.com/odpf/raccoon/identification"
pb "github.com/odpf/raccoon/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand All @@ -19,9 +17,7 @@ func TestWorker(t *testing.T) {
ID: "12345",
Group: "viewer",
},
SendEventRequest: &pb.SendEventRequest{
SentTime: &timestamp.Timestamp{Seconds: 1593574343},
},
SentTime: time.Unix(1593574343, 0),
}

t.Run("StartWorkers", func(t *testing.T) {
Expand Down