@@ -15,6 +15,10 @@
*/
package org.jitsi.impl.neomedia.rtcp.termination.strategies;

import java.util.*;
import java.util.concurrent.*;
import javax.media.rtp.*;
import javax.media.rtp.rtcp.*;
import net.sf.fmj.media.rtp.*;
import net.sf.fmj.media.rtp.util.*;
import org.jitsi.impl.neomedia.*;
@@ -26,10 +30,6 @@
import org.jitsi.util.*;
import org.jitsi.util.function.*;

import javax.media.rtp.*;
import java.util.*;
import java.util.concurrent.*;

/**
*
* The <tt>BasicRTCPTerminationStrategy</tt> "gateways" PLIs, FIRs, NACKs,
@@ -69,6 +69,12 @@
private static final RTCPReportBlock[] MIN_RTCP_REPORT_BLOCKS_ARRAY
= new RTCPReportBlock[MIN_RTCP_REPORT_BLOCKS];

/**
* The maximum transmission unit (MTU) to be assumed by
* {@code BasicRTCPTerminationStrategy}.
*/
private static int MTU = 1024 + 256;

/**
* The RTP stats map that holds RTP statistics about all the streams that
* this <tt>BasicRTCPTerminationStrategy</tt> (as a
@@ -233,12 +239,12 @@ public PacketTransformer getRTCPTransformer()

/**
* Runs in the reporting thread and it generates RTCP reports for the
* associated <tt>MediaStream</tt>.
* associated {@code MediaStream}.
*
* @return the <tt>RawPacket</tt> representing the RTCP compound packet to
* inject to the <tt>MediaStream</tt>.
* @return a {@code List} of {@code RawPacket}s representing the RTCP
* compound packets to inject into the {@code MediaStream}.
*/
public RawPacket report()
public List<RawPacket> report()
{
garbageCollector.cleanup();

@@ -281,12 +287,25 @@ public RawPacket report()
// SDES
RTCPSDESPacket sdes = makeSDES();

// Prepare the RTCPCompoundPacket to return.
RTCPCompoundPacket compound = compound(rrs, srs, sdes, remb);
// Build the RTCPCompoundPackets to return.
List<RTCPCompoundPacket> compounds = compound(rrs, srs, sdes, remb);

// Build the RTCPCompoundPacket and return the RawPacket to inject into
// the MediaStream.
return generator.apply(compound);
// Return the RawPackets to inject into the MediaStream.
List<RawPacket> rawPkts;

if (compounds.isEmpty())
{
rawPkts = null;
}
else
{
rawPkts = new ArrayList<RawPacket>(compounds.size());
for (RTCPCompoundPacket compound : compounds)
{
rawPkts.add(generator.apply(compound));
}
}
return rawPkts;
}

