Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relaxed stream id supplement #814

Merged
merged 2 commits into from May 4, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 11 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,11 @@
package io.rsocket.core;

import io.netty.util.collection.IntObjectMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

final class StreamIdSupplier {
private static final int MASK = 0x7FFFFFFF;

private static final AtomicLongFieldUpdater<StreamIdSupplier> STREAM_ID =
AtomicLongFieldUpdater.newUpdater(StreamIdSupplier.class, "streamId");
private volatile long streamId;
private long streamId;

// Visible for testing
StreamIdSupplier(int streamId) {
Expand All @@ -38,10 +35,18 @@ static StreamIdSupplier serverSupplier() {
return new StreamIdSupplier(0);
}

/**
* This methods provides new stream id and ensures there is no intersections with already running
* streams. This methods is not thread-safe.
*
* @param streamIds currently running streams store
* @return next stream id
*/
int nextStreamId(IntObjectMap<?> streamIds) {
int streamId;
do {
streamId = (int) STREAM_ID.addAndGet(this, 2) & MASK;
this.streamId += 2;
streamId = (int) (this.streamId & MASK);
} while (streamId == 0 || streamIds.containsKey(streamId));
return streamId;
}
Expand Down