Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SurroundingPublisher and HttpResponse.of(headers, pub, trailers) #4727

Merged
merged 41 commits into from
Aug 21, 2023

Conversation

injae-kim
Copy link
Contributor

@injae-kim injae-kim commented Mar 6, 2023

Related Issue #3959

Motivation:

We have PrependingPublisher to publish ResponseHeaders and body in a StreamMessage.
Similarly, we can add AppendingPublisher to publish body and HTTP trailers more efficiently.

Modifications:

  • Add SurroundingPublisher to append trailers
  • Add HttpResponse.of(header, pub, trailers) that using SurroundingPublisher

Result:

@minwoox minwoox added this to the 1.23.0 milestone Mar 7, 2023
@minwoox minwoox added the sprint Issues for OSS Sprint participants label Mar 7, 2023
Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, looks good! 😄
Left some suggestions.

@ikhoon ikhoon modified the milestones: 1.23.0, 1.24.0 Mar 15, 2023
@injae-kim
Copy link
Contributor Author

injae-kim commented Mar 20, 2023

Sorry for the late, I'll update SurroundingPublisher within this weekend! (I'm on-call this week..)

📌 Sorry once again for my late 😭 I'll must update within this weekend, until 04/02!!

@injae-kim injae-kim changed the title Add AppendingPublisher and HttpResponse.of(headers, pub, trailers) Add SurroundingPublisher and HttpResponse.of(headers, pub, trailers) Apr 3, 2023
@injae-kim injae-kim requested a review from minwoox April 10, 2023 15:49
@injae-kim
Copy link
Contributor Author

@minwoox , can I get review? 🙇

@ikhoon
Copy link
Contributor

ikhoon commented Apr 28, 2023

Let me review this first. Minwoo is working on another issue.

@injae-kim injae-kim force-pushed the appending-publisher branch 2 times, most recently from ab6abc7 to 97e6d9a Compare May 15, 2023 08:42
@injae-kim
Copy link
Contributor Author

Hmm I'm chekcing failed test on my local env~ I'll find root cause and fix it soon 🙇

@codecov
Copy link

codecov bot commented Aug 5, 2023

Codecov Report

Patch coverage: 77.19% and project coverage change: +0.02% 🎉

Comparison is base (1ce4c69) 74.30% compared to head (732744b) 74.32%.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #4727      +/-   ##
============================================
+ Coverage     74.30%   74.32%   +0.02%     
- Complexity    19600    19625      +25     
============================================
  Files          1682     1683       +1     
  Lines         72260    72585     +325     
  Branches       9242     9304      +62     
============================================
+ Hits          53695    53952     +257     
- Misses        14222    14265      +43     
- Partials       4343     4368      +25     
Files Changed Coverage Δ
.../java/com/linecorp/armeria/common/HttpRequest.java 74.45% <0.00%> (-3.42%) ⬇️
...pring/web/reactive/AbstractServerHttpResponse.java 73.61% <ø> (ø)
...orp/armeria/common/PublisherBasedHttpResponse.java 71.42% <50.00%> (-28.58%) ⬇️
...meria/spring/web/reactive/ChannelSendOperator.java 65.60% <65.60%> (ø)
...spring/web/reactive/ArmeriaServerHttpResponse.java 84.14% <84.61%> (-0.14%) ⬇️
...a/internal/common/stream/SurroundingPublisher.java 87.38% <87.38%> (ø)
...java/com/linecorp/armeria/common/HttpResponse.java 88.55% <100.00%> (+0.19%) ⬆️
...m/linecorp/armeria/common/HttpResponseBuilder.java 76.08% <100.00%> (+5.87%) ⬆️

