Skip to content

Commit

Permalink
PR IntelRealSense#11128 from Ohad: DDS Image reception on client side
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel committed Nov 23, 2022
2 parents 63c7634 + 09f6fd5 commit 70ce534
Show file tree
Hide file tree
Showing 13 changed files with 700 additions and 78 deletions.
51 changes: 49 additions & 2 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "software-device.h"
#include <librealsense2/h/rs_internal.h>
#include <realdds/topics/device-info/device-info-msg.h>
#include <realdds/topics/image/image-msg.h>
#endif //BUILD_WITH_DDS

#include <librealsense2/utilities/json.h>
Expand Down Expand Up @@ -521,11 +522,55 @@ namespace librealsense
software_sensor::open( profiles );
}

void start( frame_callback_ptr callback ) override
{
//For each stream in sensor_base::get_active_streams() set callback function that will call on_video_frame
for ( auto & profile : sensor_base::get_active_streams() )
{
//stream is the dds_stream matching the librealsense active stream
auto & stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )];
stream->start_streaming( [p = profile, this]( realdds::topics::device::image && dds_frame ) {
rs2_stream_profile prof = { p.get() };
rs2_software_video_frame rs2_frame;

//Copying from dds into LibRS space, same as copy from USB backend.
//TODO - use memory pool or some other frame allocator
rs2_frame.pixels = new uint8_t[dds_frame.size];
if ( !rs2_frame.pixels )
throw std::runtime_error( "Could not allocate memory for new frame" );
memcpy( rs2_frame.pixels, dds_frame.raw_data.data(), dds_frame.size );

rs2_frame.deleter = []( void * ptr) { delete[] ptr; };
rs2_frame.stride = dds_frame.size / dds_frame.height;
rs2_frame.bpp = rs2_frame.stride / dds_frame.width;
rs2_frame.timestamp = frame_counter * 1000.0 / p->get_framerate(); // TODO - timestamp from dds
rs2_frame.domain = RS2_TIMESTAMP_DOMAIN_HARDWARE_CLOCK; // TODO - timestamp domain from options?
rs2_frame.frame_number = frame_counter++; // TODO - frame_number from dds
rs2_frame.profile = &prof;
rs2_frame.depth_units = 0.001f; //TODO - depth unit from dds, if needed
on_video_frame( rs2_frame );
} );
}

software_sensor::start( callback );
}

void stop()
{
for ( auto & profile : sensor_base::get_active_streams() )
{
//stream is the dds_stream matching the librealsense active stream
auto & stream = _streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )];
stream->stop_streaming();
}

software_sensor::stop();
}

