Skip to content

Commit

Permalink
ACCUMULO-4615: Updated status gathering to timeout per task. Ensured …
Browse files Browse the repository at this point in the history
…shutdown before continuing.
  • Loading branch information
jschmidt10 authored and Jeffrey Schmidt committed Feb 22, 2018
1 parent 4459496 commit ce3ffae
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 46 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java 100644 → 100755
Expand Up @@ -249,6 +249,7 @@ public enum Property {
"The time between adjustments of the coordinator thread pool"),
MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", PropertyType.COUNT,
"The number of threads to use when fetching the tablet server status for balancing."),
MASTER_STATUS_TIMEOUT("master.status.timeout", "6s", PropertyType.TIMEDURATION, "The timeout used by master when collecting tserver statuses."),
MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false", PropertyType.BOOLEAN, "Allow tablets for the " + MetadataTable.NAME
+ " table to be suspended via table.suspend.duration."),

Expand Down
122 changes: 76 additions & 46 deletions server/master/src/main/java/org/apache/accumulo/master/Master.java 100644 → 100755
Expand Up @@ -32,6 +32,8 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -95,6 +97,7 @@
import org.apache.accumulo.master.replication.ReplicationDriver;
import org.apache.accumulo.master.replication.WorkDriver;
import org.apache.accumulo.master.state.TableCounts;
import org.apache.accumulo.master.status.TimeoutTaskExecutor;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.HighlyAvailableService;
Expand Down Expand Up @@ -1080,60 +1083,87 @@ private long balanceTablets() {

private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(Set<TServerInstance> currentServers) {
long start = System.currentTimeMillis();

int threads = Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 1);
ExecutorService tp = Executors.newFixedThreadPool(threads);
final SortedMap<TServerInstance,TabletServerStatus> result = new TreeMap<>();
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
tp.submit(new Runnable() {
@Override
public void run() {
try {
Thread t = Thread.currentThread();
String oldName = t.getName();
try {
t.setName("Getting status from " + server);
TServerConnection connection = tserverSet.getConnection(server);
if (connection == null)
throw new IOException("No connection to " + server);
TabletServerStatus status = connection.getTableMap(false);
result.put(server, status);
} finally {
t.setName(oldName);
}
} catch (Exception ex) {
log.error("unable to get tablet server status {} {}", server, ex.toString());
log.debug("unable to get tablet server status {}", server, ex);
if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
log.warn("attempting to stop {}", server);
try {
TServerConnection connection = tserverSet.getConnection(server);
if (connection != null) {
connection.halt(masterLock);
}
} catch (TTransportException e) {
// ignore: it's probably down
} catch (Exception e) {
log.info("error talking to troublesome tablet server", e);
}
badServers.remove(server);
}
}
long timeout = getConfiguration().getTimeInMillis(Property.MASTER_STATUS_TIMEOUT);
final SortedMap<TServerInstance,TabletServerStatus> results = new ConcurrentSkipListMap<>();

This comment has been minimized.

Copy link
@ivakegg

ivakegg Feb 26, 2018

I am trying to figure out what will happen now. If a tserver's status has not yet been added to the list, I guess the balancer will simply not include that tserver in the balancing and will send the tablets to other tservers. Then in a subsequent round the tserver may pop back into the list and the balancer will send tablets back. Are we setting ourselves up for a thrashing situation here?

This comment has been minimized.

Copy link
@ivakegg

ivakegg Feb 26, 2018

Ah, I see now. The magic is in the executor.complete() method which insures all of the tasks have completed or timed out resulting in the server being added to the "badservers" list.


try (TimeoutTaskExecutor<TabletServerStatus,GetTServerStatus> executor = new TimeoutTaskExecutor<>(Executors.newFixedThreadPool(threads), timeout,
currentServers.size())) {
executor.onException((task, e) -> log.error("Exception occurred while getting status from " + task.getServer(), e));

This comment has been minimized.

Copy link
@ivakegg

ivakegg Feb 26, 2018

Shouldn't we be adding the tservers to the bad servers here?

executor.onSuccess((task, result) -> {
if (result != null) {
results.put(task.getServer(), result);
}
});
}
tp.shutdown();
try {
tp.awaitTermination(getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT) * 2, TimeUnit.MILLISECONDS);
executor.onTimeout((task) -> {
log.warn("Timed out while fetching status from " + task.getServer());
badServers.get(task.getServer()).getAndIncrement();
});

for (TServerInstance server : currentServers) {
executor.submit(new GetTServerStatus(server));
}

executor.complete();
} catch (InterruptedException e) {
log.debug("Interrupted while fetching status");
Thread.currentThread().interrupt();
}

synchronized (badServers) {
badServers.keySet().retainAll(currentServers);
badServers.keySet().removeAll(result.keySet());
badServers.keySet().removeAll(results.keySet());
}
log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", results.size(), (System.currentTimeMillis() - start) / 1000.));
return results;
}

private class GetTServerStatus implements Callable<TabletServerStatus> {
private final TServerInstance server;

public GetTServerStatus(TServerInstance server) {
this.server = server;
}

public TServerInstance getServer() {
return server;
}

@Override
public TabletServerStatus call() throws Exception {
try {
Thread t = Thread.currentThread();
String oldName = t.getName();
try {
t.setName("Getting status from " + server);
LiveTServerSet.TServerConnection connection = tserverSet.getConnection(server);
if (connection == null)
throw new IOException("No connection to " + server);
return connection.getTableMap(false);
} finally {
t.setName(oldName);
}
} catch (Exception ex) {
log.error("unable to get tablet server status {} {}", server, ex.toString());
log.debug("unable to get tablet server status {}", server, ex);
if (badServers.get(server).incrementAndGet() > Master.MAX_BAD_STATUS_COUNT) {
log.warn("attempting to stop {}", server);
try {
LiveTServerSet.TServerConnection connection = tserverSet.getConnection(server);
if (connection != null) {
connection.halt(masterLock);
}
} catch (TTransportException e) {
// ignore: it's probably down
} catch (Exception e) {
log.info("error talking to troublesome tablet server", e);
}
badServers.remove(server);
}

This comment has been minimized.

Copy link
@ivakegg

ivakegg Feb 26, 2018

We should be re-throwing the exception if we are not stoppin-n-removing the bad server. Perhaps we should consider moving the catch(Exception ex) sections into the exception and timeout handlers instead.

return null;
}
}
log.debug(String.format("Finished gathering information from %d servers in %.2f seconds", result.size(), (System.currentTimeMillis() - start) / 1000.));
return result;
}

public void run() throws IOException, InterruptedException, KeeperException {
Expand Down
@@ -0,0 +1,124 @@
package org.apache.accumulo.master.status;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* Runs one or more tasks with a timeout per task (instead of for all tasks).
*
* @param <T> The return type of the submitted Callables.
* @param <C> The type of the Callables.
*/
public class TimeoutTaskExecutor<T, C extends Callable<T>> implements AutoCloseable {

private final static Logger log = LoggerFactory.getLogger(TimeoutTaskExecutor.class);

private final long timeout;
private final ExecutorService executorService;
private final List<WrappedTask> wrappedTasks;

private BiConsumer<C, T> successCallback;
private BiConsumer<C, Exception> exceptionCallback;
private Consumer<C> timeoutCallback;

public TimeoutTaskExecutor(ExecutorService executorService, long timeout, int expectedNumCallables) {
this.executorService = executorService;
this.timeout = timeout;
this.wrappedTasks = Lists.newArrayListWithExpectedSize(expectedNumCallables);
}

/**
* Submits a new task to the underlying executor.
*
* @param callable
*/
public void submit(C callable) {
WrappedTask wt = new WrappedTask(callable, executorService.submit(callable));
wrappedTasks.add(wt);
}

/**
* Registers the callback to use on successful tasks.
*
* @param callback
*/
public void onSuccess(BiConsumer<C, T> callback) {
this.successCallback = callback;
}

/**
* Registers the callback to use on tasks that generating an exception.
*
* @param callback
*/
public void onException(BiConsumer<C, Exception> callback) {
this.exceptionCallback = callback;
}

/**
* Registers the callback to use on tasks that timed out.
*
* @param callback
*/
public void onTimeout(Consumer<C> callback) {
this.timeoutCallback = callback;
}

/**
* Completes all the current tasks by dispatching to tha appropriate callback.
*
* @throws InterruptedException If interrupted while awaiting callable results.
*/
public void complete() throws InterruptedException {
Preconditions.checkState(successCallback != null, "Must set a success callback before completing " + this);
Preconditions.checkState(exceptionCallback != null, "Must set an exception callback before completing " + this);
Preconditions.checkState(timeoutCallback != null, "Must set a timeout callback before completing " + this);

for (WrappedTask wt : wrappedTasks) {
try {
successCallback.accept(wt.callable, wt.future.get(timeout, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
throw e;
} catch (ExecutionException e) {
exceptionCallback.accept(wt.callable, e);
} catch (TimeoutException e) {
wt.future.cancel(true);
timeoutCallback.accept(wt.callable);
}

This comment has been minimized.

Copy link
@ivakegg

ivakegg Feb 26, 2018

Should we add a catch(Exception e) general case?

}
}

@Override
public void close() {
try {
executorService.shutdownNow();
} catch (Exception e) {
log.warn("Error while shutting down " + this, e);
}
}

/*
* A wrapper for keeping a callable together with it's corresponding future and results.
*/
private class WrappedTask {
public C callable;
public Future<T> future;

public WrappedTask(C callable, Future<T> future) {
this.callable = callable;
this.future = future;
}
}
}
@@ -0,0 +1,96 @@
package org.apache.accumulo.master.status;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Collection;
import java.util.concurrent.*;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

public class TimeoutTaskExecutorTest {

private TimeoutTaskExecutor<String, DummyTask> executor;
private long timeout = 100;

private Collection<String> results;
private Collection<DummyTask> timeouts;

@Before
public void setup() {
int numThreads = 2;
executor = new TimeoutTaskExecutor<>(Executors.newFixedThreadPool(numThreads), timeout, 3);

results = Lists.newArrayList();
timeouts = Lists.newArrayList();

executor.onSuccess((task, result) -> results.add(result));
executor.onTimeout((task) -> timeouts.add(task));
executor.onException((task, ex) -> fail("Unexpected exception"));
}

@Test
public void shouldExecuteTasks() throws InterruptedException {
executor.submit(new DummyTask("one", 0));
executor.submit(new DummyTask("two", 0));

executor.complete();

assertThat(results.contains("one"), is(true));
assertThat(results.contains("two"), is(true));
assertThat(timeouts.isEmpty(), is(true));
}

@Test
public void shouldReportTimedOutTasks() throws InterruptedException {
executor.submit(new DummyTask("successful", 0));
executor.submit(new DummyTask("timeout", timeout * 2));

executor.complete();

DummyTask task = Iterables.get(timeouts, 0);

assertThat(timeouts.size(), is(1));
assertThat(task.result, is("timeout"));
}

@Test
public void slowTasksShouldNotPreventOthersFromRunning() throws Exception {
// Clog up the threadpool with slow running tasks
executor.submit(new DummyTask("slow task 1", Long.MAX_VALUE));
executor.submit(new DummyTask("slow task 2", Long.MAX_VALUE));
executor.submit(new DummyTask("slow task 3", Long.MAX_VALUE));
executor.submit(new DummyTask("good task", 0L));

executor.complete();

assertThat(results.size(), is(1));
assertThat(Iterables.getFirst(results, null), is("good task"));
}

@After
public void tearDown() {
executor.close();
}

private class DummyTask implements Callable<String> {
private final String result;
private final long timeout;

public DummyTask(String result, long timeout) {
this.result = result;
this.timeout = timeout;
}

@Override
public String call() throws Exception {
Thread.sleep(timeout);
return result;
}
}
}

0 comments on commit ce3ffae

Please sign in to comment.