Skip to content

Commit

Permalink
perf(channel): performance improvements (#36)
Browse files Browse the repository at this point in the history
* perf(channel): fixed degradation issues
  • Loading branch information
ramey authored Jun 22, 2022
1 parent 0231b53 commit 2e34f07
Show file tree
Hide file tree
Showing 27 changed files with 141 additions and 147 deletions.
2 changes: 1 addition & 1 deletion app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices
logger.Info(fmt.Sprintf("Outstanding unprocessed events in the channel, data lost ~ (No batches %d * 5 events) = ~%d", len(bufferChannel), eventsInChannel))
metrics.Count("kafka_messages_delivered_total", eventsInChannel+eventsInProducer, "success=false")
logger.Info("Exiting server")
os.Exit(0)
cancel()
default:
logger.Info(fmt.Sprintf("[App.Server] Received a unexpected signal %s", sig))
}
Expand Down
8 changes: 0 additions & 8 deletions deserialization/deserializer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
package deserialization

type Deserializer interface {
Deserialize(b []byte, i interface{}) error
}

type DeserializeFunc func(b []byte, i interface{}) error

func (f DeserializeFunc) Deserialize(b []byte, i interface{}) error {
return f(b, i)
}
4 changes: 1 addition & 3 deletions deserialization/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package deserialization

import "encoding/json"

type JSONDeserializer struct{}

func (j *JSONDeserializer) Deserialize(b []byte, i interface{}) error {
func DeserializeJSON(b []byte, i interface{}) error {
return json.Unmarshal(b, i)
}
7 changes: 3 additions & 4 deletions deserialization/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ func TestJSONDeserializer_Deserialize(t *testing.T) {
}
tests := []struct {
name string
j *JSONDeserializer
j DeserializeFunc
args args
wantErr bool
}{
{
name: "Use JSON Deserializer",
j: &JSONDeserializer{},
j: DeserializeJSON,
args: args{
b: []byte(`{"A": "a"}`),
i: &struct {
Expand All @@ -27,8 +27,7 @@ func TestJSONDeserializer_Deserialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &JSONDeserializer{}
if err := j.Deserialize(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
if err := tt.j(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
t.Errorf("JSONDeserializer.Deserialize() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
4 changes: 2 additions & 2 deletions deserialization/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

var ErrInvalidProtoMessage = errors.New("invalld proto message")

type ProtoDeserilizer struct{}
// type ProtoDeserilizer struct{}

func (d *ProtoDeserilizer) Deserialize(b []byte, i interface{}) error {
func DeserializeProto(b []byte, i interface{}) error {
msg, ok := i.(proto.Message)
if !ok {
return ErrInvalidProtoMessage
Expand Down
7 changes: 3 additions & 4 deletions deserialization/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ func TestProtoDeserilizer_Deserialize(t *testing.T) {
}
tests := []struct {
name string
d *ProtoDeserilizer
d DeserializeFunc
args args
wantErr bool
}{
{
name: "Deserialize a proto message",
d: &ProtoDeserilizer{},
d: DeserializeProto,
args: args{
b: []byte{},
i: &pb.SendEventRequest{},
Expand All @@ -29,8 +29,7 @@ func TestProtoDeserilizer_Deserialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &ProtoDeserilizer{}
if err := d.Deserialize(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
if err := tt.d(tt.args.b, tt.args.i); (err != nil) != tt.wantErr {
t.Errorf("ProtoDeserilizer.Deserialize() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ go 1.14

require (
github.com/confluentinc/confluent-kafka-go v1.4.2 // indirect
github.com/golang/protobuf v1.5.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.4.2
github.com/sirupsen/logrus v1.6.0
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.7.0
golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3 // indirect
google.golang.org/grpc v1.41.0
google.golang.org/protobuf v1.27.1
google.golang.org/protobuf v1.28.0
gopkg.in/alexcesaro/statsd.v2 v2.0.0
gopkg.in/confluentinc/confluent-kafka-go.v1 v1.4.2
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/statsd.v2 v2.0.0 h1:FXkZSCZIH17vLCO5sO2UucTHsH9pc+17F6pl3JVCwMc=
gopkg.in/alexcesaro/statsd.v2 v2.0.0/go.mod h1:i0ubccKGzBVNBpdGV5MocxyA/XlLUJzA7SLonnE4drU=
Expand Down
24 changes: 11 additions & 13 deletions publisher/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ package publisher
import (
"encoding/json"
"fmt"
"strings"

"github.com/odpf/raccoon/collection"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

// Importing librd to make it work on vendor mode
"strings"
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka"

"github.com/odpf/raccoon/config"
"github.com/odpf/raccoon/logger"
"github.com/odpf/raccoon/metrics"
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka"
pb "github.com/odpf/raccoon/proto"
)

// KafkaProducer Produce data to kafka synchronously
type KafkaProducer interface {
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
ProduceBulk(request collection.CollectRequest, deliveryChannel chan kafka.Event) error
ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error
}

func NewKafka() (*Kafka, error) {
Expand Down Expand Up @@ -50,8 +49,7 @@ type Kafka struct {

// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
// DeliveryChannel needs to be exclusive. DeliveryChannel is exposed for recyclability purpose.
func (pr *Kafka) ProduceBulk(request collection.CollectRequest, deliveryChannel chan kafka.Event) error {
events := request.GetEvents()
func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChannel chan kafka.Event) error {
errors := make([]error, len(events))
totalProcessed := 0
for order, event := range events {
Expand All @@ -64,26 +62,26 @@ func (pr *Kafka) ProduceBulk(request collection.CollectRequest, deliveryChannel

err := pr.kp.Produce(message, deliveryChannel)
if err != nil {
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.GetType()))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type))
if err.Error() == "Local: Unknown topic" {
errors[order] = fmt.Errorf("%v %s", err, topic)
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, request.ConnectionIdentifier.Group))
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
} else {
errors[order] = err
}
continue
}
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, event.GetType()))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", connGroup, event.Type))
totalProcessed++
}
// Wait for deliveryChannel as many as processed
for i := 0; i < totalProcessed; i++ {
d := <-deliveryChannel
m := d.(*kafka.Message)
if m.TopicPartition.Error != nil {
eventType := events[i].GetType()
metrics.Decrement("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, eventType))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", request.ConnectionIdentifier.Group, eventType))
eventType := events[i].Type
metrics.Decrement("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", connGroup, eventType))
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, eventType))
order := m.Opaque.(int)
errors[order] = m.TopicPartition.Error
}
Expand Down
13 changes: 8 additions & 5 deletions publisher/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"os"
"testing"

"github.com/odpf/raccoon/collection"
"github.com/odpf/raccoon/logger"
pb "github.com/odpf/raccoon/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

const (
group1 = "group-1"
)

type void struct{}

func (v void) Write(_ []byte) (int, error) {
Expand Down Expand Up @@ -54,7 +57,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
})
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
assert.NoError(t, err)
})
})
Expand All @@ -78,7 +81,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
assert.Len(t, err.(BulkError).Errors, 3)
assert.Error(t, err.(BulkError).Errors[0])
assert.Empty(t, err.(BulkError).Errors[1])
Expand All @@ -90,7 +93,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
})
})
Expand All @@ -114,7 +117,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
}).Once()
kp := NewKafkaFromClient(client, 10, "%s")

