Skip to content
Queue is a generic interface to abstract the details of implementation of queue systems.
Go Makefile
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
amqp
memory Rename semaphore channel var name Oct 17, 2018
test Add advertised window support for memory queue Oct 16, 2018
.gitignore Ignore build/make files. Jun 19, 2018
.travis.yml travis: disable tip testing and shift go versions Sep 28, 2018
LICENSE CI and paperwork May 9, 2018
MAINTAINERS Add MAINTAINERS file Jun 13, 2018
Makefile build: update ci to v1 and use win pkg installer Sep 28, 2018
README.md
appveyor.yml appveyor: do not send coverage data Sep 28, 2018
common.go Improve description for Comply func Sep 25, 2018
example_test.go
job.go Doc improvements Sep 25, 2018
register.go
register_test.go

README.md

go-queue GoDoc Build Status Build status codecov.io Go Report Card

Queue is a generic interface to abstract the details of implementation of queue systems.

Similar to the package database/sql, this package implements a common interface to interact with different queue systems, in a unified way.

Currently, only AMQP queues and an in-memory queue are supported.

Installation

The recommended way to install go-queue is:

go get -u gopkg.in/src-d/go-queue.v1/...

Usage

This example shows how to publish and consume a Job from the in-memory implementation, very useful for unit tests.

The queue implementations to be supported by the NewBroker should be imported as shows the example.

import (
    ...
	"gopkg.in/src-d/go-queue.v1"
	_ "gopkg.in/src-d/go-queue.v1/memory"
)

...

b, _ := queue.NewBroker("memory://")
q, _ := b.Queue("test-queue")

j, _ := queue.NewJob()
if err := j.Encode("hello world!"); err != nil {
    log.Fatal(err)
}

if err := q.Publish(j); err != nil {
    log.Fatal(err)
}

iter, err := q.Consume(1)
if err != nil {
    log.Fatal(err)
}

consumedJob, _ := iter.Next()

var payload string
_ = consumedJob.Decode(&payload)

fmt.Println(payload)
// Output: hello world!

Configuration

AMQP

The list of available variables is:

  • AMQP_BACKOFF_MIN (default: 20ms): Minimum time to wait for retry the connection or queue channel assignment.
  • AMQP_BACKOFF_MAX (default: 30s): Maximum time to wait for retry the connection or queue channel assignment.
  • AMQP_BACKOFF_FACTOR (default: 2): Multiplying factor for each increment step on the retry.
  • AMQP_BURIED_QUEUE_SUFFIX (default: .buriedQueue): Suffix for the buried queue name.
  • AMQP_BURIED_EXCHANGE_SUFFIX (default: .buriedExchange): Suffix for the exchange name.
  • AMQP_BURIED_TIMEOUT (default: 500): Time in milliseconds to wait for new jobs from the buried queue.
  • AMQP_RETRIES_HEADER (default: x-retries): Message header to set the number of retries.
  • AMQP_ERROR_HEADER (default: x-error-type): Message header to set the error type.

License

Apache License Version 2.0, see LICENSE

You can’t perform that action at this time.