Skip to content
Permalink
Branch: master
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 573 lines (435 sloc) 13.8 KB

02.6 - Goroutines and channels

Goroutines

Concurrency is not parallelism

  • Rob "Commander" Pike

With that said, let's look at one of Go's main selling points, the goroutine. go function(a, b) runs the function in parallel and continues with the rest of the program.

// 02.6-01-goroutine1.go
package main

import "fmt"

func PrintMe(t int, count int) {
    for i := 0; i < count; i++ {
        fmt.Printf("Printing from %d\n", t)
    }
}

func main() {

    go PrintMe(0, 100)

    fmt.Println("Main finished!")
}

But we never see anything printed. main returns before goroutine is spun up and start printing:

$ go run 02.6-01-goroutine1.go
Main finished!

Lesson learned: Always wait for goroutines to finish! (if applicable).

Continuing the C tradition, we can wait for a key-press before ending main.

// 02.6-02-goroutine2.go
package main

import "fmt"

func PrintMe(t int, count int) {
    for i := 0; i < count; i++ {
        fmt.Printf("Printing from %d\n", t)
    }
}

func main() {

    go PrintMe(0, 10)

    // Wait for a keypress
    fmt.Scanln()
    fmt.Println("Main finished!")
}

This time we can see the goroutine's output:

$ go run 02.6-02-goroutine2.go
Printing from 0
Printing from 0
Printing from 0
Printing from 0
Printing from 0
Printing from 0
Printing from 0
Printing from 0
Printing from 0
Printing from 0
e
Main finished!

Spawning anonymous goroutines

We can also spawn new goroutines on the spot:

// 02.6-03-goroutine3.go
package main

import "fmt"

func main() {

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Printf("Printing from %d\n", 0)
        }
    }()

    // Wait for a keypress
    fmt.Scanln()
    fmt.Println("Main finished!")
}

Channels

Channels go hand-in-hand with gorotuines. They are typed. For example if we create a channel of type int, we can only use it to transfer ints. Values are transfered using <-. Channels must be created before use.

Let's make a channel in honor of famous hacker 4chan and use it to transfer some numbers around:

// 02.6-04-channel1.go
// This will not run
package main

import "fmt"

func main() {

    fourChan := make(chan int)

    i1 := 10

    // Send i1 to channel
    fourChan <- i1
    fmt.Printf("Sent %d to channel\n", i1)

    // Receive int from channel
    i2 := <-fourChan
    fmt.Printf("Received %d from channel\n", i2)
}

But it doesn't work:

$ go run 02.6-04-channel1.go
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        Z:/Go/src/Hacking-with-Go/code/02/02.6/02.6-04-channel1.go:12 +0x75
exit status 2

[Unbuffered] Channels will not start until the other side is ready.

Our channel's "other side" is also in main and the channel is unbuffered (we will talk about it in a bit). Meaning there's nothing on the other side listening.

We can either send or receive the data in a goroutine (or both):

// 02.6-05-channel2.go
package main

import "fmt"

func main() {

    fourChan := make(chan int)

    go func() {
        // Send i1 to channel
        i1 := 10
        fourChan <- i1 // fourChan <- 10
        fmt.Printf("Sent %d to channel\n", i1)
    }()

    go func() {
        // Receive int from channel
        i2 := <-fourChan
        fmt.Println(i2)
        fmt.Printf("Received %d from channel\n", i2)
    }()

    // Wait for goroutines to finish
    fmt.Scanln()
    fmt.Println("Main Finished!")
}

This time we have another goroutine listening on the other side:

$ go run 02.6-05-channel2.go
10
Received 10 from channel
Sent 10 to channel
e
Main Finished!

Buffered channels

Buffered channels have capacity and only block when the buffer is full. Buffer size (as far as I know) is specified during declaration:

  • bc := make(chan int, 10) makes an int channel with size 10.

Using buffered channels we can send and receive in main:

// 02.6-06-channel3.go
package main

import "fmt"

func main() {

    fourChan := make(chan int, 2)

    // Send 10 to channel
    fourChan <- 10
    fmt.Printf("Sent %d to channel\n", 10)

    // Receive int from channel
    // We can also receive directly
    fmt.Printf("Received %d from channel\n", <-fourChan)
}

If the channel goes over capacity, we get the same fatal runtime error as before.

Closing channels

Channels can be closed. To close a channel we can use close(fourChan).

Sending items to a closed channel will cause a panic.

Checking channel status

When reading from channels, we can also get a second return value:

  • i1, ok := <- fourChan

If channel is open ok will be true. false means channel is closed.

Reading from a closed channel will return a zero value (e.g. 0 for most number types). See this example. i2 is 10 before reading something from a closed channel. After it's 0.

// 02.6-07-channel4.go
package main

import "fmt"

func main() {

    fourChan := make(chan int, 2)

    close(fourChan)

    i2 := 10
    fmt.Println("i2 before reading from closed channel", i2) // 10
    i2, ok := <-fourChan
    fmt.Printf("i2: %d - ok: %t", i2, ok) // i2: 10 - ok: false
}

Reading from channels with range

Use a range in a for to receive values from the channel in a loop until it closes like for i:= range fourChan.

// 02-08-channel5.go
package main

import "fmt"

func main() {

    fourChan := make(chan int, 10)

    go func() {
        // Send 0-9 to channel
        for i := 0; i < 10; i++ {
            fourChan <- i
        }
    }()

    go func() {
        // Receive from channel
        for v := range fourChan {
            fmt.Println(v)
        }
    }()

    // Wait for goroutines to finish
    fmt.Scanln()
    fmt.Println("Main Finished!")
}

