Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming protocols #16

Merged
merged 2 commits into from
Nov 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,7 @@ public void cancel() {
});
break;
case ProtocolMessages.NACK:
// subscriber.onError(new MuonException("Stream does not exist"));
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
// sendRequest(n);
subscriber.onError(new MuonException("Stream does not exist"));
}

@Override
public void cancel() {
// sendCancel();
subscriber.onError(new MuonException("Stream does not exist"));
}
});
subscriber.onError(new MuonException("Stream does not exist"));
break;
case ProtocolMessages.DATA:
subscriber.onNext(codecs.decode(msg.getPayload(), msg.getContentType(), type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
public interface RequestResponseClientProtocolStack extends
TransportClientSource, CodecsSource, ServiceConfigurationSource {

default <X,R> MuonFuture<Response<R>> request(String uri, X payload, Class<R> responseType) throws URISyntaxException {
return request(new URI(uri), payload, responseType);
default <R> MuonFuture<Response<R>> request(String uri, Class<R> responseType) {
return request(uri, new Object(), responseType);
}

default <X,R> MuonFuture<Response<R>> request(String uri, X payload, Class<R> responseType) {
try {
return request(new URI(uri), payload, responseType);
} catch (URISyntaxException ex) {
throw new MuonException("URI is incorrect", ex);
}
}

default <X,R> MuonFuture<Response<R>> request(URI uri, X payload, Class<R> responseType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ class ReactiveStreamClientProtocolSpec extends Specification {
TransportMessage.ChannelOperation.NORMAL))

then:
1 * sub.onSubscribe(_)
1 * sub.onError(_ as MuonException)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public static void main(String[] args) throws URISyntaxException, InterruptedExc

Map data = new HashMap<>();

Response<List> ret = muon.request("request://awesomeService", data, List.class).get();
Response ret = muon.request("request://tckservice/discover", data, List.class).get();

System.out.println("Server responds " + ret.getPayload().get(0));
System.out.println("Server responds " + ret.getPayload());
muon.shutdown();
}

Expand Down
194 changes: 109 additions & 85 deletions muon-tck/src/main/java/com/simplicity/services/TCKService.java
Original file line number Diff line number Diff line change
@@ -1,94 +1,118 @@
package com.simplicity.services;

import io.muoncore.Discovery;
import io.muoncore.Muon;
import io.muoncore.ServiceDescriptor;
import io.muoncore.SingleTransportMuon;
import io.muoncore.codec.Codecs;
import io.muoncore.codec.json.JsonOnlyCodecs;
import io.muoncore.config.AutoConfiguration;
import io.muoncore.extension.amqp.*;
import io.muoncore.extension.amqp.discovery.AmqpDiscovery;
import io.muoncore.extension.amqp.discovery.ServiceCache;
import io.muoncore.extension.amqp.rabbitmq09.RabbitMq09ClientAmqpConnection;
import io.muoncore.extension.amqp.rabbitmq09.RabbitMq09QueueListenerFactory;
import io.muoncore.protocol.requestresponse.Response;
import io.muoncore.transport.MuonTransport;
import org.reactivestreams.Publisher;
import reactor.rx.Streams;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static io.muoncore.protocol.requestresponse.server.HandlerPredicates.path;

/**
* An implementation of the Muon HTTP TCK Resources to prove compatibility of the library
*/
public class TCKService {

// public static void main(String[] args) throws URISyntaxException, KeyManagementException, NoSuchAlgorithmException, IOException {
//
// final OldMuon muon = new MuonBuilder()
// .withServiceIdentifier("tck")
// .withTags("my-tag", "tck-service")
// .build();
//
// muon.start();
//
// outboundResourcesSetup(muon);
//
// inboundResourcesSetup(muon);
//
// streamPublisher(muon);
//
// }
//
// private static void outboundResourcesSetup(final OldMuon muon) {
//
// final Map storedata = new HashMap();
//
// muon.onQuery("/invokeresponse-store", Map.class, queryEvent -> MuonFutures.immediately(storedata));
//
// muon.onCommand("/invokeresponse", Map.class, queryEvent -> {
//
// String url = (String) queryEvent.getDecodedContent().get("resource");
//
// MuonFuture<MuonClient.MuonResult<Map>> rsult = muon.query(url, Map.class);
//
// return MuonFutures.fromPublisher(
// Streams.wrap(rsult.toPublisher()).map(mapMuonResult -> {
// Map data;
// data = mapMuonResult.getResponseEvent().getDecodedContent();
// storedata.clear();
// storedata.putAll(data);
//
// return data;
// }));
// }
// );
// }
//
// private static void streamPublisher(OldMuon muon) {
// Publisher<Long> pub = Streams.range(1, 10);
// muon.streamSource("/myStream", Long.class, pub);
// }
//
// private static void inboundResourcesSetup(final OldMuon muon) {
// muon.onQuery("/echo", Map.class, new MuonService.MuonQueryListener<Map>() {
// @Override
// public MuonFuture onQuery(MuonResourceEvent<Map> queryEvent) {
// Map obj = queryEvent.getDecodedContent();
//
// obj.put("method", "GET");
//
// return MuonFutures.immediately(obj);
// }
// });
//
// muon.onCommand("/echo", Map.class, new MuonService.MuonCommandListener<Map>() {
// @Override
// public MuonFuture<Map> onCommand(MuonResourceEvent<Map> queryEvent) {
// String method = queryEvent.getHeaders().get("METHOD");
//
// Map obj = queryEvent.getDecodedContent();
//
// obj.put("method", method);
//
// return MuonFutures.immediately(obj);
// }
// });
//
// muon.onQuery("/discover", Map.class, new MuonService.MuonQueryListener<Map>() {
// @Override
// public MuonFuture<List<String>> onQuery(MuonResourceEvent<Map> queryEvent) {
// List<String> ids = new ArrayList<String>();
//
// for (ServiceDescriptor desc: muon.discoverServices()) {
// ids.add(desc.getIdentifier());
// }
//
// return MuonFutures.immediately(ids);
// }
// });
// }
public static void main(String[] args) throws URISyntaxException, KeyManagementException, NoSuchAlgorithmException, IOException, InterruptedException {

String serviceName = "tckservice";

AmqpConnection connection = new RabbitMq09ClientAmqpConnection("amqp://muon:microservices@localhost");
QueueListenerFactory queueFactory = new RabbitMq09QueueListenerFactory(connection.getChannel());
ServiceQueue serviceQueue = new DefaultServiceQueue(serviceName, connection);
AmqpChannelFactory channelFactory = new DefaultAmqpChannelFactory(serviceName, queueFactory, connection);

MuonTransport svc1 = new AMQPMuonTransport(
"amqp://muon:microservices@localhost", serviceQueue, channelFactory);

AutoConfiguration config = new AutoConfiguration();
config.setServiceName(serviceName);
config.setAesEncryptionKey("abcde12345678906");

Muon muon = new SingleTransportMuon(config, createDiscovery(), svc1);

//allow discovery settle time.
Thread.sleep(5000);

outboundResourcesSetup(muon);

inboundResourcesSetup(muon);

streamPublisher(muon);

}

private static void outboundResourcesSetup(final Muon muon) {

final Map storedata = new HashMap();

muon.handleRequest(path("/invokeresponse-store"), Map.class, queryEvent -> queryEvent.ok(storedata) );

muon.handleRequest(path("/invokeresponse"), Map.class, queryEvent -> {

String url = (String) queryEvent.getRequest().getPayload().get("resource");

Response<Map> rsult = null;
try {
rsult = muon.request(url, Map.class).get();
storedata.clear();
storedata.putAll(rsult.getPayload());
queryEvent.ok(rsult.getPayload());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
);
}

private static void streamPublisher(Muon muon) {
Publisher<Long> pub = Streams.range(1, 10);
muon.publishSource("/myStream", pub);
}

private static void inboundResourcesSetup(final Muon muon) {
muon.handleRequest(path("/echo"), Map.class, queryEvent -> {
Map obj = queryEvent.getRequest().getPayload();

obj.put("method", "GET");

queryEvent.ok(obj);
});

muon.handleRequest(path("/discover"), Map.class, request ->
request.ok(
muon.getDiscovery().getKnownServices().stream().map(ServiceDescriptor::getIdentifier).collect(Collectors.toList())));
}

private static Discovery createDiscovery() throws URISyntaxException, KeyManagementException, NoSuchAlgorithmException, IOException {

AmqpConnection connection = new RabbitMq09ClientAmqpConnection("amqp://muon:microservices@localhost");
QueueListenerFactory queueFactory = new RabbitMq09QueueListenerFactory(connection.getChannel());
Codecs codecs = new JsonOnlyCodecs();

AmqpDiscovery discovery = new AmqpDiscovery(queueFactory, connection, new ServiceCache(), codecs);
discovery.start();
return discovery;
}

}

This file was deleted.

Loading