Skip to content

Commit 4751dbc

Browse files
authored
fix: use ribbon LB (#2147)
* use ribbon LB Signed-off-by: achmelo <a.chmelo@gmail.com> * retry Signed-off-by: achmelo <a.chmelo@gmail.com> * catch after retry Signed-off-by: achmelo <a.chmelo@gmail.com> * use retryable Signed-off-by: achmelo <a.chmelo@gmail.com> * test WS request context Signed-off-by: achmelo <a.chmelo@gmail.com>
1 parent 0205470 commit 4751dbc

File tree

5 files changed

+67
-43
lines changed

5 files changed

+67
-43
lines changed

gateway-service/src/main/java/org/zowe/apiml/gateway/ribbon/loadbalancer/predicate/RequestHeaderPredicate.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ public class RequestHeaderPredicate extends RequestAwarePredicate {
2424

2525
@Override
2626
public boolean apply(LoadBalancingContext context, DiscoveryEnabledServer server) {
27-
String targetServer = context.getRequestContext().getRequest().getHeader(REQUEST_HEADER_NAME);
28-
if (StringUtils.isEmpty(targetServer)) {
29-
return true;
27+
if (context.getRequestContext().getRequest() != null) {
28+
String targetServer = context.getRequestContext().getRequest().getHeader(REQUEST_HEADER_NAME);
29+
if (StringUtils.isEmpty(targetServer)) {
30+
return true;
31+
}
32+
return server.getInstanceInfo().getInstanceId().equalsIgnoreCase(targetServer);
3033
}
31-
return server.getInstanceInfo().getInstanceId().equalsIgnoreCase(targetServer);
34+
return true;
3235
}
3336

3437
@Override

gateway-service/src/main/java/org/zowe/apiml/gateway/routing/ApimlRoutingConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public DiscoveryClientRouteLocator discoveryClientRouteLocator(DiscoveryClient d
4949
@ConditionalOnProperty(name = "apiml.routing.mode", havingValue = "new")
5050
@Autowired
5151
public DiscoveryClientRouteLocator apimlClientRouteLocator(DiscoveryClient discoveryClient,
52-
ZuulProperties zuulProperties,
53-
RoutedServicesNotifier routedServicesNotifier) {
52+
ZuulProperties zuulProperties,
53+
RoutedServicesNotifier routedServicesNotifier) {
5454
return new NewApimlRouteLocator("", zuulProperties, discoveryClient, routedServicesNotifier);
5555
}
5656

gateway-service/src/main/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandler.java

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import org.springframework.beans.factory.annotation.Autowired;
1414
import org.springframework.beans.factory.annotation.Value;
1515
import org.springframework.cloud.client.ServiceInstance;
16-
import org.springframework.cloud.client.discovery.DiscoveryClient;
16+
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
17+
import org.springframework.context.ApplicationContext;
18+
import org.springframework.retry.annotation.Backoff;
19+
import org.springframework.retry.annotation.Retryable;
1720
import org.springframework.stereotype.Component;
1821
import org.springframework.web.socket.CloseStatus;
1922
import org.springframework.web.socket.SubProtocolCapable;
@@ -24,6 +27,7 @@
2427
import org.zowe.apiml.product.routing.RoutedServices;
2528
import org.zowe.apiml.product.routing.RoutedServicesUser;
2629

30+
import javax.annotation.PostConstruct;
2731
import javax.inject.Singleton;
2832
import java.io.IOException;
2933
import java.net.URI;
@@ -51,26 +55,34 @@ public List<String> getSubProtocols() {
5155

5256
private final Map<String, WebSocketRoutedSession> routedSessions;
5357
private final Map<String, RoutedServices> routedServicesMap = new ConcurrentHashMap<>();
54-
private final DiscoveryClient discovery;
5558
private final WebSocketRoutedSessionFactory webSocketRoutedSessionFactory;
5659
private final WebSocketClientFactory webSocketClientFactory;
5760
private static final String SEPARATOR = "/";
61+
private final LoadBalancerClient lbCLient;
62+
private ApplicationContext context;
63+
private WebSocketProxyServerHandler meAsProxy;
5864

5965
@Autowired
60-
public WebSocketProxyServerHandler(DiscoveryClient discovery, WebSocketClientFactory webSocketClientFactory) {
61-
this.discovery = discovery;
66+
public WebSocketProxyServerHandler(WebSocketClientFactory webSocketClientFactory, LoadBalancerClient lbCLient, ApplicationContext context) {
6267
this.webSocketClientFactory = webSocketClientFactory;
6368
this.routedSessions = new ConcurrentHashMap<>(); // Default
6469
this.webSocketRoutedSessionFactory = new WebSocketRoutedSessionFactoryImpl();
70+
this.lbCLient = lbCLient;
71+
this.context = context;
6572
log.debug("Creating WebSocketProxyServerHandler {} ", this);
6673
}
6774

68-
public WebSocketProxyServerHandler(DiscoveryClient discovery, WebSocketClientFactory webSocketClientFactory,
69-
Map<String, WebSocketRoutedSession> routedSessions, WebSocketRoutedSessionFactory webSocketRoutedSessionFactory) {
70-
this.discovery = discovery;
75+
@PostConstruct
76+
private void initBean() {
77+
meAsProxy = context.getBean(WebSocketProxyServerHandler.class);
78+
}
79+
80+
public WebSocketProxyServerHandler(WebSocketClientFactory webSocketClientFactory,
81+
Map<String, WebSocketRoutedSession> routedSessions, WebSocketRoutedSessionFactory webSocketRoutedSessionFactory, LoadBalancerClient lbCLient) {
7182
this.webSocketClientFactory = webSocketClientFactory;
7283
this.routedSessions = routedSessions;
7384
this.webSocketRoutedSessionFactory = webSocketRoutedSessionFactory;
85+
this.lbCLient = lbCLient;
7486
log.debug("Creating WebSocketProxyServerHandler {}", this);
7587
}
7688

@@ -127,7 +139,17 @@ private void routeToService(WebSocketSession webSocketSession, String serviceId,
127139
return;
128140
}
129141

130-
ServiceInstance serviceInstance = findServiceInstance(serviceId);
142+
try {
143+
meAsProxy.openConn(serviceId, service, webSocketSession, path);
144+
} catch (WebSocketProxyError e) {
145+
log.debug("Error opening WebSocket connection to: {}, {}", service.getServiceUrl(), e.getMessage());
146+
webSocketSession.close(CloseStatus.NOT_ACCEPTABLE.withReason(e.getMessage()));
147+
}
148+
}
149+
150+
@Retryable(value = WebSocketProxyError.class, backoff = @Backoff(value = 1000))
151+
void openConn(String serviceId, RoutedService service, WebSocketSession webSocketSession, String path) throws IOException {
152+
ServiceInstance serviceInstance = this.lbCLient.choose(serviceId);
131153
if (serviceInstance != null) {
132154
openWebSocketConnection(service, serviceInstance, serviceInstance, path, webSocketSession);
133155
} else {
@@ -152,29 +174,16 @@ private String[] getUriParts(WebSocketSession webSocketSession) {
152174
}
153175

154176
private void openWebSocketConnection(RoutedService service, ServiceInstance serviceInstance, Object uri,
155-
String path, WebSocketSession webSocketSession) throws IOException {
177+
String path, WebSocketSession webSocketSession) {
156178
String serviceUrl = service.getServiceUrl();
157179
String targetUrl = getTargetUrl(serviceUrl, serviceInstance, path);
158180

159181
log.debug(String.format("Opening routed WebSocket session from %s to %s with %s by %s", uri.toString(), targetUrl, webSocketClientFactory, this));
160-
try {
161-
WebSocketRoutedSession session = webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory);
162-
routedSessions.put(webSocketSession.getId(), session);
163182

164-
} catch (WebSocketProxyError e) {
165-
log.debug("Error opening WebSocket connection to {}: {}", targetUrl, e.getMessage());
166-
webSocketSession.close(CloseStatus.NOT_ACCEPTABLE.withReason(e.getMessage()));
167-
}
168-
}
183+
WebSocketRoutedSession session = webSocketRoutedSessionFactory.session(webSocketSession, targetUrl, webSocketClientFactory);
184+
routedSessions.put(webSocketSession.getId(), session);
185+
169186

170-
private ServiceInstance findServiceInstance(String serviceId) {
171-
List<ServiceInstance> serviceInstances = this.discovery.getInstances(serviceId);
172-
if (!serviceInstances.isEmpty()) {
173-
// TODO: Is this implementation apropriate?
174-
return serviceInstances.get(0);
175-
} else {
176-
return null;
177-
}
178187
}
179188

180189
@Override

gateway-service/src/test/java/org/zowe/apiml/gateway/ribbon/loadbalancer/predicate/RequestHeaderPredicateTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
import com.netflix.appinfo.InstanceInfo;
1313
import com.netflix.niws.loadbalancer.DiscoveryEnabledServer;
1414
import com.netflix.zuul.context.RequestContext;
15-
import org.junit.jupiter.api.*;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Nested;
17+
import org.junit.jupiter.api.Test;
1618
import org.junit.jupiter.params.ParameterizedTest;
1719
import org.junit.jupiter.params.provider.CsvSource;
1820
import org.springframework.mock.web.MockHttpServletRequest;
@@ -48,7 +50,6 @@ void doesNotFilter() {
4850
}
4951

5052

51-
5253
@Nested
5354
class WhitHeader {
5455

@@ -79,4 +80,15 @@ void filtersOnInstanceId(String headerName) {
7980

8081
}
8182

83+
@Nested
84+
class NotZuulRequest {
85+
86+
@Test
87+
void alwaysValid() {
88+
RequestHeaderPredicate predicate = new RequestHeaderPredicate();
89+
assertTrue(predicate.apply(new LoadBalancingContext("key", null), null));
90+
91+
}
92+
}
93+
8294
}

gateway-service/src/test/java/org/zowe/apiml/gateway/ws/WebSocketProxyServerHandlerTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616
import org.junit.jupiter.params.ParameterizedTest;
1717
import org.junit.jupiter.params.provider.ValueSource;
1818
import org.springframework.cloud.client.ServiceInstance;
19-
import org.springframework.cloud.client.discovery.DiscoveryClient;
19+
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
20+
import org.springframework.test.util.ReflectionTestUtils;
2021
import org.springframework.web.socket.CloseStatus;
2122
import org.springframework.web.socket.WebSocketMessage;
2223
import org.springframework.web.socket.WebSocketSession;
2324
import org.zowe.apiml.product.routing.RoutedService;
2425
import org.zowe.apiml.product.routing.RoutedServices;
2526

2627
import java.net.URI;
27-
import java.util.Collections;
2828
import java.util.HashMap;
2929
import java.util.Map;
3030

@@ -37,26 +37,26 @@
3737

3838
class WebSocketProxyServerHandlerTest {
3939
private WebSocketProxyServerHandler underTest;
40-
private DiscoveryClient discoveryClient;
4140
private WebSocketRoutedSessionFactory webSocketRoutedSessionFactory;
4241
private Map<String, WebSocketRoutedSession> routedSessions;
42+
private LoadBalancerClient lbClient;
4343

4444
@BeforeEach
4545
public void setup() {
46-
discoveryClient = mock(DiscoveryClient.class);
4746
routedSessions = new HashMap<>();
4847
webSocketRoutedSessionFactory = mock(WebSocketRoutedSessionFactory.class);
48+
lbClient = mock(LoadBalancerClient.class);
4949

5050
underTest = new WebSocketProxyServerHandler(
51-
discoveryClient,
5251
mock(WebSocketClientFactory.class),
5352
routedSessions,
54-
webSocketRoutedSessionFactory
53+
webSocketRoutedSessionFactory,
54+
lbClient
5555
);
56+
ReflectionTestUtils.setField(underTest, "meAsProxy", underTest);
5657
}
5758

5859

59-
6060
private ServiceInstance validServiceInstance() {
6161
ServiceInstance validService = mock(ServiceInstance.class);
6262
when(validService.getHost()).thenReturn("gatewayHost");
@@ -85,7 +85,6 @@ void prepareRoutedService() {
8585
when(routesForSpecificValidService.findServiceByGatewayUrl("ws/v1"))
8686
.thenReturn(new RoutedService("ws-v1", "ws/v1", "/valid-service/ws/v1"));
8787
ServiceInstance foundService = validServiceInstance();
88-
when(discoveryClient.getInstances(serviceId)).thenReturn(Collections.singletonList(foundService));
8988

9089
underTest.addRoutedServices(serviceId, routesForSpecificValidService);
9190
}
@@ -104,7 +103,8 @@ void prepareRoutedService() {
104103
@ValueSource(strings = {"wss://gatewayHost:1443/valid-service/ws/v1/valid-path", "wss://gatewayHost:1443/ws/v1/valid-service/valid-path"})
105104
void givenValidRoute(String path) throws Exception {
106105
when(webSocketRoutedSessionFactory.session(any(), any(), any())).thenReturn(mock(WebSocketRoutedSession.class));
107-
106+
ServiceInstance serviceInstance = mock(ServiceInstance.class);
107+
when(lbClient.choose(any())).thenReturn(serviceInstance);
108108
String establishedSessionId = "validAndUniqueId";
109109
when(establishedSession.getId()).thenReturn(establishedSessionId);
110110
when(establishedSession.getUri()).thenReturn(new URI(path));
@@ -173,7 +173,7 @@ void givenInvalidRoute() throws Exception {
173173
@Test
174174
void givenNoInstanceOfTheServiceIsInTheRepository() throws Exception {
175175
when(establishedSession.getUri()).thenReturn(new URI("wss://gatewayHost:1443/service-without-instance/ws/v1/valid-path"));
176-
176+
when(lbClient.choose(any())).thenReturn(null);
177177
RoutedServices routesForSpecificValidService = mock(RoutedServices.class);
178178
when(routesForSpecificValidService.findServiceByGatewayUrl("ws/v1"))
179179
.thenReturn(new RoutedService("api-v1", "api/v1", "/api-v1/api/v1"));

0 commit comments

Comments
 (0)