If we attempt to read something from an open channel and there's nothing there, the program will block and wait until it gets something. We can use channels to sync goroutines instead of waiting for Scanln. Here's our example from 02.6-03-goroutine3.go:

// 02.6-09-channel6.go
package main

import "fmt"

func main() {

    c := make(chan bool)

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Printf("Printing from %d\n", 0)
        }

        // Send true to channel when we are done
        c <- true
    }()

    // Main will wait until it receives something from c
    <-c
}

select

Another way to wait for channels to be ready is using select. select has some cases. It will block until one of the cases is ready and runs it. If multiple are ready, it will choose one at random.

// 02.6-10-channel7.go
package main

import "fmt"

func main() {

    c := make(chan int, 2)

    for i := 0; i < 10; i++ {

        select {
        case c <- i:
            // If we can write to channel, send something to it
            fmt.Println("Sent to channel", i)
        case i2 := <-c:
            // If we can read from channel, read from it and print
            fmt.Println("Received from channel", i2)
        default:
            // This is run when nothing else can be done
        }
    }
}

Break is never reached because there's always something to do. Increase the size of the channel and re-run the program a few times to see select's randomness when multiple choices are valid.

$ go run 02.6-10-channel7.go
Sent to channel 0
Received from channel 0
Sent to channel 2
Sent to channel 3
Received from channel 2
Received from channel 3
Sent to channel 6
Received from channel 6
Sent to channel 8
Sent to channel 9

If channel is unbuffered, default is always triggered because there's nothing listening on the other side.

Directed channels

Channels can be directed. Meaning you can only read or write to them.

  • c1 := make(chan<- int): write-only int channel.
  • c2 := make(<-chan int): read-only int channel.

However, declaring directed channels is not useful. Because if we can never write to a read-only channel, it will never have data. Instead they are used when passing channels to functions/goroutines.

Rewriting 02.6-05-channel2.go using directed channels:

// 02.6-11-channel8.go
package main

import "fmt"

// Directed write-only channel
func Sender(c chan<- int) {
    for i := 0; i < 10; i++ {
        fmt.Println("Sent", i)
        c <- i
    }
}

func Receiver(c <-chan int) {
    for i := range c {
        fmt.Println("Received", i)
    }
}

func main() {

    fourChan := make(chan int)

    go Sender(fourChan)
    go Receiver(fourChan)

    // Wait for goroutines to finish
    fmt.Scanln()
    fmt.Println("Main Finished!")
}
$ go run 02.6-11-channel8.go
Sent 0
Sent 1
Received 0
Received 1
Sent 2
Sent 3
Received 2
Received 3
Sent 4
Sent 5
Received 4
Received 5
Sent 6
Sent 7
Received 6
Received 7
Sent 8
Sent 9
Received 8
Received 9
d
Main Finished!

Synching goroutines

In our previous example, we used both Scanln and a blocking channel to force main wait for goroutines to finish. There's a better way of doing this using sync.WaitGroup.

Let's assume we are generating a list of strings that need to processed. To take advantage of Go's concurrency model, we spawn a goroutine to generate the list and send each to a channel. Then we read from the channel and spawn a new goroutine for each string and process it.

This way we can start processing the generated strings as they are being generated and we do not have to create a large string slice to hold the results.

// 02.6-12-waitgroup1.go
package main

import "fmt"

// generateStrings generated n strings and sends them to channel.
// Channel is closed when string generation is done.
func generateStrings(n int, c chan<- string) {

    // Close channel when done
    defer close(c)
    // Generate strings
    for i := 0; i < n; i++ {
        c <- fmt.Sprintf("String #%d", i)
    }
}

// consumeString reads strings from channel and prints them.
func consumeString(s string) {
    fmt.Printf("Consumed %s\n", s)
}

func main() {
    // Create channel
    c := make(chan string)
    // Generate strings
    go generateStrings(10, c)

    for {
        select {
        // Read from channel
        case s, ok := <-c:
            // If channel is closed stop processing and return
            if !ok {
                fmt.Println("Processing finished")
                return
            }
            // Consume the string read from channel
            go consumeString(s)
        }
    }
}

This looks correct but it's not. Not all strings are consumed. Because the channel is closed and we return from main when generateStrings is done. However, not all consumerString goroutines are done when by then. We need to find a way to signal main to wait until all goroutines have returned.

sync.WaitGroup

We accomplish this with sync.WaitGroup. Before spawning each consumerString goroutine we wg.Add(1) to it. Every time a consumerString goroutine is finished, we subtract the counter by one with wg.Done() and then we wait before returning with wg.Wait() which blocks execution until the counter is zero.

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

// generateStrings generated n strings and sends them to channel.
// Channel is closed when string generation is done.
func generateStrings(n int, c chan<- string) {

    // Close channel when done
    defer close(c)
    // Generate strings
    for i := 0; i < n; i++ {
        c <- fmt.Sprintf("String #%d", i)
    }
}

// consumeString reads strings from channel and prints them.
func consumeString(s string) {
    // Decrease waitgroup's counter by one
    defer wg.Done()
    fmt.Printf("Consumed %s\n", s)
}

func main() {
    // Create channel
    c := make(chan string)
    // Generate strings
    go generateStrings(10, c)

    for {
        select {
        // Read from channel
        case s, ok := <-c:
            // If channel is closed stop processing and return
            if !ok {
                // Wait for all goroutines to finish
                wg.Wait()
                // Return
                fmt.Println("Processing finished")
                return
            }
            // Increase wg counter by one for each goroutine
            // Note this is happening inside main before spawning the goroutine
            wg.Add(1)
            // Consume the string
            go consumeString(s)
        }
    }
}

Continue reading ⇒ 02.7 - Error handling

You can’t perform that action at this time.