/**
@@ -303,45 +322,220 @@ public RawPacket report()
* @return a new {@code RTCPCompoundPacket} consisting of the specified
* {@code srs}, {@code rrs}, {@code sdes}, and {@code others}
*/
private RTCPCompoundPacket compound(
private List<RTCPCompoundPacket> compound(
List<RTCPRRPacket> rrs,
List<RTCPSRPacket> srs,
RTCPSDESPacket sdes,
RTCPPacket... others)
{
Collection<RTCPPacket> rtcps = new ArrayList<RTCPPacket>();

// SRs
if (srs != null && !srs.isEmpty())
{
rtcps.addAll(srs);

// SRs can carry the report blocks of the RRs and thus reduce the
// SRs may carry the report blocks of the RRs and thus reduce the
// compound's size.
if (rrs != null && !rrs.isEmpty())
moveReportBlocks(rrs, srs);
}
// RRs
if (rrs != null && !rrs.isEmpty())
{
rtcps.addAll(rrs);
}

// RTCP packets other than SR, RR, and SDES.
if (others.length > 0)
List<RTCPCompoundPacket> compounds
= new ArrayList<RTCPCompoundPacket>();
RTCPCompoundPacket prevCompound = null;
int prevCompoundLen = 0;
RTCPSDESPacket prevSDESPkt = null;

do
{
for (RTCPPacket other : others)
// SR or RR.
RTCPPacket report;
int ssrc;

if (srs != null && !srs.isEmpty())
{
if (other != null)
rtcps.add(other);
RTCPSRPacket sr = srs.remove(0);

report = sr;
ssrc = sr.ssrc;
}
else if (rrs != null && !rrs.isEmpty())
{
RTCPRRPacket rr = rrs.remove(0);

report = rr;
ssrc = rr.ssrc;
}
else
{
break;
}

List<RTCPPacket> rtcps = new ArrayList<RTCPPacket>();

rtcps.add(report);

// RTCP packets other than SR, RR, and SDES such as REMB.
if (others.length > 0)
{
for (int i = 0; i < others.length; ++i)
{
RTCPPacket other = others[i];

if (other != null)
{
rtcps.add(other);
// We've included the current other in the current
// compound packet and we don't want to include it in
// subsequent compound packets.
others[i] = null;
}
}
}

// SDES with CNAME for the SSRC of the SR or RR.
RTCPSDESItem cnameItem = findCNAMEItem(sdes, ssrc);
RTCPSDESPacket sdesPkt = null;

if (cnameItem != null)
{
RTCPSDES sdesOfReport = new RTCPSDES();

sdesOfReport.items = new RTCPSDESItem[] { cnameItem };
sdesOfReport.ssrc = ssrc;
sdesPkt = new RTCPSDESPacket(new RTCPSDES[] { sdesOfReport });
rtcps.add(sdesPkt);
}

RTCPCompoundPacket compound
= new RTCPCompoundPacket(
rtcps.toArray(new RTCPPacket[rtcps.size()]));
int compoundLen = compound.calcLength();

// Try to merge the current compound packet into the previous
// compound packet.
if (prevCompound != null)
{
int len = prevCompoundLen + compoundLen;
boolean mergeSDES = false;

if (prevSDESPkt != null && sdesPkt != null)
{
// The SDES packet of the previous compound packet can be
// merged with the SDES packet of the current compound
// packet.
len -= 4;
mergeSDES = true;
}
if (len <= MTU)
{
int prevCompoundPktCount = prevCompound.packets.length;
int compoundPktCount = compound.packets.length;

if (prevSDESPkt != null)
{
// We'll count the SDES packet of the previous compound
// packet separately.
--prevCompoundPktCount;
}

// Merge the SDES of the current compound packet into
// the SDES of the previous compound packet.
if (mergeSDES)
{
RTCPSDES[] newPrevChunks
= new RTCPSDES[
prevSDESPkt.sdes.length
+ sdesPkt.sdes.length];

System.arraycopy(
prevSDESPkt.sdes, 0,
newPrevChunks, 0,
prevSDESPkt.sdes.length);
System.arraycopy(
sdesPkt.sdes, 0,
newPrevChunks, prevSDESPkt.sdes.length,
sdesPkt.sdes.length);
prevSDESPkt.sdes = newPrevChunks;

// We've merged the SDES packet of the current compound
// packet into the SDES packet of the previous compound
// packet.
--compoundPktCount;
}

// Merge the current compound packet into the previous
// compound packet.
int newPrevCompoundPktCount
= prevCompoundPktCount + compoundPktCount;

if (prevSDESPkt != null)
++newPrevCompoundPktCount;

RTCPPacket[] newPrevCompoundPkts
= new RTCPPacket[newPrevCompoundPktCount];

System.arraycopy(
prevCompound.packets, 0,
newPrevCompoundPkts, 0,
prevCompoundPktCount);
System.arraycopy(
compound.packets, 0,
newPrevCompoundPkts, prevCompoundPktCount,
compoundPktCount);
// Keep SDES at the end.
if (prevSDESPkt != null)
{
newPrevCompoundPkts[newPrevCompoundPkts.length - 1]
= prevSDESPkt;
}
prevCompound.packets = newPrevCompoundPkts;

// The current compound packet has been merged into the
// prevous compound packet.
prevCompoundLen = prevCompound.calcLength();
compound = null;
}
}
if (compound != null)
{
compounds.add(compound);
prevCompound = compound;
prevCompoundLen = compoundLen;
prevSDESPkt = sdesPkt;
}
}
while (true);

// SDES
rtcps.add(sdes);
return compounds;
}

return
new RTCPCompoundPacket(rtcps.toArray(new RTCPPacket[rtcps.size()]));
/**
* Finds the first {@code RTCPSDESItem} within a specific
* {@code RTCPSDESPacket} which specifies the CNAME of a specific
* synchronization source identifier (SSRC).
*
* @param sdes the {@code RTCPSDESPacket} to search through
* @param ssrc the synchronization source identifier (SSRC) to find the
* first CNAME of
* @return the first {@code RTCPSDESItem} within {@code sdes} which
* specifies the CNAME of {@code ssrc} if any; otherwise, {@code null}
*/
private RTCPSDESItem findCNAMEItem(RTCPSDESPacket sdes, int ssrc)
{
RTCPSDES[] chunks = sdes.sdes;

for (RTCPSDES chunk : chunks)
{
if (chunk.ssrc == ssrc)
{
RTCPSDESItem[] items = chunk.items;

for (RTCPSDESItem item : items)
{
if (item.type == RTCPSDESItem.CNAME)
return item;
}
}
}
return null;
}

/**
@@ -701,63 +895,80 @@ private List<RTCPSRPacket> makeSRs(long time)
*/
private RTCPSDESPacket makeSDES()
{
Collection<RTCPSDES> sdesChunks = new ArrayList<RTCPSDES>();

// Create an SDES for our own SSRC.
RTCPSDES ownSDES = new RTCPSDES();

SSRCInfo ourinfo
= getStream().getStreamRTPManager().getSSRCCache().ourssrc;
ownSDES.ssrc = (int) getLocalSSRC();
Collection<RTCPSDESItem> ownItems = new ArrayList<RTCPSDESItem>();
ownItems.add(new RTCPSDESItem(
RTCPSDESItem.CNAME, ourinfo.sourceInfo.getCNAME()));

// CNAME
ownItems.add(
new RTCPSDESItem(
RTCPSDESItem.CNAME,
ourinfo.sourceInfo.getCNAME()));

// Throttle the source description bandwidth. See RFC3550#6.3.9
// Allocation of Source Description Bandwidth.

if (sdesCounter % 3 == 0)
{
if (ourinfo.name != null && ourinfo.name.getDescription() != null)
ownItems.add(new RTCPSDESItem(RTCPSDESItem.NAME, ourinfo.name
.getDescription()));
if (ourinfo.email != null && ourinfo.email.getDescription() != null)
ownItems.add(new RTCPSDESItem(RTCPSDESItem.EMAIL, ourinfo.email
.getDescription()));
if (ourinfo.phone != null && ourinfo.phone.getDescription() != null)
ownItems.add(new RTCPSDESItem(RTCPSDESItem.PHONE, ourinfo.phone
.getDescription()));
if (ourinfo.loc != null && ourinfo.loc.getDescription() != null)
ownItems.add(new RTCPSDESItem(RTCPSDESItem.LOC, ourinfo.loc
.getDescription()));
if (ourinfo.tool != null && ourinfo.tool.getDescription() != null)
ownItems.add(new RTCPSDESItem(RTCPSDESItem.TOOL, ourinfo.tool
.getDescription()));
if (ourinfo.note != null && ourinfo.note.getDescription() != null)
ownItems.add(new RTCPSDESItem(RTCPSDESItem.NOTE, ourinfo.note
.getDescription()));
}
SourceDescription sd;
String d;

if ((sd = ourinfo.name) != null
&& (d = sd.getDescription()) != null)
{
ownItems.add(new RTCPSDESItem(RTCPSDESItem.NAME, d));
}
if ((sd = ourinfo.email) != null
&& (d = sd.getDescription()) != null)
{
ownItems.add(new RTCPSDESItem(RTCPSDESItem.EMAIL, d));
}
if ((sd = ourinfo.phone) != null
&& (d = sd.getDescription()) != null)
{
ownItems.add(new RTCPSDESItem(RTCPSDESItem.PHONE, d));
}
if ((sd = ourinfo.loc) != null
&& (d = sd.getDescription()) != null)
{
ownItems.add(new RTCPSDESItem(RTCPSDESItem.LOC, d));
}
if ((sd = ourinfo.tool) != null
&& (d = sd.getDescription()) != null)
{
ownItems.add(new RTCPSDESItem(RTCPSDESItem.TOOL, d));
}
if ((sd = ourinfo.note) != null
&& (d = sd.getDescription()) != null)
{
ownItems.add(new RTCPSDESItem(RTCPSDESItem.NOTE, d));
}
}
sdesCounter++;

RTCPSDES ownSDES = new RTCPSDES();

ownSDES.items = ownItems.toArray(new RTCPSDESItem[ownItems.size()]);
ownSDES.ssrc = (int) getLocalSSRC();

Collection<RTCPSDES> chunks = new ArrayList<RTCPSDES>();

sdesChunks.add(ownSDES);
chunks.add(ownSDES);

for (Map.Entry<Integer, byte[]> entry : cnameRegistry.entrySet())
{
RTCPSDES sdes = new RTCPSDES();
sdes.ssrc = entry.getKey();

sdes.items
= new RTCPSDESItem[]
{
new RTCPSDESItem(RTCPSDESItem.CNAME, entry.getValue())
};
sdes.ssrc = entry.getKey();
chunks.add(sdes);
}

return
new RTCPSDESPacket(
sdesChunks.toArray(new RTCPSDES[sdesChunks.size()]));
return new RTCPSDESPacket(chunks.toArray(new RTCPSDES[chunks.size()]));
}

