The RxSocket use io.pipelines with RX to handle socket connection and networkstream.
it draw inspiration from davidfowl's project.
ISocketService interface of TcpService providing socket both client and server APIs.
Another thing,also providing subscribe stream both sender and reciever.
It is provided under the MIT License
Simple Tcp Server with IO.Pipelines
Simple asynchronous event based Reciever and Sender
Provide Retry Policy
To install RxSocketWithIoPipelines from within Visual studion, search for RxSocketWithIoPipeLines in the NuGet Package Manager UI, or run the following command in the Package Manager Console:
Install-Package RxSocketWithIoPipeLines -Version 1.0.0
var tcpServer = serviceProvider.GetRequiredService<ISocketService>();
tcpServer.Accepted.SubscribeOn(TaskPoolScheduler.Default)
.Subscribe
(
r => Console.WriteLine($"Server started. Listening at {r.LocalEndPoint}"),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Server Accepted completed")
);
tcpServer.Disconnected.SubscribeOn(TaskPoolScheduler.Default)
.Subscribe
(
r => Console.WriteLine($"{r} Server stooped."),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Server Disconnected completed")
);
tcpServer.Error.SubscribeOn(TaskPoolScheduler.Default)
.Subscribe
(
r => Console.WriteLine($"Error happend:{r.Method}, {r.Exception.Message},{r.Exception.StackTrace}"),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Server Error completed")
);
tcpServer.Reciever.SubscribeOn(TaskPoolScheduler.Default)
.Subscribe(
r => Console.WriteLine($"Receive:{r.Message} from [{r.EndPoint}]"),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Socket receiver completed")
);
tcpServer.Sender.SubscribeOn(TaskPoolScheduler.Default)
.Subscribe(
r => Console.WriteLine($"Self Sending Message:{r.Message}"),
ex => Console.WriteLine(ex),
() => Console.WriteLine("Socket sender completed")
);
await tcpServer?.StartAsync();
var client = _serviceProvider.GetRequiredService<IClientSocket>();
Task.Run(async () => await _client.ConnectAsync(_serverConfig.IpAddress, _serverConfig.Port)
.ConfigureAwait(false))
.ContinueWith(tresult =>
{
if (tresult.Status == TaskStatus.Faulted)
{
Console.WriteLine($"connect {_serverConfig.IpAddress}:{_serverConfig.Port} faulted");
_needReConnecting = true;
}
});
string line = null;
while ((line = Console.ReadLine()) != "")
{
if (line == "r")
{
Console.WriteLine("Reconnecting...");
_client.Disconnect();
Task.Run(async () => await _client.ConnectAsync(_serverConfig.IpAddress, _serverConfig.Port))
.ContinueWith(tresult =>
{
if (tresult.Status == TaskStatus.Faulted)
{
Console.WriteLine($"Connected Faulted.");
}
else
{
Console.WriteLine($"IsConnected = {_client.IsConnected}");
}
});
}
else if (line == "exit")
{
_client.Disconnect();
break;
}
else
{
Console.WriteLine($"{line} Sending..");
_client.SendAsync(line, 3, ErrorMessageCallback).ConfigureAwait(false);
}
}