Skip to content

Commit

Permalink
provides extra @NonNullApi annotation for all packages (#826)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlegDokuka committed May 8, 2020
1 parent 79d2ee6 commit cec7a78
Show file tree
Hide file tree
Showing 67 changed files with 199 additions and 79 deletions.
1 change: 0 additions & 1 deletion build.gradle
Expand Up @@ -62,7 +62,6 @@ subprojects {

dependencies {
dependency "ch.qos.logback:logback-classic:${ext['logback.version']}"
dependency "com.google.code.findbugs:jsr305:${ext['findbugs.version']}"
dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}"
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
Expand Down
2 changes: 0 additions & 2 deletions rsocket-core/build.gradle
Expand Up @@ -29,8 +29,6 @@ dependencies {

implementation 'org.slf4j:slf4j-api'

compileOnly 'com.google.code.findbugs:jsr305'

testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter-api'
Expand Down
Expand Up @@ -19,7 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.AbstractReferenceCounted;
import io.rsocket.core.DefaultConnectionSetupPayload;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* Exposes information from the {@code SETUP} frame to a server, as well as to client responders.
Expand Down
Expand Up @@ -41,10 +41,10 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

/**
Expand Down
Expand Up @@ -55,7 +55,6 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand All @@ -68,6 +67,7 @@
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/**
Expand Down
Expand Up @@ -39,14 +39,14 @@
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.*;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.netty.util.collection.IntObjectMap;

/** This API is not thread-safe and must be strictly used in serialized fashion */
final class StreamIdSupplier {
private static final int MASK = 0x7FFFFFFF;

Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* Application layer logic generating a Reactive Streams {@code onError} event.
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The Responder canceled the request but may have started processing it (similar to REJECTED but
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The connection is being terminated. Sender or Receiver of this frame MUST wait for outstanding
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The connection is being terminated. Sender or Receiver of this frame MAY close the connection
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

public class CustomRSocketException extends RSocketException {
private static final long serialVersionUID = 7873267740343446585L;
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The request is invalid.
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The Setup frame is invalid for the server (it could be that the client is too recent for the old
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* Despite being a valid request, the Responder decided to reject it. The Responder guarantees that
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The server rejected the resume, it can specify the reason in the payload.
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* The server rejected the setup, it can specify the reason in the payload.
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/** The root of the setup exception hierarchy. */
public abstract class SetupException extends RSocketException {
Expand Down
Expand Up @@ -17,7 +17,7 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorFrameCodec;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

/**
* Some (or all) of the parameters specified by the client are unsupported by the server.
Expand Down
Expand Up @@ -25,12 +25,12 @@
import io.rsocket.frame.FrameLengthCodec;
import io.rsocket.frame.FrameType;
import java.util.Objects;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s.
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.SynchronousSink;
import reactor.util.annotation.Nullable;

/**
* The implementation of the RSocket reassembly behavior.
Expand Down Expand Up @@ -83,6 +84,7 @@ public boolean isDisposed() {
return get();
}

@Nullable
synchronized ByteBuf getHeader(int streamId) {
return headers.get(streamId);
}
Expand All @@ -109,14 +111,17 @@ synchronized CompositeByteBuf getData(int streamId) {
return byteBuf;
}

@Nullable
synchronized ByteBuf removeHeader(int streamId) {
return headers.remove(streamId);
}

@Nullable
synchronized CompositeByteBuf removeMetadata(int streamId) {
return metadata.remove(streamId);
}

@Nullable
synchronized CompositeByteBuf removeData(int streamId) {
return data.remove(streamId);
}
Expand Down Expand Up @@ -236,7 +241,6 @@ void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {
case CANCEL:
case ERROR:
cancelAssemble(streamId);
default:
}

if (!frameType.isFragmentable()) {
Expand Down Expand Up @@ -270,7 +274,7 @@ private ByteBuf assembleFrameWithMetadata(ByteBuf frame, int streamId, ByteBuf h
metadata = PayloadFrameCodec.metadata(frame).retain();
}
} else {
metadata = cm != null ? cm : null;
metadata = cm;
}

ByteBuf data = assembleData(frame, streamId);
Expand Down
Expand Up @@ -2,7 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

public class ExtensionFrameCodec {
private ExtensionFrameCodec() {}
Expand Down Expand Up @@ -49,6 +49,7 @@ public static ByteBuf data(ByteBuf byteBuf) {
return data;
}

@Nullable
public static ByteBuf metadata(ByteBuf byteBuf) {
FrameHeaderCodec.ensureFrameType(FrameType.EXT, byteBuf);

Expand Down
Expand Up @@ -3,6 +3,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import reactor.util.annotation.Nullable;

class FrameBodyCodec {
public static final int FRAME_LENGTH_MASK = 0xFFFFFF;
Expand Down Expand Up @@ -33,9 +34,9 @@ private static int decodeLength(final ByteBuf byteBuf) {
static ByteBuf encode(
ByteBufAllocator allocator,
final ByteBuf header,
ByteBuf metadata,
@Nullable ByteBuf metadata,
boolean hasMetadata,
ByteBuf data) {
@Nullable ByteBuf data) {

final boolean addData;
if (data != null) {
Expand Down
Expand Up @@ -4,7 +4,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.Payload;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

class GenericFrameCodec {

Expand Down Expand Up @@ -75,7 +75,7 @@ static ByteBuf encode(
boolean next,
int requestN,
@Nullable ByteBuf metadata,
ByteBuf data) {
@Nullable ByteBuf data) {

final boolean hasMetadata = metadata != null;

Expand Down Expand Up @@ -115,6 +115,7 @@ static ByteBuf data(ByteBuf byteBuf) {
return data;
}

@Nullable
static ByteBuf metadata(ByteBuf byteBuf) {
boolean hasMetadata = FrameHeaderCodec.hasMetadata(byteBuf);
if (!hasMetadata) {
Expand All @@ -136,6 +137,7 @@ static ByteBuf dataWithRequestN(ByteBuf byteBuf) {
return data;
}

@Nullable
static ByteBuf metadataWithRequestN(ByteBuf byteBuf) {
boolean hasMetadata = FrameHeaderCodec.hasMetadata(byteBuf);
if (!hasMetadata) {
Expand Down
Expand Up @@ -2,8 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import javax.annotation.Nullable;
import reactor.util.annotation.Nullable;

public class LeaseFrameCodec {

Expand Down Expand Up @@ -67,6 +66,7 @@ public static int numRequests(final ByteBuf byteBuf) {
return numRequests;
}

@Nullable
public static ByteBuf metadata(final ByteBuf byteBuf) {
FrameHeaderCodec.ensureFrameType(FrameType.LEASE, byteBuf);
if (FrameHeaderCodec.hasMetadata(byteBuf)) {
Expand All @@ -77,7 +77,7 @@ public static ByteBuf metadata(final ByteBuf byteBuf) {
byteBuf.resetReaderIndex();
return metadata;
} else {
return Unpooled.EMPTY_BUFFER;
return null;
}
}
}
Expand Up @@ -8,6 +8,10 @@
public class MetadataPushFrameCodec {

public static ByteBuf encodeReleasingPayload(ByteBufAllocator allocator, Payload payload) {
if (!payload.hasMetadata()) {
throw new IllegalStateException(
"Metadata push requires to have metadata present" + " in the given Payload");
}
final ByteBuf metadata = payload.metadata().retain();
// releasing payload safely since it can be already released wheres we have to release retained
// data and metadata as well
Expand Down
Expand Up @@ -3,6 +3,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Payload;
import reactor.util.annotation.Nullable;

public class PayloadFrameCodec {

Expand Down Expand Up @@ -37,8 +38,8 @@ public static ByteBuf encode(
boolean fragmentFollows,
boolean complete,
boolean next,
ByteBuf metadata,
ByteBuf data) {
@Nullable ByteBuf metadata,
@Nullable ByteBuf data) {

return GenericFrameCodec.encode(
allocator, FrameType.PAYLOAD, streamId, fragmentFollows, complete, next, 0, metadata, data);
Expand All @@ -48,6 +49,7 @@ public static ByteBuf data(ByteBuf byteBuf) {
return GenericFrameCodec.data(byteBuf);
}

@Nullable
public static ByteBuf metadata(ByteBuf byteBuf) {
return GenericFrameCodec.metadata(byteBuf);
}
Expand Down

0 comments on commit cec7a78

Please sign in to comment.