... and 23 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines 336 to 337
assert upstreamRequested > 0;
if (upstreamRequested < Long.MAX_VALUE) {
Copy link
Contributor Author

@injae-kim injae-kim Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class ChannelSendOperator {

		@Override
		public void subscribe(Subscriber<? super T> writeSubscriber) {
			synchronized (this) {
				Assert.state(this.writeSubscriber == null, "Only one write subscriber supported");
				this.writeSubscriber = writeSubscriber;
				if (this.error != null || this.completed) { // completed: true
					this.writeSubscriber.onSubscribe(Operators.emptySubscription());
					emitCachedSignals(); // 👈👈 
				}

		private boolean emitCachedSignals() {
			T item = this.item;
			this.item = null;
			if (item != null) {
				requiredWriteSubscriber().onNext(item); // 👈👈 although there's no request, onNext cached item!
			}
}

@ikhoon , I found that spring's ChannelSendOperator calls downstream.onNext(item) without any request on below case!

  1. ChannelSendOperator.subscribe()
  2. ChannelSendOperator is already completed
  3. emitCachedSignals()
  4. downstream.onNext(item) // although there's no request, onNext() is invoked!

"ChannelSendOperator(upstream) can downstream.onNext(item) without any request" -> it sounds weird, but we can't change spring ChannelSendOperator's behavior so I just removed assert upstreamRequested > 0; on L336 ~! PTAL 🙇

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, changes still look good to me 👍

Copy link
Contributor

@ikhoon ikhoon Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ChannelSendOperator(upstream) can downstream.onNext(item) without any request"

It might be not related to this PR but the assertion found the bug. Let me add some logs and debug to know why ChannelSendOperator was completed before subscribe()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ikhoon nim gentle ping 🙇

Can you check debug logs on broken test? I think I still can't check debug logs on CI/CD result.
I want to merge this PR and handle next PRs!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to download artifacts to see debug logs on the summary page of the GitHub Actions job.
Otherwise, let me try to merge #5104 quickly to easily see the test logs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test results on Gradle build scans.

oh it's cool! thanks a lot 🙇 I'll check test result and share to you~!!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspected the problem is a bug in ChannelSendOperator that violates the most important rule 1.1. https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md#1.1
So I forked ChannelSendOperator and modified not to publish the cached item before a request is made. https://github.com/line/armeria/pull/4727/files#r1299525470

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem occurs only in Spring Boot 2 WebFlux. The following command can reproduce it locally.

./gradlew :spring:boot3-webflux-autoconfigure:testClasses :spring:boot2-webflux-autoconfigure:test --tests "*.MatrixVariablesTest"

ChannelSendOperator implementation has not changed between Spring Boot 2 and Spring Boot 3. The main difference was spring-projects/spring-framework#28398 which avoids collecting Flux before passing to ChannelSendOperator. Consequently, WriteBarrier.onComplete() was not invoked before WriteBarrier.subscribe() is called and backpressure looks respected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really thanks for your detailed investigation 🙇 If there's something I have to do, please tell me!

@jrhee17 jrhee17 modified the milestones: 1.25.0, 1.26.0 Aug 16, 2023

// Forked from https://github.com/spring-projects/spring-framework/blob/1e3099759e2d823b6dd1c0c43895abcbe3e02a12/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java
// and modified at L370 for not publishing the cache item before receiving request(n) from the subscriber.
private final Function<Publisher<T>, Publisher<Void>> writeFunction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual diff with the upstream code.

--- spring-framework/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java	2023-01-15 20:37:55
+++ armeria/spring/boot3-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ChannelSendOperator.java	2023-08-21 11:37:11
@@ -1,4 +1,19 @@
 /*
+ * Copyright 2023 LINE Corporation
+ *
+ * LINE Corporation licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+/*
  * Copyright 2002-2022 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,13 +29,18 @@
  * limitations under the License.
  */
 
-package org.springframework.http.server.reactive;
+package com.linecorp.armeria.spring.web.reactive;
 
 import java.util.function.Function;
 
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+
 import reactor.core.CoreSubscriber;
 import reactor.core.Scannable;
 import reactor.core.publisher.Flux;
@@ -28,11 +48,6 @@
 import reactor.core.publisher.Operators;
 import reactor.util.context.Context;
 
-import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
-import org.springframework.lang.Nullable;
-import org.springframework.util.Assert;
-
 /**
  * Given a write function that accepts a source {@code Publisher<T>} to write
  * with and returns {@code Publisher<Void>} for the result, this operator helps
@@ -48,6 +63,8 @@
  */
 public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
 
+	// Forked from https://github.com/spring-projects/spring-framework/blob/1e3099759e2d823b6dd1c0c43895abcbe3e02a12/spring-web/src/main/java/org/springframework/http/server/reactive/ChannelSendOperator.java
+	// and modified at L370 for not publishing the cache item before receiving request(n) from the subscriber.
 	private final Function<Publisher<T>, Publisher<Void>> writeFunction;
 
 	private final Flux<T> source;
@@ -350,7 +367,7 @@
 			synchronized (this) {
 				Assert.state(this.writeSubscriber == null, "Only one write subscriber supported");
 				this.writeSubscriber = writeSubscriber;
-				if (this.error != null || this.completed) {
+				if (this.error != null || (this.completed && this.item == null)) {
 					this.writeSubscriber.onSubscribe(Operators.emptySubscription());
 					emitCachedSignals();
 				}

@jrhee17 jrhee17 modified the milestones: 1.26.0, 1.25.0 Aug 21, 2023
@jrhee17 jrhee17 merged commit d861d34 into line:main Aug 21, 2023
14 checks passed
minwoox pushed a commit that referenced this pull request Oct 25, 2023
Related issue #4816 

### Motivation:
> As a future work of #4727,
users might want to dynamically emit the last message depending on whether the upstream publisher completes successfully or exceptionally.

- On #4816, it's good to add `StreamMessage.endWith(finalizer)`

### Modifications:

- Add `StreamMessage.endWith(finalizer)`

### Result:

- Closes #4816 
- Now user can dynamically emit the last value depending on whether the `StreamMessage` completes successfully or exceptionally by using `StreamMessage.endWith(finalizer)`
Bue-von-hon pushed a commit to Bue-von-hon/armeria that referenced this pull request Nov 10, 2023
Related issue line#4816 

### Motivation:
> As a future work of line#4727,
users might want to dynamically emit the last message depending on whether the upstream publisher completes successfully or exceptionally.

- On line#4816, it's good to add `StreamMessage.endWith(finalizer)`

### Modifications:

- Add `StreamMessage.endWith(finalizer)`

### Result:

- Closes line#4816 
- Now user can dynamically emit the last value depending on whether the `StreamMessage` completes successfully or exceptionally by using `StreamMessage.endWith(finalizer)`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement sprint Issues for OSS Sprint participants
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add AppendingPublisher
5 participants