Skip to content

Commit

Permalink
Use explicit CLOSING and CLOSED states
Browse files Browse the repository at this point in the history
  • Loading branch information
io7m committed Apr 27, 2023
1 parent c4d0388 commit 2621047
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ public enum HBState

CLIENT_POLLING_EVENTS_SUCCEEDED,

/**
* The client has been instructed to close.
*/

CLIENT_CLOSING,

/**
* The client has been closed.
*/
Expand Down
4 changes: 4 additions & 0 deletions com.io7m.hibiscus.basic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<groupId>com.io7m.junreachable</groupId>
<artifactId>com.io7m.junreachable.core</artifactId>
</dependency>
<dependency>
<groupId>com.io7m.jcip</groupId>
<artifactId>com.io7m.jcip.annotations</artifactId>
</dependency>

<dependency>
<groupId>org.osgi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@
import com.io7m.hibiscus.api.HBResultType;
import com.io7m.hibiscus.api.HBState;
import com.io7m.junreachable.UnreachableCodeException;
import net.jcip.annotations.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.io7m.hibiscus.api.HBState.CLIENT_CLOSED;
import static com.io7m.hibiscus.api.HBState.CLIENT_CLOSING;
import static com.io7m.hibiscus.api.HBState.CLIENT_CONNECTED;
import static com.io7m.hibiscus.api.HBState.CLIENT_DISCONNECTED;
import static com.io7m.hibiscus.api.HBState.CLIENT_EXECUTING_COMMAND;
Expand Down Expand Up @@ -73,13 +73,14 @@ public abstract class HBClientSynchronousAbstract<
private static final Logger LOG =
LoggerFactory.getLogger(HBClientSynchronousAbstract.class);

private final SubmissionPublisher<E> events;
private final SubmissionPublisher<HBState> state;
private final AtomicReference<HBState> stateNow;
private final AtomicBoolean closedExternal;
private final AtomicBoolean closedInternal;
private final SubmissionPublisher<E> eventPublisher;
private final SubmissionPublisher<HBState> statePublisher;

@GuardedBy("stateLock")
private HBState stateNow;
private final HBClientHandlerType<X, C, R, RS, RF, E, CR> disconnectedHandler;
private final HBDirectExecutor executor;
private final Object stateLock;
private HBClientHandlerType<X, C, R, RS, RF, E, CR> handler;

/**
Expand All @@ -97,24 +98,23 @@ protected HBClientSynchronousAbstract(

this.executor =
new HBDirectExecutor();
this.events =
this.eventPublisher =
new SubmissionPublisher<>(this.executor, Flow.defaultBufferSize());
this.state =
this.statePublisher =
new SubmissionPublisher<>(this.executor, Flow.defaultBufferSize());

this.stateLock =
new Object();
this.stateNow =
new AtomicReference<>(CLIENT_DISCONNECTED);
this.closedExternal =
new AtomicBoolean(false);
this.closedInternal =
new AtomicBoolean(false);
CLIENT_DISCONNECTED;
this.handler =
this.disconnectedHandler;
}

@Override
public final boolean isClosed()
{
return this.closedExternal.get();
return this.stateNow() == CLIENT_CLOSED;
}

@Override
Expand All @@ -126,19 +126,21 @@ public final boolean isConnected()
@Override
public final Flow.Publisher<E> events()
{
return this.events;
return this.eventPublisher;
}

@Override
public final Flow.Publisher<HBState> state()
{
return this.state;
return this.statePublisher;
}

@Override
public final HBState stateNow()
{
return this.stateNow.get();
synchronized (this.stateLock) {
return this.stateNow;
}
}

@Override
Expand All @@ -150,7 +152,7 @@ public final void pollEvents()
this.publishState(CLIENT_POLLING_EVENTS);

try {
this.handler.onPollEvents().forEach(this.events::submit);
this.handler.onPollEvents().forEach(this.eventPublisher::submit);
LOG.debug("polling events succeeded");
this.publishState(CLIENT_POLLING_EVENTS_SUCCEEDED);
} catch (final Exception e) {
Expand Down Expand Up @@ -199,35 +201,38 @@ public final HBResultType<RS, RF> login(

private void checkNotClosed()
{
if (this.closedInternal.get()) {
throw new IllegalStateException("Client is closed!");
synchronized (this.stateLock) {
final var stateNow = this.stateNow();
if (stateNow == CLIENT_CLOSING || stateNow == CLIENT_CLOSED) {
throw new IllegalStateException("Client is closed!");
}
}
}

private void publishState(
final HBState newState)
{
if (newState == CLIENT_CLOSED) {
this.logStateChange(newState);
this.stateNow.set(newState);
this.state.submit(newState);
return;
final HBState stateThen;
synchronized (this.stateLock) {
stateThen = this.stateNow();
if (stateThen == CLIENT_CLOSED) {
return;
}
}

if (this.closedInternal.get()) {
return;
logStateChange(stateThen, newState);
synchronized (this.stateLock) {
this.stateNow = newState;
}

this.logStateChange(newState);
this.stateNow.set(newState);
this.state.submit(newState);
this.statePublisher.submit(newState);
}

private void logStateChange(
private static void logStateChange(
final HBState oldState,
final HBState newState)
{
if (LOG.isTraceEnabled()) {
LOG.trace("state {} -> {}", this.stateNow.get(), newState);
LOG.trace("state {} -> {}", oldState, newState);
}
}

Expand Down Expand Up @@ -282,28 +287,34 @@ public final void disconnect()
@Override
public final void close()
{
/*
* We track an "internal" and "external" closed state flag because we
* want to use the internal flag to prevent multiple close attempts, but
* we don't want to advertise the "closed" state externally until all
* resources have actually been closed.
*/

if (this.closedInternal.compareAndSet(false, true)) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("close");
}
synchronized (this.stateLock) {
final var state = this.stateNow;
if (state == CLIENT_CLOSING || state == CLIENT_CLOSED) {
return;
}
this.stateNow = CLIENT_CLOSING;
}

try {
if (LOG.isTraceEnabled()) {
LOG.trace("close");
}

this.publishState(CLIENT_CLOSED);
this.statePublisher.submit(CLIENT_CLOSED);

try {
this.state.close();
} finally {
this.events.close();
}
try {
this.statePublisher.close();
} finally {
this.closedExternal.set(true);
this.eventPublisher.close();
}
} finally {
synchronized (this.stateLock) {
this.stateNow = CLIENT_CLOSED;
}
logStateChange(CLIENT_CLOSING, CLIENT_CLOSED);

if (LOG.isTraceEnabled()) {
LOG.trace("close completed");
}
}
}
Expand Down
1 change: 1 addition & 0 deletions com.io7m.hibiscus.basic/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

requires transitive com.io7m.hibiscus.api;

requires com.io7m.jcip.annotations;
requires org.slf4j;
requires com.io7m.junreachable.core;

Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
<artifactId>com.io7m.junreachable.core</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>com.io7m.jcip</groupId>
<artifactId>com.io7m.jcip.annotations</artifactId>
<version>2.0.0</version>
</dependency>

<!-- OSGi metadata and build. -->
<dependency>
Expand Down

0 comments on commit 2621047

Please sign in to comment.