From df16e2762a82cba75cbf9ce9586f4f01a50bf115 Mon Sep 17 00:00:00 2001 From: Vinay Kumar Date: Tue, 15 Jan 2019 12:58:49 +0000 Subject: [PATCH] Added Event --- src/main/java/in/erail/model/Event.java | 39 +++++++++ .../java/in/erail/service/RESTService.java | 16 +++- .../in/erail/service/RESTServiceImpl.java | 87 ++++++++++--------- .../in/erail/service/BinaryBodyService.java | 15 ++-- .../in/erail/service/BroadcastService.java | 17 ++-- .../erail/service/ProcessorCheckService.java | 14 +-- .../service/processor/AddHeaderProcessor.java | 18 ++-- .../processor/LoadSubjectProcessor.java | 11 ++- 8 files changed, 137 insertions(+), 80 deletions(-) create mode 100644 src/main/java/in/erail/model/Event.java diff --git a/src/main/java/in/erail/model/Event.java b/src/main/java/in/erail/model/Event.java new file mode 100644 index 0000000..07a7db4 --- /dev/null +++ b/src/main/java/in/erail/model/Event.java @@ -0,0 +1,39 @@ +package in.erail.model; + +/** + * + * @author vinay + */ +public class Event { + + private RequestEvent request; + private ResponseEvent response; + + public Event() { + this(new RequestEvent(), new ResponseEvent()); + } + + public Event(RequestEvent pRequest, ResponseEvent pResponse) { + this.request = pRequest; + this.response = pResponse; + } + + public RequestEvent getRequest() { + return request; + } + + public Event setRequest(RequestEvent pRequest) { + this.request = pRequest; + return this; + } + + public ResponseEvent getResponse() { + return response; + } + + public Event setResponse(ResponseEvent pResponse) { + this.response = pResponse; + return this; + } + +} diff --git a/src/main/java/in/erail/service/RESTService.java b/src/main/java/in/erail/service/RESTService.java index b1479a5..fd1a0a4 100644 --- a/src/main/java/in/erail/service/RESTService.java +++ b/src/main/java/in/erail/service/RESTService.java @@ -1,9 +1,9 @@ package in.erail.service; +import in.erail.model.Event; import in.erail.model.RequestEvent; import in.erail.model.ResponseEvent; import io.reactivex.Maybe; -import io.reactivex.MaybeSource; /** * @@ -15,7 +15,19 @@ public interface RESTService { String getServiceUniqueId(); - Maybe handleEvent(RequestEvent pRequest); + default Class getRequestEventClass() { + return RequestEvent.class; + } + + default Class getResponseEventClass() { + return ResponseEvent.class; + } + + default Event createEvent(RequestEvent pRequest) throws InstantiationException, IllegalAccessException { + return new Event(pRequest, getResponseEventClass().newInstance()); + } + + Maybe handleEvent(Event pEvent); String getAuthority(); diff --git a/src/main/java/in/erail/service/RESTServiceImpl.java b/src/main/java/in/erail/service/RESTServiceImpl.java index 99463e2..0adbb8a 100644 --- a/src/main/java/in/erail/service/RESTServiceImpl.java +++ b/src/main/java/in/erail/service/RESTServiceImpl.java @@ -6,6 +6,7 @@ import io.vertx.reactivex.core.Vertx; import org.apache.logging.log4j.Logger; import in.erail.glue.annotation.StartService; +import in.erail.model.Event; import in.erail.model.RequestEvent; import in.erail.model.ResponseEvent; import io.netty.handler.codec.http.HttpResponseStatus; @@ -22,9 +23,7 @@ * * @author vinay */ -public abstract class RESTServiceImpl implements RESTService, MaybeTransformer { - - private static final ResponseEvent DEFAULT_REPONSE_EVENT = new ResponseEvent(); +public abstract class RESTServiceImpl implements RESTService, MaybeTransformer { private String mOperationId; private String mServiceUniqueId; @@ -32,15 +31,19 @@ public abstract class RESTServiceImpl implements RESTService, MaybeTransformer mRequestEventClass = RequestEvent.class; - private MaybeTransformer mPreProcessProcessors[]; - private MaybeTransformer mPostProcessProcessors[]; + private Class mResponseEventClass = ResponseEvent.class; + private MaybeTransformer mPreProcessProcessors[]; + private MaybeTransformer mPostProcessProcessors[]; @StartService - public void start() { + public void start() throws InstantiationException, IllegalAccessException { + + mDefaultEvent = new Event(getRequestEventClass().newInstance(), getResponseEventClass().newInstance()); + if (mEnable) { getVertx() .eventBus() @@ -60,12 +63,13 @@ public Single handleRequest(Message pMessage) { return Single .just(pMessage) .map(m -> pMessage.body().mapTo(getRequestEventClass())) + .map(this::createEvent) .flatMapMaybe(this::handleEvent) - .toSingle(getDefaultResponseEvent()) - .map(resp -> JsonObject.mapFrom(resp)) + .toSingle(getDefaultEvent()) + .map(resp -> JsonObject.mapFrom(resp.getResponse())) .doOnSuccess(resp -> pMessage.reply(resp)) .doOnError(err -> { - ResponseEvent resp = new ResponseEvent() + ResponseEvent resp = getResponseEventClass().newInstance() .setStatusCode(HttpResponseStatus.BAD_REQUEST.code()) .setMediaType(MediaType.PLAIN_TEXT_UTF_8) .setBody(ExceptionUtils.getMessage(err).getBytes()); @@ -76,43 +80,30 @@ public Single handleRequest(Message pMessage) { } @Override - public Maybe handleEvent(RequestEvent pRequest) { - + public Maybe handleEvent(Event pEvent) { return Maybe - .just(pRequest) - .compose(this::preProcess) + .just(pEvent) + .compose(composePipeline(getPreProcessProcessors())) .compose(this) - .compose(this::postProcess); + .compose(composePipeline(getPostProcessProcessors())); } - public MaybeSource preProcess(Maybe pRequest) { + protected MaybeTransformer composePipeline(MaybeTransformer[] pProcessors) { - if (getPreProcessProcessors() == null || getPreProcessProcessors().length == 0) { - return pRequest; + if (pProcessors == null || pProcessors.length == 0) { + return (Maybe pEvent) -> pEvent; } - return Arrays - .stream(getPreProcessProcessors()) - .reduce(pRequest, (acc, p) -> acc.compose(p), (a, b) -> a); + return (Maybe pEvent) -> Arrays + .stream(pProcessors) + .reduce(pEvent, (acc, p) -> acc.compose(p), (a, b) -> a); } @Override - public MaybeSource apply(Maybe pRequest) { + public final MaybeSource apply(Maybe pRequest) { return process(pRequest); } - - public abstract MaybeSource process(Maybe pRequest); - - public Maybe postProcess(Maybe pResponse) { - - if (getPostProcessProcessors() == null || getPostProcessProcessors().length == 0) { - return pResponse; - } - - return Arrays - .stream(getPostProcessProcessors()) - .reduce(pResponse, (acc, p) -> acc.compose(p), (a, b) -> a); - } + public abstract MaybeSource process(Maybe pEvent); @Override public String getOperationId() { @@ -164,12 +155,12 @@ public void setScheduler(Scheduler pScheduler) { this.mScheduler = pScheduler; } - public ResponseEvent getDefaultResponseEvent() { - return mDefaultResponseEvent; + public Event getDefaultEvent() { + return mDefaultEvent; } - public void setDefaultResponseEvent(ResponseEvent pDefaultResponseEvent) { - this.mDefaultResponseEvent = pDefaultResponseEvent; + public void setDefaultEvent(Event pDefaultEvent) { + this.mDefaultEvent = pDefaultEvent; } @Override @@ -190,6 +181,7 @@ public void setAuthority(String pAuthority) { this.mAuthority = pAuthority; } + @Override public Class getRequestEventClass() { return mRequestEventClass; } @@ -198,19 +190,28 @@ public void setRequestEventClass(Class pRequestEventClas this.mRequestEventClass = pRequestEventClass; } - public MaybeTransformer[] getPreProcessProcessors() { + @Override + public Class getResponseEventClass() { + return mResponseEventClass; + } + + public void setResponseEventClass(Class pResponseEventClass) { + this.mResponseEventClass = pResponseEventClass; + } + + public MaybeTransformer[] getPreProcessProcessors() { return mPreProcessProcessors; } - public void setPreProcessProcessors(MaybeTransformer[] pPreProcessProcessors) { + public void setPreProcessProcessors(MaybeTransformer[] pPreProcessProcessors) { this.mPreProcessProcessors = pPreProcessProcessors; } - public MaybeTransformer[] getPostProcessProcessors() { + public MaybeTransformer[] getPostProcessProcessors() { return mPostProcessProcessors; } - public void setPostProcessProcessors(MaybeTransformer[] pPostProcessProcessors) { + public void setPostProcessProcessors(MaybeTransformer[] pPostProcessProcessors) { this.mPostProcessProcessors = pPostProcessProcessors; } diff --git a/src/test/java/in/erail/service/BinaryBodyService.java b/src/test/java/in/erail/service/BinaryBodyService.java index b4f8f19..6423682 100644 --- a/src/test/java/in/erail/service/BinaryBodyService.java +++ b/src/test/java/in/erail/service/BinaryBodyService.java @@ -2,6 +2,7 @@ import com.google.common.base.Strings; import com.google.common.net.MediaType; +import in.erail.model.Event; import in.erail.model.RequestEvent; import in.erail.model.ResponseEvent; @@ -19,23 +20,23 @@ public class BinaryBodyService extends RESTServiceImpl { @Override - public MaybeSource process(Maybe pRequest) { - return pRequest.map(this::handle); + public MaybeSource process(Maybe pRequest) { + return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse())); } - protected ResponseEvent handle(RequestEvent pRequest) { + protected void handle(RequestEvent pRequest, ResponseEvent pRespone) { String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME); if (Strings.isNullOrEmpty(topicName)) { - return new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()); + pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()); + return; } - ResponseEvent response = new ResponseEvent(); - response.setMediaType(MediaType.PLAIN_TEXT_UTF_8); + pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8); JsonObject jsonBody = new JsonObject(Buffer.buffer(pRequest.getBody())); String bodyContent = jsonBody.getString("data"); - return response.setBody(bodyContent.getBytes()); + pRespone.setBody(bodyContent.getBytes()); } } diff --git a/src/test/java/in/erail/service/BroadcastService.java b/src/test/java/in/erail/service/BroadcastService.java index f741e8c..6bf49b2 100644 --- a/src/test/java/in/erail/service/BroadcastService.java +++ b/src/test/java/in/erail/service/BroadcastService.java @@ -2,6 +2,7 @@ import com.google.common.base.Strings; import com.google.common.net.MediaType; +import in.erail.model.Event; import in.erail.model.RequestEvent; import in.erail.model.ResponseEvent; @@ -18,15 +19,16 @@ public class BroadcastService extends RESTServiceImpl { @Override - public MaybeSource process(Maybe pRequest) { - return pRequest.map(this::handle); + public MaybeSource process(Maybe pRequest) { + return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse())); } - protected ResponseEvent handle(RequestEvent pRequest) { + protected void handle(RequestEvent pRequest, ResponseEvent pRespone) { String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME); if (Strings.isNullOrEmpty(topicName)) { - return new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()); + pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code()); + return; } JsonObject bodyJson = new JsonObject(pRequest.bodyAsString()); @@ -37,10 +39,7 @@ protected ResponseEvent handle(RequestEvent pRequest) { getLog().debug(() -> String.format("Message[%s] published on [%s]", bodyJson.toString(), topicName)); - ResponseEvent response = new ResponseEvent(); - response.setBody(TestConstants.Service.Message.successMessage().toString().getBytes()); - response.setMediaType(MediaType.JSON_UTF_8); - - return response; + pRespone.setBody(TestConstants.Service.Message.successMessage().toString().getBytes()); + pRespone.setMediaType(MediaType.JSON_UTF_8); } } diff --git a/src/test/java/in/erail/service/ProcessorCheckService.java b/src/test/java/in/erail/service/ProcessorCheckService.java index 66e55eb..201a4d7 100644 --- a/src/test/java/in/erail/service/ProcessorCheckService.java +++ b/src/test/java/in/erail/service/ProcessorCheckService.java @@ -1,6 +1,7 @@ package in.erail.service; import com.google.common.net.MediaType; +import in.erail.model.Event; import in.erail.model.RequestEvent; import in.erail.model.ResponseEvent; @@ -13,14 +14,13 @@ */ public class ProcessorCheckService extends RESTServiceImpl { - @Override - public MaybeSource process(Maybe pRequest) { - return pRequest.map(this::handle); +@Override + public MaybeSource process(Maybe pRequest) { + return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse())); } - protected ResponseEvent handle(RequestEvent pRequest) { - ResponseEvent response = new ResponseEvent(); - response.setMediaType(MediaType.PLAIN_TEXT_UTF_8); - return response.setBody(pRequest.getSubject().toString().getBytes()); + protected void handle(RequestEvent pRequest, ResponseEvent pRespone) { + pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8); + pRespone.setBody(pRequest.getSubject().toString().getBytes()); } } diff --git a/src/test/java/in/erail/service/processor/AddHeaderProcessor.java b/src/test/java/in/erail/service/processor/AddHeaderProcessor.java index b3fa657..b251b06 100644 --- a/src/test/java/in/erail/service/processor/AddHeaderProcessor.java +++ b/src/test/java/in/erail/service/processor/AddHeaderProcessor.java @@ -1,25 +1,27 @@ package in.erail.service.processor; -import in.erail.model.ResponseEvent; +import java.util.Optional; + +import in.erail.model.Event; import io.reactivex.Maybe; import io.reactivex.MaybeSource; import io.reactivex.MaybeTransformer; -import java.util.Optional; /** * * @author vinay */ -public class AddHeaderProcessor implements MaybeTransformer { +public class AddHeaderProcessor implements MaybeTransformer { private String mMessage; @Override - public MaybeSource apply(Maybe pResponse) { - return pResponse.map(r -> { - String msg = Optional.ofNullable(r.headerValue("ProcessorHeader")).orElse("") + getMessage(); - r.removeHeader("ProcessorHeader"); - return r.addHeader("ProcessorHeader", msg); + public MaybeSource apply(Maybe pEvent) { + return pEvent.map(r -> { + String msg = Optional.ofNullable(r.getResponse().headerValue("ProcessorHeader")).orElse("") + getMessage(); + r.getResponse().removeHeader("ProcessorHeader"); + r.getResponse().addHeader("ProcessorHeader", msg); + return r; }); } diff --git a/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java b/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java index b20bbef..2678d8e 100644 --- a/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java +++ b/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java @@ -1,6 +1,6 @@ package in.erail.service.processor; -import in.erail.model.RequestEvent; +import in.erail.model.Event; import io.reactivex.Maybe; import io.reactivex.MaybeSource; import io.reactivex.MaybeTransformer; @@ -10,13 +10,16 @@ * * @author vinay */ -public class LoadSubjectProcessor implements MaybeTransformer { +public class LoadSubjectProcessor implements MaybeTransformer { private String mMessage; @Override - public MaybeSource apply(Maybe pRequest) { - return pRequest.map(e -> e.setSubject(Optional.ofNullable(e.getSubject()).orElse("") + getMessage())); + public MaybeSource apply(Maybe pEvent) { + return pEvent.map(e -> { + e.getRequest().setSubject(Optional.ofNullable(e.getRequest().getSubject()).orElse("") + getMessage()); + return e; + }); } public String getMessage() {