Skip to content

Commit

Permalink
Latent support for SSM
Browse files Browse the repository at this point in the history
  • Loading branch information
lelegard committed Jul 3, 2018
1 parent 6e9149d commit 315b9b4
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 60 deletions.
7 changes: 7 additions & 0 deletions src/libtsduck/tsIPAddress.h
Expand Up @@ -168,6 +168,13 @@ namespace ts {
//!
bool isMulticast() const { return IN_MULTICAST(_addr); }

//!
//! Check if the address is a source specific multicast (SSM) address.
//! Note: SSM addresses are in the range 232.0.0.0/8.
//! @return True if the address is an SSM address, false otherwise.
//!
bool isSSM() const { return (_addr & 0xFF000000) == 0xE8000000; }

//!
//! Check if this object is set to a valid address (ie not AnyAddress).
//! @return True if this object is set to a valid address (ie not AnyAddress),
Expand Down
73 changes: 62 additions & 11 deletions src/libtsduck/tsUDPReceiver.cpp
Expand Up @@ -40,6 +40,7 @@ ts::UDPReceiver::UDPReceiver(ts::Report& report, bool with_short_options, bool d
_with_short_options(with_short_options),
_dest_as_param(dest_as_param),
_receiver_specified(false),
_use_ssm(false),
_dest_addr(),
_local_address(),
_reuse_port(false),
Expand All @@ -60,11 +61,11 @@ ts::UDPReceiver::UDPReceiver(ts::Report& report, bool with_short_options, bool d
void ts::UDPReceiver::defineOptions(ts::Args& args) const
{
if (_dest_as_param) {
// [address:]port is a mandatory parameter.
// [[source@]address:]port is a mandatory parameter.
args.option(u"", 0, Args::STRING, 1, 1);
}
else {
// [address:]port is an option.
// [[source@]address:]port is an option.
args.option(u"ip-udp", _with_short_options ? 'i' : 0, Args::STRING);
}

Expand All @@ -75,6 +76,7 @@ void ts::UDPReceiver::defineOptions(ts::Args& args) const
args.option(u"no-reuse-port", 0);
args.option(u"reuse-port", _with_short_options ? 'r' : 0);
args.option(u"source", _with_short_options ? 's' : 0, Args::STRING);
args.option(u"ssm", 0);
}


Expand All @@ -84,12 +86,13 @@ void ts::UDPReceiver::defineOptions(ts::Args& args) const

void ts::UDPReceiver::addHelp(ts::Args& args) const
{
// Format the help text for the [address:]port parameter / option.
// Format the help text for the [[source@]address:]port parameter / option.
const UString destText(args.helpLines(_dest_as_param ? 1 : 2,
u"The parameter [address:]port describes the destination of UDP packets to receive. "
u"The 'port' part is mandatory and specifies the UDP port to listen on. "
u"The 'address' part is optional. It specifies an IP multicast address to listen on. "
u"It can be also a host name that translates to a multicast address."));
u"It can be also a host name that translates to a multicast address. "
u"An optional source address can be specified as 'source@address:port' in the case of SSM."));
UString destParameter;
UString destOption;

Expand Down Expand Up @@ -144,7 +147,12 @@ void ts::UDPReceiver::addHelp(ts::Args& args) const
u" useful when several sources send packets to the same destination address\n"
u" and port. Accepting all packets could result in a corrupted stream and\n"
u" only one sender shall be accepted. Options --first-source and --source\n"
u" are mutually exclusive.\n";
u" are mutually exclusive.\n"
u"\n"
u" --ssm\n"
u" Force the usage of Source-Specific Multicast (SSM) using the source which\n"
u" is specified by the option --source. The --ssm option is implicit when the\n"
u" syntax 'source@address:port' is used.\n";

if (_dest_as_param) {
args.setHelp(help + args.getHelp());
Expand All @@ -162,7 +170,7 @@ void ts::UDPReceiver::addHelp(ts::Args& args) const
bool ts::UDPReceiver::load(ts::Args& args)
{
// Get destination address.
const UString destination(args.value(_dest_as_param ? u"" : u"ip-udp"));
UString destination(args.value(_dest_as_param ? u"" : u"ip-udp"));
_receiver_specified = !destination.empty();

// When --ip-udp is specified as an option, the presence of a UDP received is optional.
Expand All @@ -174,20 +182,48 @@ bool ts::UDPReceiver::load(ts::Args& args)
// General UDP options.
_reuse_port = !args.present(u"no-reuse-port");
_default_interface = args.present(u"default-interface");
_use_ssm = args.present(u"ssm");
_use_first_source = args.present(u"first-source");
_recv_bufsize = args.intValue<size_t>(u"buffer-size", 0);

// Check the presence of the '@' indicating a source address.
const size_t sep = destination.find(u'@');
_use_source.clear();
if (sep != UString::NPOS) {
// Resolve source address.
if (!_use_source.resolve(destination.substr(0, sep), args)) {
return false;
}
// Force SSM.
_use_ssm = true;
// Remove the source from the string.
destination.erase(0, sep + 1);
}

// Resolve destination address.
if (!_dest_addr.resolve(destination, args)) {
return false;
}

// If a destination address is specified, it must be a multicast address
// If a destination address is specified, it must be a multicast address.
if (_dest_addr.hasAddress() && !_dest_addr.isMulticast()) {
args.error(u"address %s is not multicast", {_dest_addr.toString()});
return false;
}

// In case of SSM, it should be in the SSM range, but let it a warning only.
if (_use_ssm && !_dest_addr.hasAddress()) {
args.error(u"multicast group address is missing with SSM");
return false;
}
if (_use_ssm && !_dest_addr.isSSM()) {
args.warning(u"address %s is not an SSM address", {_dest_addr.toString()});
}
if (_use_ssm && _use_first_source) {
args.error(u"SSM and --first-source are mutually exclusive");
return false;
}

// The destination port is mandatory
if (!_dest_addr.hasPort()) {
args.error(u"no UDP port specified in %s", {destination});
Expand All @@ -210,7 +246,11 @@ bool ts::UDPReceiver::load(ts::Args& args)

// Translate optional source address.
UString source(args.value(u"source"));
if (source.empty()) {
if (_use_source.hasAddress() && !source.empty()) {
args.error(u"SSM source address specified twice");
return false;
}
else if (source.empty()) {
_use_source.clear();
}
else if (!_use_source.resolve(source, args)) {
Expand All @@ -225,6 +265,10 @@ bool ts::UDPReceiver::load(ts::Args& args)
args.error(u"--first-source and --source are mutually exclusive");
return false;
}
if (_use_ssm && !_use_source.hasAddress()) {
args.error(u"missing source address with --ssm");
return false;
}

return true;
}
Expand All @@ -237,6 +281,7 @@ bool ts::UDPReceiver::load(ts::Args& args)
void ts::UDPReceiver::setParameters(const SocketAddress& localAddress, bool reusePort, size_t bufferSize)
{
_receiver_specified = true;
_use_ssm = false;
_dest_addr.clear();
_dest_addr.setPort(localAddress.port());
_local_address = localAddress;
Expand Down Expand Up @@ -281,17 +326,23 @@ bool ts::UDPReceiver::open(ts::Report& report)
(_recv_bufsize <= 0 || setReceiveBufferSize(_recv_bufsize, report)) &&
bind(local_addr, report);

// Optional SSM source address.
IPAddress ssm_source;
if (_use_ssm) {
ssm_source = _use_source;
}

// Join multicast group.
if (ok && _dest_addr.hasAddress()) {
if (_default_interface) {
ok = addMembershipDefault(_dest_addr, report);
ok = addMembershipDefault(_dest_addr, ssm_source, report);
}
else if (_local_address.hasAddress()) {
ok = addMembership(_dest_addr, _local_address, report);
ok = addMembership(_dest_addr, _local_address, ssm_source, report);
}
else {
// By default, listen on all interfaces.
ok = addMembershipAll(_dest_addr, report);
ok = addMembershipAll(_dest_addr, ssm_source, report);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/libtsduck/tsUDPReceiver.h
Expand Up @@ -104,13 +104,14 @@ namespace ts {
bool _with_short_options;
bool _dest_as_param;
bool _receiver_specified; // An address is specified.
bool _use_ssm; // Use source-specific multicast.
SocketAddress _dest_addr; // Expected destination of packets.
IPAddress _local_address; // Local address on which to listen.
bool _reuse_port; // Reuse port socket option.
bool _default_interface; // Use default local interface.
bool _use_first_source; // Use socket address of first received packet to filter subsequent packets.
size_t _recv_bufsize; // Socket receive buffer size.
SocketAddress _use_source; // Filter on this socket address of sender.
SocketAddress _use_source; // Filter on this socket address of sender (can be a simple filter of an SSM source).
SocketAddress _first_source; // Socket address of first received packet.
std::set<SocketAddress> _sources; // Set of all detected packet sources.

Expand Down
83 changes: 58 additions & 25 deletions src/libtsduck/tsUDPSocket.cpp
Expand Up @@ -49,7 +49,8 @@ ts::UDPSocket::UDPSocket(bool auto_open, Report& report) :
Socket(),
_local_address(),
_default_destination(),
_mcast()
_mcast(),
_ssmcast()
{
if (auto_open) {
// Returned value ignored on purpose, the socket is marked as closed in the object on error.
Expand Down Expand Up @@ -101,10 +102,7 @@ bool ts::UDPSocket::close(Report& report)
{
// Leave all multicast groups.
if (isOpen()) {
for (MReqSet::const_iterator it = _mcast.begin(); it != _mcast.end(); ++it) {
::setsockopt(getSocket(), IPPROTO_IP, IP_DROP_MEMBERSHIP, TS_SOCKOPT_T(&it->req), sizeof(it->req));
}
_mcast.clear();
dropMembership(report);
}

// Close socket
Expand Down Expand Up @@ -268,23 +266,45 @@ bool ts::UDPSocket::setBroadcastIfRequired(const IPAddress destination, Report&
// Join one multicast group on one local interface.
//----------------------------------------------------------------------------

bool ts::UDPSocket::addMembership(const IPAddress& multicast, const IPAddress& local, Report& report)
bool ts::UDPSocket::addMembership(const IPAddress& multicast, const IPAddress& local, const IPAddress& source, Report& report)
{
// Verbose message about joining the group.
UString groupString;
if (source.hasAddress()) {
groupString = source.toString() + u"@";
}
groupString += multicast.toString();
if (local.hasAddress()) {
report.verbose(u"joining multicast group %s from local address %s", {multicast.toString(), local.toString()});
report.verbose(u"joining multicast group %s from local address %s", {groupString, local.toString()});
}
else {
report.verbose(u"joining multicast group %s from default interface", {multicast.toString()});
report.verbose(u"joining multicast group %s from default interface", {groupString});
}

MReq req(multicast, local);
if (::setsockopt(getSocket(), IPPROTO_IP, IP_ADD_MEMBERSHIP, TS_SOCKOPT_T(&req.req), sizeof(req.req)) != 0) {
report.error(u"error adding multicast membership to %s from local address %s: %s", {multicast.toString(), local.toString(), SocketErrorCodeMessage()});
return false;
// Now join the group.
if (source.hasAddress()) {
// Source-specific multicast (SSM).
SSMReq req(multicast, local, source);
if (::setsockopt(getSocket(), IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, TS_SOCKOPT_T(&req.data), sizeof(req.data)) != 0) {
report.error(u"error adding SSM membership to %s from local address %s: %s", {groupString, local.toString(), SocketErrorCodeMessage()});
return false;
}
else {
_ssmcast.insert(req);
return true;
}
}
else {
_mcast.insert(req);
return true;
// Standard multicast.
MReq req(multicast, local);
if (::setsockopt(getSocket(), IPPROTO_IP, IP_ADD_MEMBERSHIP, TS_SOCKOPT_T(&req.data), sizeof(req.data)) != 0) {
report.error(u"error adding multicast membership to %s from local address %s: %s", {groupString, local.toString(), SocketErrorCodeMessage()});
return false;
}
else {
_mcast.insert(req);
return true;
}
}
}

Expand All @@ -293,17 +313,17 @@ bool ts::UDPSocket::addMembership(const IPAddress& multicast, const IPAddress& l
// Join one multicast group, let the system select the local interface.
//----------------------------------------------------------------------------

bool ts::UDPSocket::addMembershipDefault(const IPAddress& multicast, Report& report)
bool ts::UDPSocket::addMembershipDefault(const IPAddress& multicast, const IPAddress& source, Report& report)
{
return addMembership(multicast, IPAddress(), report);
return addMembership(multicast, IPAddress(), source, report);
}


//----------------------------------------------------------------------------
// Join one multicast group on all local interfaces.
//----------------------------------------------------------------------------

bool ts::UDPSocket::addMembershipAll(const IPAddress& multicast, Report& report)
bool ts::UDPSocket::addMembershipAll(const IPAddress& multicast, const IPAddress& source, Report& report)
{
// There is no implicit way to listen on all interfaces.
// If no local address is specified, we must get the list
Expand All @@ -320,7 +340,7 @@ bool ts::UDPSocket::addMembershipAll(const IPAddress& multicast, Report& report)
bool ok = true;
for (size_t i = 0; i < loc_if.size(); ++i) {
if (loc_if[i].hasAddress()) {
ok = addMembership(multicast, loc_if[i], report) && ok;
ok = addMembership(multicast, loc_if[i], source, report) && ok;
}
}
return ok;
Expand All @@ -329,28 +349,41 @@ bool ts::UDPSocket::addMembershipAll(const IPAddress& multicast, Report& report)

//----------------------------------------------------------------------------
// Leave all multicast groups.
// Return true on success, false on error.
//----------------------------------------------------------------------------

bool ts::UDPSocket::dropMembership(Report& report)
{
bool ok = true;
for (MReqSet::const_iterator it = _mcast.begin(); it != _mcast.end(); ++it) {
report.verbose(u"leaving multicast group %s from local address %s", {IPAddress(it->req.imr_multiaddr).toString(), IPAddress(it->req.imr_interface).toString()});
if (::setsockopt(getSocket(), IPPROTO_IP, IP_DROP_MEMBERSHIP, TS_SOCKOPT_T(&it->req), sizeof(it->req)) != 0) {
report.error(u"error dropping multicast membership: " + SocketErrorCodeMessage());

// Drop all standard multicast groups.
for (auto it = _mcast.begin(); it != _mcast.end(); ++it) {
report.verbose(u"leaving multicast group %s from local address %s",
{IPAddress(it->data.imr_multiaddr).toString(), IPAddress(it->data.imr_interface).toString()});
if (::setsockopt(getSocket(), IPPROTO_IP, IP_DROP_MEMBERSHIP, TS_SOCKOPT_T(&it->data), sizeof(it->data)) != 0) {
report.error(u"error dropping multicast membership: %s", {SocketErrorCodeMessage()});
ok = false;
}
}

// Drop all source-specific multicast groups.
for (auto it = _ssmcast.begin(); it != _ssmcast.end(); ++it) {
report.verbose(u"leaving multicast group %s@%s from local address %s",
{IPAddress(it->data.imr_sourceaddr).toString(), IPAddress(it->data.imr_multiaddr).toString(), IPAddress(it->data.imr_interface).toString()});
if (::setsockopt(getSocket(), IPPROTO_IP, IP_DROP_SOURCE_MEMBERSHIP, TS_SOCKOPT_T(&it->data), sizeof(it->data)) != 0) {
report.error(u"error dropping multicast membership: %s", {SocketErrorCodeMessage()});
ok = false;
}
}

_mcast.clear();
_ssmcast.clear();

return ok;
}


//----------------------------------------------------------------------------
// Send a message to a destination address and port.
// Address and port are mandatory in SocketAddress.
// Return true on success, false on error.
//----------------------------------------------------------------------------

bool ts::UDPSocket::send(const void* data, size_t size, const SocketAddress& dest, Report& report)
Expand Down

0 comments on commit 315b9b4

Please sign in to comment.