Skip to content

Commit

Permalink
Address review comments from Karsten
Browse files Browse the repository at this point in the history
* Remove time point typedefs in favor of using the types directly.
* Use condition_variable for waiting
* Minor formatting updates

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Apr 2, 2021
1 parent bffc11e commit daadf5b
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 41 deletions.
1 change: 0 additions & 1 deletion rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ if(BUILD_TESTING)
ament_add_gmock(test_time_controller_clock
test/rosbag2_cpp/test_time_controller_clock.cpp)
if(TARGET test_time_controller_clock)
# ament_target_dependencies(test_player_clock rosbag2_cpp)
target_link_libraries(test_time_controller_clock ${PROJECT_NAME})
endif()
endif()
Expand Down
11 changes: 4 additions & 7 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,20 @@ namespace rosbag2_cpp
class PlayerClock
{
public:
// Type representing the current time as according to the playback
typedef rcutils_time_point_value_t ROSTimePoint;
/**
* Type representing an arbitrary steady time, used to measure real-time durations
* This type is never exposed by the PlayerClock - it is only used as input to the PlayerClock.
*/
typedef std::chrono::time_point<std::chrono::steady_clock> SteadyTimePoint;
typedef std::function<SteadyTimePoint()> NowFunction;
typedef std::function<std::chrono::steady_clock::time_point()> NowFunction;

ROSBAG2_CPP_PUBLIC
virtual ~PlayerClock() = default;

/**
* Calculate and return current ROSTimePoint based on starting time, playback rate, pause state.
* Calculate and return current rcutils_time_point_value_t based on starting time, playback rate, pause state.
*/
ROSBAG2_CPP_PUBLIC
virtual ROSTimePoint now() const = 0;
virtual rcutils_time_point_value_t now() const = 0;

/**
* Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock
Expand All @@ -60,7 +57,7 @@ class PlayerClock
* The user should not take action based on this sleep until it returns true.
*/
ROSBAG2_CPP_PUBLIC
virtual bool sleep_until(ROSTimePoint until) = 0;
virtual bool sleep_until(rcutils_time_point_value_t until) = 0;

/**
* Return the current playback rate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ class TimeControllerClock : public PlayerClock
*/
ROSBAG2_CPP_PUBLIC
TimeControllerClock(
ROSTimePoint starting_time,
rcutils_time_point_value_t starting_time,
double rate = 1.0,
NowFunction now_fn = std::chrono::steady_clock::now);

ROSBAG2_CPP_PUBLIC
virtual ~TimeControllerClock();

/**
* Calculate and return current ROSTimePoint based on starting time, playback rate, pause state.
* Calculate and return current rcutils_time_point_value_t based on starting time, playback rate, pause state.
*/
ROSBAG2_CPP_PUBLIC
ROSTimePoint now() const override;
rcutils_time_point_value_t now() const override;

/**
* Try to sleep (non-busy) the current thread until the provided time is reached - according to this Clock
Expand All @@ -65,7 +65,7 @@ class TimeControllerClock : public PlayerClock
* The user should not take action based on this sleep until it returns true.
*/
ROSBAG2_CPP_PUBLIC
bool sleep_until(ROSTimePoint until) override;
bool sleep_until(rcutils_time_point_value_t until) override;

/**
* Return the current playback rate.
Expand Down
31 changes: 15 additions & 16 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
Expand All @@ -33,8 +34,8 @@ class TimeControllerClockImpl
*/
struct TimeReference
{
PlayerClock::ROSTimePoint ros;
PlayerClock::SteadyTimePoint steady;
rcutils_time_point_value_t ros;
std::chrono::steady_clock::time_point steady;
};

TimeControllerClockImpl() = default;
Expand All @@ -46,27 +47,28 @@ class TimeControllerClockImpl
return std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
}

PlayerClock::ROSTimePoint steady_to_ros(PlayerClock::SteadyTimePoint steady_time)
rcutils_time_point_value_t steady_to_ros(std::chrono::steady_clock::time_point steady_time)
{
return reference.ros + static_cast<rcutils_duration_value_t>(
rate * duration_nanos(steady_time - reference.steady));
}

PlayerClock::SteadyTimePoint ros_to_steady(PlayerClock::ROSTimePoint ros_time)
std::chrono::steady_clock::time_point ros_to_steady(rcutils_time_point_value_t ros_time)
{
const rcutils_duration_value_t diff_nanos = static_cast<rcutils_duration_value_t>(
const auto diff_nanos = static_cast<rcutils_duration_value_t>(
(ros_time - reference.ros) / rate);
return reference.steady + std::chrono::nanoseconds(diff_nanos);
}

double rate = 1.0;
PlayerClock::NowFunction now_fn;
std::mutex mutex;
std::condition_variable cv;
PlayerClock::NowFunction now_fn RCPPUTILS_TSA_GUARDED_BY(mutex);
double rate RCPPUTILS_TSA_GUARDED_BY(mutex) = 1.0;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(mutex);
};

