Skip to content

Commit

Permalink
[FLINK-9784] Fix inconsistent use of 'static' in AsyncIOExample.java
Browse files Browse the repository at this point in the history
This closes apache#6298.
  • Loading branch information
nekrassov authored and sampath s committed Jul 26, 2018
1 parent e9d09ee commit 753d82b
Showing 1 changed file with 19 additions and 47 deletions.
Expand Up @@ -33,16 +33,17 @@
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -116,10 +117,7 @@ public void cancel() {
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static final long serialVersionUID = 2098635244857937717L;

private static ExecutorService executorService;
private static Random random;

private int counter;
private transient ExecutorService executorService;

/**
* The result of multiplying sleepFactor with a random float is used to pause
Expand All @@ -145,57 +143,31 @@ private static class SampleAsyncFunction extends RichAsyncFunction<Integer, Stri
public void open(Configuration parameters) throws Exception {
super.open(parameters);

synchronized (SampleAsyncFunction.class) {
if (counter == 0) {
executorService = Executors.newFixedThreadPool(30);

random = new Random();
}

++counter;
}
executorService = Executors.newFixedThreadPool(30);
}

@Override
public void close() throws Exception {
super.close();

synchronized (SampleAsyncFunction.class) {
--counter;

if (counter == 0) {
executorService.shutdown();

try {
if (!executorService.awaitTermination(shutdownWaitTS, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
}

@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) throws Exception {
executorService.submit(new Runnable() {
@Override
public void run() {
// wait for while to simulate async operation here
long sleep = (long) (random.nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);

if (random.nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<String>(0));
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
// wait for while to simulate async operation here
long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);

if (ThreadLocalRandom.current().nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<>(0));
}
});
}
Expand Down

0 comments on commit 753d82b

Please sign in to comment.