diff --git a/pom.xml b/pom.xml index b36963670..0e0bf0ff0 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,7 @@ ${project.groupId} ice4j - 1.0-20160310.155819-18 + 1.0-20160323.233049-19 ${project.groupId} diff --git a/src/org/jitsi/impl/neomedia/RTPConnectorOutputStream.java b/src/org/jitsi/impl/neomedia/RTPConnectorOutputStream.java index 6a798639f..717dc18c7 100644 --- a/src/org/jitsi/impl/neomedia/RTPConnectorOutputStream.java +++ b/src/org/jitsi/impl/neomedia/RTPConnectorOutputStream.java @@ -24,7 +24,7 @@ import javax.media.rtp.*; import net.sf.fmj.media.util.*; -import org.jitsi.impl.neomedia.rtp.remotebitrateestimator.*; +import org.ice4j.util.*; import org.jitsi.service.configuration.*; import org.jitsi.service.libjitsi.*; import org.jitsi.service.packetlogging.*; @@ -768,11 +768,23 @@ private class Queue */ final Thread sendThread; + /** + * The instance optionally used to gather and print statistics about + * this queue. + */ + QueueStatistics queueStats = null; + /** * Initializes a new {@link Queue} instance and starts its send thread. */ private Queue() { + if (logger.isTraceEnabled()) + { + queueStats = new QueueStatistics( + getClass().getSimpleName() + "-" + hashCode()); + } + sendThread = new Thread() { @@ -809,12 +821,17 @@ private void write(byte[] buf, int off, int len, Object context) buffer.len = len; buffer.context = context; + long now = System.currentTimeMillis(); if (queue.size() >= PACKET_QUEUE_CAPACITY) { // Drop from the head of the queue. Buffer b = queue.poll(); if (b != null) { + if (queueStats != null) + { + queueStats.remove(now); + } pool.offer(b); numDroppedPackets++; if (logDroppedPacket(numDroppedPackets)) @@ -825,7 +842,11 @@ private void write(byte[] buf, int off, int len, Object context) } } } - queue.offer(buffer); + + if (queue.offer(buffer) && queueStats != null) + { + queueStats.add(now); + } } /** @@ -869,10 +890,19 @@ private void runInSendThread() // The current thread has potentially waited. if (closed) + { break; + } if (buffer == null) + { continue; + } + + if (queueStats != null) + { + queueStats.remove(System.currentTimeMillis()); + } // We will sooner or later process the Buffer. Since this // may take a non-negligible amount of time, do it before diff --git a/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RateStatistics.java b/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RateStatistics.java deleted file mode 100644 index c52631a33..000000000 --- a/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RateStatistics.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright @ 2015 Atlassian Pty Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jitsi.impl.neomedia.rtp.remotebitrateestimator; - -/** - * webrtc/modules/remote_bitrate_estimator/rate_statistics.cc - * webrtc/modules/remote_bitrate_estimator/rate_statistics.h - * - * @author Lyubomir Marinov - */ -public class RateStatistics -{ - /** - * Total count recorded in buckets. - */ - private long accumulatedCount; - - /** - * Counters are kept in buckets (circular buffer), with one bucket per - * millisecond. - */ - private final long[] buckets; - - /** - * Bucket index of oldest counter recorded in buckets. - */ - private int oldestIndex; - - /** - * Oldest time recorded in buckets. - */ - private long oldestTime; - - /** - * To convert counts/ms to desired units. - */ - private final float scale; - - /** - * Initializes a new {@link RateStatistics} instance with a default scale - * if 8000 (i.e. if the input is in bytes, the result will be in bits per - * second). - * @param windowSizeMs window size in ms for the rate estimation - */ - public RateStatistics(int windowSizeMs) - { - this(windowSizeMs, 8000F); - } - - /** - * @param windowSizeMs window size in ms for the rate estimation - * @param scale coefficient to convert counts/ms to desired units. For - * example, if counts represents bytes, use 8*1000 to go to bits/s. - */ - public RateStatistics(int windowSizeMs, float scale) - { - buckets = new long[windowSizeMs + 1]; // N ms in (N+1) buckets. - this.scale = scale / (buckets.length - 1); - } - - private void eraseOld(long nowMs) - { - long newOldestTime = nowMs - buckets.length + 1; - - if (newOldestTime <= oldestTime) - return; - - while (oldestTime < newOldestTime) - { - long countInOldestBucket = buckets[oldestIndex]; - - accumulatedCount -= countInOldestBucket; - buckets[oldestIndex] = 0L; - if (++oldestIndex >= buckets.length) - { - oldestIndex = 0; - } - ++oldestTime; - if (accumulatedCount == 0L) - { - // This guarantees we go through all the buckets at most once, - // even if newOldestTime is far greater than oldestTime. - break; - } - } - oldestTime = newOldestTime; - } - - public long getRate(long nowMs) - { - eraseOld(nowMs); - return (long) (accumulatedCount * scale + 0.5F); - } - - public void update(int count, long nowMs) - { - if (nowMs < oldestTime) // Too old data is ignored. - return; - - eraseOld(nowMs); - - int nowOffset = (int) (nowMs - oldestTime); - int index = oldestIndex + nowOffset; - - if (index >= buckets.length) - index -= buckets.length; - buckets[index] += count; - accumulatedCount += count; - } -} diff --git a/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RemoteBitrateEstimatorSingleStream.java b/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RemoteBitrateEstimatorSingleStream.java index feec04f51..cdceaa5ad 100644 --- a/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RemoteBitrateEstimatorSingleStream.java +++ b/src/org/jitsi/impl/neomedia/rtp/remotebitrateestimator/RemoteBitrateEstimatorSingleStream.java @@ -19,6 +19,7 @@ import net.sf.fmj.media.rtp.util.*; +import org.ice4j.util.*; import org.jitsi.service.neomedia.rtp.*; import org.jitsi.util.concurrent.*; diff --git a/src/org/jitsi/impl/neomedia/rtp/translator/OutputDataStreamImpl.java b/src/org/jitsi/impl/neomedia/rtp/translator/OutputDataStreamImpl.java index 09d4b6802..0c7ba7a35 100644 --- a/src/org/jitsi/impl/neomedia/rtp/translator/OutputDataStreamImpl.java +++ b/src/org/jitsi/impl/neomedia/rtp/translator/OutputDataStreamImpl.java @@ -23,6 +23,7 @@ import net.sf.fmj.media.rtp.*; import net.sf.fmj.media.rtp.RTPHeader; +import org.ice4j.util.*; import org.jitsi.impl.neomedia.*; import org.jitsi.service.libjitsi.*; import org.jitsi.service.neomedia.*; @@ -98,6 +99,8 @@ class OutputDataStreamImpl private int writeQLength; + private final QueueStatistics writeQStats; + /** * The number of packets dropped because a packet was inserted while * {@link #writeQ} was full. @@ -116,6 +119,17 @@ public OutputDataStreamImpl(RTPConnectorImpl connector, boolean data) LibJitsi.getConfigurationService(), REMOVE_RTP_HEADER_EXTENSIONS_PNAME, false); + + if (logger.isTraceEnabled()) + { + writeQStats + = new QueueStatistics( + getClass().getSimpleName() + "-" + hashCode()); + } + else + { + writeQStats = null; + } } /** @@ -390,6 +404,10 @@ public void run() if (writeQHead >= writeQ.length) writeQHead = 0; writeQLength--; + if (writeQStats != null) + { + writeQStats.remove(System.currentTimeMillis()); + } } try @@ -631,6 +649,10 @@ public synchronized void write( if (writeQHead >= writeQ.length) writeQHead = 0; writeQLength--; + if (writeQStats != null) + { + writeQStats.remove(System.currentTimeMillis()); + } numDroppedPackets++; if (RTPConnectorOutputStream.logDroppedPacket(numDroppedPackets)) @@ -657,6 +679,10 @@ public synchronized void write( write.length = len; writeQLength++; + if (writeQStats != null) + { + writeQStats.add(System.currentTimeMillis()); + } if (writeThread == null) createWriteThread(); diff --git a/src/org/jitsi/impl/neomedia/rtp/translator/PushSourceStreamImpl.java b/src/org/jitsi/impl/neomedia/rtp/translator/PushSourceStreamImpl.java index 09e8f3c59..ec61d4acd 100644 --- a/src/org/jitsi/impl/neomedia/rtp/translator/PushSourceStreamImpl.java +++ b/src/org/jitsi/impl/neomedia/rtp/translator/PushSourceStreamImpl.java @@ -22,6 +22,7 @@ import javax.media.*; import javax.media.protocol.*; +import org.ice4j.util.*; import org.jitsi.impl.neomedia.*; import org.jitsi.util.*; @@ -72,6 +73,8 @@ class PushSourceStreamImpl */ private final int readQCapacity; + private final QueueStatistics readQStats; + /** * The number of packets dropped because a packet was inserted while * {@link #readQ} was full. @@ -103,6 +106,16 @@ public PushSourceStreamImpl(RTPConnectorImpl connector, boolean data) readQCapacity = RTPConnectorOutputStream.PACKET_QUEUE_CAPACITY; readQ = new ArrayBlockingQueue<>(readQCapacity); + if (logger.isTraceEnabled()) + { + readQStats + = new QueueStatistics( + getClass().getSimpleName() + "-" + hashCode()); + } + else + { + readQStats = null; + } transferDataThread = new Thread(this, getClass().getName()); transferDataThread.setDaemon(true); @@ -228,6 +241,10 @@ public int read(byte[] buffer, int offset, int length) } readQ.remove(); + if (readQStats != null) + { + readQStats.remove(System.currentTimeMillis()); + } _read = true; readQ.notifyAll(); } @@ -459,9 +476,14 @@ else if (readQSize < readQCapacity) synchronized (readQ) { + long now = System.currentTimeMillis(); if (readQ.size() >= readQCapacity) { readQ.remove(); + if (readQStats != null) + { + readQStats.remove(now); + } numDroppedPackets++; if (RTPConnectorOutputStream.logDroppedPacket( numDroppedPackets)) @@ -474,6 +496,10 @@ else if (readQSize < readQCapacity) if (readQ.offer(pkt)) { + if (readQStats != null) + { + readQStats.add(now); + } // TODO It appears that it is better to not yield based // on whether the read method has read after the last // write. diff --git a/src/org/jitsi/util/RawPacketQueue.java b/src/org/jitsi/util/RawPacketQueue.java new file mode 100644 index 000000000..45223e281 --- /dev/null +++ b/src/org/jitsi/util/RawPacketQueue.java @@ -0,0 +1,105 @@ +/* + * Copyright @ 2015 Atlassian Pty Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jitsi.util; + +import org.ice4j.util.*; +import org.jitsi.impl.neomedia.*; + +/** + * Implements {@link PacketQueue} for {@link RawPacket}. + * + * @author Boris Grozev + */ +public class RawPacketQueue extends PacketQueue +{ + /** + * Initializes a new {@link RawPacketQueue}. See + * {@link PacketQueue#PacketQueue(int, boolean, boolean, String, PacketHandler)} + */ + public RawPacketQueue(int capacity, boolean copy, + boolean enableStatistics, String id, + PacketHandler handler) + { + super(capacity, copy, enableStatistics, id, handler); + } + + /** + * Initializes a new {@link RawPacketQueue}. See + * {@link PacketQueue#PacketQueue(boolean, String, PacketHandler)} + */ + public RawPacketQueue( + boolean enableStatistics, String id, + PacketHandler packetHandler) + { + super(enableStatistics, id, packetHandler); + } + + /** + * Initializes a new {@link RawPacketQueue}. See + * {@link PacketQueue#PacketQueue()} + */ + public RawPacketQueue() + { + super(); + } + + /** + * {@inheritDoc} + */ + @Override + public byte[] getBuffer(RawPacket pkt) + { + return pkt == null ? null : pkt.getBuffer(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getLength(RawPacket pkt) + { + return pkt == null ? 0 : pkt.getLength(); + } + + /** + * {@inheritDoc} + */ + @Override + public int getOffset(RawPacket pkt) + { + return pkt == null ? 0 : pkt.getOffset(); + } + + /** + * {@inheritDoc} + */ + @Override + public Object getContext(RawPacket pkt) + { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + protected RawPacket createPacket( + byte[] buf, int off, int len, Object context) + { + return new RawPacket(buf, off, len); + } +} +