Skip to content

Commit

Permalink
refactor: rename collection to collector (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravisuhag committed Jun 18, 2024
1 parent cd0fe92 commit a9f57c7
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 43 deletions.
6 changes: 3 additions & 3 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

pubsubsdk "cloud.google.com/go/pubsub"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
Expand All @@ -30,7 +30,7 @@ type Publisher interface {

// StartServer starts the server
func StartServer(ctx context.Context, cancel context.CancelFunc) {
bufferChannel := make(chan collection.CollectRequest, config.Worker.ChannelSize)
bufferChannel := make(chan collector.CollectRequest, config.Worker.ChannelSize)
httpServices := services.Create(bufferChannel)
logger.Info("Start Server -->")
httpServices.Start(ctx, cancel)
Expand All @@ -50,7 +50,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
go shutDownServer(ctx, cancel, httpServices, bufferChannel, workerPool, publisher)
}

func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest, workerPool *worker.Pool, pub Publisher) {
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collector.CollectRequest, workerPool *worker.Pool, pub Publisher) {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
for {
Expand Down
2 changes: 1 addition & 1 deletion collection/collector.go → collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion collection/mock.go → collector/mock.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion collection/service.go → collector/service.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion collection/service_test.go → collector/service_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package collection
package collector

import (
"reflect"
Expand Down
8 changes: 4 additions & 4 deletions services/grpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/identification"
"github.com/raystack/raccoon/logger"
Expand All @@ -15,7 +15,7 @@ import (
)

type Handler struct {
C collection.Collector
C collector.Collector
pb.UnimplementedEventServiceServer
}

Expand Down Expand Up @@ -49,7 +49,7 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb.
h.sendEventCounters(req.Events, identifier.Group)

responseChannel := make(chan *pb.SendEventResponse, 1)
h.C.Collect(ctx, &collection.CollectRequest{
h.C.Collect(ctx, &collector.CollectRequest{
ConnectionIdentifier: identifier,
TimeConsumed: timeConsumed,
SendEventRequest: req,
Expand All @@ -59,7 +59,7 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb.

}

func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collection.AckFunc {
func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collector.AckFunc {
switch config.Event.Ack {
case config.Asynchronous:
responseChannel <- &pb.SendEventResponse{
Expand Down
6 changes: 3 additions & 3 deletions services/grpc/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"reflect"
"testing"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
Expand All @@ -23,7 +23,7 @@ func (v void) Write(_ []byte) (int, error) {

func TestHandler_SendEvent(t *testing.T) {
type fields struct {
C collection.Collector
C collector.Collector
UnimplementedEventServiceServer pb.UnimplementedEventServiceServer
}
type args struct {
Expand All @@ -33,7 +33,7 @@ func TestHandler_SendEvent(t *testing.T) {

logger.SetOutput(void{})
metrics.SetVoid()
collector := new(collection.MockCollector)
collector := new(collector.MockCollector)
ctx := context.Background()
meta := metadata.MD{}
meta.Set(config.ServerWs.ConnGroupHeader, "group")
Expand Down
6 changes: 3 additions & 3 deletions services/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ import (
"fmt"
"net"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
pb "github.com/raystack/raccoon/proto"
"google.golang.org/grpc"
)

type Service struct {
Collector collection.Collector
Collector collector.Collector
s *grpc.Server
}

func NewGRPCService(c collection.Collector) *Service {
func NewGRPCService(c collector.Collector) *Service {
server := grpc.NewServer()
pb.RegisterEventServiceServer(server, &Handler{C: c})
return &Service{
Expand Down
10 changes: 5 additions & 5 deletions services/rest/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/deserialization"
"github.com/raystack/raccoon/identification"
Expand All @@ -28,10 +28,10 @@ type serDe struct {
}
type Handler struct {
serDeMap map[string]*serDe
collector collection.Collector
collector collector.Collector
}

func NewHandler(collector collection.Collector) *Handler {
func NewHandler(collector collector.Collector) *Handler {
serDeMap := make(map[string]*serDe)
serDeMap[ContentJSON] = &serDe{
serializer: serialization.SerializeJSON,
Expand Down Expand Up @@ -128,7 +128,7 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) {
h.sendEventCounters(req.Events, identifier.Group)

resChannel := make(chan struct{}, 1)
h.collector.Collect(r.Context(), &collection.CollectRequest{
h.collector.Collect(r.Context(), &collector.CollectRequest{
ConnectionIdentifier: identifier,
TimeConsumed: timeConsumed,
SendEventRequest: req,
Expand All @@ -137,7 +137,7 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) {
<-resChannel
}

func (h *Handler) Ack(rw http.ResponseWriter, resChannel chan struct{}, s serialization.SerializeFunc, reqGuid string, connGroup string) collection.AckFunc {
func (h *Handler) Ack(rw http.ResponseWriter, resChannel chan struct{}, s serialization.SerializeFunc, reqGuid string, connGroup string) collector.AckFunc {
res := &Response{
SendEventResponse: &pb.SendEventResponse{},
}
Expand Down
6 changes: 3 additions & 3 deletions services/rest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/metrics"
"github.com/raystack/raccoon/middleware"
Expand All @@ -15,11 +15,11 @@ import (
)

type Service struct {
Collector collection.Collector
Collector collector.Collector
s *http.Server
}

func NewRestService(c collection.Collector) *Service {
func NewRestService(c collector.Collector) *Service {
pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn)
wh := websocket.NewHandler(pingChannel, c)
go websocket.Pinger(pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval)
Expand Down
10 changes: 5 additions & 5 deletions services/rest/websocket/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/config"
"github.com/raystack/raccoon/deserialization"
"github.com/raystack/raccoon/logger"
Expand All @@ -23,7 +23,7 @@ type serDe struct {
type Handler struct {
upgrader *connection.Upgrader
serdeMap map[int]*serDe
collector collection.Collector
collector collector.Collector
PingChannel chan connection.Conn
}

Expand All @@ -41,7 +41,7 @@ func getSerDeMap() map[int]*serDe {
return serDeMap
}

func NewHandler(pingC chan connection.Conn, collector collection.Collector) *Handler {
func NewHandler(pingC chan connection.Conn, collector collector.Collector) *Handler {
ugConfig := connection.UpgraderConfig{
ReadBufferSize: config.ServerWs.ReadBufferSize,
WriteBufferSize: config.ServerWs.WriteBufferSize,
Expand Down Expand Up @@ -115,7 +115,7 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
metrics.Increment("batches_read_total", map[string]string{"status": "success", "conn_group": conn.Identifier.Group, "reason": "NA"})
h.sendEventCounters(payload.Events, conn.Identifier.Group)

h.collector.Collect(r.Context(), &collection.CollectRequest{
h.collector.Collect(r.Context(), &collector.CollectRequest{
ConnectionIdentifier: conn.Identifier,
TimeConsumed: timeConsumed,
SendEventRequest: payload,
Expand All @@ -124,7 +124,7 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) {
}
}

func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collection.AckFunc {
func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collector.AckFunc {
switch config.Event.Ack {
case config.Asynchronous:
writeSuccessResponse(conn, s, messageType, reqGuid)
Expand Down
6 changes: 3 additions & 3 deletions services/rest/websocket/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
pb "github.com/raystack/raccoon/proto"
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestNewHandler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewHandler(tt.args.pingC, &collection.MockCollector{}); !reflect.DeepEqual(got, tt.want) {
if got := NewHandler(tt.args.pingC, &collector.MockCollector{}); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewHandler() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestHandler_GETHandlerWSEvents(t *testing.T) {
}

func getRouter(hlr *Handler) http.Handler {
collector := new(collection.MockCollector)
collector := new(collector.MockCollector)
collector.On("Collect", mock.Anything, mock.Anything).Return(nil)
router := mux.NewRouter()
subRouter := router.PathPrefix("/api/v1").Subrouter()
Expand Down
6 changes: 3 additions & 3 deletions services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"net/http"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/services/grpc"
"github.com/raystack/raccoon/services/pprof"
Expand Down Expand Up @@ -43,8 +43,8 @@ func (s *Services) Shutdown(ctx context.Context) {
}
}

func Create(b chan collection.CollectRequest) Services {
c := collection.NewChannelCollector(b)
func Create(b chan collector.CollectRequest) Services {
c := collector.NewChannelCollector(b)
return Services{
b: []bootstrapper{
grpc.NewGRPCService(c),
Expand Down
6 changes: 3 additions & 3 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/logger"
"github.com/raystack/raccoon/metrics"
pb "github.com/raystack/raccoon/proto"
Expand All @@ -25,13 +25,13 @@ type Producer interface {
type Pool struct {
Size int
deliveryChannelSize int
EventsChannel <-chan collection.CollectRequest
EventsChannel <-chan collector.CollectRequest
producer Producer
wg sync.WaitGroup
}

// CreateWorkerPool create new Pool struct given size and EventsChannel worker.
func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, producer Producer) *Pool {
func CreateWorkerPool(size int, eventsChannel <-chan collector.CollectRequest, deliveryChannelSize int, producer Producer) *Pool {
return &Pool{
Size: size,
deliveryChannelSize: deliveryChannelSize,
Expand Down
8 changes: 4 additions & 4 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"
"time"

"github.com/raystack/raccoon/collection"
"github.com/raystack/raccoon/collector"
"github.com/raystack/raccoon/identification"
pb "github.com/raystack/raccoon/proto"
"github.com/stretchr/testify/assert"
Expand All @@ -14,7 +14,7 @@ import (
)

func TestWorker(t *testing.T) {
request := &collection.CollectRequest{
request := &collector.CollectRequest{
ConnectionIdentifier: identification.Identifier{
ID: "12345",
Group: "viewer",
Expand All @@ -31,7 +31,7 @@ func TestWorker(t *testing.T) {
m.On("Timing", "processing.latency", mock.Anything, "")
m.On("Count", "kafka_messages_delivered_total", 0, "success=true")
m.On("Count", "kafka_messages_delivered_total", 0, "success=false")
bc := make(chan collection.CollectRequest, 2)
bc := make(chan collector.CollectRequest, 2)
worker := Pool{
Size: 1,
deliveryChannelSize: 0,
Expand All @@ -54,7 +54,7 @@ func TestWorker(t *testing.T) {
t.Run("Flush", func(t *testing.T) {
t.Run("Should block until all messages is processed", func(t *testing.T) {
kp := mockKafkaPublisher{}
bc := make(chan collection.CollectRequest, 2)
bc := make(chan collector.CollectRequest, 2)
m := &mockMetric{}
m.On("Timing", "processing.latency", mock.Anything, "")
m.On("Count", "kafka_messages_delivered_total", 0, "success=false")
Expand Down

0 comments on commit a9f57c7

Please sign in to comment.