Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
better job tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfy-j committed Jan 7, 2019
1 parent 2684579 commit c9df9db
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 110 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -5,4 +5,5 @@ vendor/
*.db
clover.xml
.DS_Store
tests/.rr-test.yml
tests/.rr-test.yml
go.sum
59 changes: 43 additions & 16 deletions .travis.yml
Expand Up @@ -8,28 +8,55 @@ go:
services:
- docker

before_install:
- cd tests
- docker-compose up -d
- cd ..
- go version
- sudo add-apt-repository -y ppa:ondrej/php && sudo apt-get update
- sudo apt-get install -y php7.1-cli php7.1-xml php7.1-xdebug php7.1-dom
- sudo cp `which php7.1` `which php`
- go get ./...
- go build -o rr tests/main.go

install:
- sudo cp `which php7.1` `which php`
- export GO111MODULE=on
- go mod download
- php -v
- composer self-update
- composer install --no-interaction --ignore-platform-reqs
- php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"
- php composer-setup.php
- php composer.phar install --no-interaction --prefer-source

before_script:
- ./rr serve -c tests/.rr.yaml &
- cd tests
- docker-compose up -d
- go build -o rr main.go
- ./rr serve -c .rr.yaml &
- cd ..

script:
- go test -v -race -cover -coverprofile=jobs.txt -covermode=atomic
- go test -v -race -cover ./cpool -coverprofile=cpool.txt -covermode=atomic
- go test -v -race -cover ./broker/local -coverprofile=local.txt -covermode=atomic
- go test -v -race -cover ./broker/beanstalk -coverprofile=beanstalk.txt -covermode=atomic
- vendor/bin/phpunit --coverage-clover=coverage.xml