void close() override
{
//dds_device::close expects <stream_uid, stream_index> pairs
realdds::dds_streams streams_to_close;
for( auto profile : sensor_base::get_active_streams() )
for( auto & profile : sensor_base::get_active_streams() )
{
streams_to_close.push_back(
_streams[sid_index( profile->get_unique_id(), profile->get_stream_index() )] );
Expand All @@ -543,6 +588,8 @@ namespace librealsense
std::shared_ptr< realdds::dds_device > const & _dev;
std::string _name;
std::map< sid_index, std::shared_ptr< realdds::dds_stream > > _streams;

int frame_counter = 0;
};

// This is the rs2 device; it proxies to an actual DDS device that does all the actual
Expand Down
28 changes: 23 additions & 5 deletions third-party/realdds/include/realdds/dds-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@
#pragma once

#include "dds-stream-base.h"
#include "dds-stream-profile.h"

#include <memory>
#include <string>
#include <vector>
#include <functional>

namespace realdds {

namespace topics {
namespace device {
class image;
} // namespace device
} // namespace topics

class dds_subscriber;
class dds_topic_reader;

// Represents a stream of information (images, motion data, etc..) from a single source received via the DDS system.
// A stream can have several profiles, i.e different data frequency, image resolution, etc..
Expand All @@ -25,12 +32,23 @@ class dds_stream : public dds_stream_base
protected:
dds_stream( std::string const & stream_name, std::string const & sensor_name );

// dds_stream_base
public:
bool is_open() const override;
bool is_streaming() const override;
bool is_open() const override { return !! _reader; }
virtual void open( std::string const & topic_name, std::shared_ptr< dds_subscriber > const & );
virtual void close();

bool is_streaming() const override { return _on_data_available != nullptr; }
typedef std::function< void( topics::device::image && f) > on_data_available_callback;
void start_streaming( on_data_available_callback cb );
void stop_streaming();

std::shared_ptr< dds_topic > const & get_topic() const override;

protected:
void handle_frames();

std::shared_ptr< dds_topic_reader > _reader;
on_data_available_callback _on_data_available = nullptr;
};

class dds_video_stream : public dds_stream
Expand Down
33 changes: 32 additions & 1 deletion third-party/realdds/include/realdds/topics/image/image-msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@
#include <vector>


namespace eprosima {
namespace fastdds {
namespace dds {
struct SampleInfo;
}
} // namespace fastdds
} // namespace eprosima

namespace realdds {


class dds_participant;
class dds_topic;
class dds_topic_reader;


namespace topics {
Expand All @@ -34,14 +43,36 @@ class image
public:
using type = raw::device::imagePubSubType;

image( const raw::device::image & );
image() = default;

//Disable copy
image( const image & ) = delete;
image & operator=( const image & ) = delete;

//move is OK
image( image && ) = default;
image( raw::device::image && );
image & operator=( image && ) = default;
image & operator=( raw::device::image && );

bool is_valid() const { return width != 0 && height != 0; }
void invalidate() { width = 0; }

static std::shared_ptr< dds_topic > create_topic( std::shared_ptr< dds_participant > const & participant,
char const * topic_name );

// This helper method will take the next sample from a reader.
//
// Returns true if successful. Make sure you still check is_valid() in case the sample info isn't!
// Returns false if no more data is available.
// Will throw if an unexpected error occurs.
//
//Note - copies the data.
//TODO - add an API for a function that loans the data and enables the user to free it later.
static bool take_next( dds_topic_reader &,
image * output,
eprosima::fastdds::dds::SampleInfo * optional_info = nullptr );

std::vector<uint8_t> raw_data;
int width = 0;
int height = 0;
Expand Down
62 changes: 58 additions & 4 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@
#include <realdds/dds-participant.h>
#include <realdds/dds-topic-reader.h>
#include <realdds/dds-topic-writer.h>

#include <realdds/dds-subscriber.h>
#include <realdds/topics/flexible/flexible-msg.h>
#include <librealsense2/utilities/time/timer.h>

#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>

#include <librealsense2/utilities/time/timer.h>
#include <librealsense2/utilities/json.h>

#include <third-party/json.hpp>

#include <cassert>

#include <librealsense2/utilities/json.h>

using nlohmann::json;


namespace realdds {


namespace {

enum class state_type
Expand Down Expand Up @@ -64,6 +67,7 @@ dds_device::impl::impl( std::shared_ptr< dds_participant > const & participant,
: _info( info )
, _guid( guid )
, _participant( participant )
, _subscriber( std::make_shared< dds_subscriber >( participant ) )
{
}

Expand All @@ -82,6 +86,56 @@ void dds_device::impl::run()
_running = true;
}

void dds_device::impl::open( const dds_stream_profiles & profiles )
{
if ( profiles.empty() )
DDS_THROW( runtime_error, "must provide at least one profile" );
using nlohmann::json;
auto stream_profiles = json();
for ( auto & profile : profiles )
{
auto stream = profile->stream();
if ( !stream )
DDS_THROW( runtime_error, "profile (" + profile->to_string() + ") is not part of any stream" );
if ( stream_profiles.find( stream->name() ) != stream_profiles.end() )
DDS_THROW( runtime_error, "more than one profile found for stream '" + stream->name() + "'" );

stream_profiles[stream->name()] = profile->to_json();

_streams[stream->name()]->open( _info.topic_root + '/' + stream->name(), _subscriber );
}

json j = {
{ "id", "open-streams" },
{ "stream-profiles", stream_profiles },
};

write_control_message( j );
}

void dds_device::impl::close( dds_streams const & streams )
{
if ( streams.empty() )
DDS_THROW( runtime_error, "must provide at least one stream" );
using nlohmann::json;
auto stream_names = json::array();
for ( auto & stream : streams )
{
if ( !stream )
DDS_THROW( runtime_error, "null stream passed in" );
stream_names += stream->name();

_streams[stream->name()]->close();
}

json j = {
{ "id", "close-streams" },
{ "stream-names", stream_names },
};

write_control_message( j );
}

void dds_device::impl::write_control_message( topics::flexible_msg && msg )
{
assert( _control_writer != nullptr );
Expand Down
7 changes: 5 additions & 2 deletions third-party/realdds/src/dds-device-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include <realdds/dds-device.h>
#include <realdds/dds-stream-profile.h>
#include <realdds/dds-utilities.h>

#include <realdds/topics/device-info/device-info-msg.h>
#include <realdds/topics/flexible/flexible-msg.h>

#include <fastdds/rtps/common/Guid.h>

#include <map>
Expand All @@ -21,7 +21,7 @@ namespace realdds {

class dds_topic_reader;
class dds_topic_writer;

class dds_subscriber;

namespace topics {
class flexible_msg;
Expand All @@ -34,6 +34,7 @@ class dds_device::impl
topics::device_info const _info;
dds_guid const _guid;
std::shared_ptr< dds_participant > const _participant;
std::shared_ptr< dds_subscriber > _subscriber;

bool _running = false;

Expand All @@ -48,6 +49,8 @@ class dds_device::impl
topics::device_info const & info );

void run();
void open( const dds_stream_profiles & profiles );
void close( const dds_streams & streams );

void write_control_message( topics::flexible_msg && );

Expand Down
41 changes: 2 additions & 39 deletions third-party/realdds/src/dds-device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
#include <realdds/dds-device.h>
#include "dds-device-impl.h"

#include <realdds/topics/flexible/flexible-msg.h>
#include <realdds/dds-exceptions.h>

#include <third-party/json.hpp>

namespace realdds {

Expand Down Expand Up @@ -87,13 +83,11 @@ void dds_device::run()
_impl->run();
}


std::shared_ptr< dds_participant > const& dds_device::participant() const
{
return _impl->_participant;
}


topics::device_info const & dds_device::device_info() const
{
return _impl->_info;
Expand Down Expand Up @@ -121,43 +115,12 @@ size_t dds_device::foreach_stream( std::function< void( std::shared_ptr< dds_str

void dds_device::open( const dds_stream_profiles & profiles )
{
if( profiles.empty() )
DDS_THROW( runtime_error, "must provide at least one profile" );
using nlohmann::json;
auto stream_profiles = json();
for( auto & profile : profiles )
{
auto stream = profile->stream();
if( ! stream )
DDS_THROW( runtime_error, "profile (" + profile->to_string() + ") is not part of any stream" );
if( stream_profiles.find( stream->name() ) != stream_profiles.end() )
DDS_THROW( runtime_error, "more than one profile found for stream '" + stream->name() + "'" );
stream_profiles[stream->name()] = profile->to_json();
}
json j = {
{ "id", "open-streams" },
{ "stream-profiles", stream_profiles },
};
_impl->write_control_message( j );
_impl->open( profiles );
}

void dds_device::close( dds_streams const & streams )
{
if( streams.empty() )
DDS_THROW( runtime_error, "must provide at least one stream" );
using nlohmann::json;
auto stream_names = json::array();
for( auto & stream : streams )
{
if( ! stream )
DDS_THROW( runtime_error, "null stream passed in" );
stream_names += stream->name();
}
json j = {
{ "id", "close-streams" },
{ "stream-names", stream_names },
};
_impl->write_control_message( j );
_impl->close( streams );
}


Expand Down

0 comments on commit 70ce534

Please sign in to comment.