Skip to content

Producer Close Exception when the producer is closed #97

@Gsantomaggio

Description

@Gsantomaggio

When the Producer TCP connection is closed by the management UI or network problems the fucntionproducer.close() raises:

System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.NetworkStream'.
   at System.Net.Sockets.NetworkStream.<ThrowIfDisposed>g__ThrowObjectDisposedException|63_0()
   at System.Net.Sockets.NetworkStream.WriteAsync(ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)

It is enough to kill the connection using the management UI:

using System.Net;
using RabbitMQ.Stream.Client;
using System.Text.Json;


namespace example;

public class Reliable
{
    public class Conn
    {
        public string name { get; set; }
    }

    private async Task<int> HttpKillConnections()
    {
        using var handler = new HttpClientHandler
        {
            Credentials = new NetworkCredential("guest", "guest"),
        };
        using var client = new HttpClient(handler);
        var result = await client.GetAsync("http://localhost:15672/api/connections");
        var json = await result.Content.ReadAsStringAsync();
        var connections = JsonSerializer.Deserialize<IEnumerable<Conn>>(json);
        if (connections == null) return 0;
        foreach (var conn in connections)
        {
            await client.DeleteAsync($"http://localhost:15672/api/connections/{conn.name}");
        }

        return connections.Count();
    }

    public async Task RaiseConnectionError()
    {
        var config = new StreamSystemConfig { };
        const string stream = "mystream-r";
        var system = await StreamSystem.Create(config);
        await system.CreateStream(new StreamSpec(stream));

       
        var consumer = await system.CreateConsumer(
            new ConsumerConfig
            {
                Reference = "my_consumer1",
                Stream = stream,
                OffsetSpec = new OffsetTypeFirst(),
                MetadataHandler = update => { Console.WriteLine($"ConsumerConfig:{update.Stream}"); },
                ConnectionClosedHandler = async s =>
                {
                    Console.WriteLine($"consumer Closed");
                    await Task.CompletedTask;
                },
                MessageHandler = async (consumer, ctx, message) => { await Task.CompletedTask; }
            });

        var p = new ProducerConfig
        {
            Reference = "producer1",
            Stream = stream,
            MetadataHandler = update =>
            {
                Console.WriteLine($"Meta data update {update}");

            },
            ConnectionClosedHandler = s =>
            {
                Console.WriteLine($"From External Close {s}");
                return Task.CompletedTask;
            },
        };

        var producer = await system.CreateProducer(p);
        Thread.Sleep(5000);
        var r = HttpKillConnections();

        Console.WriteLine($"Connection count: {r.Result}");
      
        Thread.Sleep(1000);
        try
        {
           await producer.Close();
            await consumer.Close();
        }
        catch (Exception e)
        {
            // Exception here
            Console.WriteLine(e);
            throw;
        }
       await system.DeleteStream(stream);
        Console.WriteLine("Close");
    }
}

just execute RaiseConnectionError() method.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions