Permalink
Browse files

some improvement

  • Loading branch information...
1 parent 80642de commit 13ec655603063e1c0640239fa7e5bdc8c4870188 @kicool committed Apr 20, 2012
Showing with 26 additions and 15 deletions.
  1. +26 −15 lb/loadbalancer.go
View
@@ -2,30 +2,36 @@ package main
import (
"container/heap"
- "fmt"
+ "log"
"math/rand"
"time"
)
var (
nWorker = 4
nBufferedReq = 50
- workFn = func() (r int) {
- r = rand.Int()
- fmt.Println("workFn", r)
+ workFn = func(req *Request) (r Result) {
+ r = Result{rand.Int(), req.id}
+ log.Println("workFn", r.reqId, ":", r.value)
return
}
)
+type Result struct {
+ value int
+ reqId int
+}
+
type Request struct {
- fn func() int // The operation to perform
- r chan int // The channel to return the result
+ fn func(*Request) Result // The operation to perform
+ r chan Result // The channel to return the result
+ id int
}
-func requester(work chan<- Request) {
- r := make(chan int)
+func requester(work chan<- Request, id int) {
+ r := make(chan Result)
for {
- work <- Request{workFn, r}
+ work <- Request{workFn, r, id}
result := <-r
furtherProcess(result)
// fake load
@@ -34,11 +40,12 @@ func requester(work chan<- Request) {
}
func furtherProcess(r interface{}) {
- result := r.(int)
- fmt.Println("furtherProcess", result)
+ result := r.(Result)
+ log.Println("Processing", result.reqId, ":", result.value)
}
type Worker struct {
+ id int
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
index int // index in the heap
@@ -47,7 +54,8 @@ type Worker struct {
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests
- req.r <- req.fn()
+ log.Println("Working", w.id)
+ req.r <- req.fn(&req)
done <- w
}
}
@@ -125,17 +133,18 @@ func (b *Balancer) completed(w *Worker) {
func main() {
// Init
- fmt.Println("Init...")
+ log.Println("Init...")
// Worker Pool
workerpool := make(Pool, nWorker)
for i := 0; i < nWorker; i++ {
workerpool[i] = new(Worker)
+ workerpool[i].id = i
workerpool[i].requests = make(chan Request, nBufferedReq/nWorker)
workerpool[i].pending = 0
workerpool[i].index = -1
}
- fmt.Println("Worker Pool:", len(workerpool))
+ log.Println("Worker Pool:", len(workerpool))
// Balancer
balancer := Balancer{workerpool, make(chan *Worker)}
@@ -148,7 +157,9 @@ func main() {
workPipe := make(chan Request, nBufferedReq)
// Fake requester
- go requester(workPipe)
+ for i := 0; i < 100; i++ {
+ go requester(workPipe, i)
+ }
// GO! GO! GO!
balancer.balance(workPipe)

0 comments on commit 13ec655

Please sign in to comment.