Skip to content

Commit

Permalink
Synchronize message sending
Browse files Browse the repository at this point in the history
Issue: SPR-12516
(backport for commit b796c1)
  • Loading branch information
rstoyanchev committed Dec 8, 2014
1 parent ac5c361 commit 9cb1569
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 51 deletions.
Expand Up @@ -70,7 +70,7 @@ public abstract class AbstractHttpSockJsSession extends AbstractSockJsSession {

private final Object responseLock = new Object();

private volatile boolean requestInitialized;
private volatile boolean readyToSend;


private final Queue<String> messageCache;
Expand Down Expand Up @@ -195,18 +195,24 @@ public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse r
this.localAddress = request.getLocalAddress();
this.remoteAddress = request.getRemoteAddress();

this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);

synchronized (this.responseLock) {
try {
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
this.asyncRequestControl.start(-1);

// Let "our" handler know before sending the open frame to the remote handler
delegateConnectionEstablished();
writePrelude(request, response);
writeFrame(SockJsFrame.openFrame());
if (isStreaming() && !isClosed()) {
startAsyncRequest();

if (isStreaming()) {
writePrelude(request, response);
writeFrame(SockJsFrame.openFrame());
flushCache();
this.readyToSend = true;
}
else {
writeFrame(SockJsFrame.openFrame());
}
}
catch (Throwable ex) {
Expand All @@ -219,17 +225,6 @@ public void handleInitialRequest(ServerHttpRequest request, ServerHttpResponse r
protected void writePrelude(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
}

private void startAsyncRequest() {
this.asyncRequestControl.start(-1);
if (this.messageCache.size() > 0) {
flushCache();
}
else {
scheduleHeartbeat();
}
this.requestInitialized = true;
}

/**
* Handle all requests, except the first one, to receive messages on a SockJS
* HTTP transport based session.
Expand All @@ -249,12 +244,27 @@ public void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpRespons
try {
if (isClosed()) {
response.getBody().write(SockJsFrame.closeFrameGoAway().getContentBytes());
return;
}
this.response = response;
this.frameFormat = frameFormat;
this.asyncRequestControl = request.getAsyncRequestControl(response);
writePrelude(request, response);
startAsyncRequest();
this.asyncRequestControl.start(-1);

if (isStreaming()) {
writePrelude(request, response);
flushCache();
this.readyToSend = true;
}
else {
if (this.messageCache.isEmpty()) {
scheduleHeartbeat();
this.readyToSend = true;
}
else {
flushCache();
}
}
}
catch (Throwable ex) {
tryCloseWithSockJsTransportError(ex, CloseStatus.SERVER_ERROR);
Expand All @@ -266,32 +276,24 @@ public void handleSuccessiveRequest(ServerHttpRequest request, ServerHttpRespons

@Override
protected final void sendMessageInternal(String message) throws SockJsTransportFailureException {
this.messageCache.add(message);
tryFlushCache();
}

private boolean tryFlushCache() throws SockJsTransportFailureException {
synchronized (this.responseLock) {
if (this.messageCache.isEmpty()) {
logger.trace("Nothing to flush in session=" + this.getId());
return false;
}
this.messageCache.add(message);
if (logger.isTraceEnabled()) {
logger.trace(this.messageCache.size() + " message(s) to flush in session " + this.getId());
}
if (isActive() && this.requestInitialized) {
if (isActive() && this.readyToSend) {
if (logger.isTraceEnabled()) {
logger.trace("Session is active, ready to flush.");
}
cancelHeartbeat();
flushCache();
return true;
return;
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Session is not active, not ready to flush.");
}
return false;
return;
}
}
}
Expand All @@ -312,7 +314,7 @@ protected void resetRequest() {

ServerHttpAsyncRequestControl control = this.asyncRequestControl;
this.asyncRequestControl = null;
this.requestInitialized = false;
this.readyToSend = false;
this.response = null;

updateLastActiveTime();
Expand Down
Expand Up @@ -18,13 +18,9 @@

import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.SockJsException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
import org.springframework.web.socket.sockjs.transport.SockJsServiceConfig;

Expand Down Expand Up @@ -53,7 +49,7 @@ protected boolean isStreaming() {

@Override
protected void flushCache() throws SockJsTransportFailureException {
do {
while (!getMessageCache().isEmpty()) {
String message = getMessageCache().poll();
SockJsMessageCodec messageCodec = getSockJsServiceConfig().getMessageCodec();
SockJsFrame frame = SockJsFrame.messageFrame(messageCodec, message);
Expand All @@ -73,7 +69,6 @@ protected void flushCache() throws SockJsTransportFailureException {
break;
}
}
while (!getMessageCache().isEmpty());
scheduleHeartbeat();
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,25 +16,27 @@

package org.springframework.web.socket.sockjs.transport.handler;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.sql.Date;

import org.junit.Before;
import org.junit.Test;

import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.socket.AbstractHttpRequestTests;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;
import org.springframework.web.socket.sockjs.frame.SockJsFrameFormat;
import org.springframework.web.socket.sockjs.transport.session.AbstractSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.PollingSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.StreamingSockJsSession;
import org.springframework.web.socket.sockjs.transport.session.StubSockJsServiceConfig;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

/**
* Test fixture for {@link AbstractHttpSendingTransportHandler} and sub-classes.
*
Expand Down Expand Up @@ -75,7 +77,7 @@ public void handleRequestXhr() throws Exception {
assertFalse("Polling request should complete after open frame", this.servletRequest.isAsyncStarted());
verify(this.webSocketHandler).afterConnectionEstablished(session);

resetResponse();
resetRequestAndResponse();
transportHandler.handleRequest(this.request, this.response, this.webSocketHandler, session);

assertTrue("Polling request should remain open", this.servletRequest.isAsyncStarted());
Expand Down
Expand Up @@ -82,7 +82,7 @@ public void handleInitialRequest() throws Exception {
this.session.handleInitialRequest(this.request, this.response, this.frameFormat);

assertEquals("hhh\no", this.servletResponse.getContentAsString());
assertFalse(this.servletRequest.isAsyncStarted());
assertTrue(this.servletRequest.isAsyncStarted());

verify(this.webSocketHandler).afterConnectionEstablished(this.session);
}
Expand Down Expand Up @@ -119,7 +119,7 @@ public TestAbstractHttpSockJsSession(SockJsServiceConfig config, WebSocketHandle

@Override
protected boolean isStreaming() {
return false;
return true;
}

@Override
Expand Down

0 comments on commit 9cb1569

Please sign in to comment.