Skip to content

Commit

Permalink
Offload I/O from ScheduledExecutorService thread in Retransmitter. (#172
Browse files Browse the repository at this point in the history
)

* Offload I/O from ScheduledExecutorService thread in Retransmitter.

* Use single thread executor for timer.

* Use single thread for timer

* Fixed typo
  • Loading branch information
mstyura authored and bbaldino committed Feb 12, 2019
1 parent f9a357e commit 5aec41f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 96 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/ice4j/ice/Agent.java
Expand Up @@ -121,7 +121,7 @@ public class Agent
* The ScheduledExecutorService to execute Agent's scheduled tasks
*/
private static final ScheduledExecutorService agentTasksScheduler
= ExecutorFactory.createCPUBoundScheduledExecutor(
= ExecutorFactory.createSingleThreadScheduledExecutor(
"ice4j.Agent-timer-", 60, TimeUnit.SECONDS);

/**
Expand Down
129 changes: 34 additions & 95 deletions src/main/java/org/ice4j/stack/StunClientTransaction.java
Expand Up @@ -18,6 +18,7 @@
package org.ice4j.stack;

import java.io.*;
import java.time.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;
Expand Down Expand Up @@ -74,12 +75,21 @@ public class StunClientTransaction
public static final int DEFAULT_ORIGINAL_WAIT_INTERVAL = 100;

/**
* The pool of <tt>Thread</tt>s which retransmit
* The pool of <tt>Thread</tt>s which schedules retransmission of
* <tt>StunClientTransaction</tt>s.
*/
private static final ScheduledExecutorService retransmissionThreadPool
= ExecutorFactory.createCPUBoundScheduledExecutor(
"ice4j.StunClientTransaction-", 60, TimeUnit.SECONDS);
private static final ScheduledExecutorService retransmissionTimer
= ExecutorFactory.createSingleThreadScheduledExecutor(
"ice4j.StunClientTransaction-timer-", 60, TimeUnit.SECONDS);

/**
* The pool of <tt>Thread</tt>s which retransmits
* <tt>StunClientTransaction</tt>s.
*/
private static final ExecutorService retransmissionExecutor
= ExecutorFactory.createCachedThreadPool(
"ice4j.StunClientTransaction-executor-");


/**
* Maximum number of retransmissions. Once this number is reached and if no
Expand Down Expand Up @@ -415,7 +425,7 @@ public TransportAddress getRemoteAddress()
* If no response is received by 1.6 seconds after the last request has been
* sent, we consider the transaction to have failed.
*/
private final class Retransmitter
private final class Retransmitter extends PeriodicRunnable
{
/**
* Current number of retransmission attempts
Expand All @@ -427,30 +437,28 @@ private final class Retransmitter
*/
private int nextRetransmissionDelay = originalWaitInterval;

/**
* Currently scheduled retransmission task
*/
private ScheduledFuture<?> retransmissionFuture;
protected Retransmitter()
{
super(retransmissionTimer, retransmissionExecutor);
}

/**
* The scheduled runnable that perform retransmit attempt
*/
private final Runnable retransmissionAttempt = new Runnable()
@Override
protected Duration getDelayUntilNextRun()
{
@Override
public void run()
{
if (cancelled.get())
{
return;
}
return Duration.ofMillis(nextRetransmissionDelay);
}

retransmissionCounter++;
@Override
protected void run()
{
retransmissionCounter++;

int curWaitInterval = nextRetransmissionDelay;
nextRetransmissionDelay
= Math.min(maxWaitInterval, 2 * nextRetransmissionDelay);
int curWaitInterval = nextRetransmissionDelay;
nextRetransmissionDelay
= Math.min(maxWaitInterval, 2 * nextRetransmissionDelay);

if (retransmissionCounter <= maxRetransmissions)
{
try
{
logger.fine(
Expand All @@ -470,87 +478,18 @@ public void run()
"A client tran retransmission failed",
ex);
}

if(!cancelled.get())
{
reschedule();
}
}

private void reschedule()
else
{
if (retransmissionCounter < maxRetransmissions)
{
retransmissionFuture = retransmissionThreadPool.schedule(
retransmissionAttempt,
nextRetransmissionDelay,
TimeUnit.MILLISECONDS);
}
else
{
// before stating that a transaction has timeout-ed we
// should first wait for a reception of the response
nextRetransmissionDelay =
Math.min(maxWaitInterval, 2* nextRetransmissionDelay);

retransmissionFuture = retransmissionThreadPool.schedule(
transactionTimedOut,
nextRetransmissionDelay,
TimeUnit.MILLISECONDS);
}
}
};

/**
* Scheduled runnable to time-out STUN transaction
*/
private final Runnable transactionTimedOut = new Runnable()
{
@Override
public void run()
{
if (cancelled.get())
{
return;
}

stackCallback.removeClientTransaction(
StunClientTransaction.this);

responseCollector.processTimeout(
new StunTimeoutEvent(
stackCallback,
getRequest(), getLocalAddress(), getTransactionID()));
}
};

/**
* Schedules STUN transaction retransmission
*/
void schedule()
{
if (retransmissionFuture != null)
{
return;
}

retransmissionFuture = retransmissionThreadPool.schedule(
retransmissionAttempt,
nextRetransmissionDelay,
TimeUnit.MILLISECONDS);
}

/**
* Cancels the transaction. Once this method is called the transaction
* is considered terminated and will stop retransmissions.
*/
void cancel()
{
final ScheduledFuture<?> retransmissionFuture =
this.retransmissionFuture;
if (retransmissionFuture != null)
{
retransmissionFuture.cancel(true);
nextRetransmissionDelay = -1;
}
}
}
Expand Down

0 comments on commit 5aec41f

Please sign in to comment.