Skip to content
Permalink
Browse files

Offload I/O from ScheduledExecutorService thread in PaceMaker. (#173)

* Offload I/O from ScheduledExecutorService thread in PaceMaker.
  • Loading branch information...
mstyura authored and bgrozev committed Feb 13, 2019
1 parent 5aec41f commit 80471b751a95edca4c5f88a88657c12081fcc895
Showing with 63 additions and 104 deletions.
  1. +2 −1 src/main/java/org/ice4j/ice/Agent.java
  2. +61 −103 src/main/java/org/ice4j/ice/ConnectivityCheckClient.java
@@ -379,7 +379,8 @@ public Agent(Level loggingLevel, String ufragPrefix)
SecureRandom random = new SecureRandom();

connCheckServer = new ConnectivityCheckServer(this);
connCheckClient = new ConnectivityCheckClient(this, agentTasksScheduler);
connCheckClient = new ConnectivityCheckClient(
this, agentTasksScheduler, agentTasksExecutor);

//add the FINGERPRINT attribute to all messages.
System.setProperty(StackProperties.ALWAYS_SIGN, "true");
@@ -18,16 +18,17 @@
package org.ice4j.ice;

import java.net.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.logging.*;

import org.ice4j.*;
import org.ice4j.attribute.*;
import org.ice4j.message.*;
import org.ice4j.socket.*;
import org.ice4j.stack.*;
import org.ice4j.util.*;
import org.ice4j.util.Logger; //Disambiguation

/**
@@ -57,10 +58,15 @@
private final Agent parentAgent;

/**
* A scheduled executor service to perform scheduled task of the client
* A scheduled executor service to perform periodic tasks of the client
*/
private final ScheduledExecutorService scheduledExecutorService;

/**
* An executor service to perform background tasks of the client
*/
private final ExecutorService executorService;

/**
* The <tt>StunStack</tt> that we will use for connectivity checks.
*/
@@ -97,13 +103,17 @@
* @param parentAgent the <tt>Agent</tt> that is creating this instance.
* @param scheduledExecutorService the <tt>ScheduledExecutorService</tt>
* to execute clients tasks
* @param executorService the <tt>ExecutorService</tt> to execute
* background tasks of connectivity check client
*/
public ConnectivityCheckClient(
Agent parentAgent,
ScheduledExecutorService scheduledExecutorService)
ScheduledExecutorService scheduledExecutorService,
ExecutorService executorService)
{
this.parentAgent = parentAgent;
this.scheduledExecutorService = scheduledExecutorService;
this.executorService = executorService;
logger = new Logger(classLogger, parentAgent.getLogger());

stunStack = this.parentAgent.getStunStack();
@@ -869,92 +879,14 @@ public void processTimeout(StunTimeoutEvent ev)
* A class to control periodically scheduled runnable that actually sends
* the checks for a particular check list in the pace defined in RFC 5245.
*/
private final class PaceMaker
private final class PaceMaker extends PeriodicRunnable
{
/**
* Indicates whether this <tt>Runnable</tt> should continue running.
*/
private final AtomicBoolean cancelled = new AtomicBoolean();

/**
* Sends connectivity checks at the pace determined by the {@link
* Agent#calculateTa()} method and using either the trigger check queue
* or the regular check lists.
*/
private final Runnable connectivityChecker = new Runnable()
{
@Override
public void run()
{
if (cancelled.get())
{
return;
}

CandidatePair pairToCheck = checkList.popTriggeredCheck();

//if there are no triggered checks, go for an ordinary one.
if (pairToCheck == null)
{
pairToCheck = checkList.getNextOrdinaryPairToCheck();
}

if (pairToCheck != null)
{
/*
* Since we suspect that it is possible to
* startCheckForPair, processSuccessResponse and only
* then setStateInProgress, we'll synchronize. The
* synchronization root is the one of the
* CandidatePair#setState method.
*/
synchronized (pairToCheck)
{
TransactionID transactionID
= startCheckForPair(pairToCheck);

if (transactionID == null)
{
logger.info(
"Pair failed: "
+ pairToCheck.toShortString());
pairToCheck.setStateFailed();
}
else
{
pairToCheck.setStateInProgress(transactionID);
}
}
}
else
{
/*
* We are done sending checks for this list. We'll set
* its final state in either the processResponse(),
* processTimeout() or processFailure() method.
*/
logger.finest("will skip a check beat.");
checkList.fireEndOfOrdinaryChecks();
}

if (!cancelled.get())
{
schedule();
}
}
};

/**
* The {@link CheckList} that this <tt>PaceMaker</tt> will be running
* checks for.
*/
private final CheckList checkList;

/**
* Scheduled instance of {@link #connectivityChecker} runnable
*/
private ScheduledFuture<?> scheduledCheck;

/**
* Creates a new {@link PaceMaker} for this
* <tt>ConnectivityCheckClient</tt>.
@@ -964,37 +896,63 @@ public void run()
*/
public PaceMaker(CheckList checkList)
{
super(scheduledExecutorService, executorService);
this.checkList = checkList;
}

/**
* Cancel execution of current and further connectivity checks
* Sends connectivity checks at the pace determined by the {@link
* Agent#calculateTa()} method and using either the trigger check queue
* or the regular check lists.
*/
void cancel()
@Override
protected void run()
{
cancelled.set(true);
CandidatePair pairToCheck = checkList.popTriggeredCheck();

final ScheduledFuture<?> scheduledCheck = this.scheduledCheck;
if (scheduledCheck != null)
//if there are no triggered checks, go for an ordinary one.
if (pairToCheck == null)
{
scheduledCheck.cancel(true);
pairToCheck = checkList.getNextOrdinaryPairToCheck();
}
}

/**
* Schedules execution of checks for check list
*/
void schedule()
{
if (cancelled.get())
if (pairToCheck != null)
{
/*
* Since we suspect that it is possible to
* startCheckForPair, processSuccessResponse and only
* then setStateInProgress, we'll synchronize. The
* synchronization root is the one of the
* CandidatePair#setState method.
*/
synchronized (pairToCheck)
{
TransactionID transactionID
= startCheckForPair(pairToCheck);

if (transactionID == null)
{
logger.info(
"Pair failed: "
+ pairToCheck.toShortString());
pairToCheck.setStateFailed();
}
else
{
pairToCheck.setStateInProgress(transactionID);
}
}
}
else
{
return;
/*
* We are done sending checks for this list. We'll set
* its final state in either the processResponse(),
* processTimeout() or processFailure() method.
*/
logger.finest("will skip a check beat.");
checkList.fireEndOfOrdinaryChecks();
}
scheduledCheck
= scheduledExecutorService.schedule(
connectivityChecker,
getNextWaitInterval(),
TimeUnit.MILLISECONDS);
}

/**
@@ -1004,7 +962,7 @@ void schedule()
* @return the number milliseconds to wait before we send the next
* check.
*/
private long getNextWaitInterval()
protected Duration getDelayUntilNextRun()
{
int activeCheckLists = parentAgent.getActiveCheckListCount();

@@ -1015,7 +973,7 @@ private long getNextWaitInterval()
activeCheckLists = 1;
}

return parentAgent.calculateTa() * activeCheckLists;
return Duration.ofMillis(parentAgent.calculateTa() * activeCheckLists);
}
}

0 comments on commit 80471b7

Please sign in to comment.
You can’t perform that action at this time.