From e33f188b035c7caf01c86df80b869cb9ce80cb4c Mon Sep 17 00:00:00 2001 From: Josh Pieper Date: Fri, 15 Sep 2023 09:51:21 -0400 Subject: [PATCH] Implement socketcan support in the C++ library --- lib/cpp/mjbots/moteus/moteus_transport.h | 643 +++++++++++++++++------ 1 file changed, 473 insertions(+), 170 deletions(-) diff --git a/lib/cpp/mjbots/moteus/moteus_transport.h b/lib/cpp/mjbots/moteus/moteus_transport.h index c7bb641d..0abfcb0c 100644 --- a/lib/cpp/mjbots/moteus/moteus_transport.h +++ b/lib/cpp/mjbots/moteus/moteus_transport.h @@ -16,10 +16,14 @@ #include #include +#include +#include #include +#include #include #include #include +#include #include #include #include @@ -109,40 +113,48 @@ class Transport { virtual void Post(std::function callback) = 0; }; -class Fdcanusb : public Transport { +namespace details { +/// This is just a simple RAII class for managing file descriptors. +class FileDescriptor { public: - struct Options { - bool disable_brs = false; - - uint32_t min_ok_wait_ns = 1000000; - uint32_t min_rcv_wait_ns = 5000000; - - uint32_t rx_extra_wait_ns = 5000000; + FileDescriptor() {} + FileDescriptor(int fd) { fd_ = fd; } + ~FileDescriptor() { + if (fd_ >= 0) { ::close(fd_); } + } - // Send at most this many frames before waiting for responses. -1 - // means no limit. - int max_pipeline = -1; + FileDescriptor& operator=(int fd) { + if (fd_ >= 0) { ::close(fd_); } + fd_ = fd; + return *this; + } - Options() {} - }; + bool operator==(const FileDescriptor& rhs) const { + return fd_ == rhs.fd_; + } - // This is purely to catch out of control queues earlier, as - // typically there will just be 1 outstanding event at a time. - static constexpr int kMaxQueueSize = 2; + operator int() const { + return fd_; + } - // If @p device is empty, attempt to auto-detect a fdcanusb in the - // system. - Fdcanusb(const std::string& device_in, const Options& options = {}) - : options_(options) { - Open(device_in); + int release() { + const auto result = fd_; + fd_ = -1; + return result; } - Fdcanusb(int read_fd, int write_fd, const Options& options = {}) - : options_(options) { - Open(read_fd, write_fd); + private: + int fd_ = -1; +}; + +/// A basic event loop implemented using C++11 primitives. +class ThreadedEventLoop { + public: + ThreadedEventLoop() { + thread_ = std::thread(std::bind(&ThreadedEventLoop::CHILD_Run, this)); } - virtual ~Fdcanusb() { + ~ThreadedEventLoop() { { std::unique_lock lock(mutex_); done_ = true; @@ -150,25 +162,13 @@ class Fdcanusb : public Transport { something_cv_.notify_one(); } thread_.join(); - - if (read_fd_ != write_fd_) { - ::close(write_fd_); - } - ::close(read_fd_); } - virtual void Cycle(const CanFdFrame* frames, - size_t size, - std::vector* replies, - CompletionCallback completed_callback) override { - std::unique_lock lock(mutex_); - work_ = std::bind(&Fdcanusb::CHILD_Cycle, - this, frames, size, replies, completed_callback); - do_something_ = true; - something_cv_.notify_one(); - } + // This is purely to catch out of control queues earlier, as + // typically there will just be 1 outstanding event at a time. + static constexpr int kMaxQueueSize = 2; - virtual void Post(std::function callback) override { + void Post(std::function callback) { std::unique_lock lock(mutex_); event_queue_.push_back(std::move(callback)); if (event_queue_.size() > kMaxQueueSize) { @@ -178,42 +178,82 @@ class Fdcanusb : public Transport { something_cv_.notify_one(); } - static void Fail(const std::string& str) { - throw std::runtime_error(str); - } + private: + void CHILD_Run() { + std::unique_lock lock(mutex_); - static void FailIfErrno(bool terminate) { - if (terminate) { - Fail(::strerror(errno)); + while (true) { + something_cv_.wait(lock, [&]() { + return do_something_ || !event_queue_.empty(); + }); + do_something_ = false; + + if (done_) { + return; + } + + // Do at most one event. + if (!event_queue_.empty()) { + auto top = event_queue_.front(); + event_queue_.pop_front(); + top(); + } } } - static std::string DetectFdcanusb() { - // For now, we'll only do linux like systems. - { - std::ifstream inf("/dev/fdcanusb"); - if (inf.is_open()) { return "/dev/fdcanusb"; } - } + std::thread thread_; - { - glob_t glob_data = {}; - const int result = ::glob( - "/dev/serial/by-id/*fdcanusb*", 0, - nullptr, - &glob_data); + // The following variables are controlled by 'something_mutex'. + std::recursive_mutex mutex_; + std::condition_variable_any something_cv_; - std::string maybe_path; + bool do_something_ = false; + bool done_ = false; - if (result == 0 && glob_data.gl_pathc > 0) { - maybe_path = glob_data.gl_pathv[0]; - } + std::deque> event_queue_; +}; - globfree(&glob_data); +/// A helper base class for transports that want to manage timeout +/// behavior in a similar manner. +class TimeoutTransport : public Transport { + public: + struct Options { + bool disable_brs = false; - if (!maybe_path.empty()) { return maybe_path; } - } + uint32_t min_ok_wait_ns = 1000000; + uint32_t min_rcv_wait_ns = 5000000; - return ""; + uint32_t rx_extra_wait_ns = 5000000; + + // Send at most this many frames before waiting for responses. -1 + // means no limit. + int max_pipeline = -1; + + Options() {} + }; + + TimeoutTransport(const Options& options) : t_options_(options) {} + + virtual void Cycle(const CanFdFrame* frames, + size_t size, + std::vector* replies, + CompletionCallback completed_callback) override { + // The event loop should never be empty here, but we make a copy + // just to assert that. + auto copy = std::atomic_load(&UNPROTECTED_event_loop_); + FailIf(!copy, "unexpected null event loop"); + copy->Post( + std::bind(&TimeoutTransport::CHILD_Cycle, + this, frames, size, replies, completed_callback)); + } + + virtual void Post(std::function callback) override { + // We might have an attempt to post an event while we are being + // destroyed. In that case, just ignore it. + auto copy = std::atomic_load(&UNPROTECTED_event_loop_); + if (copy) { + copy->Post(callback); + } } static int64_t GetNow() { @@ -223,77 +263,37 @@ class Fdcanusb : public Transport { static_cast(ts.tv_nsec); } - private: - void Open(const std::string& device_in) { - std::string device = device_in; - if (device.empty()) { - device = DetectFdcanusb(); - if (device.empty()) { - throw std::runtime_error("Could not detect fdcanusb"); - } - } - - const int fd = ::open(device.c_str(), O_RDWR | O_NOCTTY); - FailIfErrno(fd == -1); - -#ifndef _WIN32 - { - struct serial_struct serial; - FailIfErrno(::ioctl(fd, TIOCGSERIAL, &serial) < 0); - serial.flags |= ASYNC_LOW_LATENCY; - FailIfErrno(::ioctl(fd, TIOCSSERIAL, &serial) < 0); - - struct termios toptions; - FailIfErrno(::tcgetattr(fd, &toptions) < 0); - - // Turn off things that could munge our byte stream to the - // device. - toptions.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG); - toptions.c_oflag &= ~OPOST; + static void Fail(const std::string& message) { + throw std::runtime_error(message); + } - FailIfErrno(::tcsetattr(fd, TCSANOW, &toptions) < 0); - FailIfErrno(::tcsetattr(fd, TCSAFLUSH, &toptions) < 0); - } -#else // _WIN32 - { - COMMTIMEOUTS new_timeouts = {MAXDWORD, 0, 0, 0, 0}; - SetCommTimeouts(fd, &new_timeouts); + static void FailIf(bool terminate, const std::string& message) { + if (terminate) { + Fail(message); } -#endif - - Open(fd, fd); } - void Open(int read_fd, int write_fd) { - read_fd_ = read_fd; - write_fd_ = write_fd; - thread_ = std::thread(std::bind(&Fdcanusb::CHILD_Run, this)); + static void FailIfErrno(bool terminate) { + if (terminate) { + Fail(::strerror(errno)); + } } - void CHILD_Run() { - std::unique_lock lock(mutex_); - while (true) { - something_cv_.wait(lock, [&]() { - return do_something_ || !event_queue_.empty(); - }); - do_something_ = false; + protected: + virtual int CHILD_GetReadFd() const = 0; + virtual void CHILD_SendCanFdFrame(const CanFdFrame&) = 0; - if (done_) { - return; - } - if (work_) { - work_(); - work_ = {}; - } - // Do at most one event. - if (!event_queue_.empty()) { - auto top = event_queue_.front(); - event_queue_.pop_front(); - top(); - } - } - } + struct ConsumeCount { + int rcv = 0; + int ok = 0; + }; + + virtual ConsumeCount CHILD_ConsumeData( + std::vector* replies, + int expected_ok_count, + std::vector* expected_reply_count) = 0; + virtual void CHILD_FlushTransmit() = 0; void CHILD_Cycle(const CanFdFrame* frames, size_t size, @@ -302,8 +302,8 @@ class Fdcanusb : public Transport { if (replies) { replies->clear(); } CHILD_CheckReplies(replies, kFlush, 0, nullptr); - const auto advance = options_.max_pipeline < 0 ? - size : options_.max_pipeline; + const auto advance = t_options_.max_pipeline < 0 ? + size : t_options_.max_pipeline; for (size_t start = 0; start < size; start += advance) { int expected_ok_count = 0; @@ -352,12 +352,12 @@ class Fdcanusb : public Transport { auto end_time = start + (read_delay == kWait ? - std::max(expected_ok_count != 0 ? options_.min_ok_wait_ns : 0, - any_reply_checker() ? options_.min_rcv_wait_ns : 0) : + std::max(expected_ok_count != 0 ? t_options_.min_ok_wait_ns : 0, + any_reply_checker() ? t_options_.min_rcv_wait_ns : 0) : 5000); struct pollfd fds[1] = {}; - fds[0].fd = read_fd_; + fds[0].fd = CHILD_GetReadFd(); fds[0].events = POLLIN; int ok_count = 0; @@ -381,23 +381,8 @@ class Fdcanusb : public Transport { } if (poll_ret == 0) { return; } - // Read into our line buffer. - const int to_read = sizeof(line_buffer_) - line_buffer_pos_; - const int read_ret = ::read( - read_fd_, &line_buffer_[line_buffer_pos_], to_read); - if (read_ret < 0) { - if (errno == EINTR || errno == EAGAIN) { continue; } - FailIfErrno(true); - } - line_buffer_pos_ += read_ret; - - const auto consume_count = CHILD_ConsumeLines( - replies, expected_reply_count); - if (line_buffer_pos_ >= sizeof(line_buffer_)) { - // We overran our line buffer. For now, just drop everything - // and start from 0. - line_buffer_pos_ = 0; - } + const auto consume_count = CHILD_ConsumeData( + replies, expected_ok_count, expected_reply_count); ok_count += consume_count.ok; @@ -410,16 +395,162 @@ class Fdcanusb : public Transport { if (consume_count.rcv || consume_count.ok) { const auto finish_time = GetNow(); - end_time = finish_time + options_.rx_extra_wait_ns; + end_time = finish_time + t_options_.rx_extra_wait_ns; } } } - struct ConsumeCount { - int rcv = 0; - int ok = 0; + // This is protected, because derived classes need to delete it + // before freeing any file descriptors. The public methods of the + // ThreadedEventLoop require no locking, but the shared_ptr itself + // requires either synchronization or access using the atomic std + // library methods. We'll exclusively use the atomic std library + // methods. + std::shared_ptr UNPROTECTED_event_loop_ = + std::make_shared(); + + private: + const Options t_options_; + + std::vector expected_reply_count_; +}; +} + +class Fdcanusb : public details::TimeoutTransport { + public: + struct Options : details::TimeoutTransport::Options { + Options() {} }; + // If @p device is empty, attempt to auto-detect a fdcanusb in the + // system. + Fdcanusb(const std::string& device_in, const Options& options = {}) + : details::TimeoutTransport(options), + options_(options) { + Open(device_in); + } + + // This constructor overload is intended for use in unit tests, + // where the file descriptors will likely be pipes. + Fdcanusb(int read_fd, int write_fd, const Options& options = {}) + : details::TimeoutTransport(options), + options_(options) { + Open(read_fd, write_fd); + } + + virtual ~Fdcanusb() { + event_loop_.reset(); + + if (read_fd_ == write_fd_) { + write_fd_.release(); + } + } + + static std::string DetectFdcanusb() { + // For now, we'll only do linux like systems. + { + std::ifstream inf("/dev/fdcanusb"); + if (inf.is_open()) { return "/dev/fdcanusb"; } + } + + { + glob_t glob_data = {}; + const int result = ::glob( + "/dev/serial/by-id/*fdcanusb*", 0, + nullptr, + &glob_data); + + std::string maybe_path; + + if (result == 0 && glob_data.gl_pathc > 0) { + maybe_path = glob_data.gl_pathv[0]; + } + + globfree(&glob_data); + + if (!maybe_path.empty()) { return maybe_path; } + } + + return ""; + } + + private: + void Open(const std::string& device_in) { + std::string device = device_in; + if (device.empty()) { + device = DetectFdcanusb(); + if (device.empty()) { + throw std::runtime_error("Could not detect fdcanusb"); + } + } + + const int fd = ::open(device.c_str(), O_RDWR | O_NOCTTY); + FailIfErrno(fd == -1); + +#ifndef _WIN32 + { + struct serial_struct serial; + FailIfErrno(::ioctl(fd, TIOCGSERIAL, &serial) < 0); + serial.flags |= ASYNC_LOW_LATENCY; + FailIfErrno(::ioctl(fd, TIOCSSERIAL, &serial) < 0); + + struct termios toptions; + FailIfErrno(::tcgetattr(fd, &toptions) < 0); + + // Turn off things that could munge our byte stream to the + // device. + toptions.c_lflag &= ~(ICANON | ECHO | ECHOE | ISIG); + toptions.c_oflag &= ~OPOST; + + FailIfErrno(::tcsetattr(fd, TCSANOW, &toptions) < 0); + FailIfErrno(::tcsetattr(fd, TCSAFLUSH, &toptions) < 0); + } +#else // _WIN32 + { + // Windows is likely broken for many other reasons, but if we do + // fix all the other problems, this will be necessary. + COMMTIMEOUTS new_timeouts = {MAXDWORD, 0, 0, 0, 0}; + SetCommTimeouts(fd, &new_timeouts); + } +#endif + + Open(fd, fd); + } + + void Open(int read_fd, int write_fd) { + read_fd_ = read_fd; + write_fd_ = write_fd; + } + + virtual int CHILD_GetReadFd() const override { + return read_fd_; + } + + virtual ConsumeCount CHILD_ConsumeData( + std::vector* replies, + int expected_ok_count, + std::vector* expected_reply_count) override { + // Read into our line buffer. + const int to_read = sizeof(line_buffer_) - line_buffer_pos_; + const int read_ret = ::read( + read_fd_, &line_buffer_[line_buffer_pos_], to_read); + if (read_ret < 0) { + if (errno == EINTR || errno == EAGAIN) { return {}; } + FailIfErrno(true); + } + line_buffer_pos_ += read_ret; + + const auto consume_count = CHILD_ConsumeLines( + replies, expected_reply_count); + if (line_buffer_pos_ >= sizeof(line_buffer_)) { + // We overran our line buffer. For now, just drop everything + // and start from 0. + line_buffer_pos_ = 0; + } + + return consume_count; + } + /// Return the number of CAN frames received. ConsumeCount CHILD_ConsumeLines(std::vector* replies, std::vector* expected_reply_count) { @@ -490,7 +621,7 @@ class Fdcanusb : public Transport { if (expected_reply_count) { if (this_frame.source < expected_reply_count->size()) { - (*expected_reply_count)[this_frame.source] = std::min( + (*expected_reply_count)[this_frame.source] = std::max( (*expected_reply_count)[this_frame.source] - 1, 0); } } @@ -521,7 +652,7 @@ class Fdcanusb : public Transport { const size_t capacity_; }; - void CHILD_SendCanFdFrame(const CanFdFrame& frame) { + virtual void CHILD_SendCanFdFrame(const CanFdFrame& frame) override { char buf[256] = {}; Printer p(buf, sizeof(buf)); @@ -556,7 +687,7 @@ class Fdcanusb : public Transport { tx_buffer_size_ += p.size(); } - void CHILD_FlushTransmit() { + virtual void CHILD_FlushTransmit() override { for (size_t n = 0; n < tx_buffer_size_; ) { int ret = ::write(write_fd_, &tx_buffer_[n], tx_buffer_size_ - n); if (ret < 0) { @@ -606,18 +737,13 @@ class Fdcanusb : public Transport { } // This is set in the parent, then used in the child. - std::thread thread_; const Options options_; - int read_fd_ = -1; - int write_fd_ = -1; - // The following variables are controlled by 'something_mutex'. - std::recursive_mutex mutex_; - std::condition_variable_any something_cv_; - bool do_something_ = false; - bool done_ = false; - std::function work_; - std::deque> event_queue_; + // We have these scoped file descriptors first in our member list, + // so they will only be closed after the threaded event loop has + // been destroyed during destruction. + details::FileDescriptor read_fd_; + details::FileDescriptor write_fd_; // The following variables are only used in the child. char line_buffer_[4096] = {}; @@ -625,8 +751,132 @@ class Fdcanusb : public Transport { char tx_buffer_[4096] = {}; size_t tx_buffer_size_ = 0; +}; - std::vector expected_reply_count_; + +class Socketcan : public details::TimeoutTransport { + public: + struct Options : details::TimeoutTransport::Options { + std::string ifname = "can0"; + + Options() {} + }; + + Socketcan(const Options& options) + : details::TimeoutTransport(options), + options_(options) { + socket_ = Open(options_.ifname); + } + + virtual ~Socketcan() { + std::atomic_store(&UNPROTECTED_event_loop_, {}); + } + + private: + static void SetNonblock(int fd) { + int flags = ::fcntl(fd, F_GETFL, 0); + FailIf(flags < 0, "error getting flags"); + flags |= O_NONBLOCK; + FailIf(::fcntl(fd, F_SETFL, flags), "error setting flags"); + } + + static int Open(const std::string& ifname) { + const int fd = ::socket(PF_CAN, SOCK_RAW, CAN_RAW); + FailIf(fd < 0, "error opening CAN socket"); + + SetNonblock(fd); + + struct ifreq ifr = {}; + std::strncpy(&ifr.ifr_name[0], ifname.c_str(), + sizeof(ifr.ifr_name) - 1); + FailIf(::ioctl(fd, SIOCGIFINDEX, &ifr) < 0, + "could not find CAN: " + ifname); + + const int enable_canfd = 1; + FailIf(::setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, + &enable_canfd, sizeof(enable_canfd)) != 0, + "could not set CAN-FD mode"); + + struct sockaddr_can addr = {}; + addr.can_family = AF_CAN; + addr.can_ifindex = ifr.ifr_ifindex; + FailIf(::bind(fd, + reinterpret_cast(&addr), + sizeof(addr)) < 0, + "could not bind to CAN if"); + + return fd; + } + + virtual int CHILD_GetReadFd() const override { + return socket_; + } + + virtual void CHILD_SendCanFdFrame(const CanFdFrame& frame) override { + struct canfd_frame send_frame = {}; + send_frame.can_id = frame.arbitration_id; + if (send_frame.can_id >= 0x7ff) { + // Set the frame format flag if we need an extended ID. + send_frame.can_id |= (1 << 31); + } + send_frame.len = frame.size; + std::memcpy(send_frame.data, frame.data, frame.size); + + using F = CanFdFrame; + + send_frame.flags = + ((frame.fdcan_frame == F::kDefault || + frame.fdcan_frame == F::kForceOn) ? CANFD_FDF : 0) | + (((frame.brs == F::kDefault && !options_.disable_brs) || + frame.brs == F::kForceOn) ? CANFD_BRS : 0); + + FailIf(::write(socket_, &send_frame, sizeof(send_frame)) < 0, + "error writing CAN"); + } + + virtual ConsumeCount CHILD_ConsumeData( + std::vector* replies, + int expected_ok_count, + std::vector* expected_reply_count) override { + struct canfd_frame recv_frame = {}; + FailIf(::read(socket_, &recv_frame, sizeof(recv_frame)) < 0, + "error reading CAN frame"); + + CanFdFrame this_frame; + this_frame.arbitration_id = recv_frame.can_id & 0x1fffffff; + this_frame.destination = this_frame.arbitration_id & 0x7f; + this_frame.source = (this_frame.arbitration_id >> 8) & 0x7f; + this_frame.can_prefix = (this_frame.arbitration_id >> 16); + + this_frame.brs = (recv_frame.flags & CANFD_BRS) ? + CanFdFrame::kForceOn : CanFdFrame::kForceOff; + this_frame.fdcan_frame = (recv_frame.flags & CANFD_FDF) ? + CanFdFrame::kForceOn : CanFdFrame::kForceOff; + + std::memcpy(this_frame.data, recv_frame.data, recv_frame.len); + this_frame.size = recv_frame.len; + + if (expected_reply_count) { + if (this_frame.source < expected_reply_count->size()) { + (*expected_reply_count)[this_frame.source] = std::max( + (*expected_reply_count)[this_frame.source] - 1, 0); + } + } + + if (replies) { + replies->emplace_back(std::move(this_frame)); + } + + ConsumeCount result; + result.ok = 1; + result.rcv = 1; + return result; + } + + virtual void CHILD_FlushTransmit() override {} + + const Options options_; + details::FileDescriptor socket_; }; @@ -711,6 +961,58 @@ class FdcanusbFactory : public TransportFactory { } }; +class SocketcanFactory : public TransportFactory { + public: + virtual ~SocketcanFactory() {} + + virtual int priority() override { return 11; } + virtual std::string name() override { return "socketcan"; } + + virtual TransportArgPair make(const std::vector& args_in) override { + auto args = args_in; + + Socketcan::Options options; + std::string device; + + { + auto it = std::find(args.begin(), args.end(), "--can-disable-brs"); + if (it != args.end()) { + options.disable_brs = true; + args.erase(it); + } + } + + { + auto it = std::find(args.begin(), args.end(), "--socketcan-iface"); + if (it != args.end()) { + if ((it + 1) != args.end()) { + options.ifname = *(it + 1); + args.erase(it, it + 2); + } else { + throw std::runtime_error("--socketcan-iface requires an interface name"); + } + } + } + + auto result = std::make_shared(options); + return TransportArgPair(result, args); + } + + virtual std::vector cmdline_arguments() override { + return { + { "--socketcan-iface", 1, "socketcan iface name" }, + { "--can-disable-brs", 0, "do not set BRS" }, + }; + } + + virtual bool is_args_set(const std::vector& args) override { + for (const auto& arg : args) { + if (arg == "--socketcan-iface") { return true; } + } + return false; + } +}; + class TransportRegistry { public: template @@ -796,6 +1098,7 @@ class TransportRegistry { private: TransportRegistry() { Register(); + Register(); } std::vector> items_;