1
1
/*
2
- * Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
2
+ * Copyright (c) 2022, 2023, Oracle and/or its affiliates. All rights reserved.
3
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
4
*
5
5
* This code is free software; you can redistribute it and/or modify it
@@ -58,11 +58,16 @@ public void request(long n) { }
58
58
public void cancel () { }
59
59
};
60
60
61
+ static final int SUBSCRIBED = 1 ;
62
+ static final int REGISTERED = 2 ;
63
+ static final int COMPLETED = 4 ;
64
+ static final int CANCELLED = 8 ;
65
+ static final int UNREGISTERED = 16 ;
66
+
61
67
static final AtomicLong IDS = new AtomicLong ();
62
68
final long id = IDS .incrementAndGet ();
63
69
final BodySubscriber <T > userSubscriber ;
64
- final AtomicBoolean completed = new AtomicBoolean ();
65
- final AtomicBoolean subscribed = new AtomicBoolean ();
70
+ private volatile int state ;
66
71
final ReentrantLock subscriptionLock = new ReentrantLock ();
67
72
volatile SubscriptionWrapper subscription ;
68
73
volatile Throwable withError ;
@@ -83,14 +88,55 @@ public void request(long n) {
83
88
@ Override
84
89
public void cancel () {
85
90
try {
86
- subscription .cancel ();
87
- onCancel ();
91
+ try {
92
+ subscription .cancel ();
93
+ } finally {
94
+ if (markCancelled ()) {
95
+ onCancel ();
96
+ }
97
+ }
88
98
} catch (Throwable t ) {
89
99
onError (t );
90
100
}
91
101
}
92
102
}
93
103
104
+ private final boolean markState (final int flag ) {
105
+ int state = this .state ;
106
+ if ((state & flag ) == flag ) {
107
+ return false ;
108
+ }
109
+ synchronized (this ) {
110
+ state = this .state ;
111
+ if ((state & flag ) == flag ) {
112
+ return false ;
113
+ }
114
+ state = this .state = (state | flag );
115
+ }
116
+ assert (state & flag ) == flag ;
117
+ return true ;
118
+ }
119
+
120
+ private boolean markSubscribed () {
121
+ return markState (SUBSCRIBED );
122
+ }
123
+
124
+ private boolean markCancelled () {
125
+ return markState (CANCELLED );
126
+ }
127
+
128
+ private boolean markCompleted () {
129
+ return markState (COMPLETED );
130
+ }
131
+
132
+ private boolean markRegistered () {
133
+ return markState (REGISTERED );
134
+ }
135
+
136
+ private boolean markUnregistered () {
137
+ return markState (UNREGISTERED );
138
+ }
139
+
94
140
final long id () { return id ; }
95
141
96
142
@ Override
@@ -101,8 +147,9 @@ public boolean needsExecutor() {
101
147
// propagate the error to the user subscriber, even if not
102
148
// subscribed yet.
103
149
private void propagateError (Throwable t ) {
150
+ var state = this .state ;
104
151
assert t != null ;
105
- assert completed . get () ;
152
+ assert ( state & COMPLETED ) != 0 ;
106
153
try {
107
154
// if unsubscribed at this point, it will not
108
155
// get subscribed later - so do it now and
@@ -111,7 +158,7 @@ private void propagateError(Throwable t) {
111
158
// subscription is finished before calling onError;
112
159
subscriptionLock .lock ();
113
160
try {
114
- if (subscribed . compareAndSet ( false , true )) {
161
+ if (markSubscribed ( )) {
115
162
userSubscriber .onSubscribe (NOP );
116
163
}
117
164
} finally {
@@ -125,34 +172,139 @@ private void propagateError(Throwable t) {
125
172
}
126
173
}
127
174
175
+ /**
176
+ * This method attempts to mark the state of this
177
+ * object as registered, and then call the
178
+ * {@link #register()} method.
179
+ * <p>
180
+ * The state will be marked as registered, and the
181
+ * {@code register()} method will be called only
182
+ * if not already registered or unregistered,
183
+ * or cancelled, or completed.
184
+ *
185
+ * @return {@code true} if {@link #register()} was called,
186
+ * false otherwise.
187
+ */
188
+ protected final boolean tryRegister () {
189
+ subscriptionLock .lock ();
190
+ try {
191
+ int state = this .state ;
192
+ if ((state & (REGISTERED | UNREGISTERED | CANCELLED | COMPLETED )) != 0 ) return false ;
193
+ if (markRegistered ()) {
194
+ register ();
195
+ return true ;
196
+ }
197
+ } finally {
198
+ subscriptionLock .unlock ();
199
+ }
200
+ return false ;
201
+ }
202
+
203
+ /**
204
+ * This method attempts to mark the state of this
205
+ * object as unregistered, and then call the
206
+ * {@link #unregister()} method.
207
+ * <p>
208
+ * The {@code unregister()} method will be called only
209
+ * if already registered and not yet unregistered.
210
+ * Whether {@code unregister()} is called or not,
211
+ * the state is marked as unregistered, to prevent
212
+ * {@link #tryRegister()} from calling {@link #register()}
213
+ * after {@link #tryUnregister()} has been called.
214
+ *
215
+ * @return {@code true} if {@link #unregister()} was called,
216
+ * false otherwise.
217
+ */
218
+ protected final boolean tryUnregister () {
219
+ subscriptionLock .lock ();
220
+ try {
221
+ int state = this .state ;
222
+ if ((state & REGISTERED ) == 0 ) {
223
+ markUnregistered ();
224
+ return false ;
225
+ }
226
+ if (markUnregistered ()) {
227
+ unregister ();
228
+ return true ;
229
+ }
230
+ } finally {
231
+ subscriptionLock .unlock ();
232
+ }
233
+ return false ;
234
+ }
235
+
236
+ /**
237
+ * This method can be implemented by subclasses
238
+ * to perform registration actions. It will not be
239
+ * called if already registered or unregistered.
240
+ * @apiNote
241
+ * This method is called while holding a subscription
242
+ * lock.
243
+ * @see #tryRegister()
244
+ */
245
+ protected void register () {
246
+ assert subscriptionLock .isHeldByCurrentThread ();
247
+ }
248
+
249
+ /**
250
+ * This method can be implemented by subclasses
251
+ * to perform unregistration actions. It will not be
252
+ * called if not already registered, or already unregistered.
253
+ * @apiNote
254
+ * This method is called while holding a subscription
255
+ * lock.
256
+ * @see #tryUnregister()
257
+ */
258
+ protected void unregister () {
259
+ assert subscriptionLock .isHeldByCurrentThread ();
260
+ }
261
+
128
262
/**
129
263
* Called when the subscriber cancels its subscription.
130
264
* @apiNote
131
265
* This method may be used by subclasses to perform cleanup
132
266
* actions after a subscription has been cancelled.
267
+ * @implSpec
268
+ * This method calls {@link #tryUnregister()}
133
269
*/
134
- protected void onCancel () { }
270
+ protected void onCancel () {
271
+ // If the subscription is cancelled the
272
+ // subscriber may or may not get completed.
273
+ // Therefore we need to unregister it
274
+ tryUnregister ();
275
+ }
135
276
136
277
/**
137
278
* Called right before the userSubscriber::onSubscribe is called.
138
279
* @apiNote
139
280
* This method may be used by subclasses to perform cleanup
140
- * related actions after a subscription has been succesfully
281
+ * related actions after a subscription has been successfully
141
282
* accepted.
283
+ * This method is called while holding a subscription
284
+ * lock.
285
+ * @implSpec
286
+ * This method calls {@link #tryRegister()}
142
287
*/
143
- protected void onSubscribed () { }
288
+ protected void onSubscribed () {
289
+ tryRegister ();
290
+ }
144
291
145
292
/**
146
293
* Complete the subscriber, either normally or exceptionally
147
294
* ensure that the subscriber is completed only once.
148
295
* @param t a throwable, or {@code null}
296
+ * @implSpec
297
+ * If not {@linkplain #completed()} yet, this method
298
+ * calls {@link #tryUnregister()}
149
299
*/
150
- protected void complete (Throwable t ) {
151
- if (completed .compareAndSet (false , true )) {
300
+ public final void complete (Throwable t ) {
301
+ if (markCompleted ()) {
302
+ tryUnregister ();
152
303
t = withError = Utils .getCompletionCause (t );
153
304
if (t == null ) {
154
305
try {
155
- assert subscribed .get ();
306
+ var state = this .state ;
307
+ assert (state & SUBSCRIBED ) != 0 ;
156
308
userSubscriber .onComplete ();
157
309
} catch (Throwable x ) {
158
310
// Simply propagate the error by calling
@@ -179,10 +331,45 @@ protected void complete(Throwable t) {
179
331
* {@return true if this subscriber has already completed, either normally
180
332
* or abnormally}
181
333
*/
182
- public boolean completed () {
183
- return completed .get ();
334
+ public final boolean completed () {
335
+ int state = this .state ;
336
+ return (state & COMPLETED ) != 0 ;
184
337
}
185
338
339
+ /**
340
+ * {@return true if this subscriber has already subscribed}
341
+ */
342
+ public final boolean subscribed () {
343
+ int state = this .state ;
344
+ return (state & SUBSCRIBED ) != 0 ;
345
+ }
346
+
347
+ /**
348
+ * {@return true if this subscriber has already been registered}
349
+ */
350
+ public final boolean registered () {
351
+ int state = this .state ;
352
+ return (state & REGISTERED ) != 0 ;
353
+ }
354
+
355
+ /**
356
+ * {@return true if this subscriber has already been unregistered}
357
+ */
358
+ public final boolean unregistered () {
359
+ int state = this .state ;
360
+ return (state & UNREGISTERED ) != 0 ;
361
+ }
362
+
363
+ /**
364
+ * {@return true if this subscriber's subscription has already
365
+ * been cancelled}
366
+ */
367
+ public final boolean cancelled () {
368
+ int state = this .state ;
369
+ return (state & CANCELLED ) != 0 ;
370
+ }
371
+
372
+
186
373
@ Override
187
374
public CompletionStage <T > getBody () {
188
375
return userSubscriber .getBody ();
@@ -194,7 +381,7 @@ public void onSubscribe(Flow.Subscription subscription) {
194
381
// subscription is finished before calling onError;
195
382
subscriptionLock .lock ();
196
383
try {
197
- if (subscribed . compareAndSet ( false , true )) {
384
+ if (markSubscribed ( )) {
198
385
onSubscribed ();
199
386
SubscriptionWrapper wrapped = new SubscriptionWrapper (subscription );
200
387
userSubscriber .onSubscribe (this .subscription = wrapped );
@@ -208,8 +395,9 @@ public void onSubscribe(Flow.Subscription subscription) {
208
395
209
396
@ Override
210
397
public void onNext (List <ByteBuffer > item ) {
211
- assert subscribed .get ();
212
- if (completed .get ()) {
398
+ var state = this .state ;
399
+ assert (state & SUBSCRIBED ) != 0 ;
400
+ if ((state & COMPLETED ) != 0 ) {
213
401
SubscriptionWrapper subscription = this .subscription ;
214
402
if (subscription != null ) {
215
403
subscription .subscription .cancel ();
@@ -222,6 +410,7 @@ public void onNext(List<ByteBuffer> item) {
222
410
public void onError (Throwable throwable ) {
223
411
complete (throwable );
224
412
}
413
+
225
414
@ Override
226
415
public void onComplete () {
227
416
complete (null );
0 commit comments