Skip to content

Commit

Permalink
feat: add collector based design (#17)
Browse files Browse the repository at this point in the history
* feat: raccoon http+grpc

* test: completing coding and unit tests

* fix: resolved merge conflicts and changed integration tests

* refactor: addressed review comments

* ci: changing protoc version

* ci: fixing protoc compilation in docker  build

* test: fixing tests

* ci: fixing github build and integration tests

* refactor: Removed pkg directory
combined EventsBatch and CollectRequest
  • Loading branch information
ramey committed Nov 23, 2021
1 parent c5515f8 commit 193ba4d
Show file tree
Hide file tree
Showing 58 changed files with 1,852 additions and 386 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=60000
SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=5000
SERVER_WEBSOCKET_PINGER_SIZE=1

SERVER_GRPC_PORT=8081

WORKER_BUFFER_CHANNEL_SIZE=5
WORKER_BUFFER_FLUSH_TIMEOUT_MS=5000
WORKER_POOL_SIZE=5
Expand Down
2 changes: 2 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ SERVER_WEBSOCKET_PONG_WAIT_INTERVAL_MS=10000
SERVER_WEBSOCKET_WRITE_WAIT_INTERVAL_MS=1000
SERVER_WEBSOCKET_PINGER_SIZE=1

SERVER_GRPC_PORT=8081

WORKER_BUFFER_CHANNEL_SIZE=5
WORKER_BUFFER_FLUSH_TIMEOUT_MS=5000
WORKER_POOL_SIZE=5
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/integration-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
run: make docker-run
- run: make install-protoc && make generate-proto
- name: Invoking go test
run: INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=ws://localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" go test ./integration -v
run: INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" GRPC_SERVER_ADDR="localhost:8081" go test ./integration -v
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ coverage
*.idea/
clickstream-service
raccoon
websocket/proto/*.pb.go
pkg/proto/*.pb.go
proto/*.pb.go
.temp
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ FROM golang:1.14

WORKDIR /app
RUN apt-get update && apt-get install unzip --no-install-recommends --assume-yes
RUN PROTOC_ZIP=protoc-3.14.0-linux-x86_64.zip && \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.14.0/$PROTOC_ZIP && \
RUN PROTOC_ZIP=protoc-3.17.3-linux-x86_64.zip && \
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.17.3/$PROTOC_ZIP && \
unzip -o $PROTOC_ZIP -d /usr/local bin/protoc && \
unzip -o $PROTOC_ZIP -d /usr/local 'include/*' && \
rm -f $PROTOC_ZIP
Expand Down
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ install-protoc:
@echo "> installing dependencies"
go get -u github.com/golang/protobuf/proto@v1.4.3
go get -u github.com/golang/protobuf/protoc-gen-go@v1.4.3
go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1

update-deps:
go mod tidy -v
Expand All @@ -22,12 +23,13 @@ update-deps:
copy-config:
cp .env.sample .env

PROTO_PACKAGE=/websocket/proto
PROTO_PACKAGE=/proto
generate-proto:
rm -rf .temp
mkdir -p .temp
curl -o .temp/proton.tar.gz -L http://api.github.com/repos/odpf/proton/tarball/main; tar xvf .temp/proton.tar.gz -C .temp/ --strip-components 1
protoc --proto_path=.temp/ .temp/odpf/raccoon/*.proto --go_out=./ --go_opt=paths=import --go_opt=Modpf/raccoon/Event.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventRequest.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventResponse.proto=$(PROTO_PACKAGE)
protoc --proto_path=.temp/ .temp/odpf/raccoon/Event.proto .temp/odpf/raccoon/EventRequest.proto .temp/odpf/raccoon/EventResponse.proto --go_out=./ --go_opt=paths=import --go_opt=Modpf/raccoon/Event.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventRequest.proto=$(PROTO_PACKAGE) --go_opt=Modpf/raccoon/EventResponse.proto=$(PROTO_PACKAGE)
protoc --proto_path=.temp/ .temp/odpf/raccoon/*.proto --go-grpc_opt=paths=import --go-grpc_opt=Modpf/raccoon/Event.proto=$(PROTO_PACKAGE) --go-grpc_opt=Modpf/raccoon/EventRequest.proto=$(PROTO_PACKAGE) --go-grpc_opt=Modpf/raccoon/EventResponse.proto=$(PROTO_PACKAGE) --go-grpc_opt=Modpf/raccoon/EventService.proto=$(PROTO_PACKAGE) --go-grpc_out=./

# Build Lifecycle
compile:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ $ make test
# Running integration tests
$ cp .env.test .env
$ make docker-run
$ INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=ws://localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" go test ./integration -v
$ INTEGTEST_BOOTSTRAP_SERVER=localhost:9094 INTEGTEST_HOST=ws://localhost:8080 INTEGTEST_TOPIC_FORMAT="clickstream-%s-log" GRPC_SERVER_ADDR="localhost:8081" go test ./integration -v
```

## Contribute
Expand Down
16 changes: 9 additions & 7 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ package app
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"raccoon/collection"
"raccoon/config"
raccoonhttp "raccoon/http"
"raccoon/logger"
"raccoon/metrics"
"raccoon/publisher"
ws "raccoon/websocket"
"raccoon/worker"
"syscall"
)

// StartServer starts the server
func StartServer(ctx context.Context, cancel context.CancelFunc) {
wssServer, bufferChannel := ws.CreateServer()
bufferChannel := make(chan *collection.CollectRequest)
httpserver := raccoonhttp.CreateServer(bufferChannel)
logger.Info("Start Server -->")
wssServer.StartHTTPServer(ctx, cancel)
httpserver.StartServers(ctx, cancel)
logger.Info("Start publisher -->")
kPublisher, err := publisher.NewKafka()
if err != nil {
Expand All @@ -32,18 +33,19 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) {
workerPool := worker.CreateWorkerPool(config.Worker.WorkersPoolSize, bufferChannel, config.Worker.DeliveryChannelSize, kPublisher)
workerPool.StartWorkers()
go kPublisher.ReportStats()
go shutDownServer(ctx, cancel, wssServer.HTTPServer, bufferChannel, workerPool, kPublisher)
go shutDownServer(ctx, cancel, httpserver, bufferChannel, workerPool, kPublisher)
}

func shutDownServer(ctx context.Context, cancel context.CancelFunc, wssServer *http.Server, bufferChannel chan ws.EventsBatch, workerPool *worker.Pool, kp *publisher.Kafka) {
func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServer *raccoonhttp.Servers, bufferChannel chan *collection.CollectRequest, workerPool *worker.Pool, kp *publisher.Kafka) {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
for {
sig := <-signalChan
switch sig {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
logger.Info(fmt.Sprintf("[App.Server] Received a signal %s", sig))
wssServer.Shutdown(ctx)
httpServer.HTTPServer.Shutdown(ctx)
httpServer.GRPCServer.GracefulStop()
logger.Info("Server shutdown all the listeners")
timedOut := workerPool.FlushWithTimeOut(config.Worker.WorkerFlushTimeout)
if timedOut {
Expand Down
20 changes: 20 additions & 0 deletions collection/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package collection

import (
"context"
"time"

"raccoon/identification"
pb "raccoon/proto"
)

type CollectRequest struct {
ConnectionIdentifier *identification.Identifier
TimeConsumed time.Time
TimePushed time.Time
*pb.EventRequest
}

type Collector interface {
Collect(ctx context.Context, req *CollectRequest) error
}
16 changes: 16 additions & 0 deletions collection/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package collection

import (
"context"

"github.com/stretchr/testify/mock"
)

type MockCollector struct {
mock.Mock
}

func (m *MockCollector) Collect(ctx context.Context, req *CollectRequest) error {
args := m.Called(ctx, req)
return args.Error(0)
}
20 changes: 20 additions & 0 deletions collection/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package collection

import (
"context"
"time"
)

type CollectFunction func(ctx context.Context, req *CollectRequest) error

func (c CollectFunction) Collect(ctx context.Context, req *CollectRequest) error {
return c(ctx, req)
}

func NewChannelCollector(c chan *CollectRequest) Collector {
return CollectFunction(func(ctx context.Context, req *CollectRequest) error {
req.TimePushed = time.Now()
c <- req
return nil
})
}
36 changes: 36 additions & 0 deletions collection/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package collection

import (
"context"
"reflect"
"testing"
)

func TestNewChannelCollector(t *testing.T) {
type args struct {
c chan *CollectRequest
}
c := make(chan *CollectRequest)
tests := []struct {
name string
args args
want Collector
}{
{
name: "Creating collector",
args: args{
c: c,
},
want: CollectFunction(func(ctx context.Context, req *CollectRequest) error {
return nil
}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewChannelCollector(tt.args.c); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("NewChannelCollector() = %v, want %v", got, tt.want)
}
})
}
}
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Load() {
logConfigLoader()
publisherKafkaConfigLoader()
serverWsConfigLoader()
serverGRPCConfigLoader()
workerConfigLoader()
metricStatsdConfigLoader()
eventDistributionConfigLoader()
Expand Down
6 changes: 6 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func TestServerConfig(t *testing.T) {
assert.Equal(t, time.Duration(1)*time.Millisecond, ServerWs.PongWaitInterval)
}

func TestGRPCServerConfig(t *testing.T) {
os.Setenv("SERVER_GRPC_PORT", "8081")
serverGRPCConfigLoader()
assert.Equal(t, "8081", ServerGRPC.Port)
}

func TestDynamicConfigLoad(t *testing.T) {
os.Setenv("PUBLISHER_KAFKA_CLIENT_RANDOM", "anything")
os.Setenv("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS", "localhost:9092")
Expand Down
13 changes: 13 additions & 0 deletions config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
)

var ServerWs serverWs
var ServerGRPC serverGRPC

type serverWs struct {
AppPort string
Expand All @@ -24,6 +25,10 @@ type serverWs struct {
ConnGroupDefault string
}

type serverGRPC struct {
Port string
}

func serverWsConfigLoader() {
viper.SetDefault("SERVER_WEBSOCKET_PORT", "8080")
viper.SetDefault("SERVER_WEBSOCKET_MAX_CONN", 30000)
Expand Down Expand Up @@ -52,3 +57,11 @@ func serverWsConfigLoader() {
ConnGroupDefault: util.MustGetString("SERVER_WEBSOCKET_CONN_GROUP_DEFAULT"),
}
}

func serverGRPCConfigLoader() {

viper.SetDefault("SERVER_GRPC_PORT", "8081")
ServerGRPC = serverGRPC{
Port: util.MustGetString("SERVER_GRPC_PORT"),
}
}
11 changes: 11 additions & 0 deletions deserialization/deserializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package deserialization

type Deserializer interface {
Deserialize(b []byte, i interface{}) error
}

type DeserializeFunc func(b []byte, i interface{}) error

func (f DeserializeFunc) Deserialize(b []byte, i interface{}) error {
return f(b, i)
}
9 changes: 9 additions & 0 deletions deserialization/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package deserialization

import "encoding/json"

func JSONDeserializer() Deserializer {
return DeserializeFunc(func(b []byte, i interface{}) error {
return json.Unmarshal(b, i)
})
}
27 changes: 27 additions & 0 deletions deserialization/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package deserialization

import (
"reflect"
"testing"
)

func TestJSONDeserializer(t *testing.T) {
tests := []struct {
name string
want Deserializer
}{
{
name: "Creating new JSON Deserializer",
want: DeserializeFunc(func(b []byte, i interface{}) error {
return nil
}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := JSONDeserializer(); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("JSONDeserializer() = %v, want %v", got, tt.want)
}
})
}
}
20 changes: 20 additions & 0 deletions deserialization/proto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package deserialization

import (
"errors"

"google.golang.org/protobuf/proto"
)

var ErrInvalidProtoMessage = errors.New("invalld proto message")

func ProtoDeserilizer() Deserializer {
return DeserializeFunc(func(b []byte, i interface{}) error {
msg, ok := i.(proto.Message)
if !ok {
return ErrInvalidProtoMessage
}
return proto.Unmarshal(b, msg)
})

}
27 changes: 27 additions & 0 deletions deserialization/proto_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package deserialization

import (
"reflect"
"testing"
)

func TestProtoDeserilizer(t *testing.T) {
tests := []struct {
name string
want Deserializer
}{
{
name: "Create new proto Deserializer",
want: DeserializeFunc(func(b []byte, i interface{}) error {
return nil
}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ProtoDeserilizer(); reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
t.Errorf("ProtoDeserilizer() = %v, want %v", got, tt.want)
}
})
}
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ services:
- kafka
ports:
- "8080:8080"
- "8081:8081"
networks:
- cs-network
2 changes: 1 addition & 1 deletion docs/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"fmt"
"net/http"
pb "raccoon/websocket/proto"
pb "raccoon/proto"
"time"

"github.com/gorilla/websocket"
Expand Down
Loading

0 comments on commit 193ba4d

Please sign in to comment.