### 多线程生产者消费者模型

In [2]:
using System.Threading;
using System.Collections.Concurrent;
//BlockingCollection自带信号量的ConcurrentQueue
var queue = new BlockingCollection<Message>(new ConcurrentQueue<Message>());
Thread sender = new Thread(SendMessageThread);
Thread receiver = new Thread(ReceiveMessageThread);

sender.Start(1);
receiver.Start(1);

sender.Join();
Thread.Sleep(100);
receiver.Interrupt();
receiver.Join();

void SendMessageThread(object? arg)
{
    int id = (int)arg!;

    for(int i = 1; i <= 20; i++)
    {
        queue.Add(new Message(id,i.ToString()));
        Console.WriteLine($"Thread {id} sent {i}");
        Thread.Sleep(100);
    }
}

void ReceiveMessageThread(object? id)
{
    try
    {
        while(true)
        {
            var message = queue.Take();
            Console.WriteLine($"Thread {id} received {message.Content} from {message.FromId}");
            Thread.Sleep(1);
        }
    }
    catch(ThreadInterruptedException)
    {
        Console.WriteLine($"Thread {id} interrupted");
    }
}

record Message(int FromId, string Content);

Thread 1 sent 1
Thread 1 received 1 from 1
Thread 1 sent 2
Thread 1 received 2 from 1
Thread 1 sent 3
Thread 1 received 3 from 1
Thread 1 received 4 from 1
Thread 1 sent 4
Thread 1 received 5 from 1
Thread 1 sent 5
Thread 1 received 6 from 1
Thread 1 sent 6
Thread 1 received 7 from 1
Thread 1 sent 7
Thread 1 sent 8
Thread 1 received 8 from 1
Thread 1 sent 9
Thread 1 received 9 from 1
Thread 1 received 10 from 1
Thread 1 sent 10
Thread 1 sent 11
Thread 1 received 11 from 1
Thread 1 sent 12
Thread 1 received 12 from 1
Thread 1 sent 13
Thread 1 received 13 from 1
Thread 1 sent 14
Thread 1 received 14 from 1
Thread 1 sent 15
Thread 1 received 15 from 1
Thread 1 sent 16
Thread 1 received 16 from 1
Thread 1 sent 17
Thread 1 received 17 from 1
Thread 1 sent 18
Thread 1 received 18 from 1
Thread 1 sent 19
Thread 1 received 19 from 1
Thread 1 sent 20
Thread 1 received 20 from 1
Thread 1 interrupted


### 使用Channel

In [3]:
using System.Threading.Channels;

// var option = new BoundedChannelOptions(10)
// {
//     FullMode = BoundedChannelFullMode.Wait,//Channel满了以后阻塞、丢弃最新的、丢弃最老的、丢弃当前要写的
//     SingleReader = true,
//     SingleWriter = true
// };
var channel = Channel.CreateUnbounded<Message>();

Thread sender = new Thread(SendMessageThread);
Thread receiver = new Thread(ReceiveMessageThread);

sender.Start(1);
receiver.Start(1);

sender.Join();
Thread.Sleep(100);
receiver.Interrupt();
receiver.Join();

void SendMessageThread(object? arg)
{
    int id = (int)arg!;

    for(int i = 1; i <= 20; i++)
    {
        if(channel.Writer.TryWrite(new Message(id,i.ToString())))
            Console.WriteLine($"Thread {id} sent {i}");
        Thread.Sleep(100);
    }
}

void ReceiveMessageThread(object? id)
{
    try
    {
        while(true)
        {
            if(channel.Reader.TryRead(out var message))
                Console.WriteLine($"Thread {id} received {message.Content} from {message.FromId}");
            Thread.Sleep(1);
        }
    }
    catch(ThreadInterruptedException)
    {
        Console.WriteLine($"Thread {id} interrupted");
    }
}


Thread 1 sent 1
Thread 1 received 1 from 1
Thread 1 sent 2
Thread 1 received 2 from 1
Thread 1 sent 3
Thread 1 received 3 from 1
Thread 1 sent 4
Thread 1 received 4 from 1
Thread 1 sent 5
Thread 1 received 5 from 1
Thread 1 sent 6
Thread 1 received 6 from 1
Thread 1 sent 7
Thread 1 received 7 from 1
Thread 1 sent 8
Thread 1 received 8 from 1
Thread 1 sent 9
Thread 1 received 9 from 1
Thread 1 sent 10
Thread 1 received 10 from 1
Thread 1 sent 11
Thread 1 received 11 from 1
Thread 1 sent 12
Thread 1 received 12 from 1
Thread 1 sent 13
Thread 1 received 13 from 1
Thread 1 sent 14
Thread 1 received 14 from 1
Thread 1 sent 15
Thread 1 received 15 from 1
Thread 1 sent 16
Thread 1 received 16 from 1
Thread 1 sent 17
Thread 1 received 17 from 1
Thread 1 sent 18
Thread 1 received 18 from 1
Thread 1 sent 19
Thread 1 received 19 from 1
Thread 1 sent 20
Thread 1 received 20 from 1
Thread 1 interrupted


### 改造成异步方法

In [7]:
using System.Threading.Channels;
using System.Threading.Tasks;

// var option = new BoundedChannelOptions(10)
// {
//     FullMode = BoundedChannelFullMode.Wait,//Channel满了以后阻塞、丢弃最新的、丢弃最老的、丢弃当前要写的
//     SingleReader = true,
//     SingleWriter = true
// };
var channel = Channel.CreateUnbounded<Message>();

