An easy-to-use, distributed, extensible task/job queue framework for #golang
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
amqp
docs
redis
.gitignore
.travis.yml
LICENSE
README.md
backend.go interface change Dec 30, 2015
backend_local.go
backend_local_test.go
backend_test_suite.go
bench_test.go
bridge.go
bridge_local.go
bridge_local_test.go
bridge_remote.go
bridge_remote_test.go
bridge_test.go
broker.go
broker_local.go
broker_local_test.go
broker_test_suite.go
codec_json_safe.go
config.go
dingo.go
dingo_amqp_redis_test.go improve converge Dec 27, 2015
dingo_amqp_test.go
dingo_api_test.go
dingo_local_test.go rename Dec 29, 2015
dingo_multi_test.go
dingo_redis_test.go
dingo_single_test.go
err.go
err_test.go
event.go
example_test.go
header.go
header_test.go
id_maker.go
id_maker_test.go
invoker.go 'dingo' consumes 'transport' Dec 21, 2015
invoker_generic.go
invoker_generic_test.go
invoker_lazy.go
invoker_lazy_test.go
invoker_test.go
mapper.go
mapper_test.go
marshaller.go
marshaller_custom.go
marshaller_gob.go
marshaller_json.go
marshaller_test.go
mgr.go
mgr_test.go
mux.go
mux_test.go
object.go
option.go
report.go
report_test.go
result.go
result_test.go
routine.go
routine_test.go
task.go
task_test.go
worker.go
worker_test.go

README.md

dingo

GoDoc Build Status Coverage Status

I initiated this project after machinery, which is a great library and tends to provide a replacement of Celery in #golang. The reasons to create (yet) another task library are:

  • To make sending tasks as easy as possible
  • Await and receive reports through channels. (channel is a natural way to represent asynchronous results)
  • I want to get familiar with those concepts of #golang: interface, routine, channel, and a distributed task framework is a good topic for practice, :)

One important concept I learned from Celery and inherited in Dingo is that Caller and Worker could share the same codebase.

When you send a task message in Celery, that message will not contain any source code, but only the name of the task you want to execute. This works similarly to how host names work on the internet: every worker maintains a mapping of task names to their actual functions, called the task registry.

Below is a quicklink to go through this README:

Quick Demo

Here is a quick demo of this project in local mode as a background job pool:

package main

import (
	"fmt"
	"github.com/mission-liao/dingo"
)

func main() {
	// initiate a local app
	app, err := dingo.NewApp("local", nil)
	if err != nil {
		return
	}
	// register a worker function
	err = app.Register("add", func(a int, b int) int {
		return a + b
	})
	if err != nil {
		return
	}

	// allocate workers for that function: 2 workers, sharing 1 report channel.
	_, err = app.Allocate("add", 2, 1)

	// wrap the report channel with dingo.Result
	result := dingo.NewResult(app.Call("add", dingo.DefaultOption(), 2, 3))
	err = result.Wait(0)
	if err != nil {
		return
	}
    // set callback like promise in javascript
	result.OnOK(func(sum int) {
		fmt.Printf("result is: %v\n", sum)
	})

	// release resource
	err = app.Close()
	if err != nil {
		return
	}
}

Features

Invoking Worker Functions with Arbitary Signatures

(Almost) ANY Function Can Be Your Dingo

These functions can be used as worker functions by dingo:

type Person struct {
  ID int
  Name string
}
func NewEmployee(p *Person, age int) (failed bool) { ... } // struct, OK
func GetEmployees(age int) (employees map[string]*Person) { ... } // map of struct, OK
func DeleteEmployees(names []string) (count int) { ... } // slice, OK
func DoNothing () { ... } // OK

Idealy, you don't have to rewrite your function to fit any specific signature, it's piece of cake to adapt a function to Dingo.

Below is to explain why some types can't be supported by Dingo: The most compatible exchange format is []byte, to marshall in/out your parameters to []byte, we rely these builtin encoders:

  • encoding/json
  • encoding/gob

Type info are deduced from the signatures of worker functions you register. With those type info, parameters are unmarshalled from []byte to cloeset type. A type correction procedure would be applied on those parameters before invoking.

Obviously, it's hard (not impossible) to handle all types in #golang, these are unsupported by Dingo as far as I know:

  • interface: unmarshalling requires concrete types. (so error can be marshalled, but can't be un-marshalled)
  • chan: haven't tried yet
  • private field in struct: they are ignore by json/gob, but it's still possible to support them by providing customized marshaller and invoker. (please search 'ExampleCustomMarshaller' for details)

Stateful Worker Functions

Dingo Remembers things

Wanna create a worker function with states? Two ways to did this in Dingo:

  • The reflect package allow us to invoke a method of struct, so you can initiate an instance of your struct to hold every global and provide its method as worker functions.
  • create a closure to enclose states or globals

Refer to Stateful Worker Functions for more details.

Two Way Binding with Worker Functions

Throwing and Catching with Your Dingo

Besides sending arguments, return values from worker functions can also be accessed. Every time you initiate a task, you will get a report channel.

reports, err := app.Call("yourTask", nil, arg1, arg2 ...)

// synchronous waiting
r := <-reports

// asynchronous waiting
go func () {
  r := <-reports
}()

// access the return values
if r.OK() {
  var ret []interface{} = r.Return()
  ret[0].(int) // by type assertion
}

Or using:

A Distributed Task Framework with Local Mode

Dingo @Home, or Anywhere

You would prefer a small, local worker pool at early development stage, and transfer to a distributed one when stepping in production. In dingo, there is nothing much to do for transfering (besides debugging, :( )

You've seen a demo for local mode, and it's easy to make it distributed by attaching corresponding components at caller-side and worker-side. A demo: caller and worker.

In short, at Caller side, you need to:

  • register worker functions for tasks
  • config default-option, id-maker, marshaller for tasks if needed.
  • attach Producer, Store

And at Worker side, you need to:

  • register the same worker function as the one on Caller side for tasks
  • config marshaller for tasks if needed, the marshaller used for Caller and Worker should be sync.
  • attach Consumer (or NamedConsumer), Reporter
  • allocate worker routines

Customizable

Personalize Your Dingo

Many core behaviors can be customized:

Development Environment Setup

There is no dependency manager in this project, you need to install them by yourself.

go get github.com/streadway/amqp
go get github.com/garyburd/redigo/redis
go get github.com/stretchr/testify
go get github.com/satori/go.uuid

Install Redis and Rabbitmq, then unittest @ the root folder of dingo

go test -v ./...