diff --git a/README.org b/README.org new file mode 100644 index 0000000..2274160 --- /dev/null +++ b/README.org @@ -0,0 +1,3 @@ +This is the full sourcecode for the book, "Concurrency in Go" published by O'Reilly. + +For errata and more information, please refer to the book's hub at [[http://katherine.cox-buday.com/concurrency-in-go]]. diff --git a/an-introduction-to-concurrency/why-is-concurrency-hard/deadlocks-livelocks-and-starvation/livelock/fig-livelock-hallway.go b/an-introduction-to-concurrency/why-is-concurrency-hard/deadlocks-livelocks-and-starvation/livelock/fig-livelock-hallway.go new file mode 100644 index 0000000..168471d --- /dev/null +++ b/an-introduction-to-concurrency/why-is-concurrency-hard/deadlocks-livelocks-and-starvation/livelock/fig-livelock-hallway.go @@ -0,0 +1,59 @@ +package main + +import ( + "bytes" + "fmt" + "sync" + "sync/atomic" + "time" +) + +func main() { + cadence := sync.NewCond(&sync.Mutex{}) + go func() { + for range time.Tick(1 * time.Millisecond) { + cadence.Broadcast() + } + }() + + takeStep := func() { + cadence.L.Lock() + cadence.Wait() + cadence.L.Unlock() + } + + tryDir := func(dirName string, dir *int32, out *bytes.Buffer) bool { // <1> + fmt.Fprintf(out, " %v", dirName) + atomic.AddInt32(dir, 1) // <2> + takeStep() // <3> + if atomic.LoadInt32(dir) == 1 { + fmt.Fprint(out, ". Success!") + return true + } + takeStep() + atomic.AddInt32(dir, -1) // <4> + return false + } + + var left, right int32 + tryLeft := func(out *bytes.Buffer) bool { return tryDir("left", &left, out) } + tryRight := func(out *bytes.Buffer) bool { return tryDir("right", &right, out) } + walk := func(walking *sync.WaitGroup, name string) { + var out bytes.Buffer + defer func() { fmt.Println(out.String()) }() + defer walking.Done() + fmt.Fprintf(&out, "%v is trying to scoot:", name) + for i := 0; i < 5; i++ { // <1> + if tryLeft(&out) || tryRight(&out) { // <2> + return + } + } + fmt.Fprintf(&out, "\n%v tosses her hands up in exasperation!", name) + } + + var peopleInHallway sync.WaitGroup // <3> + peopleInHallway.Add(2) + go walk(&peopleInHallway, "Alice") + go walk(&peopleInHallway, "Barbara") + peopleInHallway.Wait() +} diff --git a/an-introduction-to-concurrency/why-is-concurrency-hard/deadlocks-livelocks-and-starvation/starvation/fig-example.go b/an-introduction-to-concurrency/why-is-concurrency-hard/deadlocks-livelocks-and-starvation/starvation/fig-example.go new file mode 100644 index 0000000..a28e1b2 --- /dev/null +++ b/an-introduction-to-concurrency/why-is-concurrency-hard/deadlocks-livelocks-and-starvation/starvation/fig-example.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +func main() { + var wg sync.WaitGroup + var sharedLock sync.Mutex + const runtime = 1 * time.Second + + greedyWorker := func() { + defer wg.Done() + + var count int + for begin := time.Now(); time.Since(begin) <= runtime; { + sharedLock.Lock() + time.Sleep(3 * time.Nanosecond) + sharedLock.Unlock() + count++ + } + + fmt.Printf("Greedy worker was able to execute %v work loops\n", count) + } + + politeWorker := func() { + defer wg.Done() + + var count int + for begin := time.Now(); time.Since(begin) <= runtime; { + sharedLock.Lock() + time.Sleep(1 * time.Nanosecond) + sharedLock.Unlock() + + sharedLock.Lock() + time.Sleep(1 * time.Nanosecond) + sharedLock.Unlock() + + sharedLock.Lock() + time.Sleep(1 * time.Nanosecond) + sharedLock.Unlock() + + count++ + } + + fmt.Printf("Polite worker was able to execute %v work loops.\n", count) + } + + wg.Add(2) + go greedyWorker() + go politeWorker() + + wg.Wait() +} diff --git a/an-introduction-to-concurrency/why-is-concurrency-hard/memory-access-synchronization/fig-basic-memory-access-sync.go b/an-introduction-to-concurrency/why-is-concurrency-hard/memory-access-synchronization/fig-basic-memory-access-sync.go new file mode 100644 index 0000000..e5eea11 --- /dev/null +++ b/an-introduction-to-concurrency/why-is-concurrency-hard/memory-access-synchronization/fig-basic-memory-access-sync.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var memoryAccess sync.Mutex // <1> + var value int + go func() { + memoryAccess.Lock() // <2> + value++ + memoryAccess.Unlock() // <3> + }() + + memoryAccess.Lock() // <4> + if value == 0 { + fmt.Printf("the value is %v.\n", value) + } else { + fmt.Printf("the value is %v.\n", value) + } + memoryAccess.Unlock() // <5> +} diff --git a/an-introduction-to-concurrency/why-is-concurrency-hard/race-conditions/fig-basic-race-condition.go b/an-introduction-to-concurrency/why-is-concurrency-hard/race-conditions/fig-basic-race-condition.go new file mode 100644 index 0000000..6b5d712 --- /dev/null +++ b/an-introduction-to-concurrency/why-is-concurrency-hard/race-conditions/fig-basic-race-condition.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" +) + +func main() { + var data int + go func() { // <1> + data++ + }() + if data == 0 { + fmt.Printf("the value is %v.\n", data) + } +} diff --git a/concurrency-at-scale/error-propagation/fig-error-propagation-corrected.go b/concurrency-at-scale/error-propagation/fig-error-propagation-corrected.go new file mode 100644 index 0000000..a2fe754 --- /dev/null +++ b/concurrency-at-scale/error-propagation/fig-error-propagation-corrected.go @@ -0,0 +1,85 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/exec" + "runtime/debug" +) + +type MyError struct { + Inner error + Message string + StackTrace string + Misc map[string]interface{} +} + +func wrapError(err error, messagef string, msgArgs ...interface{}) MyError { + return MyError{ + Inner: err, //<1> + Message: fmt.Sprintf(messagef, msgArgs...), + StackTrace: string(debug.Stack()), // <2> + Misc: make(map[string]interface{}), // <3> + } +} + +func (err MyError) Error() string { + return err.Message +} + +// "lowlevel" module + +type LowLevelErr struct { + error +} + +func isGloballyExec(path string) (bool, error) { + info, err := os.Stat(path) + if err != nil { + return false, LowLevelErr{(wrapError(err, err.Error()))} // <1> + } + return info.Mode().Perm()&0100 == 0100, nil +} + +// "intermediate" module + +type IntermediateErr struct { + error +} + +func runJob(id string) error { + const jobBinPath = "/bad/job/binary" + isExecutable, err := isGloballyExec(jobBinPath) + if err != nil { + return IntermediateErr{wrapError( + err, + "cannot run job %q: requisite binaries not available", + id, + )} // <1> + } else if isExecutable == false { + return wrapError(nil, "cannot run job %q: requisite binaries are not executable", id) + } + + return exec.Command(jobBinPath, "--id="+id).Run() +} + +func handleError(key int, err error, message string) { + log.SetPrefix(fmt.Sprintf("[logID: %v]: ", key)) + log.Printf("%#v", err) + fmt.Printf("[%v] %v", key, message) +} + +func main() { + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + err := runJob("1") + if err != nil { + msg := "There was an unexpected issue; please report this as a bug." + if _, ok := err.(IntermediateErr); ok { + msg = err.Error() + } + handleError(1, err, msg) + } +} diff --git a/concurrency-at-scale/error-propagation/fig-error-propagation.go b/concurrency-at-scale/error-propagation/fig-error-propagation.go new file mode 100644 index 0000000..8de97d7 --- /dev/null +++ b/concurrency-at-scale/error-propagation/fig-error-propagation.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/exec" + "runtime/debug" +) + +type MyError struct { + Inner error + Message string + StackTrace string + Misc map[string]interface{} +} + +func wrapError(err error, messagef string, msgArgs ...interface{}) MyError { + return MyError{ + Inner: err, //<1> + Message: fmt.Sprintf(messagef, msgArgs...), + StackTrace: string(debug.Stack()), // <2> + Misc: make(map[string]interface{}), // <3> + } +} + +func (err MyError) Error() string { + return err.Message +} + +// "lowlevel" module + +type LowLevelErr struct { + error +} + +func isGloballyExec(path string) (bool, error) { + info, err := os.Stat(path) + if err != nil { + return false, LowLevelErr{(wrapError(err, err.Error()))} // <1> + } + return info.Mode().Perm()&0100 == 0100, nil +} + +// "intermediate" module + +type IntermediateErr struct { + error +} + +func runJob(id string) error { + const jobBinPath = "/bad/job/binary" + isExecutable, err := isGloballyExec(jobBinPath) + if err != nil { + return err // <1> + } else if isExecutable == false { + return wrapError(nil, "job binary is not executable") + } + + return exec.Command(jobBinPath, "--id="+id).Run() // <1> +} + +func handleError(key int, err error, message string) { + log.SetPrefix(fmt.Sprintf("[logID: %v]: ", key)) + log.Printf("%#v", err) // <3> + fmt.Printf("[%v] %v", key, message) +} + +func main() { + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + err := runJob("1") + if err != nil { + msg := "There was an unexpected issue; please report this as a bug." + if _, ok := err.(IntermediateErr); ok { // <1> + msg = err.Error() + } + handleError(1, err, msg) // <2> + } +} diff --git a/concurrency-at-scale/healing-unhealthy-goroutines/fig-example-steward-usage.go b/concurrency-at-scale/healing-unhealthy-goroutines/fig-example-steward-usage.go new file mode 100644 index 0000000..dcd2cd9 --- /dev/null +++ b/concurrency-at-scale/healing-unhealthy-goroutines/fig-example-steward-usage.go @@ -0,0 +1,110 @@ +package main + +import ( + "log" + "os" + "time" +) + +func main() { + var or func(channels ...<-chan interface{}) <-chan interface{} + or = func(channels ...<-chan interface{}) <-chan interface{} { // <1> + switch len(channels) { + case 0: // <2> + return nil + case 1: // <3> + return channels[0] + } + + orDone := make(chan interface{}) + go func() { // <4> + defer close(orDone) + + switch len(channels) { + case 2: // <5> + select { + case <-channels[0]: + case <-channels[1]: + } + default: // <6> + select { + case <-channels[0]: + case <-channels[1]: + case <-channels[2]: + case <-or(append(channels[3:], orDone)...): // <6> + } + } + }() + return orDone + } + type startGoroutineFn func( + done <-chan interface{}, + pulseInterval time.Duration, + ) (heartbeat <-chan interface{}) // <1> + + newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { // <2> + return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} { + heartbeat := make(chan interface{}) + go func() { + defer close(heartbeat) + + var wardDone chan interface{} + var wardHeartbeat <-chan interface{} + startWard := func() { // <3> + wardDone = make(chan interface{}) // <4> + wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) // <5> + } + startWard() + pulse := time.Tick(pulseInterval) + + monitorLoop: + for { + timeoutSignal := time.After(timeout) + + for { // <6> + select { + case <-pulse: + select { + case heartbeat <- struct{}{}: + default: + } + case <-wardHeartbeat: // <7> + continue monitorLoop + case <-timeoutSignal: // <8> + log.Println("steward: ward unhealthy; restarting") + close(wardDone) + startWard() + continue monitorLoop + case <-done: + return + } + } + } + }() + + return heartbeat + } + } + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} { + log.Println("ward: Hello, I'm irresponsible!") + go func() { + <-done // <1> + log.Println("ward: I am halting.") + }() + return nil + } + doWorkWithSteward := newSteward(4*time.Second, doWork) // <2> + + done := make(chan interface{}) + time.AfterFunc(9*time.Second, func() { // <3> + log.Println("main: halting steward and ward.") + close(done) + }) + + for range doWorkWithSteward(done, 4*time.Second) { + } //<4> + log.Println("Done") +} diff --git a/concurrency-at-scale/healing-unhealthy-goroutines/fig-more-complicated-ward.go b/concurrency-at-scale/healing-unhealthy-goroutines/fig-more-complicated-ward.go new file mode 100644 index 0000000..07cad43 --- /dev/null +++ b/concurrency-at-scale/healing-unhealthy-goroutines/fig-more-complicated-ward.go @@ -0,0 +1,194 @@ +package main + +import ( + "fmt" + "log" + "os" + "time" +) + +func main() { + var or func(channels ...<-chan interface{}) <-chan interface{} + or = func(channels ...<-chan interface{}) <-chan interface{} { // <1> + switch len(channels) { + case 0: // <2> + return nil + case 1: // <3> + return channels[0] + } + + orDone := make(chan interface{}) + go func() { // <4> + defer close(orDone) + + switch len(channels) { + case 2: // <5> + select { + case <-channels[0]: + case <-channels[1]: + } + default: // <6> + select { + case <-channels[0]: + case <-channels[1]: + case <-channels[2]: + case <-or(append(channels[3:], orDone)...): // <6> + } + } + }() + return orDone + } + type startGoroutineFn func( + done <-chan interface{}, + pulseInterval time.Duration, + ) (heartbeat <-chan interface{}) // <1> + + newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { // <2> + return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} { + heartbeat := make(chan interface{}) + go func() { + defer close(heartbeat) + + var wardDone chan interface{} + var wardHeartbeat <-chan interface{} + startWard := func() { // <3> + wardDone = make(chan interface{}) // <4> + wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) // <5> + } + startWard() + pulse := time.Tick(pulseInterval) + + monitorLoop: + for { + timeoutSignal := time.After(timeout) + + for { // <6> + select { + case <-pulse: + select { + case heartbeat <- struct{}{}: + default: + } + case <-wardHeartbeat: // <7> + continue monitorLoop + case <-timeoutSignal: // <8> + log.Println("steward: ward unhealthy; restarting") + close(wardDone) + startWard() + continue monitorLoop + case <-done: + return + } + } + } + }() + + return heartbeat + } + } + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + //bridge := func( + // done <-chan interface{}, + // chanStream <-chan <-chan interface{}, + //) <-chan interface{} { + // valStream := make(chan interface{}) // <1> + // go func() { + // defer close(valStream) + // for { // <2> + // var stream <-chan interface{} + // select { + // case maybeStream, ok := <-chanStream: + // if ok == false { + // return + // } + // stream = maybeStream + // case <-done: + // return + // } + // for val := range orDone(done, stream) { // <3> + // select { + // case valStream <- val: + // case <-done: + // } + // } + // } + // }() + // return valStream + //} + doWorkFn := func( + done <-chan interface{}, + intList ...int, + ) (startGoroutineFn, <-chan interface{}) { // <1> + intChanStream := make(chan (<-chan interface{})) // <2> + intStream := bridge(done, intChanStream) + doWork := func( + done <-chan interface{}, + pulseInterval time.Duration, + ) <-chan interface{} { // <3> + intStream := make(chan interface{}) // <4> + heartbeat := make(chan interface{}) + go func() { + defer close(intStream) + select { + case intChanStream <- intStream: // <5> + case <-done: + return + } + + pulse := time.Tick(pulseInterval) + + for { + valueLoop: + for _, intVal := range intList { + if intVal < 0 { + log.Printf("negative value: %v\n", intVal) // <6> + return + } + + for { + select { + case <-pulse: + select { + case heartbeat <- struct{}{}: + default: + } + case intStream <- intVal: + continue valueLoop + case <-done: + return + } + } + } + } + }() + return heartbeat + } + return doWork, intStream + } + log.SetFlags(log.Ltime | log.LUTC) + log.SetOutput(os.Stdout) + + done := make(chan interface{}) + defer close(done) + + doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) // <1> + doWorkWithSteward := newSteward(1*time.Millisecond, doWork) // <2> + doWorkWithSteward(done, 1*time.Hour) // <3> + + for intVal := range take(done, intStream, 6) { // <4> + fmt.Printf("Received: %v\n", intVal) + } +} diff --git a/concurrency-at-scale/heartbeats/bad_concurrent_test.go b/concurrency-at-scale/heartbeats/bad_concurrent_test.go new file mode 100644 index 0000000..8609544 --- /dev/null +++ b/concurrency-at-scale/heartbeats/bad_concurrent_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "testing" + "time" +) + +func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) { + heartbeat := make(chan interface{}, 1) + intStream := make(chan int) + go func() { + defer close(heartbeat) + defer close(intStream) + + time.Sleep(2 * time.Second) // <1> + + for _, n := range nums { + select { + case heartbeat <- struct{}{}: + default: + } + + select { + case <-done: + return + case intStream <- n: + } + } + }() + + return heartbeat, intStream +} +func TestDoWork_GeneratesAllNumbers(t *testing.T) { + done := make(chan interface{}) + defer close(done) + + intSlice := []int{0, 1, 2, 3, 5} + _, results := DoWork(done, intSlice...) + + for i, expected := range intSlice { + select { + case r := <-results: + if r != expected { + t.Errorf("index %v: expected %v, but received %v,", i, expected, r) + } + case <-time.After(1 * time.Second): // <1> + t.Fatal("test timed out") + } + } +} diff --git a/concurrency-at-scale/heartbeats/concurrent_test.go b/concurrency-at-scale/heartbeats/concurrent_test.go new file mode 100644 index 0000000..2fa9256 --- /dev/null +++ b/concurrency-at-scale/heartbeats/concurrent_test.go @@ -0,0 +1,50 @@ +package main + +import ( + "testing" + "time" +) + +func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) { + heartbeat := make(chan interface{}, 1) + intStream := make(chan int) + go func() { + defer close(heartbeat) + defer close(intStream) + + time.Sleep(2 * time.Second) // <1> + + for _, n := range nums { + select { + case heartbeat <- struct{}{}: + default: + } + + select { + case <-done: + return + case intStream <- n: + } + } + }() + + return heartbeat, intStream +} + +func TestDoWork_GeneratesAllNumbers(t *testing.T) { + done := make(chan interface{}) + defer close(done) + + intSlice := []int{0, 1, 2, 3, 5} + heartbeat, results := DoWork(done, intSlice...) + + <-heartbeat // <1> + + i := 0 + for r := range results { + if expected := intSlice[i]; r != expected { + t.Errorf("index %v: expected %v, but received %v,", i, expected, r) + } + i++ + } +} diff --git a/concurrency-at-scale/heartbeats/dowork_test.go b/concurrency-at-scale/heartbeats/dowork_test.go new file mode 100644 index 0000000..b2da6e3 --- /dev/null +++ b/concurrency-at-scale/heartbeats/dowork_test.go @@ -0,0 +1,32 @@ +package main + +import ( + "testing" + "time" +) + +func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) { + heartbeat := make(chan interface{}, 1) + intStream := make(chan int) + go func() { + defer close(heartbeat) + defer close(intStream) + + time.Sleep(2 * time.Second) // <1> + + for _, n := range nums { + select { + case heartbeat <- struct{}{}: + default: + } + + select { + case <-done: + return + case intStream <- n: + } + } + }() + + return heartbeat, intStream +} diff --git a/concurrency-at-scale/heartbeats/fig-interval-heartbeat-misbehaving-goroutine.go b/concurrency-at-scale/heartbeats/fig-interval-heartbeat-misbehaving-goroutine.go new file mode 100644 index 0000000..2c02f3e --- /dev/null +++ b/concurrency-at-scale/heartbeats/fig-interval-heartbeat-misbehaving-goroutine.go @@ -0,0 +1,72 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + doWork := func( + done <-chan interface{}, + pulseInterval time.Duration, + ) (<-chan interface{}, <-chan time.Time) { + heartbeat := make(chan interface{}) + results := make(chan time.Time) + go func() { + pulse := time.Tick(pulseInterval) + workGen := time.Tick(2 * pulseInterval) + + sendPulse := func() { + select { + case heartbeat <- struct{}{}: + default: + } + } + sendResult := func(r time.Time) { + for { + select { + case <-pulse: + sendPulse() + case results <- r: + return + } + } + } + + for i := 0; i < 2; i++ { // <1> + select { + case <-done: + return + case <-pulse: + sendPulse() + case r := <-workGen: + sendResult(r) + } + } + }() + return heartbeat, results + } + + done := make(chan interface{}) + time.AfterFunc(10*time.Second, func() { close(done) }) + + const timeout = 2 * time.Second + heartbeat, results := doWork(done, timeout/2) + for { + select { + case _, ok := <-heartbeat: + if ok == false { + return + } + fmt.Println("pulse") + case r, ok := <-results: + if ok == false { + return + } + fmt.Printf("results %v\n", r) + case <-time.After(timeout): + fmt.Println("worker goroutine is not healthy!") + return + } + } +} diff --git a/concurrency-at-scale/heartbeats/fig-interval-heartbeat.go b/concurrency-at-scale/heartbeats/fig-interval-heartbeat.go new file mode 100644 index 0000000..f8280a8 --- /dev/null +++ b/concurrency-at-scale/heartbeats/fig-interval-heartbeat.go @@ -0,0 +1,75 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + doWork := func( + done <-chan interface{}, + pulseInterval time.Duration, + ) (<-chan interface{}, <-chan time.Time) { + heartbeat := make(chan interface{}) // <1> + results := make(chan time.Time) + go func() { + defer close(heartbeat) + defer close(results) + + pulse := time.Tick(pulseInterval) // <2> + workGen := time.Tick(2 * pulseInterval) // <3> + + sendPulse := func() { + select { + case heartbeat <- struct{}{}: + default: // <4> + } + } + sendResult := func(r time.Time) { + for { + select { + case <-done: + return + case <-pulse: // <5> + sendPulse() + case results <- r: + return + } + } + } + + for { + select { + case <-done: + return + case <-pulse: // <5> + sendPulse() + case r := <-workGen: + sendResult(r) + } + } + }() + return heartbeat, results + } + done := make(chan interface{}) + time.AfterFunc(10*time.Second, func() { close(done) }) // <1> + + const timeout = 2 * time.Second // <2> + heartbeat, results := doWork(done, timeout/2) // <3> + for { + select { + case _, ok := <-heartbeat: // <4> + if ok == false { + return + } + fmt.Println("pulse") + case r, ok := <-results: // <5> + if ok == false { + return + } + fmt.Printf("results %v\n", r.Second()) + case <-time.After(timeout): // <6> + return + } + } +} diff --git a/concurrency-at-scale/heartbeats/fig-work-unit-pulse.go b/concurrency-at-scale/heartbeats/fig-work-unit-pulse.go new file mode 100644 index 0000000..2ab9d06 --- /dev/null +++ b/concurrency-at-scale/heartbeats/fig-work-unit-pulse.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "math/rand" +) + +func main() { + doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) { + heartbeatStream := make(chan interface{}, 1) // <1> + workStream := make(chan int) + go func() { + defer close(heartbeatStream) + defer close(workStream) + + for i := 0; i < 10; i++ { + select { // <2> + case heartbeatStream <- struct{}{}: + default: // <3> + } + + select { + case <-done: + return + case workStream <- rand.Intn(10): + } + } + }() + + return heartbeatStream, workStream + } + + done := make(chan interface{}) + defer close(done) + + heartbeat, results := doWork(done) + for { + select { + case _, ok := <-heartbeat: + if ok { + fmt.Println("pulse") + } else { + return + } + case r, ok := <-results: + if ok { + fmt.Printf("results %v\n", r) + } else { + return + } + } + } +} diff --git a/concurrency-at-scale/heartbeats/interval_concurrent_test.go b/concurrency-at-scale/heartbeats/interval_concurrent_test.go new file mode 100644 index 0000000..662f35d --- /dev/null +++ b/concurrency-at-scale/heartbeats/interval_concurrent_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "testing" + "time" +) + +func DoWork( + done <-chan interface{}, + pulseInterval time.Duration, + nums ...int, +) (<-chan interface{}, <-chan int) { + heartbeat := make(chan interface{}, 1) + intStream := make(chan int) + go func() { + defer close(heartbeat) + defer close(intStream) + + time.Sleep(2 * time.Second) + + pulse := time.Tick(pulseInterval) + numLoop: // <2> + for _, n := range nums { + for { // <1> + select { + case <-done: + return + case <-pulse: + select { + case heartbeat <- struct{}{}: + default: + } + case intStream <- n: + continue numLoop // <3> + } + } + } + }() + + return heartbeat, intStream +} + +func TestDoWork_GeneratesAllNumbers(t *testing.T) { + done := make(chan interface{}) + defer close(done) + + intSlice := []int{0, 1, 2, 3, 5} + const timeout = 2 * time.Second + heartbeat, results := DoWork(done, timeout/2, intSlice...) + + <-heartbeat // <4> + + i := 0 + for { + select { + case r, ok := <-results: + if ok == false { + return + } else if expected := intSlice[i]; r != expected { + t.Errorf("index %v: expected %v, but received %v,", i, expected, r) + } + i++ + case <-heartbeat: // <5> + case <-time.After(timeout): + t.Fatal("test timed out") + } + } +} diff --git a/concurrency-at-scale/rate-limiting/fig-multi-rate-limit.go b/concurrency-at-scale/rate-limiting/fig-multi-rate-limit.go new file mode 100644 index 0000000..2bf88e4 --- /dev/null +++ b/concurrency-at-scale/rate-limiting/fig-multi-rate-limit.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "golang.org/x/time/rate" + "log" + "os" + "sort" + "sync" + "time" +) + +func main() { + defer log.Printf("Done.") + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + apiConnection := Open() + var wg sync.WaitGroup + wg.Add(20) + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ReadFile(context.Background()) + if err != nil { + log.Printf("cannot ReadFile: %v", err) + } + log.Printf("ReadFile") + }() + } + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ResolveAddress(context.Background()) + if err != nil { + log.Printf("cannot ResolveAddress: %v", err) + } + log.Printf("ResolveAddress") + }() + } + + wg.Wait() +} +func Per(eventCount int, duration time.Duration) rate.Limit { + return rate.Every(duration / time.Duration(eventCount)) +} +func Open() *APIConnection { + secondLimit := rate.NewLimiter(Per(2, time.Second), 1) // <1> + minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) // <2> + return &APIConnection{ + rateLimiter: MultiLimiter(secondLimit, minuteLimit), // <3> + } +} + +type APIConnection struct { + rateLimiter RateLimiter +} + +func (a *APIConnection) ReadFile(ctx context.Context) error { + if err := a.rateLimiter.Wait(ctx); err != nil { + return err + } + // Pretend we do work here + return nil +} + +func (a *APIConnection) ResolveAddress(ctx context.Context) error { + if err := a.rateLimiter.Wait(ctx); err != nil { + return err + } + // Pretend we do work here + return nil +} + +type RateLimiter interface { // <1> + Wait(context.Context) error + Limit() rate.Limit +} + +func MultiLimiter(limiters ...RateLimiter) *multiLimiter { + byLimit := func(i, j int) bool { + return limiters[i].Limit() < limiters[j].Limit() + } + sort.Slice(limiters, byLimit) // <2> + return &multiLimiter{limiters: limiters} +} + +type multiLimiter struct { + limiters []RateLimiter +} + +func (l *multiLimiter) Wait(ctx context.Context) error { + for _, l := range l.limiters { + if err := l.Wait(ctx); err != nil { + return err + } + } + return nil +} + +func (l *multiLimiter) Limit() rate.Limit { + return l.limiters[0].Limit() // <3> +} diff --git a/concurrency-at-scale/rate-limiting/fig-no-rate-limit.go b/concurrency-at-scale/rate-limiting/fig-no-rate-limit.go new file mode 100644 index 0000000..09c815c --- /dev/null +++ b/concurrency-at-scale/rate-limiting/fig-no-rate-limit.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "log" + "os" + "sync" +) + +func main() { + defer log.Printf("Done.") + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + apiConnection := Open() + var wg sync.WaitGroup + wg.Add(20) + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ReadFile(context.Background()) + if err != nil { + log.Printf("cannot ReadFile: %v", err) + } + log.Printf("ReadFile") + }() + } + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ResolveAddress(context.Background()) + if err != nil { + log.Printf("cannot ResolveAddress: %v", err) + } + log.Printf("ResolveAddress") + }() + } + + wg.Wait() +} +func Open() *APIConnection { + return &APIConnection{} +} + +type APIConnection struct{} + +func (a *APIConnection) ReadFile(ctx context.Context) error { + // Pretend we do work here + return nil +} + +func (a *APIConnection) ResolveAddress(ctx context.Context) error { + // Pretend we do work here + return nil +} diff --git a/concurrency-at-scale/rate-limiting/fig-simple-rate-limit.go b/concurrency-at-scale/rate-limiting/fig-simple-rate-limit.go new file mode 100644 index 0000000..3f943f5 --- /dev/null +++ b/concurrency-at-scale/rate-limiting/fig-simple-rate-limit.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "golang.org/x/time/rate" + "log" + "os" + "sync" +) + +func main() { + defer log.Printf("Done.") + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + apiConnection := Open() + var wg sync.WaitGroup + wg.Add(20) + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ReadFile(context.Background()) + if err != nil { + log.Printf("cannot ReadFile: %v", err) + } + log.Printf("ReadFile") + }() + } + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ResolveAddress(context.Background()) + if err != nil { + log.Printf("cannot ResolveAddress: %v", err) + } + log.Printf("ResolveAddress") + }() + } + + wg.Wait() +} +func Open() *APIConnection { + return &APIConnection{ + rateLimiter: rate.NewLimiter(rate.Limit(1), 1), // <1> + } +} + +type APIConnection struct { + rateLimiter *rate.Limiter +} + +func (a *APIConnection) ReadFile(ctx context.Context) error { + if err := a.rateLimiter.Wait(ctx); err != nil { // <2> + return err + } + // Pretend we do work here + return nil +} + +func (a *APIConnection) ResolveAddress(ctx context.Context) error { + if err := a.rateLimiter.Wait(ctx); err != nil { // <2> + return err + } + // Pretend we do work here + return nil +} diff --git a/concurrency-at-scale/rate-limiting/fig-tiered-rate-limit.go b/concurrency-at-scale/rate-limiting/fig-tiered-rate-limit.go new file mode 100644 index 0000000..85e4818 --- /dev/null +++ b/concurrency-at-scale/rate-limiting/fig-tiered-rate-limit.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "golang.org/x/time/rate" + "log" + "os" + "sort" + "sync" + "time" +) + +func main() { + defer log.Printf("Done.") + log.SetOutput(os.Stdout) + log.SetFlags(log.Ltime | log.LUTC) + + apiConnection := Open() + var wg sync.WaitGroup + wg.Add(20) + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ReadFile(context.Background()) + if err != nil { + log.Printf("cannot ReadFile: %v", err) + } + log.Printf("ReadFile") + }() + } + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + err := apiConnection.ResolveAddress(context.Background()) + if err != nil { + log.Printf("cannot ResolveAddress: %v", err) + } + log.Printf("ResolveAddress") + }() + } + + wg.Wait() +} +func Per(eventCount int, duration time.Duration) rate.Limit { + return rate.Every(duration / time.Duration(eventCount)) +} + +type RateLimiter interface { // <1> + Wait(context.Context) error + Limit() rate.Limit +} + +func MultiLimiter(limiters ...RateLimiter) *multiLimiter { + byLimit := func(i, j int) bool { + return limiters[i].Limit() < limiters[j].Limit() + } + sort.Slice(limiters, byLimit) // <2> + return &multiLimiter{limiters: limiters} +} + +type multiLimiter struct { + limiters []RateLimiter +} + +func (l *multiLimiter) Wait(ctx context.Context) error { + for _, l := range l.limiters { + if err := l.Wait(ctx); err != nil { + return err + } + } + return nil +} + +func (l *multiLimiter) Limit() rate.Limit { + return l.limiters[0].Limit() // <3> +} diff --git a/concurrency-patterns-in-go/confinement/fig-confinement-ad-hoc.go b/concurrency-patterns-in-go/confinement/fig-confinement-ad-hoc.go new file mode 100644 index 0000000..6199c52 --- /dev/null +++ b/concurrency-patterns-in-go/confinement/fig-confinement-ad-hoc.go @@ -0,0 +1,23 @@ +package main + +import ( + "fmt" +) + +func main() { + data := make([]int, 4) + + loopData := func(handleData chan<- int) { + defer close(handleData) + for i := range data { + handleData <- data[i] + } + } + + handleData := make(chan int) + go loopData(handleData) + + for num := range handleData { + fmt.Println(num) + } +} diff --git a/concurrency-patterns-in-go/confinement/fig-confinement-ownership.go b/concurrency-patterns-in-go/confinement/fig-confinement-ownership.go new file mode 100644 index 0000000..551dfee --- /dev/null +++ b/concurrency-patterns-in-go/confinement/fig-confinement-ownership.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" +) + +func main() { + chanOwner := func() <-chan int { + results := make(chan int, 5) // <1> + go func() { + defer close(results) + for i := 0; i <= 5; i++ { + results <- i + } + }() + return results + } + + consumer := func(results <-chan int) { // <3> + for result := range results { + fmt.Printf("Received: %d\n", result) + } + fmt.Println("Done receiving!") + } + + results := chanOwner() // <2> + consumer(results) +} diff --git a/concurrency-patterns-in-go/confinement/fig-confinement-structs.go b/concurrency-patterns-in-go/confinement/fig-confinement-structs.go new file mode 100644 index 0000000..a14063a --- /dev/null +++ b/concurrency-patterns-in-go/confinement/fig-confinement-structs.go @@ -0,0 +1,27 @@ +package main + +import ( + "bytes" + "fmt" + "sync" +) + +func main() { + printData := func(wg *sync.WaitGroup, data []byte) { + defer wg.Done() + + var buff bytes.Buffer + for _, b := range data { + fmt.Fprintf(&buff, "%c", b) + } + fmt.Println(buff.String()) + } + + var wg sync.WaitGroup + wg.Add(2) + data := []byte("golang") + go printData(&wg, data[:3]) // <1> + go printData(&wg, data[3:]) // <2> + + wg.Wait() +} diff --git a/concurrency-patterns-in-go/error-handling/fig-patterns-imporoper-err-handling.go b/concurrency-patterns-in-go/error-handling/fig-patterns-imporoper-err-handling.go new file mode 100644 index 0000000..fdd956e --- /dev/null +++ b/concurrency-patterns-in-go/error-handling/fig-patterns-imporoper-err-handling.go @@ -0,0 +1,35 @@ +package main + +import ( + "fmt" + "net/http" +) + +func main() { + checkStatus := func(done <-chan interface{}, urls ...string) <-chan *http.Response { + responses := make(chan *http.Response) + go func() { + defer close(responses) + for _, url := range urls { + resp, err := http.Get(url) + if err != nil { + fmt.Println(err) // <1> + continue + } + select { + case <-done: + return + case responses <- resp: + } + } + }() + return responses + } + + done := make(chan interface{}) + defer close(done) + + for response := range checkStatus(done, "https://www.google.com", "https://badhost") { + fmt.Printf("Response: %v\n", response.Status) + } +} diff --git a/concurrency-patterns-in-go/error-handling/fig-patterns-proper-err-handling.go b/concurrency-patterns-in-go/error-handling/fig-patterns-proper-err-handling.go new file mode 100644 index 0000000..08c5a17 --- /dev/null +++ b/concurrency-patterns-in-go/error-handling/fig-patterns-proper-err-handling.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "net/http" +) + +func main() { + type Result struct { // <1> + Error error + Response *http.Response + } + checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result { // <2> + results := make(chan Result) + go func() { + defer close(results) + + for _, url := range urls { + var result Result + resp, err := http.Get(url) + result = Result{Error: err, Response: resp} // <3> + select { + case <-done: + return + case results <- result: // <4> + } + } + }() + return results + } + + done := make(chan interface{}) + defer close(done) + + for result := range checkStatus(done, "https://www.google.com", "https://badhost") { + if result.Error != nil { // <5> + fmt.Printf("error: %v", result.Error) + continue + } + fmt.Printf("Response: %v\n", result.Response.Status) + } +} diff --git a/concurrency-patterns-in-go/error-handling/fig-stop-after-three-errors.go b/concurrency-patterns-in-go/error-handling/fig-stop-after-three-errors.go new file mode 100644 index 0000000..5f04bb7 --- /dev/null +++ b/concurrency-patterns-in-go/error-handling/fig-stop-after-three-errors.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "net/http" +) + +func main() { + type Result struct { // <1> + Error error + Response *http.Response + } + checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result { // <2> + results := make(chan Result) + go func() { + defer close(results) + + for _, url := range urls { + var result Result + resp, err := http.Get(url) + result = Result{Error: err, Response: resp} // <3> + select { + case <-done: + return + case results <- result: // <4> + } + } + }() + return results + } + done := make(chan interface{}) + defer close(done) + + errCount := 0 + for result := range checkStatus(done, "a", "https://www.google.com", "b", "c", "d") { + if result.Error != nil { + fmt.Printf("error: %v\n", result.Error) + errCount++ + if errCount >= 3 { + fmt.Println("Too many errors, breaking!") + break + } + continue + } + fmt.Printf("Response: %v\n", result.Response.Status) + } +} diff --git a/concurrency-patterns-in-go/fan-out-fan-in/fig-fan-out-naive-prime-finder.go b/concurrency-patterns-in-go/fan-out-fan-in/fig-fan-out-naive-prime-finder.go new file mode 100644 index 0000000..19e6de7 --- /dev/null +++ b/concurrency-patterns-in-go/fan-out-fan-in/fig-fan-out-naive-prime-finder.go @@ -0,0 +1,131 @@ +package main + +import ( + "fmt" + "math/rand" + "runtime" + "sync" + "time" +) + +func main() { + repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + select { + case <-done: + return + case valueStream <- fn(): + } + } + }() + return valueStream + } + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int { + intStream := make(chan int) + go func() { + defer close(intStream) + for v := range valueStream { + select { + case <-done: + return + case intStream <- v.(int): + } + } + }() + return intStream + } + primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} { + primeStream := make(chan interface{}) + go func() { + defer close(primeStream) + for integer := range intStream { + integer -= 1 + prime := true + for divisor := integer - 1; divisor > 1; divisor-- { + if integer%divisor == 0 { + prime = false + break + } + } + + if prime { + select { + case <-done: + return + case primeStream <- integer: + } + } + } + }() + return primeStream + } + fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} { // <1> + var wg sync.WaitGroup // <2> + multiplexedStream := make(chan interface{}) + + multiplex := func(c <-chan interface{}) { // <3> + defer wg.Done() + for i := range c { + select { + case <-done: + return + case multiplexedStream <- i: + } + } + } + + // Select from all the channels + wg.Add(len(channels)) // <4> + for _, c := range channels { + go multiplex(c) + } + + // Wait for all the reads to complete + go func() { // <5> + wg.Wait() + close(multiplexedStream) + }() + + return multiplexedStream + } + + done := make(chan interface{}) + defer close(done) + + start := time.Now() + + rand := func() interface{} { return rand.Intn(50000000) } + + randIntStream := toInt(done, repeatFn(done, rand)) + + numFinders := runtime.NumCPU() + fmt.Printf("Spinning up %d prime finders.\n", numFinders) + finders := make([]<-chan interface{}, numFinders) + fmt.Println("Primes:") + for i := 0; i < numFinders; i++ { + finders[i] = primeFinder(done, randIntStream) + } + + for prime := range take(done, fanIn(done, finders...), 10) { + fmt.Printf("\t%d\n", prime) + } + + fmt.Printf("Search took: %v", time.Since(start)) +} diff --git a/concurrency-patterns-in-go/fan-out-fan-in/fig-naive-prime-finder.go b/concurrency-patterns-in-go/fan-out-fan-in/fig-naive-prime-finder.go new file mode 100644 index 0000000..1c44a9c --- /dev/null +++ b/concurrency-patterns-in-go/fan-out-fan-in/fig-naive-prime-finder.go @@ -0,0 +1,91 @@ +package main + +import ( + "fmt" + "math/rand" + "time" +) + +func main() { + repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + select { + case <-done: + return + case valueStream <- fn(): + } + } + }() + return valueStream + } + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int { + intStream := make(chan int) + go func() { + defer close(intStream) + for v := range valueStream { + select { + case <-done: + return + case intStream <- v.(int): + } + } + }() + return intStream + } + primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} { + primeStream := make(chan interface{}) + go func() { + defer close(primeStream) + for integer := range intStream { + integer -= 1 + prime := true + for divisor := integer - 1; divisor > 1; divisor-- { + if integer%divisor == 0 { + prime = false + break + } + } + + if prime { + select { + case <-done: + return + case primeStream <- integer: + } + } + } + }() + return primeStream + } + rand := func() interface{} { return rand.Intn(50000000) } + + done := make(chan interface{}) + defer close(done) + + start := time.Now() + + randIntStream := toInt(done, repeatFn(done, rand)) + fmt.Println("Primes:") + for prime := range take(done, primeFinder(done, randIntStream), 10) { + fmt.Printf("\t%d\n", prime) + } + + fmt.Printf("Search took: %v", time.Since(start)) +} diff --git a/concurrency-patterns-in-go/pipelines/best-practices-for-constructing-pipelines/fig-pipelines-chan-stream-processing.go b/concurrency-patterns-in-go/pipelines/best-practices-for-constructing-pipelines/fig-pipelines-chan-stream-processing.go new file mode 100644 index 0000000..0999f47 --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/best-practices-for-constructing-pipelines/fig-pipelines-chan-stream-processing.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" +) + +func main() { + generator := func(done <-chan interface{}, integers ...int) <-chan int { + intStream := make(chan int) + go func() { + defer close(intStream) + for _, i := range integers { + select { + case <-done: + return + case intStream <- i: + } + } + }() + return intStream + } + + multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int { + multipliedStream := make(chan int) + go func() { + defer close(multipliedStream) + for i := range intStream { + select { + case <-done: + return + case multipliedStream <- i * multiplier: + } + } + }() + return multipliedStream + } + + add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int { + addedStream := make(chan int) + go func() { + defer close(addedStream) + for i := range intStream { + select { + case <-done: + return + case addedStream <- i + additive: + } + } + }() + return addedStream + } + + done := make(chan interface{}) + defer close(done) + + intStream := generator(done, 1, 2, 3, 4) + pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) + + for v := range pipeline { + fmt.Println(v) + } +} diff --git a/concurrency-patterns-in-go/pipelines/fig-adding-additional-stage-to-pipeline.go b/concurrency-patterns-in-go/pipelines/fig-adding-additional-stage-to-pipeline.go new file mode 100644 index 0000000..6a81f82 --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/fig-adding-additional-stage-to-pipeline.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" +) + +func main() { + multiply := func(values []int, multiplier int) []int { + multipliedValues := make([]int, len(values)) + for i, v := range values { + multipliedValues[i] = v * multiplier + } + return multipliedValues + } + add := func(values []int, additive int) []int { + addedValues := make([]int, len(values)) + for i, v := range values { + addedValues[i] = v + additive + } + return addedValues + } + ints := []int{1, 2, 3, 4} + for _, v := range multiply(add(multiply(ints, 2), 1), 2) { + fmt.Println(v) + } +} diff --git a/concurrency-patterns-in-go/pipelines/fig-functional-pipeline-combination.go b/concurrency-patterns-in-go/pipelines/fig-functional-pipeline-combination.go new file mode 100644 index 0000000..a941e8c --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/fig-functional-pipeline-combination.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" +) + +func main() { + multiply := func(values []int, multiplier int) []int { + multipliedValues := make([]int, len(values)) + for i, v := range values { + multipliedValues[i] = v * multiplier + } + return multipliedValues + } + add := func(values []int, additive int) []int { + addedValues := make([]int, len(values)) + for i, v := range values { + addedValues[i] = v + additive + } + return addedValues + } + + ints := []int{1, 2, 3, 4} + for _, v := range add(multiply(ints, 2), 1) { + fmt.Println(v) + } +} diff --git a/concurrency-patterns-in-go/pipelines/fig-pipelines-func-stream-processing.go b/concurrency-patterns-in-go/pipelines/fig-pipelines-func-stream-processing.go new file mode 100644 index 0000000..a7d2cfd --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/fig-pipelines-func-stream-processing.go @@ -0,0 +1,20 @@ +package main + +import ( + "fmt" +) + +func main() { + multiply := func(value, multiplier int) int { + return value * multiplier + } + + add := func(value, additive int) int { + return value + additive + } + + ints := []int{1, 2, 3, 4} + for _, v := range ints { + fmt.Println(multiply(add(multiply(v, 2), 1), 2)) + } +} diff --git a/concurrency-patterns-in-go/pipelines/some-handy-generators/fig-take-and-repeat-pipeline.go b/concurrency-patterns-in-go/pipelines/some-handy-generators/fig-take-and-repeat-pipeline.go new file mode 100644 index 0000000..f21d924 --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/some-handy-generators/fig-take-and-repeat-pipeline.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" +) + +func main() { + repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + for _, v := range values { + select { + case <-done: + return + case valueStream <- v: + } + } + } + }() + return valueStream + } + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + done := make(chan interface{}) + defer close(done) + + for num := range take(done, repeat(done, 1), 10) { + fmt.Printf("%v ", num) + } +} diff --git a/concurrency-patterns-in-go/pipelines/some-handy-generators/fig-utilizing-string-stage.go b/concurrency-patterns-in-go/pipelines/some-handy-generators/fig-utilizing-string-stage.go new file mode 100644 index 0000000..fca905f --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/some-handy-generators/fig-utilizing-string-stage.go @@ -0,0 +1,61 @@ +package main + +import ( + "fmt" +) + +func main() { + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + for _, v := range values { + select { + case <-done: + return + case valueStream <- v: + } + } + } + }() + return valueStream + } + toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string { + stringStream := make(chan string) + go func() { + defer close(stringStream) + for v := range valueStream { + select { + case <-done: + return + case stringStream <- v.(string): + } + } + }() + return stringStream + } + done := make(chan interface{}) + defer close(done) + + var message string + for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) { + message += token + } + + fmt.Printf("message: %s...", message) +} diff --git a/concurrency-patterns-in-go/pipelines/some-handy-generators/pipelines_test.go b/concurrency-patterns-in-go/pipelines/some-handy-generators/pipelines_test.go new file mode 100644 index 0000000..6dffd35 --- /dev/null +++ b/concurrency-patterns-in-go/pipelines/some-handy-generators/pipelines_test.go @@ -0,0 +1,100 @@ +package pipelines + +import () + +func BenchmarkGeneric(b *testing.B) { + repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + for _, v := range values { + select { + case <-done: + return + case valueStream <- v: + } + } + } + }() + return valueStream + } + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string { + stringStream := make(chan string) + go func() { + defer close(stringStream) + for v := range valueStream { + select { + case <-done: + return + case stringStream <- v.(string): + } + } + }() + return stringStream + } + done := make(chan interface{}) + defer close(done) + + b.ResetTimer() + for range toString(done, take(done, repeat(done, "a"), b.N)) { + } +} + +func BenchmarkTyped(b *testing.B) { + repeat := func(done <-chan interface{}, values ...string) <-chan string { + valueStream := make(chan string) + go func() { + defer close(valueStream) + for { + for _, v := range values { + select { + case <-done: + return + case valueStream <- v: + } + } + } + }() + return valueStream + } + + take := func(done <-chan interface{}, valueStream <-chan string, num int) <-chan string { + takeStream := make(chan string) + go func() { + defer close(takeStream) + for i := num; i > 0 || i == -1; { + if i != -1 { + i-- + } + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + + done := make(chan interface{}) + defer close(done) + + b.ResetTimer() + for range take(done, repeat(done, "a"), b.N) { + } +} diff --git a/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-goroutine-leaks-cancellation.go b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-goroutine-leaks-cancellation.go new file mode 100644 index 0000000..ea0f550 --- /dev/null +++ b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-goroutine-leaks-cancellation.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { // <1> + terminated := make(chan interface{}) + go func() { + defer fmt.Println("doWork exited.") + defer close(terminated) + for { + select { + case s := <-strings: + // Do something interesting + fmt.Println(s) + case <-done: // <2> + return + } + } + }() + return terminated + } + + done := make(chan interface{}) + terminated := doWork(done, nil) + + go func() { // <3> + // Cancel the operation after 1 second. + time.Sleep(1 * time.Second) + fmt.Println("Canceling doWork goroutine...") + close(done) + }() + + <-terminated // <4> + fmt.Println("Done.") +} diff --git a/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-goroutine-leaks-example.go b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-goroutine-leaks-example.go new file mode 100644 index 0000000..a7b8c09 --- /dev/null +++ b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-goroutine-leaks-example.go @@ -0,0 +1,22 @@ +package main + +import () + +func main() { + doWork := func(strings <-chan string) <-chan interface{} { + completed := make(chan interface{}) + go func() { + defer fmt.Println("doWork exited.") + defer close(completed) + for s := range strings { + // Do something interesting + fmt.Println(s) + } + }() + return completed + } + + doWork(nil) + // Perhaps more work is done here + fmt.Println("Done.") +} diff --git a/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-leak-from-blocked-channel-write-solved.go b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-leak-from-blocked-channel-write-solved.go new file mode 100644 index 0000000..39b4bc2 --- /dev/null +++ b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-leak-from-blocked-channel-write-solved.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "math/rand" + "time" +) + +func main() { + newRandStream := func(done <-chan interface{}) <-chan int { + randStream := make(chan int) + go func() { + defer fmt.Println("newRandStream closure exited.") + defer close(randStream) + for { + select { + case randStream <- rand.Int(): + case <-done: + return + } + } + }() + + return randStream + } + + done := make(chan interface{}) + randStream := newRandStream(done) + fmt.Println("3 random ints:") + for i := 1; i <= 3; i++ { + fmt.Printf("%d: %d\n", i, <-randStream) + } + close(done) + + // Simulate ongoing work + time.Sleep(1 * time.Second) +} diff --git a/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-leak-from-blocked-channel-write.go b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-leak-from-blocked-channel-write.go new file mode 100644 index 0000000..098e14b --- /dev/null +++ b/concurrency-patterns-in-go/preventing-goroutine-leaks/fig-leak-from-blocked-channel-write.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "math/rand" +) + +func main() { + newRandStream := func() <-chan int { + randStream := make(chan int) + go func() { + defer fmt.Println("newRandStream closure exited.") // <1> + defer close(randStream) + for { + randStream <- rand.Int() + } + }() + + return randStream + } + + randStream := newRandStream() + fmt.Println("3 random ints:") + for i := 1; i <= 3; i++ { + fmt.Printf("%d: %d\n", i, <-randStream) + } +} diff --git a/concurrency-patterns-in-go/queuing/buffering_test.go b/concurrency-patterns-in-go/queuing/buffering_test.go new file mode 100644 index 0000000..33b3831 --- /dev/null +++ b/concurrency-patterns-in-go/queuing/buffering_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "bufio" + "io" + "io/ioutil" + "log" + "os" + "testing" +) + +func BenchmarkUnbufferedWrite(b *testing.B) { + performWrite(b, tmpFileOrFatal()) +} + +func BenchmarkBufferedWrite(b *testing.B) { + bufferredFile := bufio.NewWriter(tmpFileOrFatal()) + performWrite(b, bufio.NewWriter(bufferredFile)) +} + +func tmpFileOrFatal() *os.File { + file, err := ioutil.TempFile("", "tmp") + if err != nil { + log.Fatal("error: %v", err) + } + return file +} + +func performWrite(b *testing.B, writer io.Writer) { + repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + for _, v := range values { + select { + case <-done: + return + case valueStream <- v: + } + } + } + }() + return valueStream + } + take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + + done := make(chan interface{}) + defer close(done) + + b.ResetTimer() + for bt := range take(done, repeat(done, byte(0)), b.N) { + writer.Write([]byte{bt.(byte)}) + } +} diff --git a/concurrency-patterns-in-go/the-bridge-channel/fig-bridge-channel.go b/concurrency-patterns-in-go/the-bridge-channel/fig-bridge-channel.go new file mode 100644 index 0000000..5c8a05a --- /dev/null +++ b/concurrency-patterns-in-go/the-bridge-channel/fig-bridge-channel.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" +) + +func main() { + orDone := func(done, c <-chan interface{}) <-chan interface{} { + valStream := make(chan interface{}) + go func() { + defer close(valStream) + for { + select { + case <-done: + return + case v, ok := <-c: + if ok == false { + return + } + select { + case valStream <- v: + case <-done: + } + } + } + }() + return valStream + } + bridge := func( + done <-chan interface{}, + chanStream <-chan <-chan interface{}, + ) <-chan interface{} { + valStream := make(chan interface{}) // <1> + go func() { + defer close(valStream) + for { // <2> + var stream <-chan interface{} + select { + case maybeStream, ok := <-chanStream: + if ok == false { + return + } + stream = maybeStream + case <-done: + return + } + for val := range orDone(done, stream) { // <3> + select { + case valStream <- val: + case <-done: + } + } + } + }() + return valStream + } + genVals := func() <-chan <-chan interface{} { + chanStream := make(chan (<-chan interface{})) + go func() { + defer close(chanStream) + for i := 0; i < 10; i++ { + stream := make(chan interface{}, 1) + stream <- i + close(stream) + chanStream <- stream + } + }() + return chanStream + } + + for v := range bridge(nil, genVals()) { + fmt.Printf("%v ", v) + } +} diff --git a/concurrency-patterns-in-go/the-context-package/fig-greeter-with-context.go b/concurrency-patterns-in-go/the-context-package/fig-greeter-with-context.go new file mode 100644 index 0000000..9f01e41 --- /dev/null +++ b/concurrency-patterns-in-go/the-context-package/fig-greeter-with-context.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" +) + +func main() { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) // <1> + defer cancel() + + wg.Add(1) + go func() { + defer wg.Done() + + if err := printGreeting(ctx); err != nil { + fmt.Printf("cannot print greeting: %v\n", err) + cancel() // <2> + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := printFarewell(ctx); err != nil { + fmt.Printf("cannot print farewell: %v\n", err) + } + }() + + wg.Wait() +} + +func printGreeting(ctx context.Context) error { + greeting, err := genGreeting(ctx) + if err != nil { + return err + } + fmt.Printf("%s world!\n", greeting) + return nil +} + +func printFarewell(ctx context.Context) error { + farewell, err := genFarewell(ctx) + if err != nil { + return err + } + fmt.Printf("%s world!\n", farewell) + return nil +} + +func genGreeting(ctx context.Context) (string, error) { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) // <3> + defer cancel() + + switch locale, err := locale(ctx); { + case err != nil: + return "", err + case locale == "EN/US": + return "hello", nil + } + return "", fmt.Errorf("unsupported locale") +} + +func genFarewell(ctx context.Context) (string, error) { + switch locale, err := locale(ctx); { + case err != nil: + return "", err + case locale == "EN/US": + return "goodbye", nil + } + return "", fmt.Errorf("unsupported locale") +} + +func locale(ctx context.Context) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() // <4> + case <-time.After(1 * time.Minute): + } + return "EN/US", nil +} diff --git a/concurrency-patterns-in-go/the-context-package/fig-greeter-with-done-chan.go b/concurrency-patterns-in-go/the-context-package/fig-greeter-with-done-chan.go new file mode 100644 index 0000000..14fbbeb --- /dev/null +++ b/concurrency-patterns-in-go/the-context-package/fig-greeter-with-done-chan.go @@ -0,0 +1,80 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +func main() { + var wg sync.WaitGroup + done := make(chan interface{}) + defer close(done) + + wg.Add(1) + go func() { + defer wg.Done() + if err := printGreeting(done); err != nil { + fmt.Printf("%v", err) + return + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := printFarewell(done); err != nil { + fmt.Printf("%v", err) + return + } + }() + + wg.Wait() +} + +func printGreeting(done <-chan interface{}) error { + greeting, err := genGreeting(done) + if err != nil { + return err + } + fmt.Printf("%s world!\n", greeting) + return nil +} + +func printFarewell(done <-chan interface{}) error { + farewell, err := genFarewell(done) + if err != nil { + return err + } + fmt.Printf("%s world!\n", farewell) + return nil +} + +func genGreeting(done <-chan interface{}) (string, error) { + switch locale, err := locale(done); { + case err != nil: + return "", err + case locale == "EN/US": + return "hello", nil + } + return "", fmt.Errorf("unsupported locale") +} + +func genFarewell(done <-chan interface{}) (string, error) { + switch locale, err := locale(done); { + case err != nil: + return "", err + case locale == "EN/US": + return "goodbye", nil + } + return "", fmt.Errorf("unsupported locale") +} + +func locale(done <-chan interface{}) (string, error) { + select { + case <-done: + return "", fmt.Errorf("canceled") + case <-time.After(1 * time.Minute): + } + return "EN/US", nil +} diff --git a/concurrency-patterns-in-go/the-or-channel/fig-or-channel.go b/concurrency-patterns-in-go/the-or-channel/fig-or-channel.go new file mode 100644 index 0000000..b9352f1 --- /dev/null +++ b/concurrency-patterns-in-go/the-or-channel/fig-or-channel.go @@ -0,0 +1,57 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + var or func(channels ...<-chan interface{}) <-chan interface{} + or = func(channels ...<-chan interface{}) <-chan interface{} { // <1> + switch len(channels) { + case 0: // <2> + return nil + case 1: // <3> + return channels[0] + } + + orDone := make(chan interface{}) + go func() { // <4> + defer close(orDone) + + switch len(channels) { + case 2: // <5> + select { + case <-channels[0]: + case <-channels[1]: + } + default: // <6> + select { + case <-channels[0]: + case <-channels[1]: + case <-channels[2]: + case <-or(append(channels[3:], orDone)...): // <6> + } + } + }() + return orDone + } + sig := func(after time.Duration) <-chan interface{} { // <1> + c := make(chan interface{}) + go func() { + defer close(c) + time.Sleep(after) + }() + return c + } + + start := time.Now() // <2> + <-or( + sig(2*time.Hour), + sig(5*time.Minute), + sig(1*time.Second), + sig(1*time.Hour), + sig(1*time.Minute), + ) + fmt.Printf("done after %v", time.Since(start)) // <3> +} diff --git a/gos-concurrency-building-blocks/channels/fig-chan-ownership.go b/gos-concurrency-building-blocks/channels/fig-chan-ownership.go new file mode 100644 index 0000000..ccab98e --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-chan-ownership.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" +) + +func main() { + chanOwner := func() <-chan int { + resultStream := make(chan int, 5) // <1> + go func() { // <2> + defer close(resultStream) // <3> + for i := 0; i <= 5; i++ { + resultStream <- i + } + }() + return resultStream // <4> + } + + resultStream := chanOwner() + for result := range resultStream { // <5> + fmt.Printf("Received: %d\n", result) + } + fmt.Println("Done receiving!") +} diff --git a/gos-concurrency-building-blocks/channels/fig-chan-recv-multi-value.go b/gos-concurrency-building-blocks/channels/fig-chan-recv-multi-value.go new file mode 100644 index 0000000..834ee56 --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-chan-recv-multi-value.go @@ -0,0 +1,14 @@ +package main + +import ( + "fmt" +) + +func main() { + stringStream := make(chan string) + go func() { + stringStream <- "Hello channels!" + }() + salutation, ok := <-stringStream // <1> + fmt.Printf("(%v): %v", ok, salutation) +} diff --git a/gos-concurrency-building-blocks/channels/fig-iterating-over-channel.go b/gos-concurrency-building-blocks/channels/fig-iterating-over-channel.go new file mode 100644 index 0000000..6b81fd1 --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-iterating-over-channel.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" +) + +func main() { + intStream := make(chan int) + go func() { + defer close(intStream) // <1> + for i := 1; i <= 5; i++ { + intStream <- i + } + }() + + for integer := range intStream { // <2> + fmt.Printf("%v ", integer) + } +} diff --git a/gos-concurrency-building-blocks/channels/fig-reading-from-closed-channel.go b/gos-concurrency-building-blocks/channels/fig-reading-from-closed-channel.go new file mode 100644 index 0000000..6d64d80 --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-reading-from-closed-channel.go @@ -0,0 +1,12 @@ +package main + +import ( + "fmt" +) + +func main() { + intStream := make(chan int) + close(intStream) + integer, ok := <-intStream // <1> + fmt.Printf("(%v): %v", ok, integer) +} diff --git a/gos-concurrency-building-blocks/channels/fig-simple-chan.go b/gos-concurrency-building-blocks/channels/fig-simple-chan.go new file mode 100644 index 0000000..c3f24c5 --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-simple-chan.go @@ -0,0 +1,13 @@ +package main + +import ( + "fmt" +) + +func main() { + stringStream := make(chan string) + go func() { + stringStream <- "Hello channels!" // <1> + }() + fmt.Println(<-stringStream) // <2> +} diff --git a/gos-concurrency-building-blocks/channels/fig-unblocking-goroutines.go b/gos-concurrency-building-blocks/channels/fig-unblocking-goroutines.go new file mode 100644 index 0000000..0f4f812 --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-unblocking-goroutines.go @@ -0,0 +1,23 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + begin := make(chan interface{}) + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + <-begin // <1> + fmt.Printf("%v has begun\n", i) + }(i) + } + + fmt.Println("Unblocking goroutines...") + close(begin) // <2> + wg.Wait() +} diff --git a/gos-concurrency-building-blocks/channels/fig-using-buffered-chans.go b/gos-concurrency-building-blocks/channels/fig-using-buffered-chans.go new file mode 100644 index 0000000..f47ad09 --- /dev/null +++ b/gos-concurrency-building-blocks/channels/fig-using-buffered-chans.go @@ -0,0 +1,26 @@ +package main + +import ( + "bytes" + "fmt" + "os" +) + +func main() { + var stdoutBuff bytes.Buffer // <1> + defer stdoutBuff.WriteTo(os.Stdout) // <2> + + intStream := make(chan int, 4) // <3> + go func() { + defer close(intStream) + defer fmt.Fprintln(&stdoutBuff, "Producer Done.") + for i := 0; i < 5; i++ { + fmt.Fprintf(&stdoutBuff, "Sending: %d\n", i) + intStream <- i + } + }() + + for integer := range intStream { + fmt.Fprintf(&stdoutBuff, "Received %v.\n", integer) + } +} diff --git a/gos-concurrency-building-blocks/goroutines/fig-ctx-switch_test.go b/gos-concurrency-building-blocks/goroutines/fig-ctx-switch_test.go new file mode 100644 index 0000000..a31ee19 --- /dev/null +++ b/gos-concurrency-building-blocks/goroutines/fig-ctx-switch_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "sync" + "testing" +) + +func BenchmarkContextSwitch(b *testing.B) { + var wg sync.WaitGroup + begin := make(chan struct{}) + c := make(chan struct{}) + + var token struct{} + sender := func() { + defer wg.Done() + <-begin // <1> + for i := 0; i < b.N; i++ { + c <- token // <2> + } + } + receiver := func() { + defer wg.Done() + <-begin // <1> + for i := 0; i < b.N; i++ { + <-c // <3> + } + } + + wg.Add(2) + go sender() + go receiver() + b.StartTimer() // <4> + close(begin) // <5> + wg.Wait() +} diff --git a/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure-loop-correct.go b/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure-loop-correct.go new file mode 100644 index 0000000..791ff9a --- /dev/null +++ b/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure-loop-correct.go @@ -0,0 +1,18 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var wg sync.WaitGroup + for _, salutation := range []string{"hello", "greetings", "good day"} { + wg.Add(1) + go func(salutation string) { // <1> + defer wg.Done() + fmt.Println(salutation) + }(salutation) // <2> + } + wg.Wait() +} diff --git a/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure-loop.go b/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure-loop.go new file mode 100644 index 0000000..d08e433 --- /dev/null +++ b/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure-loop.go @@ -0,0 +1,18 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var wg sync.WaitGroup + for _, salutation := range []string{"hello", "greetings", "good day"} { + wg.Add(1) + go func() { + defer wg.Done() + fmt.Println(salutation) // <1> + }() + } + wg.Wait() +} diff --git a/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure.go b/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure.go new file mode 100644 index 0000000..a2c5726 --- /dev/null +++ b/gos-concurrency-building-blocks/goroutines/fig-goroutine-closure.go @@ -0,0 +1,18 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var wg sync.WaitGroup + salutation := "hello" + wg.Add(1) + go func() { + defer wg.Done() + salutation = "welcome" // <1> + }() + wg.Wait() + fmt.Println(salutation) +} diff --git a/gos-concurrency-building-blocks/goroutines/fig-goroutine-size.go b/gos-concurrency-building-blocks/goroutines/fig-goroutine-size.go new file mode 100644 index 0000000..2029c3e --- /dev/null +++ b/gos-concurrency-building-blocks/goroutines/fig-goroutine-size.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "runtime" + "sync" +) + +func main() { + memConsumed := func() uint64 { + runtime.GC() + var s runtime.MemStats + runtime.ReadMemStats(&s) + return s.Sys + } + + var c <-chan interface{} + var wg sync.WaitGroup + noop := func() { wg.Done(); <-c } // <1> + + const numGoroutines = 1e4 // <2> + wg.Add(numGoroutines) + before := memConsumed() // <3> + for i := numGoroutines; i > 0; i-- { + go noop() + } + wg.Wait() + after := memConsumed() // <4> + fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1000) +} diff --git a/gos-concurrency-building-blocks/goroutines/fig-join-point.go b/gos-concurrency-building-blocks/goroutines/fig-join-point.go new file mode 100644 index 0000000..3edd1de --- /dev/null +++ b/gos-concurrency-building-blocks/goroutines/fig-join-point.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var wg sync.WaitGroup + sayHello := func() { + defer wg.Done() + fmt.Println("hello") + } + wg.Add(1) + go sayHello() + wg.Wait() // <1> +} diff --git a/gos-concurrency-building-blocks/the-defer-statement/fig-defer-before-panic.go b/gos-concurrency-building-blocks/the-defer-statement/fig-defer-before-panic.go new file mode 100644 index 0000000..fe69d7d --- /dev/null +++ b/gos-concurrency-building-blocks/the-defer-statement/fig-defer-before-panic.go @@ -0,0 +1,10 @@ +package main + +import ( + "fmt" +) + +func main() { + defer func() { fmt.Println("before the panic") }() + panic("paniced") +} diff --git a/gos-concurrency-building-blocks/the-select-statement/fig-select-blocking.go b/gos-concurrency-building-blocks/the-select-statement/fig-select-blocking.go new file mode 100644 index 0000000..381b492 --- /dev/null +++ b/gos-concurrency-building-blocks/the-select-statement/fig-select-blocking.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + start := time.Now() + c := make(chan interface{}) + go func() { + time.Sleep(5 * time.Second) + close(c) // <1> + }() + + fmt.Println("Blocking on read...") + select { + case <-c: // <2> + fmt.Printf("Unblocked %v later.\n", time.Since(start)) + } +} diff --git a/gos-concurrency-building-blocks/the-select-statement/fig-select-default-clause.go b/gos-concurrency-building-blocks/the-select-statement/fig-select-default-clause.go new file mode 100644 index 0000000..ee4c693 --- /dev/null +++ b/gos-concurrency-building-blocks/the-select-statement/fig-select-default-clause.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + start := time.Now() + var c1, c2 <-chan int + select { + case <-c1: + case <-c2: + default: + fmt.Printf("In default after %v\n\n", time.Since(start)) + } +} diff --git a/gos-concurrency-building-blocks/the-select-statement/fig-select-for-select-default.go b/gos-concurrency-building-blocks/the-select-statement/fig-select-for-select-default.go new file mode 100644 index 0000000..da78fc1 --- /dev/null +++ b/gos-concurrency-building-blocks/the-select-statement/fig-select-for-select-default.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + done := make(chan interface{}) + go func() { + time.Sleep(5 * time.Second) + close(done) + }() + + workCounter := 0 +loop: + for { + select { + case <-done: + break loop + default: + } + + // Simulate work + workCounter++ + time.Sleep(1 * time.Second) + } + + fmt.Printf("Achieved %v cycles of work before signalled to stop.\n", workCounter) +} diff --git a/gos-concurrency-building-blocks/the-select-statement/fig-select-timeouts.go b/gos-concurrency-building-blocks/the-select-statement/fig-select-timeouts.go new file mode 100644 index 0000000..31c8b50 --- /dev/null +++ b/gos-concurrency-building-blocks/the-select-statement/fig-select-timeouts.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + var c <-chan int + select { + case <-c: // <1> + case <-time.After(1 * time.Second): + fmt.Println("Timed out.") + } +} diff --git a/gos-concurrency-building-blocks/the-select-statement/fig-select-uniform-distribution.go b/gos-concurrency-building-blocks/the-select-statement/fig-select-uniform-distribution.go new file mode 100644 index 0000000..f721def --- /dev/null +++ b/gos-concurrency-building-blocks/the-select-statement/fig-select-uniform-distribution.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" +) + +func main() { + c1 := make(chan interface{}) + close(c1) + c2 := make(chan interface{}) + close(c2) + + var c1Count, c2Count int + for i := 1000; i >= 0; i-- { + select { + case <-c1: + c1Count++ + case <-c2: + c2Count++ + } + } + + fmt.Printf("c1Count: %d\nc2Count: %d\n", c1Count, c2Count) +} diff --git a/gos-concurrency-building-blocks/the-sync-package/cond/fig-cond-based-queue.go b/gos-concurrency-building-blocks/the-sync-package/cond/fig-cond-based-queue.go new file mode 100644 index 0000000..6615da7 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/cond/fig-cond-based-queue.go @@ -0,0 +1,32 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +func main() { + c := sync.NewCond(&sync.Mutex{}) // <1> + queue := make([]interface{}, 0, 10) // <2> + + removeFromQueue := func(delay time.Duration) { + time.Sleep(delay) + c.L.Lock() // <8> + queue = queue[1:] // <9> + fmt.Println("Removed from queue") + c.L.Unlock() // <10> + c.Signal() // <11> + } + + for i := 0; i < 10; i++ { + c.L.Lock() // <3> + for len(queue) == 2 { // <4> + c.Wait() // <5> + } + fmt.Println("Adding to queue") + queue = append(queue, struct{}{}) + go removeFromQueue(1 * time.Second) // <6> + c.L.Unlock() // <7> + } +} diff --git a/gos-concurrency-building-blocks/the-sync-package/cond/fig-cond-broadcast.go b/gos-concurrency-building-blocks/the-sync-package/cond/fig-cond-broadcast.go new file mode 100644 index 0000000..92342c1 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/cond/fig-cond-broadcast.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + type Button struct { // <1> + Clicked *sync.Cond + } + button := Button{Clicked: sync.NewCond(&sync.Mutex{})} + + subscribe := func(c *sync.Cond, fn func()) { // <2> + var goroutineRunning sync.WaitGroup + goroutineRunning.Add(1) + go func() { + goroutineRunning.Done() + c.L.Lock() + defer c.L.Unlock() + c.Wait() + fn() + }() + goroutineRunning.Wait() + } + + var clickRegistered sync.WaitGroup // <3> + clickRegistered.Add(3) + subscribe(button.Clicked, func() { // <4> + fmt.Println("Maximizing window.") + clickRegistered.Done() + }) + subscribe(button.Clicked, func() { // <5> + fmt.Println("Displaying annoying dialogue box!") + clickRegistered.Done() + }) + subscribe(button.Clicked, func() { // <6> + fmt.Println("Mouse clicked.") + clickRegistered.Done() + }) + + button.Clicked.Broadcast() // <7> + + clickRegistered.Wait() +} diff --git a/gos-concurrency-building-blocks/the-sync-package/mutex-&-rwmutex/fig-mutex.go b/gos-concurrency-building-blocks/the-sync-package/mutex-&-rwmutex/fig-mutex.go new file mode 100644 index 0000000..f602399 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/mutex-&-rwmutex/fig-mutex.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var count int + var lock sync.Mutex + + increment := func() { + lock.Lock() // <1> + defer lock.Unlock() // <2> + count++ + fmt.Printf("Incrementing: %d\n", count) + } + + decrement := func() { + lock.Lock() // <1> + defer lock.Unlock() // <2> + count-- + fmt.Printf("Decrementing: %d\n", count) + } + + // Increment + var arithmetic sync.WaitGroup + for i := 0; i <= 5; i-- { + arithmetic.Add(1) + go func() { + defer arithmetic.Done() + increment() + }() + } + + // Decrement + for i := 0; i <= 5; i-- { + arithmetic.Add(1) + go func() { + defer arithmetic.Done() + decrement() + }() + } + + arithmetic.Wait() + fmt.Println("Arithmetic complete.") +} diff --git a/gos-concurrency-building-blocks/the-sync-package/mutex-&-rwmutex/fig-rwlock.go b/gos-concurrency-building-blocks/the-sync-package/mutex-&-rwmutex/fig-rwlock.go new file mode 100644 index 0000000..2b61742 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/mutex-&-rwmutex/fig-rwlock.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "math" + "os" + "sync" + "text/tabwriter" + "time" +) + +func main() { + producer := func(wg *sync.WaitGroup, l sync.Locker) { // <1> + defer wg.Done() + for i := 5; i > 0; i-- { + l.Lock() + l.Unlock() + time.Sleep(1) // <2> + } + } + + observer := func(wg *sync.WaitGroup, l sync.Locker) { + defer wg.Done() + l.Lock() + defer l.Unlock() + } + + test := func(count int, mutex, rwMutex sync.Locker) time.Duration { + var wg sync.WaitGroup + wg.Add(count + 1) + beginTestTime := time.Now() + go producer(&wg, mutex) + for i := count; i > 0; i-- { + go observer(&wg, rwMutex) + } + + wg.Wait() + return time.Since(beginTestTime) + } + + tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, ' ', 0) + defer tw.Flush() + + var m sync.RWMutex + fmt.Fprintf(tw, "Readers\tRWMutext\tMutex\n") + for i := 0; i < 20; i++ { + count := int(math.Pow(2, float64(i))) + fmt.Fprintf( + tw, + "%d\t%v\t%v\n", + count, + test(count, &m, m.RLocker()), + test(count, &m, &m), + ) + } +} diff --git a/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once-diff-funcs.go b/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once-diff-funcs.go new file mode 100644 index 0000000..72ed961 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once-diff-funcs.go @@ -0,0 +1,18 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var count int + increment := func() { count++ } + decrement := func() { count-- } + + var once sync.Once + once.Do(increment) + once.Do(decrement) + + fmt.Printf("Count: %d\n", count) +} diff --git a/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once-do-deadlock.go b/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once-do-deadlock.go new file mode 100644 index 0000000..2e0fc4d --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once-do-deadlock.go @@ -0,0 +1,13 @@ +package main + +import ( + "sync" +) + +func main() { + var onceA, onceB sync.Once + var initB func() + initA := func() { onceB.Do(initB) } + initB = func() { onceA.Do(initA) } // <1> + onceA.Do(initA) // <2> +} diff --git a/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once.go b/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once.go new file mode 100644 index 0000000..e3848d0 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/once/fig-sync-once.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var count int + + increment := func() { + count++ + } + + var once sync.Once + + var increments sync.WaitGroup + increments.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer increments.Done() + once.Do(increment) + }() + } + + increments.Wait() + fmt.Printf("Count is %d\n", count) +} diff --git a/gos-concurrency-building-blocks/the-sync-package/once/sync-once-go-codebase.sh b/gos-concurrency-building-blocks/the-sync-package/once/sync-once-go-codebase.sh new file mode 100644 index 0000000..975c064 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/once/sync-once-go-codebase.sh @@ -0,0 +1 @@ +grep -ir sync.Once $(go env GOROOT)/src |wc -l diff --git a/gos-concurrency-building-blocks/the-sync-package/pool/fig-benchmark-fast-network-service_test.go b/gos-concurrency-building-blocks/the-sync-package/pool/fig-benchmark-fast-network-service_test.go new file mode 100644 index 0000000..d436321 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/pool/fig-benchmark-fast-network-service_test.go @@ -0,0 +1,71 @@ +package main + +import ( + "fmt" + "io/ioutil" + "log" + "net" + "sync" + "testing" + "time" +) + +func connectToService() interface{} { + time.Sleep(1 * time.Second) + return struct{}{} +} +func warmServiceConnCache() *sync.Pool { + p := &sync.Pool{ + New: connectToService, + } + for i := 0; i < 10; i++ { + p.Put(p.New()) + } + return p +} + +func startNetworkDaemon() *sync.WaitGroup { + var wg sync.WaitGroup + wg.Add(1) + go func() { + connPool := warmServiceConnCache() + + server, err := net.Listen("tcp", "localhost:8080") + if err != nil { + log.Fatalf("cannot listen: %v", err) + } + defer server.Close() + + wg.Done() + + for { + conn, err := server.Accept() + if err != nil { + log.Printf("cannot accept connection: %v", err) + continue + } + svcConn := connPool.Get() + fmt.Fprintln(conn, "") + connPool.Put(svcConn) + conn.Close() + } + }() + return &wg +} +func init() { + daemonStarted := startNetworkDaemon() + daemonStarted.Wait() +} + +func BenchmarkNetworkRequest(b *testing.B) { + for i := 0; i < b.N; i++ { + conn, err := net.Dial("tcp", "localhost:8080") + if err != nil { + b.Fatalf("cannot dial host: %v", err) + } + if _, err := ioutil.ReadAll(conn); err != nil { + b.Fatalf("cannot read: %v", err) + } + conn.Close() + } +} diff --git a/gos-concurrency-building-blocks/the-sync-package/pool/fig-benchmark-slow-network-service_test.go b/gos-concurrency-building-blocks/the-sync-package/pool/fig-benchmark-slow-network-service_test.go new file mode 100644 index 0000000..bd3c60a --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/pool/fig-benchmark-slow-network-service_test.go @@ -0,0 +1,58 @@ +package main + +import ( + "fmt" + "io/ioutil" + "log" + "net" + "sync" + "testing" + "time" +) + +func connectToService() interface{} { + time.Sleep(1 * time.Second) + return struct{}{} +} +func startNetworkDaemon() *sync.WaitGroup { + var wg sync.WaitGroup + wg.Add(1) + go func() { + server, err := net.Listen("tcp", "localhost:8080") + if err != nil { + log.Fatalf("cannot listen: %v", err) + } + defer server.Close() + + wg.Done() + + for { + conn, err := server.Accept() + if err != nil { + log.Printf("cannot accept connection: %v", err) + continue + } + connectToService() + fmt.Fprintln(conn, "") + conn.Close() + } + }() + return &wg +} +func init() { + daemonStarted := startNetworkDaemon() + daemonStarted.Wait() +} + +func BenchmarkNetworkRequest(b *testing.B) { + for i := 0; i < b.N; i++ { + conn, err := net.Dial("tcp", "localhost:8080") + if err != nil { + b.Fatalf("cannot dial host: %v", err) + } + if _, err := ioutil.ReadAll(conn); err != nil { + b.Fatalf("cannot read: %v", err) + } + conn.Close() + } +} diff --git a/gos-concurrency-building-blocks/the-sync-package/pool/fig-sync-pool-basic.go b/gos-concurrency-building-blocks/the-sync-package/pool/fig-sync-pool-basic.go new file mode 100644 index 0000000..dacd1e6 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/pool/fig-sync-pool-basic.go @@ -0,0 +1,20 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + myPool := &sync.Pool{ + New: func() interface{} { + fmt.Println("Creating new instance.") + return struct{}{} + }, + } + + myPool.Get() // <1> + instance := myPool.Get() // <1> + myPool.Put(instance) // <2> + myPool.Get() // <3> +} diff --git a/gos-concurrency-building-blocks/the-sync-package/pool/fig-sync-pool.go b/gos-concurrency-building-blocks/the-sync-package/pool/fig-sync-pool.go new file mode 100644 index 0000000..f363c71 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/pool/fig-sync-pool.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + var numCalcsCreated int + calcPool := &sync.Pool{ + New: func() interface{} { + numCalcsCreated += 1 + mem := make([]byte, 1024) + return &mem // <1> + }, + } + + // Seed the pool with 4KB + calcPool.Put(calcPool.New()) + calcPool.Put(calcPool.New()) + calcPool.Put(calcPool.New()) + calcPool.Put(calcPool.New()) + + const numWorkers = 1024 * 1024 + var wg sync.WaitGroup + wg.Add(numWorkers) + for i := numWorkers; i > 0; i-- { + go func() { + defer wg.Done() + + mem := calcPool.Get().(*[]byte) // <2> + defer calcPool.Put(mem) + + // Assume something interesting, but quick is being done with + // this memory. + }() + } + + wg.Wait() + fmt.Printf("%d calculators were created.", numCalcsCreated) +} diff --git a/gos-concurrency-building-blocks/the-sync-package/waitgroup/fig-bulk-add.go b/gos-concurrency-building-blocks/the-sync-package/waitgroup/fig-bulk-add.go new file mode 100644 index 0000000..380c397 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/waitgroup/fig-bulk-add.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" + "sync" +) + +func main() { + hello := func(wg *sync.WaitGroup, id int) { + defer wg.Done() + fmt.Printf("Hello from %v!\n", id) + } + + const numGreeters = 5 + var wg sync.WaitGroup + wg.Add(numGreeters) + for i := 0; i < numGreeters; i++ { + go hello(&wg, i+1) + } + wg.Wait() +} diff --git a/gos-concurrency-building-blocks/the-sync-package/waitgroup/fig-wait-group.go b/gos-concurrency-building-blocks/the-sync-package/waitgroup/fig-wait-group.go new file mode 100644 index 0000000..6428915 --- /dev/null +++ b/gos-concurrency-building-blocks/the-sync-package/waitgroup/fig-wait-group.go @@ -0,0 +1,28 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +func main() { + var wg sync.WaitGroup + + wg.Add(1) // <1> + go func() { + defer wg.Done() // <2> + fmt.Println("1st goroutine sleeping...") + time.Sleep(1) + }() + + wg.Add(1) // <1> + go func() { + defer wg.Done() // <2> + fmt.Println("2nd goroutine sleeping...") + time.Sleep(2) + }() + + wg.Wait() // <3> + fmt.Println("All goroutines complete.") +} diff --git a/notes/dead-writing/livelocks/livelock-example-fix.go b/notes/dead-writing/livelocks/livelock-example-fix.go new file mode 100644 index 0000000..8d0e66f --- /dev/null +++ b/notes/dead-writing/livelocks/livelock-example-fix.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +func main() { + type value struct { + sync.Mutex + id string + locked bool + value int + } + + lock := func(v *value) { + v.Lock() + v.locked = true + } + unlock := func(v *value) { + v.Unlock() + v.locked = false + } + printSum := func(wg *sync.WaitGroup, id string, v1, v2 *value) { + defer wg.Done() + var sum int + for i := 0; ; i++ { // <4> + if i >= 5 { + fmt.Println("canceling goroutine...") + return + } + + fmt.Printf("%v: acquiring lock on %v\n", id, v1.id) + lock(v1) // <1> + + time.Sleep(2 * time.Second) + + if v2.locked { // <2> + fmt.Printf("%v: releasing lock on %v\n", id, v1.id) + unlock(v1) // <3> + fmt.Printf("%v: %v locked, retrying\n", id, v2.id) + continue + } + + fmt.Printf("%v: acquiring lock on %v\n", id, v2.id) + lock(v2) + + sum = v1.value + v2.value + fmt.Printf("%v: releasing lock on %v\n", id, v1.id) + unlock(v1) + + fmt.Printf("%v: releasing lock on %v\n", id, v2.id) + unlock(v2) + break + } + + fmt.Printf("sum: %v\n", sum) + } + a, b := value{id: "a"}, value{id: "b"} + var wg sync.WaitGroup + wg.Add(2) + go printSum(&wg, "first", &a, &b) + go printSum(&wg, "second", &a, &b) // <1> + + wg.Wait() +} diff --git a/notes/dead-writing/livelocks/livelock-example.go b/notes/dead-writing/livelocks/livelock-example.go new file mode 100644 index 0000000..2d4f485 --- /dev/null +++ b/notes/dead-writing/livelocks/livelock-example.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +func main() { + type value struct { + sync.Mutex + id string + locked bool + value int + } + + lock := func(v *value) { + v.Lock() + v.locked = true + } + unlock := func(v *value) { + v.Unlock() + v.locked = false + } + printSum := func(wg *sync.WaitGroup, id string, v1, v2 *value) { + defer wg.Done() + var sum int + for i := 0; ; i++ { // <4> + if i >= 5 { + fmt.Println("canceling goroutine...") + return + } + + fmt.Printf("%v: acquiring lock on %v\n", id, v1.id) + lock(v1) // <1> + + time.Sleep(2 * time.Second) + + if v2.locked { // <2> + fmt.Printf("%v: releasing lock on %v\n", id, v1.id) + unlock(v1) // <3> + fmt.Printf("%v: %v locked, retrying\n", id, v2.id) + continue + } + + fmt.Printf("%v: acquiring lock on %v\n", id, v2.id) + lock(v2) + + sum = v1.value + v2.value + fmt.Printf("%v: releasing lock on %v\n", id, v1.id) + unlock(v1) + + fmt.Printf("%v: releasing lock on %v\n", id, v2.id) + unlock(v2) + break + } + + fmt.Printf("sum: %v\n", sum) + } + a, b := value{id: "a"}, value{id: "b"} + var wg sync.WaitGroup + wg.Add(2) + go printSum(&wg, "first", &a, &b) + go printSum(&wg, "second", &b, &a) + + wg.Wait() +}