TimeControllerClock::TimeControllerClock(
ROSTimePoint starting_time,
rcutils_time_point_value_t starting_time,
double rate,
NowFunction now_fn)
: impl_(std::make_unique<TimeControllerClockImpl>())
Expand All @@ -81,22 +83,19 @@ TimeControllerClock::TimeControllerClock(
TimeControllerClock::~TimeControllerClock()
{}

PlayerClock::ROSTimePoint TimeControllerClock::now() const
rcutils_time_point_value_t TimeControllerClock::now() const
{
std::lock_guard<std::mutex> lock(impl_->mutex);
return impl_->steady_to_ros(impl_->now_fn());
}

bool TimeControllerClock::sleep_until(ROSTimePoint until)
bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until)
{
SteadyTimePoint steady_until;
{
std::lock_guard<std::mutex> lock(impl_->mutex);
steady_until = impl_->ros_to_steady(until);
std::unique_lock<std::mutex> lock(impl_->mutex);
const auto steady_until = impl_->ros_to_steady(until);
impl_->cv.wait_until(lock, steady_until);
}
// TODO(emersonknapp) - when we have methods that can change timeflow during a sleep,
// it will probably be better to use a condition_variable::wait_until
std::this_thread::sleep_until(steady_until);
return now() >= until;
}

Expand Down
8 changes: 3 additions & 5 deletions rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
#include "rosbag2_cpp/clocks/time_controller_clock.hpp"

using namespace testing; // NOLINT
using SteadyTimePoint = rosbag2_cpp::PlayerClock::SteadyTimePoint;
using ROSTimePoint = rosbag2_cpp::PlayerClock::ROSTimePoint;
using SteadyTimePoint = std::chrono::steady_clock::time_point;
using NowFunction = rosbag2_cpp::PlayerClock::NowFunction;


class TimeControllerClockTest : public Test
{
public:
Expand All @@ -32,14 +30,14 @@ class TimeControllerClockTest : public Test
};
}

ROSTimePoint as_nanos(const SteadyTimePoint & time)
rcutils_time_point_value_t as_nanos(const SteadyTimePoint & time)
{
return std::chrono::duration_cast<std::chrono::nanoseconds>(time.time_since_epoch()).count();
}

NowFunction now_fn;
SteadyTimePoint return_time; // defaults to 0
ROSTimePoint ros_start_time = 0;
rcutils_time_point_value_t ros_start_time = 0;
};

TEST_F(TimeControllerClockTest, steadytime_precision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct SerializedBagMessage
std::string topic_name;
};

typedef std::shared_ptr<SerializedBagMessage> SerializedBagMessagePtr;
typedef std::shared_ptr<SerializedBagMessage> SerializedBagMessageSharedPtr;

} // namespace rosbag2_storage

Expand Down
9 changes: 3 additions & 6 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void Player::load_storage_content(const PlayOptions & options)

void Player::enqueue_up_to_boundary(uint64_t boundary)
{
rosbag2_storage::SerializedBagMessagePtr message;
rosbag2_storage::SerializedBagMessageSharedPtr message;
for (size_t i = message_queue_.size_approx(); i < boundary; i++) {
if (!reader_->has_next()) {
break;
Expand All @@ -168,7 +168,7 @@ void Player::play_messages_from_queue()

void Player::play_messages_until_queue_empty()
{
rosbag2_storage::SerializedBagMessagePtr message;
rosbag2_storage::SerializedBagMessageSharedPtr message;
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
Expand Down Expand Up @@ -216,10 +216,7 @@ void Player::prepare_publishers(const PlayOptions & options)

void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value_t starting_time)
{
float rate = 1.0;
if (options.rate > 0.0) {
rate = options.rate;
}
float rate = options.rate > 0.0 ? options.rate : 1.0;
clock_ = std::make_unique<rosbag2_cpp::TimeControllerClock>(starting_time, rate);
}

Expand Down
2 changes: 1 addition & 1 deletion rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Player
static const std::chrono::milliseconds queue_read_wait_period_;

std::shared_ptr<rosbag2_cpp::Reader> reader_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessagePtr> message_queue_;
moodycamel::ReaderWriterQueue<rosbag2_storage::SerializedBagMessageSharedPtr> message_queue_;
mutable std::future<void> storage_loading_future_;
std::shared_ptr<Rosbag2Node> rosbag2_transport_;
std::unordered_map<std::string, std::shared_ptr<GenericPublisher>> publishers_;
Expand Down

0 comments on commit daadf5b

Please sign in to comment.