after_success:
- bash <(curl -s https://codecov.io/bash) -f coverage.xml
- bash <(curl -s https://codecov.io/bash) -f jobs.txt
- bash <(curl -s https://codecov.io/bash) -f cpool.txt
- bash <(curl -s https://codecov.io/bash) -f local.txt
- bash <(curl -s https://codecov.io/bash) -f beanstalk.txt
- bash <(curl -s https://codecov.io/bash) -f coverage.xml

jobs:
include:
- stage: Test
env: "PHP=7.1"
before_install:
- sudo add-apt-repository -y ppa:ondrej/php
- sudo apt-get update
- sudo apt-get install -y php7.1-cli php7.1-xml php7.1-xdebug
- sudo cp `which php7.1` `which php`
- stage: Test
env: "PHP=7.2"
before_install:
- sudo add-apt-repository -y ppa:ondrej/php
- sudo apt-get update
- sudo apt-get install -y php7.2-cli php7.2-xml php7.2-xdebug
- sudo cp `which php7.2` `which php`
- stage: Test
env: "PHP=7.3"
before_install:
- sudo add-apt-repository -y ppa:ondrej/php
- sudo apt-get update
- sudo apt-get install -y php7.3-cli php7.3-xml php7.3-xdebug
- sudo cp `which php7.3` `which php`
18 changes: 18 additions & 0 deletions Makefile
@@ -0,0 +1,18 @@
all:
@./build.sh
build:
@./build.sh all
clean:
rm -rf protoc-gen-php-grpc
rm -rf rr-grpc
install: all
cp protoc-gen-php-grpc /usr/local/bin/protoc-gen-php-grpc
cp rr-grpc /usr/local/bin/rr-grpc
uninstall:
rm -f /usr/local/bin/protoc-gen-php-grpc
rm -f /usr/local/bin/rr-grpc
test:
go test -v -race -cover
go test -v -race -cover ./cpool
go test -v -race -cover ./broker/local
go test -v -race -cover ./broker/beanstalk
85 changes: 42 additions & 43 deletions broker/beanstalk/tube.go
Expand Up @@ -23,11 +23,13 @@ type tube struct {
cmdTimeout time.Duration
lsn func(event int, ctx interface{})
wait chan interface{}
waitTouch chan interface{}
wg sync.WaitGroup
execPool chan jobs.Handler
err jobs.ErrorHandler
mur sync.Mutex

// active operations
muw sync.RWMutex
wg sync.WaitGroup
execPool chan jobs.Handler
err jobs.ErrorHandler
mur sync.Mutex
}

// create new tube consumer and producer
Expand Down Expand Up @@ -57,14 +59,12 @@ func newTube(
func (t *tube) configure(execPool chan jobs.Handler, err jobs.ErrorHandler) error {
t.execPool = execPool
t.err = err

return nil
}

// run consumers
func (t *tube) serve(prefetch int) {
t.wait = make(chan interface{})
t.waitTouch = make(chan interface{})
atomic.StoreInt32(&t.active, 1)

fetchPool := make(chan interface{}, prefetch)
Expand Down Expand Up @@ -122,7 +122,6 @@ func (t *tube) stop() {

close(t.wait)
t.wg.Wait()
close(t.waitTouch)
}

// put data into pool or return error (no wait)
Expand All @@ -147,41 +146,7 @@ func (t *tube) put(data []byte, attempt int, delay, rrt time.Duration) (id strin
return jid(bid), err
}

// return tube stats
func (t *tube) stat() (stat *jobs.Stat, err error) {
t.wg.Add(1)
defer t.wg.Done()

conn, err := t.connPool.Allocate(t.cmdTimeout)
if err != nil {
return nil, err
}

t.mut.Lock()
t.tube.Conn = conn.(*beanstalk.Conn)
values, err := t.tube.Stats()
t.mut.Unlock()

t.connPool.Release(conn, wrapErr(err))

stat = &jobs.Stat{InternalName: t.tube.Name}

if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
stat.Queue = int64(v)
}

if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
stat.Active = int64(v)
}

if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
stat.Delayed = int64(v)
}

return stat, err
}

// consume job
// consume job todo: refactor
func (t *tube) consume(
conn *beanstalk.Conn,
fetchPool chan interface{},
Expand Down Expand Up @@ -235,6 +200,40 @@ func (t *tube) consume(
t.connPool.Release(conn, wrapErr(err))
}

// return tube stats
func (t *tube) stat() (stat *jobs.Stat, err error) {
t.wg.Add(1)
defer t.wg.Done()

conn, err := t.connPool.Allocate(t.cmdTimeout)
if err != nil {
return nil, err
}

t.mut.Lock()
t.tube.Conn = conn.(*beanstalk.Conn)
values, err := t.tube.Stats()
t.mut.Unlock()

t.connPool.Release(conn, wrapErr(err))

stat = &jobs.Stat{InternalName: t.tube.Name}

if v, err := strconv.Atoi(values["current-jobs-ready"]); err == nil {
stat.Queue = int64(v)
}

if v, err := strconv.Atoi(values["current-jobs-reserved"]); err == nil {
stat.Active = int64(v)
}

if v, err := strconv.Atoi(values["current-jobs-delayed"]); err == nil {
stat.Delayed = int64(v)
}

return stat, err
}

// throw handles service, server and pool events.
func (t *tube) throw(event int, ctx interface{}) {
t.lsn(event, ctx)
Expand Down
92 changes: 58 additions & 34 deletions broker/local/queue.go
Expand Up @@ -8,11 +8,20 @@ import (
)

type queue struct {
active int32
stat *jobs.Stat
jobs chan entry
wg sync.WaitGroup
wait chan interface{}
active int32
stat *jobs.Stat

// job pipeline
jobs chan *entry

// active operations
muw sync.Mutex
wg sync.WaitGroup

// stop channel
wait chan interface{}

// exec handlers
execPool chan jobs.Handler
err jobs.ErrorHandler
}
Expand All @@ -25,28 +34,10 @@ type entry struct {

// create new queue
func newQueue() *queue {
return &queue{stat: &jobs.Stat{}, jobs: make(chan entry)}
}

// add job to the queue
func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
if delay == 0 {
atomic.AddInt64(&q.stat.Queue, 1)
q.jobs <- entry{id: id, job: j}
return
}

atomic.AddInt64(&q.stat.Delayed, 1)

time.Sleep(delay)

atomic.AddInt64(&q.stat.Delayed, ^int64(0))
atomic.AddInt64(&q.stat.Queue, 1)

q.jobs <- entry{id: id, job: j, attempt: attempt}
return &queue{stat: &jobs.Stat{}, jobs: make(chan *entry)}
}

// associate queue with new exec pool
// associate queue with new consume pool
func (q *queue) configure(execPool chan jobs.Handler, err jobs.ErrorHandler) error {
q.execPool = execPool
q.err = err
Expand All @@ -60,15 +51,28 @@ func (q *queue) serve() {
atomic.StoreInt32(&q.active, 1)

for {
select {
case <-q.wait:
e := q.allocateEntry()
if e == nil {
return
case e := <-q.jobs:
q.wg.Add(1)
atomic.AddInt64(&q.stat.Active, 1)

go q.exec(e, <-q.execPool)
}

go q.consume(e, <-q.execPool)
}
}

// allocate one job entry
func (q *queue) allocateEntry() *entry {
q.muw.Lock()
defer q.muw.Unlock()

select {
case <-q.wait:
return nil
case e := <-q.jobs:
atomic.AddInt64(&q.stat.Active, 1)
q.wg.Add(1)

return e
}
}

Expand All @@ -81,11 +85,31 @@ func (q *queue) stop() {
atomic.StoreInt32(&q.active, 0)

close(q.wait)
q.muw.Lock()
q.wg.Wait()
q.muw.Unlock()
}

// add job to the queue
func (q *queue) push(id string, j *jobs.Job, attempt int, delay time.Duration) {
if delay == 0 {
atomic.AddInt64(&q.stat.Queue, 1)
q.jobs <- &entry{id: id, job: j}
return
}

atomic.AddInt64(&q.stat.Delayed, 1)

time.Sleep(delay)

atomic.AddInt64(&q.stat.Delayed, ^int64(0))
atomic.AddInt64(&q.stat.Queue, 1)

q.jobs <- &entry{id: id, job: j, attempt: attempt}
}

// exec singe job
func (q *queue) exec(e entry, handler jobs.Handler) {
// consume singe job
func (q *queue) consume(e *entry, handler jobs.Handler) {
defer atomic.AddInt64(&q.stat.Active, ^int64(0))
defer q.wg.Done()

Expand Down
6 changes: 3 additions & 3 deletions config_test.go
Expand Up @@ -64,7 +64,7 @@ func Test_Config_Hydrate_Pipelines(t *testing.T) {

assert.NoError(t, c.Hydrate(cfg))

assert.Equal(t, "local", c.Pipelines["some"].Broker())
assert.Equal(t, "default", c.Pipelines["some"].String("queue", ""))
assert.Equal(t, "another-default", c.Pipelines["some"].String("another", "another-default"))
assert.Equal(t, "local", c.Pipelines.Get("some").Broker())
assert.Equal(t, "default", c.Pipelines.Get("some").String("queue", ""))
assert.Equal(t, "another-default", c.Pipelines.Get("some").String("another", "another-default"))
}

0 comments on commit c9df9db

Please sign in to comment.