Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Issue ros/roslisp#12 #16

Merged
merged 8 commits into from Jun 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/slave.lisp
Expand Up @@ -194,9 +194,20 @@ Right now, the transport must be TCPROS and the return value is the socket."
status. Change *xmlrpc-timeout* to increase wait-time." :format-arguments (list uri topic) ))
(handler-case
(return
(dbind (protocol address port) (with-function-timeout *xmlrpc-timeout* (lambda () (ros-rpc-call uri "requestTopic" topic (list (list "TCPROS")))))
(dbind (protocol address port)
;; Check if it's our publisher if that's the case don't request the topic
;; using a ros-rpc-call since it would deadlock and time out
(if (equal uri *xml-rpc-caller-api*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor comment on Lisp style: I would only use IF if there is just one small expression in the if and else block. For multiple lines, I would always use COND. But that's really just a minor thing.

(dbind (code msg vals) (|requestTopic| *ros-node-name* topic (list (list "TCPROS")))
(when (<= code 0)
(cerror "Ignore and continue" 'ros-rpc-error
:call (cons "requestTopic" (list *ros-node-name* topic (list (list "TCPROS"))))
:uri uri :code code :message msg :vals vals))
vals)
(with-function-timeout *xmlrpc-timeout*
(lambda () (ros-rpc-call uri "requestTopic" topic (list (list "TCPROS"))))))
(if (string= protocol "TCPROS")
(setup-tcpros-subscription address port topic)
(ros-error (roslisp tcp) "Protocol ~a did not equal TCPROS... skipping connection" protocol))))
(function-timeout () ;;just retry
nil))))
nil))))
2 changes: 1 addition & 1 deletion src/sockets.lisp
Expand Up @@ -51,7 +51,7 @@
(defun tcp-connect (hostname port)
"Helper that connects over TCP to this host and port, and returns 1) The stream 2) The socket"
(let ((connection (make-instance 'inet-socket :type :stream :protocol :tcp))
(ip-address (get-ip-address hostname)))
(ip-address (get-ip-address hostname)))
(ros-debug (roslisp tcp) "~&Connecting to ~a ~a" ip-address port)
(socket-connect connection ip-address port)
(values (socket-make-stream connection :output t :input t :element-type '(unsigned-byte 8)) connection)))
Expand Down
197 changes: 115 additions & 82 deletions src/tcpros.lisp
Expand Up @@ -94,7 +94,7 @@
(close stream :abort abort)
(socket-close connection)))
(ros-debug (roslisp tcp) "Accepted TCP connection ~a" connection)

(mvbind (header parse-error) (ignore-errors (parse-tcpros-header stream))
;; Any errors guaranteed to be handled in the first cond clause

Expand Down Expand Up @@ -130,20 +130,22 @@


(defun handle-topic-connection (header connection stream)
"Handle topic connection by checking md5 sum, sending back a response header, then adding this socket to the publication list for this topic."
"Handle topic connection by checking md5 sum, sending back a response header, then adding this socket to the publication list for this topic. If the connection comes from this caller no response needs to be send."
(bind-from-header ((topic "topic") (md5 "md5sum") (uri "callerid")) header
(let ((pub (gethash topic *publications*)))
(tcpros-header-assert pub "unknown-topic")
(let ((my-md5 (md5sum topic)))
(let ((my-md5 (md5sum topic))
(caller-id (caller-id)))
(tcpros-header-assert (or (equal md5 "*") (equal md5 my-md5)) "md5sums do not match for ~a: ~a vs ~a" topic md5 my-md5)

;; Now we must send back the response
(send-tcpros-header stream
"type" (ros-datatype topic)
"callerid" (caller-id)
"message_definition" (message-definition topic)
"latching" (if (is-latching pub) "1" "0")
"md5sum" my-md5))
;; Send a response if you didn't subscribed to yourself
(unless (equal uri caller-id)
(send-tcpros-header stream
"type" (ros-datatype topic)
"callerid" caller-id
"message_definition" (message-definition topic)
"latching" (if (is-latching pub) "1" "0")
"md5sum" my-md5)))

;; Add this subscription to the list for the topic
(let ((sub (make-subscriber-connection :subscriber-socket connection :subscriber-stream stream
Expand All @@ -156,89 +158,121 @@
(tcpros-write (last-message pub) stream))))))



(defparameter *setup-tcpros-subscription-max-retry* 3)

(defun setup-tcpros-subscription (hostname port topic)
"Connect to the publisher at the given address and do the header exchange, then start a thread that will deserialize messages onto the queue for this topic."
(check-type hostname string)
(dotimes (retry-count *setup-tcpros-subscription-max-retry* (error 'simple-error :format-control "Timeout when
trying to communicate with publisher ~a:~a for topic ~a, check publisher node
status. Change *tcp-timeout* to increase wait-time."
:format-arguments (list hostname
port topic)))
(when (> retry-count 0) (ros-warn (roslisp tcpros) "Failed to communicate
with publisher ~a:~a for topic ~a, retrying: ~a" hostname port
topic retry-count))
(handler-case
(return
(mvbind (str connection) (tcp-connect hostname port)
(mvbind (stream connection) (tcp-connect hostname port)
(ros-debug (roslisp tcp) "~&Successfully connected to ~a:~a for topic ~a" hostname port topic)

;; Check if we try to subscribe to our own publisher
(if (and (equal hostname *tcp-server-hostname*)
(equal port *tcp-server-port*))
(setup-tcpros-subscription-to-self hostname port topic connection stream)
(setup-tcpros-subscription-to-strangers hostname port topic connection stream))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) Great function name...


(values stream connection)))


(defun setup-tcpros-subscription-to-self (hostname port topic connection stream)
"Helper function for setting up a tcpros-subscription with a publisher that
uses the same tcp-server."
(mvbind (sub known) (gethash topic *subscriptions*)
(assert known nil "Topic ~a unknown. This error should have been caught earlier!" topic)

(send-tcpros-header stream "topic" topic
"md5sum" (md5sum topic)
"type" (ros-datatype topic)
"callerid" (caller-id))

;; Spawn a dedicated thread to deserialize messages off the socket onto the queue
(spawn-connection-thread hostname port topic stream connection (buffer sub))))


(defun setup-tcpros-subscription-to-strangers (hostname port topic connection stream)
"Helper function for setting up a tcpros-subscriptions with a publisher that doesn't
uses this tcp-server."
(dotimes (retry-count *setup-tcpros-subscription-max-retry*
(error 'simple-error
:format-control "Timeout when trying to communicate with publisher ~a:~a for topic ~a, check publisher node status. Change *tcp-timeout* to increase wait-time."
:format-arguments (list hostname port topic)))
(when (> retry-count 0)
(ros-warn (roslisp tcpros)
"Failed to communicate with publisher ~a:~a for topic ~a, retrying: ~a"
hostname port topic retry-count))
(handler-case

(mvbind (sub known) (gethash topic *subscriptions*)
(assert known nil "Topic ~a unknown. This error should have been caught earlier!" topic)
(let ((buffer (buffer sub))
(topic-class-name (get-topic-class-name topic)))

;; Send header and receive response
(send-tcpros-header str
"topic" topic
"md5sum" (md5sum topic)
"type" (ros-datatype topic)
"callerid" (caller-id))
(let ((response (with-function-timeout *tcp-timeout* (lambda () (parse-tcpros-header str)))))

(when (assoc "error" response :test #'equal)
(roslisp-error "During TCPROS handshake, publisher sent error message ~a" (cdr (assoc "error" response :test #'equal))))

;; TODO need to do something with the response, handle AnyMsg (see tcpros.py)

;; Spawn a dedicated thread to deserialize messages off the socket onto the queue
(let ((connection-thread
(sb-thread:make-thread
#'(lambda ()
(block thread-block
(unwind-protect
(handler-bind
((error #'(lambda (c)
(unless *break-on-socket-errors*
(ros-debug (roslisp tcp)
"Received error ~a when reading connection to ~a:~a on topic ~a. Connection terminated."
c hostname port topic)
(return-from thread-block nil)))))


(loop
(unless (eq *node-status* :running)
(error "Node status is ~a" *node-status*))

;; Read length (ignored)
(dotimes (i 4)
(read-byte str))


(let ((msg (deserialize topic-class-name str)))

(let ((num-dropped (enqueue msg buffer)))
(ros-debug (roslisp tcp) (> num-dropped 0) "Dropped ~a messages on topic ~a" num-dropped topic)))))

;; Always close the connection before leaving the thread
(socket-close connection))))
:name (format nil "Roslisp thread for subscription to topic ~a published from ~a:~a"
topic hostname port)
)))
(assert (eq (mutex-owner *ros-lock*) *current-thread*)
nil "Code assumption violated; not holding lock in setup-tcpros-subscription")
(ros-debug (roslisp deserialization-thread) "Adding deserialization thread for connection on topic ~a to ~a:~a" topic hostname port)
(push connection-thread *deserialization-threads*)))))

;; Send header and receive response
(send-tcpros-header stream
"topic" topic
"md5sum" (md5sum topic)
"type" (ros-datatype topic)
"callerid" (caller-id))

(let ((response (with-function-timeout *tcp-timeout*
(lambda () (parse-tcpros-header stream)))))
(when (assoc "error" response :test #'equal)
(roslisp-error "During TCPROS handshake, publisher sent error message ~a"
(cdr (assoc "error" response :test #'equal))))

;; TODO need to do something with the response, handle AnyMsg (see tcpros.py)

;; Spawn a dedicated thread to deserialize messages off the socket onto the queue
(spawn-connection-thread hostname port topic stream connection (buffer sub)))

;; If nothing failed return from dotimes
(return))

(malformed-tcpros-header (c)
(send-tcpros-header str "error" (msg c))
(send-tcpros-header stream "error" (msg c))
(socket-close connection)
(error c)))))
(function-timeout () ;;just retry
nil))) )
(error c))
(function-timeout () ;;just retry
nil))))


(defun spawn-connection-thread (hostname port topic stream connection buffer)
"Spawns a dedicated thread to deserialize messages off the socket onto the queue and
adds it to the deserialization-threads."
(let ((connection-thread
(sb-thread:make-thread
#'(lambda ()
(block thread-block
(unwind-protect
(handler-bind
((error #'(lambda (c)
(unless *break-on-socket-errors*
(ros-debug (roslisp tcp)
"Received error ~a when reading connection to ~a:~a on topic ~a. Connection terminated."
c hostname port topic)
(return-from thread-block nil)))))

(loop
(unless (eq *node-status* :running)
(error "Node status is ~a" *node-status*))

;; Read length (ignored)
(dotimes (i 4)
(read-byte stream))

(let ((msg (deserialize (get-topic-class-name topic) stream)))
(let ((num-dropped (enqueue msg buffer)))
(ros-debug (roslisp tcp) (> num-dropped 0)
"Dropped ~a messages on topic ~a" num-dropped topic)))))

;; Always close the connection before leaving the thread
(socket-close connection))))
:name (format nil "Roslisp thread for subscription to topic ~a published from ~a:~a"
topic hostname port))))

(assert (eq (mutex-owner *ros-lock*) *current-thread*)
nil "Code assumption violated; not holding lock in setup-tcpros-subscription")
(ros-debug (roslisp deserialization-thread)
"Adding deserialization thread for connection on topic ~a to ~a:~a" topic hostname port)
(push connection-thread *deserialization-threads*)))


(defvar *stream-error-in-progress* nil)
Expand Down Expand Up @@ -305,7 +339,6 @@
(socket-close connection))))))))



(define-condition service-error (simple-error) ())

(defun handle-single-service-request (stream request-class-name callback)
Expand Down