Skip to content

Commit

Permalink
provides non-thread safe stream id supplying
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed May 4, 2020
1 parent c9f3323 commit 310fabb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 7 deletions.
10 changes: 4 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,11 @@
package io.rsocket.core;

import io.netty.util.collection.IntObjectMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class StreamIdSupplier {
private static final int MASK = 0x7FFFFFFF;

private static final AtomicLongFieldUpdater<StreamIdSupplier> STREAM_ID =
AtomicLongFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
private volatile long streamId;
private long streamId;

// Visible for testing
StreamIdSupplier(int streamId) {
Expand All @@ -41,7 +38,8 @@ static StreamIdSupplier serverSupplier() {
int nextStreamId(IntObjectMap<?> streamIds) {
int streamId;
do {
streamId = (int) STREAM_ID.addAndGet(this, 2) & MASK;
this.streamId += 2;
streamId = (int) (this.streamId & MASK);
} while (streamId == 0 || streamIds.containsKey(streamId));
return streamId;
}
Expand Down
Expand Up @@ -55,7 +55,6 @@
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

Expand Down

0 comments on commit 310fabb

Please sign in to comment.