Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closable evicting queue #64

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 32 additions & 51 deletions src/main/java/org/ice4j/ice/harvest/SinglePortUdpHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.ice4j.socket.*;
import org.ice4j.stack.*;
import org.ice4j.util.*;
import org.ice4j.util.concurrent.*;

import java.io.*;
import java.net.*;
Expand Down Expand Up @@ -467,8 +468,8 @@ private class MySocket
/**
* The FIFO which acts as a buffer for this socket.
*/
private final ArrayBlockingQueue<Buffer> queue
= new ArrayBlockingQueue<>(QUEUE_SIZE);
private final ClosableEvictingQueue<Buffer> queue
= new ClosableEvictingQueue<>(QUEUE_SIZE);

/**
* The {@link QueueStatistics} instance optionally used to collect and
Expand All @@ -481,12 +482,6 @@ private class MySocket
*/
private SocketAddress remoteAddress;

/**
* The flag which indicates that this <tt>DatagramSocket</tt> has been
* closed.
*/
private boolean closed = false;

/**
* Initializes a new <tt>MySocket</tt> instance with the given
* remote address.
Expand Down Expand Up @@ -518,27 +513,24 @@ public MySocket(SocketAddress remoteAddress)
*/
private void addBuffer(Buffer buf)
{
synchronized (queue)
{
// Drop the first rather than the current packet, so that
// receivers can notice the loss earlier.
if (queue.size() == QUEUE_SIZE)
{
logger.info("Dropping a packet because the queue is full.");
if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}
queue.poll();
}

queue.offer(buf);
Buffer drop = null;
try {
drop = queue.insert(buf);
} catch(QueueClosedException e) {
pool.offer(buf);
return;
}
if (drop != null) {
logger.info("Dropping a packet because the queue is full.");
if (queueStatistics != null)
{
queueStatistics.add(System.currentTimeMillis());
queueStatistics.remove(System.currentTimeMillis());
}

queue.notifyAll();
pool.offer(drop);
}
if (queueStatistics != null)
{
queueStatistics.add(System.currentTimeMillis());
}
}

Expand Down Expand Up @@ -584,13 +576,7 @@ public SocketAddress getLocalSocketAddress()
@Override
public void close()
{
closed = true;

synchronized (queue)
{
// Wake up any threads still in receive()
queue.notifyAll();
}
queue.close();

// We could be called by the super-class constructor, in which
// case this.removeAddress is not initialized yet.
Expand All @@ -614,29 +600,24 @@ public void receive(DatagramPacket p)

while (buf == null)
{
if (closed)
try
{
buf = queue.take();
}
catch (QueueClosedException e)
{
throw new SocketException("Socket closed");

synchronized (queue)
}
catch (InterruptedException e)
{
if (queue.isEmpty())
{
try
{
queue.wait();
}
catch (InterruptedException ie)
{}
}

buf = queue.poll();
if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}
}
}

if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}

byte[] pData = p.getData();

// XXX Should we use p.setData() here with a buffer of our own?
Expand Down
158 changes: 70 additions & 88 deletions src/main/java/org/ice4j/util/PacketQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package org.ice4j.util;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.Logger; // Disambiguation.

import org.ice4j.util.concurrent.*;

/**
* An abstract queue of packets. This is meant to eventually be able to be used
* in the following classes (in ice4j and libjitsi) in place of their ad-hoc
Expand Down Expand Up @@ -65,9 +66,9 @@ public static boolean logDroppedPacket(int numDroppedPackets)
}

/**
* The underlying {@link Queue} which holds packets.
* The underlying {@link ClosableEvictingQueue} which holds packets.
*/
private final Queue<T> queue;
private final ClosableEvictingQueue<T> queue;

/**
* Whether this {@link PacketQueue} should store the {@code byte[]} or
Expand Down Expand Up @@ -106,16 +107,11 @@ public static boolean logDroppedPacket(int numDroppedPackets)
*/
private final String id;

/**
* Whether this queue has been closed.
*/
private boolean closed = false;

/**
* The number of packets which were dropped from this {@link PacketQueue} as
* a result of a packet being added while the queue is at full capacity.
*/
private int numDroppedPackets = 0;
private AtomicInteger numDroppedPackets = new AtomicInteger();

/**
* Initializes a new {@link PacketQueue} instance.
Expand Down Expand Up @@ -167,7 +163,7 @@ public PacketQueue(int capacity, boolean copy,
this.copy = copy;
this.capacity = capacity;
this.id = id;
queue = new ArrayBlockingQueue<>(capacity);
queue = new ClosableEvictingQueue<>(capacity);

queueStatistics
= enableStatistics ? new QueueStatistics(id) : null;
Expand Down Expand Up @@ -215,25 +211,24 @@ private void runInReadingThread()
return;
}

while (!closed)
while (true)
{
T pkt;

synchronized (queue)
try
{
pkt = queue.poll();
if (pkt == null)
{
try
{
queue.wait(100);
}
catch (InterruptedException ie)
{
}
continue;
}
pkt = queue.take();
}
catch (QueueClosedException e)
{
break;
}
catch (InterruptedException ie)
{
pkt = null;
}
if (pkt == null)
continue;

if (queueStatistics != null)
{
Expand Down Expand Up @@ -319,37 +314,35 @@ private byte[] getByteArray(int len)
*/
private void doAdd(T pkt)
{
if (closed)
T drop = null;
try
{
drop = queue.insert(pkt);
}
catch (QueueClosedException e)
{
return;
}

synchronized (queue)
if (drop != null)
{
if (queue.size() >= capacity)
if (queueStatistics != null)
{
// Drop from the head of the queue.
T p = queue.poll();
if (p != null)
{
if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}
if (logDroppedPacket(++numDroppedPackets))
{
logger.warning(
"Packets dropped (id=" + id + "): " + numDroppedPackets);
}
}
queueStatistics.remove(System.currentTimeMillis());
}

if (queueStatistics != null)
int num = this.numDroppedPackets.incrementAndGet();
if (logDroppedPacket(num))
{
queueStatistics.add(System.currentTimeMillis());
logger.warning(
"Packets dropped (id=" + id + "): " + num);
}
queue.offer(pkt);
}

queue.notifyAll();
if (queueStatistics != null)
{
queueStatistics.add(System.currentTimeMillis());
}

}

/**
Expand All @@ -369,30 +362,25 @@ public T get()
"Trying to read from a queue with a configured handler.");
}

while (true)
T pkt;
try
{
if (closed)
return null;
synchronized (queue)
{
T pkt = queue.poll();
if (pkt != null)
{
if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}
return pkt;
}
pkt = queue.take();
}
catch (QueueClosedException e)
{
return null;
}
catch (InterruptedException e)
{
return null;
}

try
{
queue.wait();
}
catch (InterruptedException ie)
{}
}
if (queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}
return pkt;
}

/**
Expand All @@ -404,9 +392,6 @@ public T get()
*/
public T poll()
{
if (closed)
return null;

if (handler != null)
{
// If the queue was configured with a handler, it is running its
Expand All @@ -416,29 +401,26 @@ public T poll()
"Trying to read from a queue with a configured handler.");
}

synchronized (queue)
T pkt;
try
{
T pkt = queue.poll();
if (pkt != null && queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}

return pkt;
pkt = queue.poll();
}
catch (QueueClosedException e)
{
return null;
}
if (pkt != null && queueStatistics != null)
{
queueStatistics.remove(System.currentTimeMillis());
}

return pkt;
}

public void close()
{
if (!closed)
{
closed = true;

synchronized (queue)
{
queue.notifyAll();
}
}
queue.close();
}

/**
Expand Down
Loading