Skip to content

Commit

Permalink
relaxed stream id supplement (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka committed May 4, 2020
1 parent 4fa7312 commit c9a4f06
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,11 @@
package io.rsocket.core;

import io.netty.util.collection.IntObjectMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class StreamIdSupplier {
private static final int MASK = 0x7FFFFFFF;

private static final AtomicLongFieldUpdater<StreamIdSupplier> STREAM_ID =
AtomicLongFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
private volatile long streamId;
private long streamId;

// Visible for testing
StreamIdSupplier(int streamId) {
Expand All @@ -38,10 +35,18 @@ static StreamIdSupplier serverSupplier() {
return new StreamIdSupplier(0);
}

/**
* This methods provides new stream id and ensures there is no intersections with already running
* streams. This methods is not thread-safe.
*
* @param streamIds currently running streams store
* @return next stream id
*/
int nextStreamId(IntObjectMap<?> streamIds) {
int streamId;
do {
streamId = (int) STREAM_ID.addAndGet(this, 2) & MASK;
this.streamId += 2;
streamId = (int) (this.streamId & MASK);
} while (streamId == 0 || streamIds.containsKey(streamId));
return streamId;
}
Expand Down

0 comments on commit c9a4f06

Please sign in to comment.