Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions docs/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,30 @@ However, take care **not** to share the `FlowEmit` instance across threads. That
thread-unsafe and should only be used on the calling thread. The lifetime of `FlowEmit` should not extend over the
duration of the invocation of `usingEmit`.

Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received
from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.
For **concurrent emissions** from multiple threads, use `usingChannel` instead:

```java
import com.softwaremill.jox.flows.Flows;
import static com.softwaremill.jox.structured.Scopes.supervised;

void main() throws Exception {
Flows.usingChannel(sink -> {
sink.send(1);

supervised(scope -> {
scope.forkUser(() -> {
sink.send(2);
return null;
});
scope.forkUser(() -> {
sink.send(3);
return null;
});
return null;
});
}).runToList(); // Returns [1, 2, 3] in non-deterministic order
}
```

## Transforming flows: basics

Expand Down
69 changes: 69 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,75 @@ public static <T> Flow<T> usingEmit(ThrowingConsumer<FlowEmit<T>> withEmit) {
return new Flow<>(withEmit::accept);
}

/**
* Creates a flow that supports concurrent, thread-safe emissions.
*
* <p>Unlike {@link #usingEmit(ThrowingConsumer)} which provides a thread-unsafe {@link
* FlowEmit}, this method provides a thread-safe {@link Sink} that can be safely used from
* multiple threads concurrently. This makes it suitable for scenarios requiring concurrent
* emission, such as bridging external event sources, parallel processing, or aggregating
* results from multiple background tasks.
*
* <p><b>When to use:</b>
*
* <ul>
* <li>Use {@link #usingEmit} for simple, sequential emission from a single thread
* <li>Use {@code usingChannel} when you need concurrent emission from multiple threads
* </ul>
*
* <p><b>Example usage:</b>
*
* <pre>{@code
* var flow = Flows.usingChannel(sink -> {
* // Emit from the main thread
* sink.send(1);
*
* // Fork background tasks that emit concurrently
* supervised(scope -> {
* scope.forkUser(() -> { sink.send(2); return null; });
* scope.forkUser(() -> { sink.send(3); return null; });
* return null;
* });
* });
*
* var result = flow.runToList(); // Result contains [1, 2, 3] in non-deterministic order
* }</pre>
*
* <p><b>Buffer configuration:</b> The internal channel buffer size is determined by the {@link
* Flow#CHANNEL_BUFFER_SIZE} scoped value, or {@link Channel#DEFAULT_BUFFER_SIZE} if not
* specified.
*
* @param withSink callback that receives a thread-safe {@link Sink} for emitting elements
* @param <T> the type of elements emitted by the flow
* @return a flow that emits elements sent to the sink
* @see #usingEmit(ThrowingConsumer)
*/
public static <T> Flow<T> usingChannel(ThrowingConsumer<Sink<T>> withSink) {
return usingEmit(
emit ->
supervised(
scope -> {
var channel = Flow.<T>newChannelWithBufferSizeFromScope();

scope.fork(
() -> {
try {
withSink.accept(channel);
channel.doneOrClosed();
} catch (Throwable t) {
channel.errorOrClosed(t);
throw t;
}

return null;
});

FlowEmit.channelToEmit(channel, emit);

return null;
}));
}

/**
* Creates a flow using the given {@param source}. An element is emitted for each value received
* from the source. If the source is completed with an error, is it propagated by throwing.
Expand Down
145 changes: 145 additions & 0 deletions flows/src/test/java/com/softwaremill/jox/flows/FlowsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,151 @@ void shouldCreateIteratingFlow() throws Exception {
assertEquals(List.of(1, 2, 3), c.take(3).runToList());
}

@Test
void shouldCreateSimpleChannelFlow() throws Exception {
// given
var flow =
Flows.<Integer>usingChannel(
sink -> {
supervised(
scope -> {
scope.forkUser(
() -> {
sink.send(1);
return null;
});

scope.forkUser(
() -> {
sink.send(2);
return null;
});

return null;
});

sink.send(3);
});

// when
var result = flow.runToList();

// then
assertEquals(3, result.size());
assertTrue(result.containsAll(List.of(1, 2, 3)));
}

@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
void shouldCreateChannelFlow() throws Exception {
// given
var flow =
Flows.<Integer>usingChannel(
sink -> {
supervised(
scope -> {
for (var i = 0; i <= 1000; i++) {
final var value = i;

scope.forkUser(
() -> {
Thread.sleep(1000);
sink.send(value);
return null;
});
}

return null;
});
});

// when
var sum = flow.runFold(0, Integer::sum);

// then
assertEquals(500500, sum);
}

@Test
void shouldCreateChannelFlowAndCancelIt() throws Exception {
var flow =
Flows.<Integer>usingChannel(
sink ->
supervised(
scope -> {
for (var i = 0; i <= 10; i++) {
final var value = i;

scope.forkUser(
() -> {
Thread.sleep(10000);
sink.send(value);
return null;
});
}

return null;
}));

supervised(
scope -> {
var cancellable =
scope.forkCancellable(
() -> {
try {
return flow.runFold(0, Integer::sum);
} catch (InterruptedException e) {
return -1;
}
});

Thread.sleep(1000);
var res = cancellable.cancel();
assertEquals(-1, res);
return null;
});
}

@Test
void shouldPropagateErrorFromUsingChannel() {
// given
var flow =
Flows.<Integer>usingChannel(
_ -> {
throw new RuntimeException("boom");
});

// when & then
var exception = assertThrows(Exception.class, flow::runToList);
assertTrue(exception.getMessage().contains("boom"));
}

@Test
void shouldPropagateErrorFromForkInUsingChannel() {
// given
var flow =
Flows.<Integer>usingChannel(
sink -> {
supervised(
scope -> {
scope.forkUser(
() -> {
Thread.sleep(50);
throw new RuntimeException("fork boom");
});

sink.send(1);
sink.send(2);

return null;
});
});

// when & then
var exception = assertThrows(Exception.class, flow::runToList);
assertTrue(exception.getMessage().contains("fork boom"));
}

@Test
void shouldProduceRange() throws Exception {
assertEquals(List.of(1, 2, 3, 4, 5), Flows.range(1, 5, 1).runToList());
Expand Down
Loading