Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

support tcp_nodelay

support latching
  • Loading branch information...
commit f3ab50a10c8f9d9d9bc9470c84878ab46657f002 1 parent 4d4d85d
ZOETROPE authored
14 RosSharp.Tests/Topic/PublisherTest.cs
View
@@ -24,7 +24,7 @@ public void Initialize()
[HostType("Moles")]
public void AddTopic_Success()
{
- MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocket = (t1, t2) => Task.Factory.StartNew(() => { });
+ MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocketBoolean = (t1, t2, t3) => Task.Factory.StartNew(() => { });
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var pub = new Publisher<std_msgs.String>("test", "testnode");
@@ -36,8 +36,8 @@ public void AddTopic_Success()
[HostType("Moles")]
public void AddTopic_Error()
{
- MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocket =
- (t1, t2) => Task.Factory.StartNew(() => {throw new InvalidOperationException("Start Error");});
+ MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocketBoolean =
+ (t1, t2, t3) => Task.Factory.StartNew(() => { throw new InvalidOperationException("Start Error"); });
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var pub = new Publisher<std_msgs.String>("test", "testnode");
@@ -51,8 +51,8 @@ public void AddTopic_Error()
[HostType("Moles")]
public void AddTopic_MultipleTopicSuccess()
{
- MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocket =
- (t1, t2) => Task.Factory.StartNew(() => { });
+ MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocketBoolean =
+ (t1, t2, t3) => Task.Factory.StartNew(() => { });
MRosTopicClient<std_msgs.String>.AllInstances.SendTaskAsyncTMessage =
(t1, t2) => Task.Factory.StartNew(() => t2.SerializeLength);
@@ -72,8 +72,8 @@ public void AddTopic_MultipleTopicSuccess()
[HostType("Moles")]
public void OnNext_Success()
{
- MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocket =
- (t1, t2) => Task.Factory.StartNew(() => { });
+ MRosTopicClient<std_msgs.String>.AllInstances.StartAsyncSocketBoolean =
+ (t1, t2, t3) => Task.Factory.StartNew(() => { });
MRosTopicClient<std_msgs.String>.AllInstances.SendTaskAsyncTMessage =
(t1, t2) => Task.Factory.StartNew(() => t2.SerializeLength);
4 RosSharp.Tests/Topic/SubscriberTest.cs
View
@@ -29,8 +29,8 @@ public void Subscribe_Success()
MSlaveClient.AllInstances.RequestTopicAsyncStringStringListOfProtocolInfo =
(t1, t2, t3, t4) => Task.Factory.StartNew(() => new TopicParam() {HostName = "localhost", PortNumber = 12345, ProtocolName = "TCPROS"});
MRosTopicServer<std_msgs.String>.ConstructorStringStringUri = (t1, t2, t3, t4) => { };
- MRosTopicServer<std_msgs.String>.AllInstances.StartAsyncTopicParam =
- (t1, t2) => Task.Factory.StartNew(() => (IObservable<std_msgs.String>)publisher);
+ MRosTopicServer<std_msgs.String>.AllInstances.StartAsyncTopicParamBoolean =
+ (t1, t2, t3) => Task.Factory.StartNew(() => (IObservable<std_msgs.String>)publisher);
var sub = new Subscriber<std_msgs.String>("testtopic", "test");
13 RosSharp.Tests/Transport/TcpRosHeaderTest.cs
View
@@ -1,4 +1,5 @@
-using System.IO;
+using System.Collections.Generic;
+using System.IO;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RosSharp.Topic;
using RosSharp.Transport;
@@ -136,6 +137,16 @@ public void Deserialize_NotContainsEqual()
var ex = AssertEx.Throws<RosTopicException>(() => TcpRosHeaderSerializer.Deserialize(ms));
ex.Message.Is("Header does not contain '='");
}
+
+ [TestMethod]
+ public void TcpRosHeader_HasMember()
+ {
+ dynamic x = new TcpRosHeader(new Dictionary<string, string>() { { "tcp_nodelay", "true" }, { "md5sum", "0000" } });
+
+ Assert.AreEqual(x.HasMember("tcp_nodelay"), true);
+ Assert.AreEqual(x.HasMember("md5sum"), true);
+ Assert.AreEqual(x.HasMember("topic"), false);
+ }
}
}
2  RosSharp/Master/MasterServer.cs
View
@@ -61,7 +61,7 @@ public sealed class MasterServer : MarshalByRefObject, IMaster, IParameterServer
public MasterServer(int portNumber)
{
_channel = new HttpServerChannel("master", portNumber, new XmlRpcServerFormatterSinkProvider());
-
+
var tmp = new Uri(_channel.GetChannelUri());
MasterUri = new Uri("http://" + RosManager.HostName + ":" + tmp.Port);
6 RosSharp/Node/INode.cs
View
@@ -49,8 +49,9 @@ public interface INode : IDisposable
/// </summary>
/// <typeparam name="TMessage">Topic Message Type</typeparam>
/// <param name="topicName">Topic Name</param>
+ /// <param name="nodelay">false: Socket uses the Nagle algorithm</param>
/// <returns>Subscriber</returns>
- Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicName)
+ Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicName, bool nodelay = true)
where TMessage : IMessage, new();
/// <summary>
@@ -58,8 +59,9 @@ Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicName)
/// </summary>
/// <typeparam name="TMessage">Topic Message Type</typeparam>
/// <param name="topicName">Topic Name</param>
+ /// <param name="latching">true: send the latest published message when subscribed topic</param>
/// <returns>Publisher</returns>
- Task<Publisher<TMessage>> CreatePublisherAsync<TMessage>(string topicName)
+ Task<Publisher<TMessage>> CreatePublisherAsync<TMessage>(string topicName, bool latching = false)
where TMessage : IMessage, new();
/// <summary>
8 RosSharp/Node/RosNode.cs
View
@@ -142,7 +142,7 @@ public void Dispose()
_disposed = true;
}
- public Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicName)
+ public Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicName, bool nodelay = true)
where TMessage : IMessage, new()
{
if (_disposed) throw new ObjectDisposedException("RosNode");
@@ -155,7 +155,7 @@ public Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicNa
_logger.InfoFormat("Create Subscriber: {0}", topicName);
- var subscriber = new Subscriber<TMessage>(topicName, NodeId);
+ var subscriber = new Subscriber<TMessage>(topicName, NodeId, nodelay);
_topicContainer.AddSubscriber(subscriber);
subscriber.Disposing += DisposeSubscriber;
@@ -186,7 +186,7 @@ public Task<Subscriber<TMessage>> CreateSubscriberAsync<TMessage>(string topicNa
return tcs.Task;
}
- public Task<Publisher<TMessage>> CreatePublisherAsync<TMessage>(string topicName)
+ public Task<Publisher<TMessage>> CreatePublisherAsync<TMessage>(string topicName, bool latching = false)
where TMessage : IMessage, new()
{
if (_disposed) throw new ObjectDisposedException("RosNode");
@@ -199,7 +199,7 @@ public Task<Publisher<TMessage>> CreatePublisherAsync<TMessage>(string topicName
_logger.InfoFormat("Create Publisher: {0}", topicName);
- var publisher = new Publisher<TMessage>(topicName, NodeId);
+ var publisher = new Publisher<TMessage>(topicName, NodeId, latching);
_topicContainer.AddPublisher(publisher);
publisher.Disposing += DisposePublisher;
10 RosSharp/Slave/SlaveServer.cs
View
@@ -63,18 +63,16 @@ internal SlaveServer(string nodeId, int portNumber, TopicContainer topicContaine
_topicContainer = topicContainer;
_tcpRosListener = listener;
- //string slaveName = nodeId + "_slave";
- string slaveName = "slave";
+ string slaveName = nodeId + "_slave";
_channel = new HttpServerChannel(slaveName, portNumber, new XmlRpcServerFormatterSinkProvider());
var tmp = new Uri(_channel.GetChannelUri());
- SlaveUri = new Uri("http://" + RosManager.HostName + ":" + tmp.Port + "/");
- //SlaveUri = new Uri("http://" + RosManager.HostName + ":" + tmp.Port + "/" + slaveName);
+ SlaveUri = new Uri("http://" + RosManager.HostName + ":" + tmp.Port + "/" + slaveName);
ChannelServices.RegisterChannel(_channel, false);
- RemotingServices.Marshal(this, "/");
-
+ RemotingServices.Marshal(this, slaveName);
+
Console.WriteLine("SlaveUri={0}", SlaveUri);
}
17 RosSharp/Topic/Publisher.cs
View
@@ -52,15 +52,16 @@ public sealed class Publisher<TMessage> : IPublisher, IObserver<TMessage>, IDisp
private readonly ILog _logger = LogManager.GetCurrentClassLogger();
private readonly ReplaySubject<Unit> _onConnectedSubject = new ReplaySubject<Unit>();
private readonly List<RosTopicClient<TMessage>> _rosTopicClients = new List<RosTopicClient<TMessage>>();
+ private readonly bool _latching;
+ private TMessage _lastPublishedMessage;
- internal Publisher(string topicName, string nodeId)
+ internal Publisher(string topicName, string nodeId, bool latching = false)
{
var dummy = new TMessage();
-
TopicName = topicName;
MessageType = dummy.MessageType;
-
NodeId = nodeId;
+ _latching = latching;
}
public string NodeId { get; private set; }
@@ -108,7 +109,7 @@ public void OnNext(TMessage value)
_logger.Error("SendError");
}
}
-
+ _lastPublishedMessage = value;
}
}
@@ -142,7 +143,7 @@ internal Task AddTopic(Socket socket)
_logger.Debug(m => m("AddTopic: {0}", socket.RemoteEndPoint.ToString()));
var rosTopicClient = new RosTopicClient<TMessage>(NodeId, TopicName);
- return rosTopicClient.StartAsync(socket)
+ return rosTopicClient.StartAsync(socket, _latching)
.ContinueWith(task =>
{
if (task.IsFaulted)
@@ -156,6 +157,12 @@ internal Task AddTopic(Socket socket)
{
_rosTopicClients.Add(rosTopicClient);
}
+
+ if(_latching && _lastPublishedMessage != null)
+ {
+ OnNext(_lastPublishedMessage);
+ }
+
_logger.Debug("OnConnected");
_onConnectedSubject.OnNext(Unit.Default);
}
14 RosSharp/Topic/RosTopicClient.cs
View
@@ -88,17 +88,18 @@ public Task<int> SendTaskAsync(TMessage data)
}
- public Task StartAsync(Socket socket)
+ public Task StartAsync(Socket socket, bool latching = false)
{
_client = new TcpRosClient(socket);
return _client.ReceiveAsync()
.Take(1)
.Timeout(TimeSpan.FromMilliseconds(RosManager.TopicTimeout))
- .Select(OnReceivedHeader).ToTask();
+ .Select(x => OnReceivedHeader(x, latching))
+ .ToTask();
}
- private Unit OnReceivedHeader(byte[] data)
+ private Unit OnReceivedHeader(byte[] data, bool latching)
{
_logger.Debug("OnReceivedHeader");
@@ -121,10 +122,15 @@ private Unit OnReceivedHeader(byte[] data)
throw new RosTopicException("MD5Sum mismatch error");
}
+ if (reqHeader.HasMember("tcp_nodelay"))
+ {
+ _client.SetNodelayOption(reqHeader.tcp_nodelay == "1");
+ }
+
var resHeader = new
{
callerid = NodeId,
- latching = "0",
+ latching = latching ? "1":"0",
md5sum = dummy.Md5Sum,
message_definition = dummy.MessageDefinition,
topic = TopicName,
23 RosSharp/Topic/RosTopicServer.cs
View
@@ -68,7 +68,7 @@ public void Dispose()
#endregion
- public Task<IObservable<TMessage>> StartAsync(TopicParam param)
+ public Task<IObservable<TMessage>> StartAsync(TopicParam param, bool nodelay = true)
{
_client = new TcpRosClient();
@@ -83,9 +83,9 @@ public Task<IObservable<TMessage>> StartAsync(TopicParam param)
{
try
{
- OnConnected().ContinueWith(t2 =>
+ ConnectToPublisherAsync(nodelay).ContinueWith(t2 =>
{
- _logger.Debug("StartAsync OnConnected");
+ _logger.Debug("StartAsync ConnectToPublisherAsync");
if (t2.IsFaulted) tcs.SetException(t2.Exception.InnerException);
else if (t2.IsCanceled) tcs.SetCanceled();
else tcs.SetResult(t2.Result);
@@ -103,7 +103,7 @@ public Task<IObservable<TMessage>> StartAsync(TopicParam param)
}
- private Task<IObservable<TMessage>> OnConnected()
+ private Task<IObservable<TMessage>> ConnectToPublisherAsync(bool nodelay)
{
var last = _client.ReceiveAsync()
.Take(1)
@@ -118,8 +118,8 @@ private Task<IObservable<TMessage>> OnConnected()
callerid = NodeId,
topic = TopicName,
md5sum = dummy.Md5Sum,
- type = dummy.MessageType
- //,latching = "1"
+ type = dummy.MessageType,
+ tcp_nodelay = nodelay ? "1" : "0"
};
var stream = new MemoryStream();
@@ -129,7 +129,7 @@ private Task<IObservable<TMessage>> OnConnected()
_client.SendTaskAsync(stream.ToArray())
.ContinueWith(task =>
{
- _logger.Debug("OnConnected Sent");
+ _logger.Debug("ConnectToPublisherAsync Sent");
if (task.IsFaulted) tcs.SetException(task.Exception.InnerException);
else if (task.IsCanceled) tcs.SetCanceled();
else
@@ -137,10 +137,7 @@ private Task<IObservable<TMessage>> OnConnected()
try
{
var recvHeader = last.Timeout(TimeSpan.FromMilliseconds(RosManager.TopicTimeout)).First();
- tcs.SetResult(OnReceivedHeader(recvHeader));
-
- //rosoutにつながらない。ヘッダのReceiveがタイムアウトする。
- //tcs.SetResult(_client.ReceiveAsync().Select(Deserialize));
+ tcs.SetResult(CreateSubscriber(recvHeader));
}
catch (RosTopicException ex)
{
@@ -158,9 +155,9 @@ private Task<IObservable<TMessage>> OnConnected()
return tcs.Task;
}
- private IObservable<TMessage> OnReceivedHeader(dynamic header)
+ private IObservable<TMessage> CreateSubscriber(dynamic header)
{
- _logger.Debug("OnReceivedHeader");
+ _logger.Debug("CreateSubscriber");
var dummy = new TMessage();
/* roscppでは、topicがない。
7 RosSharp/Topic/Subscriber.cs
View
@@ -56,14 +56,15 @@ public sealed class Subscriber<TMessage> : ISubscriber, IObservable<TMessage>, I
private readonly CompositeDisposable _disposables = new CompositeDisposable();
private readonly ReplaySubject<Unit> _onConnectedSubject = new ReplaySubject<Unit>();
private readonly List<RosTopicServer<TMessage>> _rosTopicServers = new List<RosTopicServer<TMessage>>();
+ private bool _nodelay;
- internal Subscriber(string topicName, string nodeId)
+ internal Subscriber(string topicName, string nodeId, bool nodelay=true)
{
TopicName = topicName;
var dummy = new TMessage();
MessageType = dummy.MessageType;
-
NodeId = nodeId;
+ _nodelay = nodelay;
}
public string NodeId { get; private set; }
@@ -132,7 +133,7 @@ private void ConnectServer(TopicParam param, Uri slaveUri)
_rosTopicServers.Add(server); //TODO: DisconnectServerはない?ロックは不要?
//TODO: StartAsyncが失敗したとき
- server.StartAsync(param).ContinueWith(
+ server.StartAsync(param, _nodelay).ContinueWith(
task =>
{
_logger.Debug("ConnectServer Started");
5 RosSharp/Transport/TcpRosClient.cs
View
@@ -177,6 +177,11 @@ private bool CompleteMessage(int offset, out byte[] current, ref byte[] rest)
return true;
}
+
+ public void SetNodelayOption(bool opt)
+ {
+ _socket.NoDelay = opt;
+ }
}
}
5 RosSharp/Transport/TcpRosHeader.cs
View
@@ -65,6 +65,11 @@ public override bool TryGetMember(GetMemberBinder binder, out object result)
result = _members[binder.Name];
return true;
}
+
+ public bool HasMember(string name)
+ {
+ return _members.ContainsKey(name);
+ }
}
internal static class TcpRosHeaderSerializer
Please sign in to comment.
Something went wrong with that request. Please try again.