Skip to content

Commit

Permalink
server: first pass at synchronizing tests with push notifications
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre-Alexandre Meyer <pierre@mouraf.org>
  • Loading branch information
pierre committed May 1, 2018
1 parent b4605c7 commit f26bccc
Show file tree
Hide file tree
Showing 21 changed files with 516 additions and 213 deletions.
@@ -1,7 +1,9 @@
/*
* Copyright 2010-2013 Ning, Inc.
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
*
* Ning licenses this file to you under the Apache License, version 2.0
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
Expand Down Expand Up @@ -78,4 +80,52 @@ public UUID getObjectId() {
public String getMetaData() {
return metaData;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("NotificationJson{");
sb.append("eventType='").append(eventType).append('\'');
sb.append(", accountId=").append(accountId);
sb.append(", objectType='").append(objectType).append('\'');
sb.append(", objectId=").append(objectId);
sb.append(", metaData='").append(metaData).append('\'');
sb.append('}');
return sb.toString();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final NotificationJson that = (NotificationJson) o;

if (eventType != null ? !eventType.equals(that.eventType) : that.eventType != null) {
return false;
}
if (accountId != null ? !accountId.equals(that.accountId) : that.accountId != null) {
return false;
}
if (objectType != null ? !objectType.equals(that.objectType) : that.objectType != null) {
return false;
}
if (objectId != null ? !objectId.equals(that.objectId) : that.objectId != null) {
return false;
}
return metaData != null ? metaData.equals(that.metaData) : that.metaData == null;
}

@Override
public int hashCode() {
int result = eventType != null ? eventType.hashCode() : 0;
result = 31 * result + (accountId != null ? accountId.hashCode() : 0);
result = 31 * result + (objectType != null ? objectType.hashCode() : 0);
result = 31 * result + (objectId != null ? objectId.hashCode() : 0);
result = 31 * result + (metaData != null ? metaData.hashCode() : 0);
return result;
}
}
@@ -0,0 +1,56 @@
/*
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.killbill.billing.jaxrs;

import javax.servlet.Servlet;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;

public class CallbackServer {

private static final int SERVER_PORT = 8087;
private static final String CALLBACK_ENDPOINT = "/callmeback";

private final Server server;
private final String callbackEndpoint;
private final Servlet servlet;

public CallbackServer(final Servlet servlet) {
this.callbackEndpoint = CALLBACK_ENDPOINT;
this.servlet = servlet;
this.server = new Server(SERVER_PORT);
}

public void startServer() throws Exception {
final ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
server.setHandler(context);
context.addServlet(new ServletHolder(servlet), callbackEndpoint);
server.start();
}

public void stopServer() throws Exception {
server.stop();
}

public static String getServletEndpoint() {
return "http://127.0.0.1:" + SERVER_PORT + CALLBACK_ENDPOINT;
}
}
@@ -0,0 +1,159 @@
/*
* Copyright 2014-2018 Groupon, Inc
* Copyright 2014-2018 The Billing Project, LLC
*
* The Billing Project licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.killbill.billing.jaxrs;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.Iterator;
import java.util.Stack;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.killbill.billing.jaxrs.json.NotificationJson;
import org.killbill.billing.notification.plugin.api.ExtBusEventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.io.CharStreams;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class CallbackServlet extends HttpServlet {

private static final Logger log = LoggerFactory.getLogger(CallbackServlet.class);

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Joiner SPACE_JOINER = Joiner.on(" ");
private static final long DELAY = 60000;

// Cross tenants (for now)
private final Collection<ExtBusEventType> nextExpectedEvent = new Stack<ExtBusEventType>();

private boolean isListenerFailed = false;
private String listenerFailedMsg;
private boolean completed = true;

@Override
protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
final String body = CharStreams.toString(new InputStreamReader(request.getInputStream(), "UTF-8"));
response.setStatus(HttpServletResponse.SC_OK);

final NotificationJson notification = objectMapper.readValue(body, NotificationJson.class);
log.info("Got notification: {}", notification);
assertEqualsNicely(notification.getEventType() == null ? null : ExtBusEventType.valueOf(notification.getEventType()));
notifyIfStackEmpty();
}

public void assertListenerStatus() {
// Bail early
if (isListenerFailed) {
log.error(listenerFailedMsg);
Assert.fail(listenerFailedMsg);
}

try {
assertTrue(isCompleted(DELAY));
} catch (final Exception e) {
fail("assertListenerStatus didn't complete", e);
}

if (isListenerFailed) {
log.error(listenerFailedMsg);
Assert.fail(listenerFailedMsg);
}
}

public synchronized void reset() {
nextExpectedEvent.clear();
completed = true;

isListenerFailed = false;
listenerFailedMsg = null;
}

public void pushExpectedEvents(final ExtBusEventType... events) {
for (final ExtBusEventType event : events) {
pushExpectedEvent(event);
}
}

public synchronized void pushExpectedEvent(final ExtBusEventType next) {
nextExpectedEvent.add(next);
log.info("Stacking expected event {}, got [{}]", next, SPACE_JOINER.join(nextExpectedEvent));
completed = false;
}

private synchronized boolean isCompleted(final long timeout) {
long waitTimeMs = timeout;
do {
try {
final long before = System.currentTimeMillis();
wait(100);
final long after = System.currentTimeMillis();
waitTimeMs -= (after - before);
} catch (final Exception ignore) {
return false;
}
} while (waitTimeMs > 0 && !completed);

if (!completed) {
log.error("CallbackServlet did not complete in " + timeout + " ms, remaining events are " + SPACE_JOINER.join(nextExpectedEvent));
}
return completed;
}

private synchronized void notifyIfStackEmpty() {
if (nextExpectedEvent.isEmpty()) {
log.debug("CallbackServlet EMPTY");
completed = true;
notify();
}
}

private synchronized void assertEqualsNicely(final ExtBusEventType received) {
boolean foundIt = false;
final Iterator<ExtBusEventType> it = nextExpectedEvent.iterator();
while (it.hasNext()) {
final ExtBusEventType ev = it.next();
if (ev == received) {
it.remove();
foundIt = true;
log.info("Found expected event: {}", received);
break;
}
}
if (!foundIt) {
final String errorMsg = "CallbackServlet: received unexpected event " + received + "; remaining expected events [" + SPACE_JOINER.join(nextExpectedEvent) + "]";
log.error(errorMsg);
failed(errorMsg);
}
}

private void failed(final String msg) {
this.isListenerFailed = true;
this.listenerFailedMsg = msg;
}
}

0 comments on commit f26bccc

Please sign in to comment.