Skip to content

Commit

Permalink
travis: Fix build
Browse files Browse the repository at this point in the history
* fix travis configuration
* run all tests inside a container
  • Loading branch information
dtheodor committed Sep 20, 2018
1 parent 7f63f6d commit 9be4ea8
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 109 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.vimrc
kafka.json
*dump.rdb
librdkafka.json
rafka
test/dump.rdb
test/.bundle
vendor
29 changes: 15 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
branches:
only:
- master
- debian/stretch
language: go
go:
- 1.10.x
- "1.11.x"
services:
- docker
- docker
before_install:
- git clone https://github.com/skroutz/kafka-cluster-testbed.git && cd kafka-cluster-testbed
&& docker-compose up --build -d && cd $TRAVIS_BUILD_DIR
- wget -qO - http://packages.confluent.io/deb/3.3/archive.key | sudo apt-key add -
- sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/4.1 stable
main"
- sudo apt-get -y update && sudo apt-get install -y librdkafka-dev
- curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
install:
- dep ensure -v
- cp test/kafka.test.json kafka.json
- cd test && docker-compose up --build -d && cd $TRAVIS_BUILD_DIR
- curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
- dep ensure -v
before_script:
- git clone https://github.com/skroutz/kafka-cluster-testbed.git kafkaclustertestbed
- cd kafkaclustertestbed
- docker-compose up --no-start zoo1 zoo2 zoo3 kc1.docker kc2.docker
- docker-compose start
- cd $TRAVIS_BUILD_DIR
script:
- make
- make test
notifications:
slack:
secure: KDA5GK4A6P3rBWlS+UpU5jVTXKWlbljEB9cpdkX23geCPZXuhYKsr50wXPACN0cCLwH+v3LPyfBS7UGCP1I9OjK0/7ersOc+laQl9R75oNTxrlNgVsi9y23cNtBHmBpqFUAYNsXH7Why4+AdF6n/PnlOTFgUgiUwL5X8CIIYmRdOWCsQVCv7ZV1JzGUx7E3fXRr5QIWlqh7/xTGcQoyuKr11Rb/H4Q1hIA5OPgmecfjeMCsnXTv73OqFYoqEj5Kk2koRPFw7Z3G4QecIPdkhApA+M037gjZWCzXXiDysfgESDtE3XAgj4rMNnUMwTH8C68ftH1LtGd5eBwp98wmtj4mMJKue0RQrgxBqoxsyHpZJJ2dSERh78zy+G6guzm7EXkb8hy+OMJr1MZhWZ1FjLpZxQdKVF0cOgGvn+C0qgna8418kfZRBqosK2aHFPW2FjFMEOK0FkCNSE3g8uiobS0plZMTu7Cwu3uI95nmJ0x+05w7nFZM9CkPa2ZE1rcneKDJmNuoexP3TlCxX1FY8MXjC+XIVGGNSiA/tgRD+uivrZicY6NLMfy7WFLez1nBZPDGer0Uj1SCgw0M0wh3vKfwqZQuzhuHVgDNHKvPKdnswut3tuG7Mx83H9XNrm7OVCseDskFkRg+aUcquTWF2gPsJQW+FkvJ3gBUMOZqUaM4=
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@
[[constraint]]
name = "github.com/go-redis/redis"
version = "6.5.3"

[[constraint]]
branch = "master"
name = "github.com/agis/spawn"
33 changes: 28 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
.PHONY: install build test lint fmt clean
.PHONY: install dep build test teste2e testunit lint fmt clean run-rafka run-rafka-local testunit-local teste2e-local

install: fmt test
default: fmt install test

install:
go install -v

build: fmt test
dep:
dep ensure -v

build: fmt
go build -v

test:
testunit-local:
go test -race

teste2e-local:
cd test && bundle install --frozen && ./end-to-end -v

lint:
golint

fmt:
! gofmt -d -e -s *.go 2>&1 | tee /dev/tty | read
test -z `go fmt 2>&1`

clean:
go clean

run-rafka-local: build
./rafka -c test/librdkafka.test.json

run-rafka: dep
docker-compose -f test/docker-compose.yml up --no-start --build
docker-compose -f test/docker-compose.yml start

