## Understanding Fan-in

```mermaid
graph LR
    A[Input 1] --> C(Fan-in)
    B[Input 2] --> C
    C --> D[Output]


Fan-in pattern giống kiểu như là mình gôm nhiều channels lại thành 1 channel, mục tiêu là để handle nhiều channels cùng lúc.

## Ví dụ kinh điển của Rob Pike
Ví dụ này đã được giới thiệu ở phần 4, Select Basics

In [4]:
func boring(msg string) <-chan string {
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c
}

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            }
        }
    }()
    return c
}

func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

Joe 0
Ann 0
Ann 1
Ann 2
Joe 1
Joe 2
Ann 3
Joe 3
Ann 4
Joe 4
You're both boring; I'm leaving.


## Why Fan-in?

### 1. Merge Independent Sources

In [None]:
func fetchFromServers(servers ...string) <-chan Response {
    c := make(chan Response)
    
    // Launch a goroutine for each server
    for _, server := range servers {
        go func(s string) {
            resp := fetch(s)
            c <- resp
        }(server)
    }
    
    return c
}

### 2. Simplify Consumer Code

In [None]:
// Without fan-in - complex
for {
    select {
    case msg1 := <-ch1:
        process(msg1)
    case msg2 := <-ch2:
        process(msg2)
    case msg3 := <-ch3:
        process(msg3)
    }
}

// With fan-in - simple
merged := fanIn(ch1, ch2, ch3)
for msg := range merged {
    process(msg)
}

## Fan-in Variations

### 1. Two-Channel Fan-in
Đây là dạng đơn giản nhất, chỉ gom 2 channels lại với nhau

In [7]:
func fanIn2(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case msg := <-input1:
                c <- msg
            case msg := <-input2:
                c <- msg
            }
        }
    }()
    return c
}

### 2. Variadic Fan-in
Gom nhiều channels lại với nhau dùng variadic của Go

In [None]:
func fanIn(inputs ...<-chan string) <-chan string {
    c := make(chan string)
    
    // Start a goroutine for each input channel
    for _, input := range inputs {
        go func(in <-chan string) {
            for msg := range in {
                c <- msg
            }
        }(input)
    }
    
    return c
}

### 3. Sequenced Fan-in

In [10]:
type Message struct {
    str  string
    wait chan bool
}

func fanInSequenced(input1, input2 <-chan Message) <-chan Message {
    c := make(chan Message)
    go func() {
        for {
            select {
            case msg1 := <-input1:
                c <- msg1
            case msg2 := <-input2:
                c <- msg2
            }
        }
    }()
    return c
}

func boring(msg string) <-chan Message {
    waitForIt := make(chan bool)
    c := make(chan Message)
    go func() {
        for i := 0; ; i++ {
            c <- Message{
                str:  fmt.Sprintf("%s %d", msg, i),
                wait: waitForIt,
            }
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
            <-waitForIt // waits for permission before sending next
        }
    }()
    return c
}

func main() {
    c := fanInSequenced(boring("Joe"), boring("Ann"))

    for i := 0; i <= 5; i++ {
        msg := <-c              // get next message from either producer
        fmt.Println(msg.str)    // consume it
        msg.wait <- true        // signal producer to send next
    }
    fmt.Println("You're both boring; I'm leaving.")
}

Joe 0
Ann 0
Joe 1
Ann 1
Ann 2
Joe 2
You're both boring; I'm leaving.


### Các điểm cần note:

#### Message struct

In [None]:
type Message struct {
    str  string
    wait chan bool
}

- `str`: đây là payload
- `wait`: là 1 tín hiệu trả về cho producer 

#### boring(msg string)

In [None]:
func boring(msg string) <-chan Message {
    waitForIt := make(chan bool)
    c := make(chan Message)
    go func() {
        for i := 0; ; i++ {
            c <- Message{
                str:  fmt.Sprintf("%s %d", msg, i),
                wait: waitForIt,
            }
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
            <-waitForIt
        }
    }()
    return c
}

- Mỗi producer goroutine sẽ tạo ra các message kiểu như "Joe 0", 'Joe 1"....
- Sau mỗi message được gửi tới channel c, thì nó sẽ bị block bởi `<-waitForIt`, chờ đợi ack của consumer
- Điều này sẽ không làm quá tải cho consumer, vì gorouting không thể gửi value kế kiêp khi mà consumer chưa cho phép

#### fanInSequenced

- Merge 2 channel lại thành 1

#### Khi nào sử dụng patter này?

- Khi producer generate data nhanh quá, làm đầy buffer quá nhanh mà consumer tiêu thụ không kịp
- Producer phải tạm dừng cho đến khi consumer hoàn thành
- Comnsumer tiêu thụ đều nhau từ 2 producer

### 4. Fan-in with Done Channel

In [17]:
func fanInWithDone(done <-chan struct{}, inputs ...<-chan int) <-chan int {
    c := make(chan int)
    var wg sync.WaitGroup
    
    // Multiplexer function
    multiplex := func(input <-chan int) {
        defer wg.Done()
        for {
            select {
            case v := <-input:
                select {
                case c <- v:
                case <-done:
                    return
                }
            case <-done:
                return
            }
        }
    }
    
    // Launch multiplexer for each input
    wg.Add(len(inputs))
    for _, input := range inputs {
        go multiplex(input)
    }
    
    // Close output when all multiplexers finish
    go func() {
        wg.Wait()
        close(c)
    }()
    
    return c
}

func main() {
    done := make(chan struct{})

    // Simulate two input streams
    input1 := make(chan int)
    input2 := make(chan int)

    go func() {
        for i := 0; i < 3; i++ {
            input1 <- i
            time.Sleep(1000 * time.Millisecond)
        }
        close(input1)
    }()

    go func() {
        for i := 100; i < 103; i++ {
            input2 <- i
            time.Sleep(1500 * time.Millisecond)
        }
        close(input2)
    }()

    out := fanInWithDone(done, input1, input2)

    for v := range out {
        fmt.Println("Got:", v)
        if v == 100 { // simulate cancellation mid-stream
            close(done)
        }
    }

    fmt.Println("Finished.")
}

Got: 0
Got: 100
Finished.


### Các điểm cần note

#### 1. `done <-chan struct{}`

- Đây là 1 signal channel, đóng hoặc gửi `done` để đóng toàn bộ goroutine
- Đảm bảo dừng toàn bộ goroutine kể cả vài input channel đang active hoặc bị block

#### 2. `multiplex` function

In [None]:
multiplex := func(input <-chan int) {
    defer wg.Done()
    for {
        select {
        case v := <-input:
            select {
            case c <- v:
            case <-done:
                return
            }
        case <-done:
            return
        }
    }
}

- Đọc dư liệu từ input channel
- Forward giá trị đó vào c channel
- Nếu có done thì exit
- Select bên trong đảm bảo không block sending tới channel c

#### 3. Gọi multiplexers

In [None]:
wg.Add(len(inputs))
for _, input := range inputs {
    go multiplex(input)
}

- Tạo 1 goroutine với mỗi input
- `wg.Add` để track có bao nhiêu goroutine

#### 4. Đóng output

In [None]:
go func() {
    wg.Wait()
    close(c)
}()

- Chờ toàn bộ multiplexer thoat trước khi đóng channel c
- Ngăn chặn việc gửi data tới 1 channel đã được đóng

## Advanced Fan-in Patterns

## Real-World Applications

## Performance Considerations

## Common Pitfalls

## Best Practices