/**
@@ -1219,37 +1430,40 @@ public void maybeReport()
return;
}

// Make the RTCP reports for the assoc. <tt>MediaStream</tt>.
RawPacket rawPacket = report();
// Make the RTCP reports for the assoc. MediaStream.
List<RawPacket> rawPkts = report();

if (rawPacket == null)
if (rawPkts == null || rawPkts.isEmpty())
{
// Nothing was generated.
return;
}

try
for (RawPacket rawPkt : rawPkts)
{
getStream().injectPacket(rawPacket, false, true);
try
{
getStream().injectPacket(rawPkt, false, true);

// TODO update transmission stats.
/*if (ssrcInfo instanceof SendSSRCInfo)
// TODO update transmission stats.
/*if (ssrcInfo instanceof SendSSRCInfo)
{
((SendSSRCInfo) ssrcInfo).stats.total_rtcp++;
cache.sm.transstats.rtcp_sent++;
}
cache.updateavgrtcpsize(rawPacket.getLength());
if (cache.initial)
cache.initial = false;
if (!cache.rtcpsent)
cache.rtcpsent = true;*/
}
catch (TransmissionFailedException e)
{
((SendSSRCInfo) ssrcInfo).stats.total_rtcp++;
cache.sm.transstats.rtcp_sent++;
logger.error(e);
/*cache.sm.defaultstats
.update(OverallStats.TRANSMITFAILED, 1);
cache.sm.transstats.transmit_failed++;*/
}
cache.updateavgrtcpsize(rawPacket.getLength());
if (cache.initial)
cache.initial = false;
if (!cache.rtcpsent)
cache.rtcpsent = true;*/
}
catch (TransmissionFailedException e)
{
logger.error(e);
/*cache.sm.defaultstats
.update(OverallStats.TRANSMITFAILED, 1);
cache.sm.transstats.transmit_failed++;*/
}
}