testunit: run-rafka
docker-compose -f test/docker-compose.yml exec rafka make testunit-local

teste2e: run-rafka
docker-compose -f test/docker-compose.yml exec rafka make teste2e-local

test: run-rafka
docker-compose -f test/docker-compose.yml exec rafka make testunit-local teste2e-local
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Getting Started
```
3. Run it:
```shell
$ rafka -k kafka.json.sample
$ rafka -c librdkafka.json.sample
[rafka] 2017/06/26 11:07:23 Spawning Consumer Manager (librdkafka 0.11.0)...
[server] 2017/06/26 11:07:23 Listening on 0.0.0.0:6380
```
Expand Down
File renamed without changes.
37 changes: 17 additions & 20 deletions rafka.go → main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ func main() {
Value: 6380,
},
cli.StringFlag{
Name: "kafka, k",
Usage: "Load librdkafka configuration from `FILE`",
Value: "kafka.json",
Name: "config, c",
Usage: "Path to librdkafka configuration file",
Value: "librdkafka.json",
},
}

app.Before = func(c *cli.Context) error {
if c.String("kafka") == "" {
return cli.NewExitError("No librdkafka configuration provided!", 1)
if c.String("config") == "" {
return cli.NewExitError("No librdkafka configuration provided", 1)
}

f, err := os.Open(c.String("kafka"))
f, err := os.Open(c.String("config"))
if err != nil {
return err
}
Expand Down Expand Up @@ -165,43 +165,40 @@ func main() {
}

func run(c *cli.Context) {
l := log.New(os.Stderr, "[rafka] ", log.Ldate|log.Ltime)
var serverWg, managerWg sync.WaitGroup
logger := log.New(os.Stderr, "[rafka] ", log.Ldate|log.Ltime)
ctx := context.Background()
serverCtx, serverCancel := context.WithCancel(ctx)
managerCtx, managerCancel := context.WithCancel(ctx)

signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)

ctx := context.Background()

_, rdkafkaVer := rdkafka.LibraryVersion()
l.Printf("Spawning Consumer Manager (librdkafka %s) | config: %v...", rdkafkaVer, cfg)
var managerWg sync.WaitGroup
managerCtx, managerCancel := context.WithCancel(ctx)
manager := NewConsumerManager(managerCtx, cfg)
logger.Printf("Spawning Consumer Manager (librdkafka %s) | config: %v...", rdkafkaVer, cfg)

manager := NewConsumerManager(managerCtx, cfg)
managerWg.Add(1)
go func() {
defer managerWg.Done()
manager.Run()
}()

var serverWg sync.WaitGroup
serverCtx, serverCancel := context.WithCancel(ctx)
rafka := NewServer(serverCtx, manager, 5*time.Second)

server := NewServer(manager, 5*time.Second)
serverWg.Add(1)
go func() {
defer serverWg.Done()
err := rafka.ListenAndServe(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port))
err := server.ListenAndServe(serverCtx, fmt.Sprintf("%s:%d", cfg.Host, cfg.Port))
if err != nil {
log.Fatal(err)
}

}()

<-shutdown
l.Println("Received shutdown signal. Shutting down...")
logger.Println("Received shutdown signal. Shutting down...")
serverCancel()
serverWg.Wait()
managerCancel()
managerWg.Wait()
l.Println("All components shut down. Bye!")
logger.Println("All components shut down. Bye!")
}
31 changes: 19 additions & 12 deletions rafka_test.go → main_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"log"
"os"
Expand All @@ -11,19 +12,20 @@ import (
"testing"
"time"

"github.com/agis/spawn"
rdkafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/go-redis/redis"
)

func TestMain(m *testing.M) {
cfg.Port = 6382
cmd := spawn.New(main, "-p", "6383", "-c", "test/librdkafka.test.json")

var wg sync.WaitGroup
wg.Add(1)
go func() {
main()
wg.Done()
}()
// start rafka
ctx, cancel := context.WithCancel(context.Background())
err := cmd.Start(ctx)
if err != nil {
log.Fatal(err)
}

c := newClient("wait:for-rafka")
serverReady := false
Expand All @@ -38,8 +40,13 @@ func TestMain(m *testing.M) {
}

result := m.Run()
shutdown <- os.Interrupt
wg.Wait()

cancel()
err = cmd.Wait()
if err != nil {
log.Fatal(err)
}

os.Exit(result)
}

Expand Down Expand Up @@ -114,7 +121,7 @@ func TestConsumerOffsetCommit(t *testing.T) {

// RPUSHX
func TestProduceErr(t *testing.T) {
c := newClient("some:producer")
c := newClient("some:producer:" + t.Name())

_, err := c.RPushX("invalid-arg", "a msg").Result()
if err == nil {
Expand All @@ -128,7 +135,7 @@ func TestProduceErr(t *testing.T) {
}

func TestProduceWithKey(t *testing.T) {
c := newClient("some:producer")
c := newClient("some:producer:" + t.Name())

_, err := c.RPushX("topic:foo:bar", "a msg").Result()
if err != nil {
Expand Down Expand Up @@ -261,7 +268,7 @@ func TestParseConfig(t *testing.T) {
func newClient(id string) *redis.Client {
return redis.NewClient(&redis.Options{
// TODO Add the ability to read host and port from a cfg file
Addr: fmt.Sprintf("%s:%d", "localhost", 6382),
Addr: fmt.Sprintf("%s:%d", "localhost", 6383),
OnConnect: setName(id)})
}

Expand Down
35 changes: 18 additions & 17 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,25 @@ import (
)

type Server struct {
log *log.Logger
manager *ConsumerManager
ctx context.Context // TODO(agis): make this a function param
inFlight sync.WaitGroup // TODO(agis): make this a local var
timeout time.Duration
log *log.Logger
manager *ConsumerManager

// clientByID contains the currently connected clients to the server.
// default timeout for consumer poll
timeout time.Duration

// currently connected clients
clientByID sync.Map // map[string]*Client
}

func NewServer(ctx context.Context, manager *ConsumerManager, timeout time.Duration) *Server {
func NewServer(manager *ConsumerManager, timeout time.Duration) *Server {
return &Server{
ctx: ctx,
manager: manager,
timeout: timeout,
log: log.New(os.Stderr, "[server] ", log.Ldate|log.Ltime),
}
}

func (s *Server) handleConn(conn net.Conn) {
func (s *Server) Handle(ctx context.Context, conn net.Conn) {
c := NewClient(conn, s.manager)
defer c.Close()

Expand Down Expand Up @@ -110,7 +109,7 @@ func (s *Server) handleConn(conn net.Conn) {
ConsLoop:
for {
select {
case <-s.ctx.Done():
case <-ctx.Done():
writeErr = writer.WriteError("CONS Server shutdown")
break ConsLoop
case <-ticker.C:
Expand Down Expand Up @@ -286,15 +285,17 @@ func (s *Server) handleConn(conn net.Conn) {
}
}

func (s *Server) ListenAndServe(hostport string) error {
func (s *Server) ListenAndServe(ctx context.Context, hostport string) error {
var inflightWg sync.WaitGroup

listener, err := net.Listen("tcp", hostport)
if err != nil {
return err
}
s.log.Print("Listening on " + hostport)

go func() {
<-s.ctx.Done() // unblock Accept()
<-ctx.Done() // unblock Accept()
listener.Close()

closeFunc := func(id, client interface{}) bool {
Expand All @@ -317,7 +318,7 @@ func (s *Server) ListenAndServe(hostport string) error {
Loop:
for {
select {
case <-s.ctx.Done():
case <-ctx.Done():
break Loop
default:
conn, err := listener.Accept()
Expand All @@ -328,17 +329,17 @@ Loop:
s.log.Println("Accept error: ", err)
}
} else {
s.inFlight.Add(1)
inflightWg.Add(1)
go func() {
defer s.inFlight.Done()
s.handleConn(conn)
defer inflightWg.Done()
s.Handle(ctx, conn)
}()
}
}
}

s.log.Println("Waiting for in-flight connections...")
s.inFlight.Wait()
inflightWg.Wait()
s.log.Println("Bye")
return nil
}
Expand Down

0 comments on commit 9be4ea8

Please sign in to comment.