Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code organization #22

Merged
merged 7 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ libraryDependencies += "com.softwaremill.jox" % "core" % "0.0.2"
### Rendezvous channel

```java
import jox.Channel;
import com.softwaremill.jox.Channel;

class Demo1 {
public static void main(String[] args) throws InterruptedException {
Expand Down Expand Up @@ -73,7 +73,7 @@ class Demo1 {
### Buffered channel

```java
import jox.Channel;
import com.softwaremill.jox.Channel;

class Demo2 {
public static void main(String[] args) throws InterruptedException {
Expand Down Expand Up @@ -109,7 +109,7 @@ or `null` / the received value.
Channels can also be inspected whether they are closed, using the `isClosed()`, `isDone()` and `isError()` methods.

```java
import jox.Channel;
import com.softwaremill.jox.Channel;

class Demo3 {
public static void main(String[] args) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package jox;
package com.softwaremill.jox;

import com.softwaremill.jox.Channel;
import org.openjdk.jmh.annotations.*;

import java.util.concurrent.ArrayBlockingQueue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package jox;
package com.softwaremill.jox;

import com.softwaremill.jox.Channel;
import org.openjdk.jmh.annotations.*;

import java.util.concurrent.Exchanger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package jox
package com.softwaremill.jox

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package jox
package com.softwaremill.jox

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package jox
package com.softwaremill.jox

const val OPERATIONS_PER_INVOCATION = 1_000_000
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package jox;
package com.softwaremill.jox;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
Expand All @@ -8,10 +8,46 @@
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;

import static jox.CellState.*;
import static jox.Segment.findAndMoveForward;
import static com.softwaremill.jox.CellState.*;
import static com.softwaremill.jox.Segment.findAndMoveForward;

public class Channel<T> {
/**
* Channel is a thread-safe data structure which exposes three basic operations:
* <p>
* - {@link Channel#send(Object)}-ing a value to the channel
* - {@link Channel#receive()}-ing a value from the channel
* - closing the channel using {@link Channel#done()} or {@link Channel#error(Throwable)}
* <p>
* There are two channel flavors:
* <p>
* - rendezvous channels, where senders and receivers must meet to exchange values
* - buffered channels, where a given number of sent elements might be buffered, before subsequent sends block
* <p>
* The no-argument {@link Channel} constructor creates a rendezvous channel, while a buffered channel can be created
* by providing a positive integer to the constructor. A rendezvous channel behaves like a buffered channel with
* buffer size 0.
* <p>
* In a rendezvous channel, senders and receivers block, until a matching party arrives (unless one is already waiting).
* Similarly, buffered channels block if the buffer is full (in case of senders), or in case of receivers, if the
* buffer is empty and there are no waiting senders.
* <p>
* All blocking operations behave properly upon interruption.
* <p>
* Channels might be closed, either because no more elements will be produced by the source (using
* {@link Channel#done()}), or because there was an error while producing or processing the received elements (using
* {@link Channel#error(Throwable)}).
* <p>
* After closing, no more elements can be sent to the channel. If the channel is "done", any pending sends will be
* completed normally. If the channel is in an "error" state, pending sends will be interrupted and will return with
* the reason for the closure.
* <p>
* In case the channel is closed, one of the {@link ChannelClosedException}s is thrown. Alternatively, you can call
* the less type-safe, but more exception-safe {@link Channel#sendSafe(Object)} and {@link Channel#receiveSafe()}
* methods, which do not throw in case the channel is closed, but return one of the {@link ChannelClosed} values.
*
* @param <T> The type of the elements processed by the channel.
*/
public final class Channel<T> {
/*
Inspired by the "Fast and Scalable Channels in Kotlin Coroutines" paper (https://arxiv.org/abs/2211.04986), and
the Kotlin implementation (https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
Expand Down Expand Up @@ -73,11 +109,20 @@ operations on these (previous) segments, and we'll end up wanting to remove such
private final boolean isRendezvous;


/**
* Creates a rendezvous channel.
*/
public Channel() {
this(0);
}

/**
* Creates a buffered channel (when capacity is positive), or a rendezvous channel if the capacity is 0.
* Capacity cannot be negative.
*/
public Channel(int capacity) {
assert capacity >= 0 : "Capacity must be non-negative.";

this.capacity = capacity;
isRendezvous = capacity == 0L;

Expand Down Expand Up @@ -540,7 +585,7 @@ public void done() {
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
public Object doneSafe() {
return closeSafe(new ChannelClosed.ChannelDone());
return closeSafe(new ChannelDone());
}

/**
Expand Down Expand Up @@ -575,7 +620,7 @@ public void error(Throwable reason) {
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
public Object errorSafe(Throwable reason) {
return closeSafe(new ChannelClosed.ChannelError(reason));
return closeSafe(new ChannelError(reason));
}

private Object closeSafe(ChannelClosed channelClosed) {
Expand All @@ -591,7 +636,7 @@ private Object closeSafe(ChannelClosed channelClosed) {
// closing the segment chain guarantees that no new segment beyond `lastSegment` will be created
var lastSegment = sendSegment.get().close();

if (channelClosed instanceof ChannelClosed.ChannelError) {
if (channelClosed instanceof ChannelError) {
// closing all cells, as this is an error
closeCellsUntil(0, lastSegment);
} else {
Expand Down Expand Up @@ -682,15 +727,15 @@ public boolean isClosed() {
}

public boolean isDone() {
return closedReason.get() instanceof ChannelClosed.ChannelDone;
return closedReason.get() instanceof ChannelDone;
}

/**
* @return {@code null} if the channel is not closed, or if it's closed with {@link ChannelClosed.ChannelDone}.
* @return {@code null} if the channel is not closed, or if it's closed with {@link ChannelDone}.
*/
public Throwable isError() {
var reason = closedReason.get();
if (reason instanceof ChannelClosed.ChannelError e) {
if (reason instanceof ChannelError e) {
return e.cause();
} else {
return null;
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/com/softwaremill/jox/ChannelClosed.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.softwaremill.jox;

/**
* Returned by {@link Channel#sendSafe(Object)} and {@link Channel#receiveSafe()} when the channel is closed.
*/
public sealed interface ChannelClosed permits ChannelDone, ChannelError {
ChannelClosedException toException();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.softwaremill.jox;

/**
* Thrown by {@link Channel#send(Object)} and {@link Channel#receive()} when the channel is closed.
*/
public sealed class ChannelClosedException extends RuntimeException permits ChannelDoneException, ChannelErrorException {
public ChannelClosedException() {}

public ChannelClosedException(Throwable cause) {
super(cause);
}
}

8 changes: 8 additions & 0 deletions core/src/main/java/com/softwaremill/jox/ChannelDone.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.softwaremill.jox;

public record ChannelDone() implements ChannelClosed {
@Override
public ChannelClosedException toException() {
return new ChannelDoneException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.softwaremill.jox;

public final class ChannelDoneException extends ChannelClosedException {}
8 changes: 8 additions & 0 deletions core/src/main/java/com/softwaremill/jox/ChannelError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.softwaremill.jox;

public record ChannelError(Throwable cause) implements ChannelClosed {
@Override
public ChannelClosedException toException() {
return new ChannelErrorException(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.softwaremill.jox;

public final class ChannelErrorException extends ChannelClosedException {
public ChannelErrorException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package jox;
package com.softwaremill.jox;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down
19 changes: 0 additions & 19 deletions core/src/main/java/jox/ChannelClosed.java

This file was deleted.

24 changes: 0 additions & 24 deletions core/src/main/java/jox/ChannelClosedException.java

This file was deleted.

3 changes: 3 additions & 0 deletions core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module com.softwaremill.jox {
exports com.softwaremill.jox;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package jox;
package com.softwaremill.jox;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.concurrent.ExecutionException;

import static jox.TestUtil.forkVoid;
import static jox.TestUtil.scoped;
import static com.softwaremill.jox.TestUtil.forkVoid;
import static com.softwaremill.jox.TestUtil.scoped;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;

Expand Down Expand Up @@ -109,7 +109,7 @@ void shouldNotReceiveFromAChannelInCaseOfAnError() throws InterruptedException {
var r2 = c.receiveSafe();

// then
assertInstanceOf(ChannelClosed.ChannelError.class, r1);
assertInstanceOf(ChannelClosed.ChannelError.class, r2);
assertInstanceOf(ChannelError.class, r1);
assertInstanceOf(ChannelError.class, r2);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package jox;
package com.softwaremill.jox;

import org.junit.jupiter.api.Test;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
import static jox.TestUtil.*;
import static com.softwaremill.jox.TestUtil.*;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package jox;
package com.softwaremill.jox;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand All @@ -8,7 +8,7 @@
import java.util.Set;
import java.util.concurrent.*;

import static jox.TestUtil.*;
import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.*;

public class ChannelRendezvousTest {
Expand Down Expand Up @@ -146,10 +146,10 @@ void pendingReceivesShouldGetNotifiedThatChannelIsDone() throws InterruptedExcep
c.done();

// then
assertEquals(new ChannelClosed.ChannelDone(), f.get());
assertEquals(new ChannelDone(), f.get());

// should be rejected immediately
assertEquals(new ChannelClosed.ChannelDone(), c.receiveSafe());
assertEquals(new ChannelDone(), c.receiveSafe());
});
}

Expand All @@ -165,10 +165,10 @@ void pendingSendsShouldGetNotifiedThatChannelIsErrored() throws InterruptedExcep
c.error(new RuntimeException());

// then
assertInstanceOf(ChannelClosed.ChannelError.class, f.get());
assertInstanceOf(ChannelError.class, f.get());

// should be rejected immediately
assertInstanceOf(ChannelClosed.ChannelError.class, c.sendSafe(2));
assertInstanceOf(ChannelError.class, c.sendSafe(2));
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package jox;
package com.softwaremill.jox;

import java.util.*;
import java.util.concurrent.*;

import static jox.TestUtil.*;
import static com.softwaremill.jox.TestUtil.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -166,7 +166,7 @@ private List<String> drainChannel(StructuredTaskScope<Object> scope, Channel<Str
var result = new ArrayList<String>();
while (true) {
var e = ch.receiveSafe();
if (e instanceof ChannelClosed.ChannelDone) {
if (e instanceof ChannelDone) {
return result;
} else {
result.add((String) e);
Expand Down
Loading