Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/main/java/in/erail/model/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package in.erail.model;

/**
*
* @author vinay
*/
public class Event {

private RequestEvent request;
private ResponseEvent response;

public Event() {
this(new RequestEvent(), new ResponseEvent());
}

public Event(RequestEvent pRequest, ResponseEvent pResponse) {
this.request = pRequest;
this.response = pResponse;
}

public RequestEvent getRequest() {
return request;
}

public Event setRequest(RequestEvent pRequest) {
this.request = pRequest;
return this;
}

public ResponseEvent getResponse() {
return response;
}

public Event setResponse(ResponseEvent pResponse) {
this.response = pResponse;
return this;
}

}
16 changes: 14 additions & 2 deletions src/main/java/in/erail/service/RESTService.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package in.erail.service;

import in.erail.model.Event;
import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;

/**
*
Expand All @@ -15,7 +15,19 @@ public interface RESTService {

String getServiceUniqueId();

Maybe<ResponseEvent> handleEvent(RequestEvent pRequest);
default Class<? extends RequestEvent> getRequestEventClass() {
return RequestEvent.class;
}

default Class<? extends ResponseEvent> getResponseEventClass() {
return ResponseEvent.class;
}

default Event createEvent(RequestEvent pRequest) throws InstantiationException, IllegalAccessException {
return new Event(pRequest, getResponseEventClass().newInstance());
}

Maybe<Event> handleEvent(Event pEvent);

String getAuthority();

Expand Down
87 changes: 44 additions & 43 deletions src/main/java/in/erail/service/RESTServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.reactivex.core.Vertx;
import org.apache.logging.log4j.Logger;
import in.erail.glue.annotation.StartService;
import in.erail.model.Event;
import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -22,25 +23,27 @@
*
* @author vinay
*/
public abstract class RESTServiceImpl implements RESTService, MaybeTransformer<RequestEvent, ResponseEvent> {

private static final ResponseEvent DEFAULT_REPONSE_EVENT = new ResponseEvent();
public abstract class RESTServiceImpl implements RESTService, MaybeTransformer<Event, Event> {

private String mOperationId;
private String mServiceUniqueId;
private Vertx mVertx;
private boolean mEnable = false;
private Logger mLog;
private Scheduler mScheduler = Schedulers.io();
private ResponseEvent mDefaultResponseEvent = DEFAULT_REPONSE_EVENT;
private Event mDefaultEvent;
private boolean mSecure = false;
private String mAuthority;
private Class<? extends RequestEvent> mRequestEventClass = RequestEvent.class;
private MaybeTransformer<RequestEvent, RequestEvent> mPreProcessProcessors[];
private MaybeTransformer<ResponseEvent, ResponseEvent> mPostProcessProcessors[];
private Class<? extends ResponseEvent> mResponseEventClass = ResponseEvent.class;
private MaybeTransformer<Event, Event> mPreProcessProcessors[];
private MaybeTransformer<Event, Event> mPostProcessProcessors[];

@StartService
public void start() {
public void start() throws InstantiationException, IllegalAccessException {

mDefaultEvent = new Event(getRequestEventClass().newInstance(), getResponseEventClass().newInstance());

if (mEnable) {
getVertx()
.eventBus()
Expand All @@ -60,12 +63,13 @@ public Single<JsonObject> handleRequest(Message<JsonObject> pMessage) {
return Single
.just(pMessage)
.map(m -> pMessage.body().mapTo(getRequestEventClass()))
.map(this::createEvent)
.flatMapMaybe(this::handleEvent)
.toSingle(getDefaultResponseEvent())
.map(resp -> JsonObject.mapFrom(resp))
.toSingle(getDefaultEvent())
.map(resp -> JsonObject.mapFrom(resp.getResponse()))
.doOnSuccess(resp -> pMessage.reply(resp))
.doOnError(err -> {
ResponseEvent resp = new ResponseEvent()
ResponseEvent resp = getResponseEventClass().newInstance()
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
.setMediaType(MediaType.PLAIN_TEXT_UTF_8)
.setBody(ExceptionUtils.getMessage(err).getBytes());
Expand All @@ -76,43 +80,30 @@ public Single<JsonObject> handleRequest(Message<JsonObject> pMessage) {
}

@Override
public Maybe<ResponseEvent> handleEvent(RequestEvent pRequest) {

public Maybe<Event> handleEvent(Event pEvent) {
return Maybe
.just(pRequest)
.compose(this::preProcess)
.just(pEvent)
.compose(composePipeline(getPreProcessProcessors()))
.compose(this)
.compose(this::postProcess);
.compose(composePipeline(getPostProcessProcessors()));
}

public MaybeSource<RequestEvent> preProcess(Maybe<RequestEvent> pRequest) {
protected MaybeTransformer<Event, Event> composePipeline(MaybeTransformer<Event, Event>[] pProcessors) {

if (getPreProcessProcessors() == null || getPreProcessProcessors().length == 0) {
return pRequest;
if (pProcessors == null || pProcessors.length == 0) {
return (Maybe<Event> pEvent) -> pEvent;
}

return Arrays
.stream(getPreProcessProcessors())
.reduce(pRequest, (acc, p) -> acc.compose(p), (a, b) -> a);
return (Maybe<Event> pEvent) -> Arrays
.stream(pProcessors)
.reduce(pEvent, (acc, p) -> acc.compose(p), (a, b) -> a);
}

@Override
public MaybeSource<ResponseEvent> apply(Maybe<RequestEvent> pRequest) {
public final MaybeSource<Event> apply(Maybe<Event> pRequest) {
return process(pRequest);
}

public abstract MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest);

public Maybe<ResponseEvent> postProcess(Maybe<ResponseEvent> pResponse) {

if (getPostProcessProcessors() == null || getPostProcessProcessors().length == 0) {
return pResponse;
}

return Arrays
.stream(getPostProcessProcessors())
.reduce(pResponse, (acc, p) -> acc.compose(p), (a, b) -> a);
}
public abstract MaybeSource<Event> process(Maybe<Event> pEvent);

@Override
public String getOperationId() {
Expand Down Expand Up @@ -164,12 +155,12 @@ public void setScheduler(Scheduler pScheduler) {
this.mScheduler = pScheduler;
}

public ResponseEvent getDefaultResponseEvent() {
return mDefaultResponseEvent;
public Event getDefaultEvent() {
return mDefaultEvent;
}

public void setDefaultResponseEvent(ResponseEvent pDefaultResponseEvent) {
this.mDefaultResponseEvent = pDefaultResponseEvent;
public void setDefaultEvent(Event pDefaultEvent) {
this.mDefaultEvent = pDefaultEvent;
}

@Override
Expand All @@ -190,6 +181,7 @@ public void setAuthority(String pAuthority) {
this.mAuthority = pAuthority;
}

@Override
public Class<? extends RequestEvent> getRequestEventClass() {
return mRequestEventClass;
}
Expand All @@ -198,19 +190,28 @@ public void setRequestEventClass(Class<? extends RequestEvent> pRequestEventClas
this.mRequestEventClass = pRequestEventClass;
}

public MaybeTransformer<RequestEvent, RequestEvent>[] getPreProcessProcessors() {
@Override
public Class<? extends ResponseEvent> getResponseEventClass() {
return mResponseEventClass;
}

public void setResponseEventClass(Class<? extends ResponseEvent> pResponseEventClass) {
this.mResponseEventClass = pResponseEventClass;
}

public MaybeTransformer<Event, Event>[] getPreProcessProcessors() {
return mPreProcessProcessors;
}

public void setPreProcessProcessors(MaybeTransformer<RequestEvent, RequestEvent>[] pPreProcessProcessors) {
public void setPreProcessProcessors(MaybeTransformer<Event, Event>[] pPreProcessProcessors) {
this.mPreProcessProcessors = pPreProcessProcessors;
}

public MaybeTransformer<ResponseEvent, ResponseEvent>[] getPostProcessProcessors() {
public MaybeTransformer<Event, Event>[] getPostProcessProcessors() {
return mPostProcessProcessors;
}

public void setPostProcessProcessors(MaybeTransformer<ResponseEvent, ResponseEvent>[] pPostProcessProcessors) {
public void setPostProcessProcessors(MaybeTransformer<Event, Event>[] pPostProcessProcessors) {
this.mPostProcessProcessors = pPostProcessProcessors;
}

Expand Down
15 changes: 8 additions & 7 deletions src/test/java/in/erail/service/BinaryBodyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Strings;
import com.google.common.net.MediaType;
import in.erail.model.Event;

import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
Expand All @@ -19,23 +20,23 @@
public class BinaryBodyService extends RESTServiceImpl {

@Override
public MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest) {
return pRequest.map(this::handle);
public MaybeSource<Event> process(Maybe<Event> pRequest) {
return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
}

protected ResponseEvent handle(RequestEvent pRequest) {
protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME);

if (Strings.isNullOrEmpty(topicName)) {
return new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
return;
}

ResponseEvent response = new ResponseEvent();
response.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8);

JsonObject jsonBody = new JsonObject(Buffer.buffer(pRequest.getBody()));

String bodyContent = jsonBody.getString("data");
return response.setBody(bodyContent.getBytes());
pRespone.setBody(bodyContent.getBytes());
}
}
17 changes: 8 additions & 9 deletions src/test/java/in/erail/service/BroadcastService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Strings;
import com.google.common.net.MediaType;
import in.erail.model.Event;

import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
Expand All @@ -18,15 +19,16 @@
public class BroadcastService extends RESTServiceImpl {

@Override
public MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest) {
return pRequest.map(this::handle);
public MaybeSource<Event> process(Maybe<Event> pRequest) {
return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
}

protected ResponseEvent handle(RequestEvent pRequest) {
protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME);

if (Strings.isNullOrEmpty(topicName)) {
return new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
return;
}

JsonObject bodyJson = new JsonObject(pRequest.bodyAsString());
Expand All @@ -37,10 +39,7 @@ protected ResponseEvent handle(RequestEvent pRequest) {

getLog().debug(() -> String.format("Message[%s] published on [%s]", bodyJson.toString(), topicName));

ResponseEvent response = new ResponseEvent();
response.setBody(TestConstants.Service.Message.successMessage().toString().getBytes());
response.setMediaType(MediaType.JSON_UTF_8);

return response;
pRespone.setBody(TestConstants.Service.Message.successMessage().toString().getBytes());
pRespone.setMediaType(MediaType.JSON_UTF_8);
}
}
14 changes: 7 additions & 7 deletions src/test/java/in/erail/service/ProcessorCheckService.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package in.erail.service;

import com.google.common.net.MediaType;
import in.erail.model.Event;

import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
Expand All @@ -13,14 +14,13 @@
*/
public class ProcessorCheckService extends RESTServiceImpl {

@Override
public MaybeSource<ResponseEvent> process(Maybe<RequestEvent> pRequest) {
return pRequest.map(this::handle);
@Override
public MaybeSource<Event> process(Maybe<Event> pRequest) {
return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
}

protected ResponseEvent handle(RequestEvent pRequest) {
ResponseEvent response = new ResponseEvent();
response.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
return response.setBody(pRequest.getSubject().toString().getBytes());
protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
pRespone.setBody(pRequest.getSubject().toString().getBytes());
}
}
18 changes: 10 additions & 8 deletions src/test/java/in/erail/service/processor/AddHeaderProcessor.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package in.erail.service.processor;

import in.erail.model.ResponseEvent;
import java.util.Optional;

import in.erail.model.Event;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;
import java.util.Optional;

/**
*
* @author vinay
*/
public class AddHeaderProcessor implements MaybeTransformer<ResponseEvent, ResponseEvent> {
public class AddHeaderProcessor implements MaybeTransformer<Event, Event> {

private String mMessage;

@Override
public MaybeSource<ResponseEvent> apply(Maybe<ResponseEvent> pResponse) {
return pResponse.map(r -> {
String msg = Optional.ofNullable(r.headerValue("ProcessorHeader")).orElse("") + getMessage();
r.removeHeader("ProcessorHeader");
return r.addHeader("ProcessorHeader", msg);
public MaybeSource<Event> apply(Maybe<Event> pEvent) {
return pEvent.map(r -> {
String msg = Optional.ofNullable(r.getResponse().headerValue("ProcessorHeader")).orElse("") + getMessage();
r.getResponse().removeHeader("ProcessorHeader");
r.getResponse().addHeader("ProcessorHeader", msg);
return r;
});
}

Expand Down
Loading