Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make select clauses immutable & reusable #26

Merged
merged 7 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public Object sendSafe(T value) throws InterruptedException {

/**
* @return If {@code select} & {@code selectClause} is {@code null}: {@code null} when the value was sent, or
* {@link ChannelClosed}, when the channel is closed. Otherwise, might also return {@link StoredSelect}.
* {@link ChannelClosed}, when the channel is closed. Otherwise, might also return {@link StoredSelectClause}.
*/
private Object doSend(T value, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
if (value == null) {
Expand Down Expand Up @@ -234,7 +234,7 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
// not cleaning the previous segments - the close procedure might still need it
return closedReason.get();
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// we stored a select instance - there's no matching receive, not clearing the previous segment
return ss;
}
Expand All @@ -248,7 +248,7 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
* @param i The index within the {@code segment}.
* @param s Index of the reserved cell.
* @param value The value to send.
* @return One of {@link SendResult}, or {@link StoredSelect} if {@code select} is not {@code null}.
* @return One of {@link SendResult}, or {@link StoredSelectClause} if {@code select} is not {@code null}.
*/
private Object updateCellSend(Segment segment, int i, long s, T value, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
Expand All @@ -262,7 +262,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns
if (select != null) {
// cell is empty, no receiver, and we are in a select -> store the select instance
// and await externally; the value to send is stored in the selectClause
var storedSelect = new StoredSelect(select, segment, i, true, selectClause);
var storedSelect = new StoredSelectClause(select, segment, i, true, selectClause, value);
if (segment.casCell(i, state, storedSelect)) {
return storedSelect;
}
Expand Down Expand Up @@ -305,9 +305,14 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns
return SendResult.FAILED;
}
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// Setting the payload first, before the memory barrier created by potentially setting `SelectInstance.state`.
// The state is the read in select's main thread. Since we have this send-cell exclusively, no other thread
// will attempt to call `setPayload`.
ss.setPayload(value);

// a select clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, value)) {
if (ss.getSelect().trySelect(ss)) {
segment.setCell(i, DONE);
return SendResult.RESUMED;
} else {
Expand Down Expand Up @@ -357,7 +362,7 @@ public Object receiveSafe() throws InterruptedException {

/**
* @return If {@code select} & {@code selectClause} is {@code null}: the received value, or {@link ChannelClosed},
* when the channel is closed. Otherwise, might also return {@link StoredSelect}.
* when the channel is closed. Otherwise, might also return {@link StoredSelectClause}.
*/
private Object doReceive(SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
Expand Down Expand Up @@ -404,7 +409,7 @@ private Object doReceive(SelectInstance select, SelectClause<?> selectClause) th
- awaiting on the continuation is interrupted, in which case the exception propagates outside of this method
- we stored the given select instance (in an empty / in-buffer cell)
*/
if (!(result instanceof StoredSelect)) {
if (!(result instanceof StoredSelectClause)) {
segment.cleanPrev();
}
if (result != ReceiveResult.FAILED) {
Expand All @@ -424,7 +429,7 @@ private Object doReceive(SelectInstance select, SelectClause<?> selectClause) th
* @param i The index within the {@code segment}.
* @param r Index of the reserved cell.
* @param select The select instance of which this receive is part of, or {@code null} (along with {@code selectClause}) if this is a direct receive call.
* @return Either a state-result ({@link ReceiveResult}), {@link StoredSelect} in case {@code select} is not {@code null}, or the received value.
* @return Either a state-result ({@link ReceiveResult}), {@link StoredSelectClause} in case {@code select} is not {@code null}, or the received value.
*/
private Object updateCellReceive(Segment segment, int i, long r, SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
Expand All @@ -437,7 +442,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
if (select != null) {
// cell is empty, no sender, and we are in a select -> store the select instance
// and await externally
var storedSelect = new StoredSelect(select, segment, i, false, selectClause);
var storedSelect = new StoredSelectClause(select, segment, i, false, selectClause, null);
if (segment.casCell(i, state, storedSelect)) {
expandBuffer();
return storedSelect;
Expand Down Expand Up @@ -485,14 +490,14 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
}
// else: CAS unsuccessful, repeat
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// resolving a potential race with `expandBuffer`
if (segment.casCell(i, state, RESUMING)) {
// a send clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, 0)) {
if (ss.getSelect().trySelect(ss)) {
segment.setCell(i, DONE);
expandBuffer();
return ss.getClause().getPayload();
return ss.getPayload();
} else {
// when select fails (another clause is selected, select is interrupted, closed etc.) -> trying with a new one
// the state will be set to INTERRUPTED_SEND by the cleanup, meanwhile everybody else will observe RESUMING
Expand Down Expand Up @@ -596,11 +601,11 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
// must be a receiver continuation - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
case StoredSelect ss when ss.isSender() -> {
case StoredSelectClause ss when ss.isSender() -> {
if (segment.casCell(i, state, RESUMING)) {
// a send clause is waiting -> trying to resume
if (ss.getSelect().trySelect(ss, 0)) {
segment.setCell(i, new Buffered(ss.getClause().getPayload()));
if (ss.getSelect().trySelect(ss)) {
segment.setCell(i, new Buffered(ss.getPayload()));
return ExpandBufferResult.DONE;
} else {
// select unsuccessful -> trying with a new one
Expand All @@ -610,7 +615,7 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
}
// else: CAS unsuccessful, repeat
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
// must be a receiver clause of the select - another buffer expansion already happened
return ExpandBufferResult.DONE;
}
Expand Down Expand Up @@ -815,7 +820,7 @@ private void updateCellClose(Segment segment, int i) {
return;
}
}
case StoredSelect ss -> {
case StoredSelectClause ss -> {
ss.getSelect().channelClosed(closedReason.get());
// not setting the state & updating counters, as each non-selected stored select cell will be
// cleaned up, setting an interrupted state (and informing the segment)
Expand Down Expand Up @@ -872,7 +877,7 @@ public SelectClause<T> receiveClause() {
* the current channel, and transform it using the provided {@code callback}.
*/
public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
return new SelectClause<>(null) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
return Channel.this;
Expand All @@ -889,9 +894,9 @@ Object register(SelectInstance select) {
}

@Override
U transformedRawValue() {
U transformedRawValue(Object rawValue) {
//noinspection unchecked
return callback.apply((T) getPayload());
return callback.apply((T) rawValue);
}
};
}
Expand All @@ -909,7 +914,7 @@ public SelectClause<Void> sendClause(T value) {
* to the current channel, and return the value of the provided callback as the clause's result.
*/
public <U> SelectClause<U> sendClause(T value, Supplier<U> callback) {
return new SelectClause<>(value) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
return Channel.this;
Expand All @@ -928,18 +933,13 @@ Object register(SelectInstance select) {
}

@Override
U transformedRawValue() {
U transformedRawValue(Object rawValue) {
return callback.get();
}

@Override
void setPayload(Object payload) {
// ignoring, the payload is set during creation
}
};
}

void cleanupStoredSelect(Segment segment, int i, boolean isSender) {
void cleanupStoredSelectClause(Segment segment, int i, boolean isSender) {
// We treat the cell as if it was interrupted - the code is same as in `Continuation.await`;
// there's no need to resolve races with `SelectInstance.trySelect`, as cleanup is called either when a clause
// is selected, a channel is closed, or during re-registration. In all cases `trySelect` would fail.
Expand Down Expand Up @@ -1007,8 +1007,8 @@ public String toString() {
case Buffered b -> sb.append("V(").append(b.value()).append(")");
case Continuation c when c.isSender() -> sb.append("WS(").append(c.getPayload()).append(")");
case Continuation c -> sb.append("WR");
case StoredSelect ss when ss.isSender() -> sb.append("SS");
case StoredSelect ss -> sb.append("SR");
case StoredSelectClause ss when ss.isSender() -> sb.append("SS");
case StoredSelectClause ss -> sb.append("SR");
default -> throw new IllegalStateException("Unexpected value: " + state);
}
if (i != Segment.SEGMENT_SIZE - 1) sb.append(",");
Expand Down
Loading
Loading