Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Dec 6, 2023
1 parent f813a85 commit a8e41a2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 42 deletions.
45 changes: 29 additions & 16 deletions src/RabbitMQ.Next/Sockets/EndpointResolver.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Exceptions;
using RabbitMQ.Next.Transport;

namespace RabbitMQ.Next.Sockets;

Expand Down Expand Up @@ -44,32 +45,44 @@ IPAddress FindAddress(IReadOnlyList<IPAddress> address, AddressFamily family)
var ipV6Address = FindAddress(addresses, AddressFamily.InterNetworkV6);
if (ipV6Address != null)
{
var socket = await ConnectAsync(ipV6Address, endpoint.Port, cancellation);
if (socket != null)
{
return new SocketWrapper(socket, endpoint);
}
return await ConnectAsync(ipV6Address, endpoint, cancellation);
}

// 2. Try IP v4
var ipV4Address = FindAddress(addresses, AddressFamily.InterNetwork);
if (ipV4Address != null)
{
var socket = await ConnectAsync(ipV4Address, endpoint.Port, cancellation);
if (socket != null)
{
return new SocketWrapper(socket, endpoint);
}
return await ConnectAsync(ipV4Address, endpoint, cancellation);
}

throw new NotSupportedException("Cannot connect to the endpoint: no supported protocols is available");
}

private static async Task<Socket> ConnectAsync(IPAddress address, int port, CancellationToken cancellation)
private static async Task<ISocket> ConnectAsync(IPAddress address, Endpoint endpoint, CancellationToken cancellation)
{
var ipEndPoint = new IPEndPoint(address, endpoint.Port);
var socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(ipEndPoint, cancellation);
var stream = ConfigureStream(socket, endpoint);
return new SocketWrapper(socket, stream);
}

private static Stream ConfigureStream(Socket socket, Endpoint endpoint)
{
var endpoint = new IPEndPoint(address, port);
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(endpoint, cancellation);
return socket;
Stream stream = new NetworkStream(socket)
{
ReadTimeout = 60000,
WriteTimeout = 60000,
};

if (endpoint.UseSsl)
{
var sslStream = new SslStream(stream, false);
sslStream.AuthenticateAsClient(endpoint.Host);

stream = sslStream;
}

return stream;
}
}
33 changes: 7 additions & 26 deletions src/RabbitMQ.Next/Sockets/SocketWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.IO;
using System.Net.Security;
using System.Net.Sockets;
using RabbitMQ.Next.Buffers;

Expand All @@ -8,49 +7,32 @@ namespace RabbitMQ.Next.Sockets;
internal class SocketWrapper : ISocket
{
private readonly Socket socket;
private readonly Stream readStream;
private readonly Stream writeStream;
private readonly Stream stream;

public SocketWrapper(Socket socket, Endpoint endpoint)
public SocketWrapper(Socket socket, Stream stream)
{
this.socket = socket;

Stream stream = new NetworkStream(socket)
{
ReadTimeout = 60000,
WriteTimeout = 60000,
};

if (endpoint.UseSsl)
{
var sslStream = new SslStream(stream, false);
sslStream.AuthenticateAsClient(endpoint.Host);

stream = sslStream;
}

this.readStream = stream;
this.writeStream = stream;
this.stream = stream;
}

public void Send(IMemoryAccessor payload)
{
var current = payload;
while (current != null)
{
current.WriteTo(this.writeStream);
current.WriteTo(this.stream);
current = current.Next;
}

this.writeStream.Flush();
this.stream.Flush();
}

public int Receive(byte[] buffer, int offset, int minBytes)
{
var received = 0;
while (received < minBytes)
{
var readBytes = this.readStream.Read(buffer, offset, buffer.Length - offset);
var readBytes = this.stream.Read(buffer, offset, buffer.Length - offset);
if (readBytes == 0 && this.IsConnectionClosedByServer())
{
throw new SocketException();
Expand All @@ -68,8 +50,7 @@ private bool IsConnectionClosedByServer()

public void Dispose()
{
this.readStream?.Dispose();
this.writeStream?.Dispose();
this.stream.Dispose();
this.socket?.Dispose();
}
}

0 comments on commit a8e41a2

Please sign in to comment.