Skip to content

Commit

Permalink
Issue ReactiveX#565: Thread Local Context Propagator Support (Reactiv…
Browse files Browse the repository at this point in the history
  • Loading branch information
pulkitmehra authored and RobWin committed Jan 27, 2020
1 parent be4e8ed commit b07d61d
Show file tree
Hide file tree
Showing 31 changed files with 1,533 additions and 97 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
kotlin.code.style=official
systemProp.file.encoding=UTF-8
systemProp.sun.jnu.encoding=UTF-8
systemProp.sun.jnu.encoding=UTF-8
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package io.github.resilience4j.bulkhead;

import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.stream.Collectors.toMap;

/**
* Abstraction to retrieve, copy and clean up values across thread boundary. This class is
* specifically use for propagating {@link ThreadLocal} across different thread boundaries.
*
* @param <T> value type that is copied across thread boundary.
*/
public interface ContextPropagator<T> {

/**
* Retrieves value from the currently executing thread. This method should produce values (as
* Supplier) that needs to be propagated to new thread.
*
* @return a Supplier producing the value from current thread
*/
Supplier<Optional<T>> retrieve();

/**
* Copies value from the parent thread into new executing thread. This method is passed with the
* values received from method {@link ContextPropagator#retrieve()} in the parent thread.
*
* @return a Consumer to set values in new thread.
*/
Consumer<Optional<T>> copy();

/**
* CleanUp value before thread execution finish. This method is passed with the values received
* from method {@link ContextPropagator#retrieve()} in the parent thread.
*
* @return a Consumer to cleanUp values.
*/
Consumer<Optional<T>> clear();

/**
* Method decorates supplier to copy variables across thread boundary.
*
* @param propagator the instance of {@link ContextPropagator}
* @param supplier the supplier to be decorated
* @param <T> the type of variable that cross thread boundary
* @return decorated supplier of type T
*/
static <T> Supplier<T> decorateSupplier(ContextPropagator propagator,
Supplier<T> supplier) {
final Optional value = (Optional) propagator.retrieve().get();
return () -> {
try {
propagator.copy().accept(value);
return supplier.get();
} finally {
propagator.clear().accept(value);
}
};
}

/**
* Method decorates supplier to copy variables across thread boundary.
*
* @param propagators the instance of {@link ContextPropagator} should be non null.
* @param supplier the supplier to be decorated
* @param <T> the type of variable that cross thread boundary
* @return decorated supplier of type T
*/
static <T> Supplier<T> decorateSupplier(List<? extends ContextPropagator> propagators,
Supplier<T> supplier) {

Objects.requireNonNull(propagators, "ContextPropagator list should be non null");

//Create identity map of <ContextPropagator,Optional Supplier value>, if we have duplicate ContextPropagators then last one wins.
final Map<? extends ContextPropagator, Object> values = propagators.stream()
.collect(toMap(
p -> p, //key as ContextPropagator instance itself
p -> p.retrieve().get(), //Supplier Optional value
(first, second) -> second, //Merge function, this simply choose later value in key collision
HashMap::new)); //type of map

return () -> {
try {
values.forEach((p, v) -> p.copy().accept(v));
return supplier.get();
} finally {
values.forEach((p, v) -> p.clear().accept(v));
}
};
}

/**
* Method decorates runnable to copy variables across thread boundary.
*
* @param propagators the instance of {@link ContextPropagator}
* @param runnable the runnable to be decorated
* @param <T> the type of variable that cross thread boundary
* @return decorated supplier of type T
*/
static <T> Runnable decorateRunnable(List<? extends ContextPropagator> propagators,
Runnable runnable) {
Objects.requireNonNull(propagators, "ContextPropagator list should be non null");

//Create identity map of <ContextPropagator,Optional Supplier value>, if we have duplicate ContextPropagators then last one wins.
final Map<? extends ContextPropagator, Object> values = propagators.stream()
.collect(toMap(
p -> p, //key as ContextPropagator instance itself
p -> p.retrieve().get(), //Supplier Optional value
(first, second) -> second, //Merge function, this simply choose later value in key collision
HashMap::new)); //type of map

return () -> {
try {
values.forEach((p, v) -> p.copy().accept(v));
runnable.run();
} finally {
values.forEach((p, v) -> p.clear().accept(v));
}
};
}

/**
* Method decorates runnable to copy variables across thread boundary.
*
* @param propagator the instance of {@link ContextPropagator}
* @param runnable the runnable to be decorated
* @param <T> the type of variable that cross thread boundary
* @return decorated supplier of type T
*/
static <T> Runnable decorateRunnable(ContextPropagator propagator,
Runnable runnable) {
final Optional value = (Optional) propagator.retrieve().get();
return () -> {
try {
propagator.copy().accept(value);
runnable.run();
} finally {
propagator.clear().accept(value);
}
};
}

/**
* An empty context propagator.
*
* @param <T> type.
* @return an empty {@link ContextPropagator}
*/
static <T> ContextPropagator<T> empty() {
return new EmptyContextPropagator<>();
}

/**
* A convenient implementation of empty {@link ContextPropagator}
*
* @param <T> type of class.
*/
class EmptyContextPropagator<T> implements ContextPropagator<T> {

@Override
public Supplier<Optional<T>> retrieve() {
return () -> Optional.empty();
}

@Override
public Consumer<Optional<T>> copy() {
return (t) -> {
};
}

@Override
public Consumer<Optional<T>> clear() {
return (t) -> {
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,16 @@
*/
package io.github.resilience4j.bulkhead;

import io.github.resilience4j.core.lang.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static io.github.resilience4j.core.ClassUtils.instantiateClassDefConstructor;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;

/**
* A {@link ThreadPoolBulkheadConfig} configures a {@link Bulkhead}
Expand All @@ -39,6 +48,7 @@ public class ThreadPoolBulkheadConfig {
private int queueCapacity = DEFAULT_QUEUE_CAPACITY;
private Duration keepAliveDuration = DEFAULT_KEEP_ALIVE_DURATION;
private boolean writableStackTraceEnabled = DEFAULT_WRITABLE_STACK_TRACE_ENABLED;
private List<? extends ContextPropagator> contextPropagators = new ArrayList<>();

private ThreadPoolBulkheadConfig() {
}
Expand Down Expand Up @@ -82,19 +92,21 @@ public int getMaxThreadPoolSize() {
return maxThreadPoolSize;
}

public int getCoreThreadPoolSize() {
return coreThreadPoolSize;
}
public int getCoreThreadPoolSize() { return coreThreadPoolSize; }

public boolean isWritableStackTraceEnabled() {
return writableStackTraceEnabled;
}

public static class Builder {
public List<? extends ContextPropagator> getContextPropagator() {
return contextPropagators;
}

public static class Builder {
private Class<? extends ContextPropagator>[] contextPropagatorClasses = new Class[0];
private List<? extends ContextPropagator> contextPropagators = new ArrayList<>();
private ThreadPoolBulkheadConfig config;


public Builder(ThreadPoolBulkheadConfig bulkheadConfig) {
this.config = bulkheadConfig;
}
Expand Down Expand Up @@ -133,6 +145,26 @@ public Builder coreThreadPoolSize(int coreThreadPoolSize) {
return this;
}

/**
* Configures the context propagator class.
*
* @return the BulkheadConfig.Builder
*/
public final Builder contextPropagator(
@Nullable Class<? extends ContextPropagator>... contextPropagatorClasses) {
this.contextPropagatorClasses = contextPropagatorClasses != null
? contextPropagatorClasses
: new Class[0];
return this;
}

public final Builder contextPropagator(ContextPropagator... contextPropagators) {
this.contextPropagators = contextPropagators != null ?
Arrays.stream(contextPropagators).collect(toList()) :
new ArrayList<>();
return this;
}

/**
* Configures the capacity of the queue.
*
Expand Down Expand Up @@ -188,6 +220,16 @@ public ThreadPoolBulkheadConfig build() {
throw new IllegalArgumentException(
"maxThreadPoolSize must be a greater than or equals to coreThreadPoolSize");
}
if (contextPropagatorClasses.length > 0) {
config.contextPropagators.addAll((List)stream(contextPropagatorClasses)
.map(c -> instantiateClassDefConstructor(c))
.collect(toList()));
}
//setting bean of type context propagator overrides the class type.
if (contextPropagators.size() > 0){
config.contextPropagators.addAll((List)this.contextPropagators);
}

return config;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.bulkhead.ContextPropagator;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
import io.github.resilience4j.bulkhead.event.BulkheadEvent;
Expand All @@ -32,6 +33,7 @@
import io.vavr.collection.HashMap;
import io.vavr.collection.Map;

import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.Supplier;

Expand Down Expand Up @@ -137,14 +139,15 @@ public FixedThreadPoolBulkhead(String name, Supplier<ThreadPoolBulkheadConfig> c
public <T> CompletableFuture<T> submit(Callable<T> callable) {
final CompletableFuture<T> promise = new CompletableFuture<>();
try {
CompletableFuture.supplyAsync(() -> {

CompletableFuture.supplyAsync(ContextPropagator.decorateSupplier(config.getContextPropagator(),() -> {
try {
publishBulkheadEvent(() -> new BulkheadOnCallPermittedEvent(name));
return callable.call();
} catch (Exception e) {
throw new CompletionException(e);
}
}, executorService).whenComplete((result, throwable) -> {
}), executorService).whenComplete((result, throwable) -> {
publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
if (throwable != null) {
promise.completeExceptionally(throwable);
Expand All @@ -165,14 +168,14 @@ public <T> CompletableFuture<T> submit(Callable<T> callable) {
@Override
public void submit(Runnable runnable) {
try {
CompletableFuture.runAsync(() -> {
CompletableFuture.runAsync(ContextPropagator.decorateRunnable(config.getContextPropagator(),() -> {
try {
publishBulkheadEvent(() -> new BulkheadOnCallPermittedEvent(name));
runnable.run();
} catch (Exception e) {
throw new CompletionException(e);
}
}, executorService).whenComplete((voidResult, throwable) -> publishBulkheadEvent(
}), executorService).whenComplete((voidResult, throwable) -> publishBulkheadEvent(
() -> new BulkheadOnCallFinishedEvent(name)));
} catch (RejectedExecutionException rejected) {
publishBulkheadEvent(() -> new BulkheadOnCallRejectedEvent(name));
Expand Down
Loading

0 comments on commit b07d61d

Please sign in to comment.