Skip to content

Commit

Permalink
much better approach for general async java handling #1681
Browse files Browse the repository at this point in the history
theres no need to overload karate with async, the scenarios can be wildly different
but the patterns will be similar - and thats the example in this commit
since you can pass params and even functions from a karate test
the possibilities are many - and waiting for multiple messages is demonstrated
especially how you can wait until a user-defined condition
  • Loading branch information
ptrthomas committed Jul 18, 2021
1 parent eb5a6fc commit 03a0c8f
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 63 deletions.
6 changes: 0 additions & 6 deletions karate-core/src/main/java/com/intuit/karate/EventContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,5 @@
public interface EventContext {

void signal(Object arg);

void signalAppend(Object arg);

Object signalCollect();

void signalClear();

}
Original file line number Diff line number Diff line change
Expand Up @@ -735,23 +735,6 @@ public void signal(Object o) {
getEngine().signal(JsValue.toJava(v));
}

@Override
public void signalAppend(Object o) {
Value v = Value.asValue(o);
getEngine().signalAppend(JsValue.toJava(v));
}

@Override
public Object signalCollect() {
Object result = getEngine().signalCollect();
return JsValue.fromJava(result);
}

@Override
public void signalClear() {
getEngine().signalClear();
}

public Object sizeOf(Value v) {
if (v.hasArrayElements()) {
return v.getArraySize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ public void mockAfterScenario() {
// websocket / async =======================================================
//
private List<WebSocketClient> webSocketClients;
private final List signalCollector = new ArrayList();
private CompletableFuture SIGNAL = new CompletableFuture();

public WebSocketClient webSocket(WebSocketOptions options) {
Expand All @@ -747,18 +746,6 @@ public synchronized void signal(Object result) {
SIGNAL.complete(result);
}

public synchronized void signalAppend(Object result) {
signalCollector.add(result);
}

public synchronized Object signalCollect() {
return signalCollector;
}

public synchronized void signalClear() {
signalCollector.clear();
}

public void listen(String exp) {
Variable v = evalKarateExpression(exp);
int timeout = v.getAsInt();
Expand Down
72 changes: 48 additions & 24 deletions karate-demo/src/test/java/mock/async/QueueConsumer.java
Original file line number Diff line number Diff line change
@@ -1,55 +1,79 @@
package mock.async;

import com.intuit.karate.EventContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class QueueConsumer {

private static final Logger logger = LoggerFactory.getLogger(QueueConsumer.class);

public static final String QUEUE_NAME = "MOCK.ASYNC";

private final Connection connection;
private final MessageConsumer consumer;
private final Session session;

// in more complex tests or for re-usability, this field and append() /
// collect() / clear() methods can be in a separate / static class
private final List messages = new ArrayList();

public synchronized void append(Object message) {
messages.add(message);
if (condition.test(message)) {
logger.debug("condition met, will signal completion");
future.complete(Boolean.TRUE);
} else {
logger.debug("condition not met, will continue waiting");
}
}

public synchronized List collect() {
return messages;
}

private CompletableFuture future = new CompletableFuture();
private Predicate condition = o -> true; // just a default

// note how you can pass data in from the test for very dynamic checks
public List waitUntilCount(int count) {
condition = o -> messages.size() == count;
try {
future.get(5000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error("wait timed out: {}", e + "");
}
return messages;
}

public QueueConsumer() {
this.connection = QueueUtils.getConnection();
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE_NAME);
consumer = session.createConsumer(destination);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void listen(EventContext context) {
setMessageListener(message -> {
TextMessage tm = (TextMessage) message;
try {
context.signalAppend(tm.getText());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

public void setMessageListener(MessageListener ml) {
try {
consumer.setMessageListener(ml);
consumer.setMessageListener(message -> {
TextMessage tm = (TextMessage) message;
try {
// this is where we "collect" messages for assertions later
append(tm.getText());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
}
10 changes: 7 additions & 3 deletions karate-demo/src/test/java/mock/async/main.feature
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ Feature:

Scenario:
* def QueueConsumer = Java.type('mock.async.QueueConsumer')
# this will start listening to messages and collecting them
* def queue = new QueueConsumer()
* queue.listen(karate)

* def port = karate.start('mock.feature').port
* url 'http://localhost:' + port
* path 'send';
* method get
* status 200

* java.lang.Thread.sleep(1000)
* def messages = karate.signalCollect()
# * java.lang.Thread.sleep(1000)
# * def messages = queue.collect()

# smarter wait instead of the above two lines
* def messages = queue.waitUntilCount(3)

* match messages == ['first', 'second', 'third']

0 comments on commit 03a0c8f

Please sign in to comment.