err := kp.ProduceBulk(collection.CollectRequest{SendEventRequest: &pb.SendEventRequest{Events: []*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}}}, make(chan kafka.Event, 2))
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
assert.NotEmpty(t, err)
assert.Len(t, err.(BulkError).Errors, 2)
assert.Equal(t, "buffer full", err.(BulkError).Errors[0].Error())
Expand Down
4 changes: 1 addition & 3 deletions serialization/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package serialization

import "encoding/json"

type JSONSerializer struct{}

func (s *JSONSerializer) Serialize(m interface{}) ([]byte, error) {
func SerializeJSON(m interface{}) ([]byte, error) {
return json.Marshal(m)
}
7 changes: 3 additions & 4 deletions serialization/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func TestJSONSerializer_Serialize(t *testing.T) {
}
tests := []struct {
name string
s *JSONSerializer
s SerializeFunc
args args
want []byte
wantErr bool
}{
{
name: "Serialize JSON",
s: &JSONSerializer{},
s: SerializeJSON,
args: args{
m: &pb.SendEventRequest{},
},
Expand All @@ -31,8 +31,7 @@ func TestJSONSerializer_Serialize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &JSONSerializer{}
got, err := s.Serialize(tt.args.m)
got, err := tt.s(tt.args.m)
if (err != nil) != tt.wantErr {
t.Errorf("JSONSerializer.Serialize() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
13 changes: 0 additions & 13 deletions serialization/mock.go

This file was deleted.

4 changes: 1 addition & 3 deletions serialization/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ var (
ErrInvalidProtoMessage = errors.New("invalld proto message")
)

type ProtoSerilizer struct{}

func (p *ProtoSerilizer) Serialize(m interface{}) ([]byte, error) {
func SerializeProto(m interface{}) ([]byte, error) {
msg, ok := m.(proto.Message)
if !ok {
return nil, ErrInvalidProtoMessage
Expand Down
1 change: 0 additions & 1 deletion serialization/proto_test.go

This file was deleted.

4 changes: 1 addition & 3 deletions serialization/serializer.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package serialization

type Serializer interface {
Serialize(m interface{}) ([]byte, error)
}
type SerializeFunc func(m interface{}) ([]byte, error)
20 changes: 13 additions & 7 deletions services/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@ type Service struct {
s *grpc.Server
}

func (s Service) Init(ctx context.Context) error {
func NewGRPCService(c collection.Collector) *Service {
server := grpc.NewServer()
pb.RegisterEventServiceServer(server, &Handler{C: c})
return &Service{
s: server,
Collector: c,
}
}

func (s *Service) Init(context.Context) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", config.ServerGRPC.Port))
if err != nil {
return err
}
server := grpc.NewServer()
pb.RegisterEventServiceServer(server, &Handler{C: s.Collector})
s.s = server
return server.Serve(lis)
return s.s.Serve(lis)
}

func (s Service) Name() string {
func (*Service) Name() string {
return "GRPC"
}

func (s Service) Shutdown(ctx context.Context) error {
func (s *Service) Shutdown(context.Context) error {
s.s.GracefulStop()
return nil
}
17 changes: 11 additions & 6 deletions services/pprof/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pprof
import (
"context"
"net/http"

// enable pprof https://pkg.go.dev/net/http/pprof#pkg-overview
_ "net/http/pprof"
)
Expand All @@ -11,16 +12,20 @@ type Service struct {
s *http.Server
}

func (s Service) Init(ctx context.Context) error {
server := &http.Server{Addr: "localhost:6060", Handler: nil}
s.s = server
return server.ListenAndServe()
func NewPprofService() *Service {
return &Service{
s: &http.Server{Addr: "localhost:6060", Handler: nil},
}
}

func (s *Service) Init(context.Context) error {
return s.s.ListenAndServe()
}

func (s Service) Name() string {
func (*Service) Name() string {
return "pprof"
}

func (s Service) Shutdown(ctx context.Context) error {
func (s *Service) Shutdown(ctx context.Context) error {
return s.s.Shutdown(ctx)
}
Loading

0 comments on commit 2e34f07

Please sign in to comment.