var cts = new CancellationTokenSource();

var sender = SendMessageAsync(channel.Writer,1);
var receiver = ReceiveMessageAsync(channel.Reader, 2, cts.Token);

await sender;

await Task.Delay(100);
cts.Cancel();
await receiver;
cts.Dispose();

async Task SendMessageAsync(ChannelWriter<Message> writer, int id)
{
    for(int i = 1; i <= 20; i++)
    {
        await writer.WriteAsync(new Message(id,i.ToString()));
        Console.WriteLine($"Thread {id} sent {i}");
        await Task.Delay(100);
    }
}

async Task ReceiveMessageAsync(ChannelReader<Message> reader,int id,CancellationToken token)
{
    try
    {
        while(!token.IsCancellationRequested)
        {
            var message = await reader.ReadAsync(token);
            Console.WriteLine($"Thread {id} received {message.Content} from {message.FromId}");        
        }
    }
    catch(OperationCanceledException)
    {
        Console.WriteLine($"Thread {id} task canceled");
    }
}


Thread 1 sent 1
Thread 2 received 1 from 1
Thread 1 sent 2
Thread 2 received 2 from 1
Thread 1 sent 3
Thread 2 received 3 from 1
Thread 1 sent 4
Thread 2 received 4 from 1
Thread 1 sent 5
Thread 2 received 5 from 1
Thread 1 sent 6
Thread 2 received 6 from 1
Thread 1 sent 7
Thread 2 received 7 from 1
Thread 1 sent 8
Thread 2 received 8 from 1
Thread 1 sent 9
Thread 2 received 9 from 1
Thread 1 sent 10
Thread 2 received 10 from 1
Thread 1 sent 11
Thread 2 received 11 from 1
Thread 1 sent 12
Thread 2 received 12 from 1
Thread 1 sent 13
Thread 2 received 13 from 1
Thread 1 sent 14
Thread 2 received 14 from 1
Thread 1 sent 15
Thread 2 received 15 from 1
Thread 1 sent 16
Thread 2 received 16 from 1
Thread 1 sent 17
Thread 2 received 17 from 1
Thread 1 sent 18
Thread 2 received 18 from 1
Thread 1 sent 19
Thread 2 received 19 from 1
Thread 1 sent 20
Thread 2 received 20 from 1
Thread 2 task canceled


In [11]:
using System.Threading.Channels;
using System.Threading.Tasks;

// var option = new BoundedChannelOptions(10)
// {
//     FullMode = BoundedChannelFullMode.Wait,//Channel满了以后阻塞、丢弃最新的、丢弃最老的、丢弃当前要写的
//     SingleReader = true,
//     SingleWriter = true
// };
var channel = Channel.CreateUnbounded<Message>();


var sender1 = SendMessageAsync(channel.Writer,1);
var sender2 = SendMessageAsync(channel.Writer,3);
var receiver1 = ReceiveMessageAsync(channel.Reader, 2);
var receiver2 = ReceiveMessageAsync(channel.Reader, 4);

await Task.WhenAll(sender1,sender2);

await Task.Delay(100);
channel.Writer.Complete(); //使用标识来表示写入完毕
await Task.WhenAll(receiver1,receiver2);
async Task SendMessageAsync(ChannelWriter<Message> writer, int id)
{
    for(int i = 1; i <= 20; i++)
    {
        await writer.WriteAsync(new Message(id,i.ToString()));
        Console.WriteLine($"Thread {id} sent {i}");
        await Task.Delay(100);
    }
}

async Task ReceiveMessageAsync(ChannelReader<Message> reader,int id)
{
    // try
    // {
    //     while(!reader.Completion.IsCompleted)
    //     {
    //         var message = await reader.ReadAsync();
    //         Console.WriteLine($"Thread {id} received {message.Content} from {message.FromId}");        
    //     }
    // }
    // catch(ChannelClosedException)
    // {
    //     Console.WriteLine($"Thread {id} channel closed。");
    // }
    await foreach(var message in reader.ReadAllAsync())
    {
        Console.WriteLine($"Thread {id} received {message.Content} from {message.FromId}"); 
    }
}


Thread 1 sent 1
Thread 3 sent 1
Thread 2 received 1 from 1
Thread 2 received 1 from 3
Thread 1 sent 2
Thread 3 sent 2
Thread 2 received 2 from 3
Thread 2 received 2 from 1
Thread 3 sent 3
Thread 1 sent 3
Thread 2 received 3 from 1
Thread 4 received 3 from 3
Thread 1 sent 4
Thread 3 sent 4
Thread 2 received 4 from 1
Thread 4 received 4 from 3
Thread 1 sent 5
Thread 4 received 5 from 3
Thread 3 sent 5
Thread 2 received 5 from 1
Thread 1 sent 6
Thread 3 sent 6
Thread 4 received 6 from 1
Thread 4 received 6 from 3
Thread 3 sent 7
Thread 1 sent 7
Thread 4 received 7 from 3
Thread 2 received 7 from 1
Thread 1 sent 8
Thread 4 received 8 from 3
Thread 3 sent 8
Thread 2 received 8 from 1
Thread 1 sent 9
Thread 3 sent 9
Thread 4 received 9 from 1
Thread 2 received 9 from 3
Thread 3 sent 10
Thread 1 sent 10
Thread 4 received 10 from 3
Thread 2 received 10 from 1
Thread 1 sent 11
Thread 3 sent 11
Thread 4 received 11 from 1
Thread 2 received 11 from 3
Thread 1 sent 12
Thread 3 sent 12
Thread 4 rec