diff --git a/config-layers/test/in/erail/route/OpenAPI3RouteBuilder.properties b/config-layers/test/in/erail/route/OpenAPI3RouteBuilder.properties
index e045839..0918a30 100644
--- a/config-layers/test/in/erail/route/OpenAPI3RouteBuilder.properties
+++ b/config-layers/test/in/erail/route/OpenAPI3RouteBuilder.properties
@@ -2,5 +2,7 @@
securityEnable=false
services+=\
/in/erail/service/BroadcastService,\
- /in/erail/service/BinaryBodyService
+ /in/erail/service/BinaryBodyService,\
+ /in/erail/service/ProcessorCheckService
+
diff --git a/config-layers/test/in/erail/route/openapi3.json b/config-layers/test/in/erail/route/openapi3.json
index f03758a..b270a24 100644
--- a/config-layers/test/in/erail/route/openapi3.json
+++ b/config-layers/test/in/erail/route/openapi3.json
@@ -11,14 +11,26 @@
"summary": "Send message to all subscriber of topicName",
"description": "Send message to all subscriber of topicName",
"operationId": "API_V1_BROADCAST_SERVICE",
- "parameters": [{
- "name": "topicName",
- "in": "path",
- "required": true,
- "schema": {
- "type": "string"
+ "parameters": [
+ {
+ "name": "topicName",
+ "in": "path",
+ "required": true,
+ "schema": {
+ "type": "string"
+ }
}
- }],
+ ],
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/broadcastMessage"
+ }
+ }
+ },
+ "required": true
+ },
"responses": {
"200": {
"description": "Success Response",
@@ -30,41 +42,45 @@
}
}
}
- },
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/broadcastMessage"
- }
- }
- },
- "required": true
}
},
- "parameters": [{
- "name": "topicName",
- "in": "path",
- "description": "Name of topic on which to broadcast message",
- "required": true,
- "schema": {
- "type": "string"
+ "parameters": [
+ {
+ "name": "topicName",
+ "in": "path",
+ "description": "Name of topic on which to broadcast message",
+ "required": true,
+ "schema": {
+ "type": "string"
+ }
}
- }]
+ ]
},
"/broadcastv2/{topicName}": {
"post": {
"summary": "Send message to all subscriber of topicName",
"description": "Send message to all subscriber of topicName",
"operationId": "API_V1_BROADCAST_V2_SERVICE",
- "parameters": [{
- "name": "topicName",
- "in": "path",
- "required": true,
- "schema": {
- "type": "string"
+ "parameters": [
+ {
+ "name": "topicName",
+ "in": "path",
+ "required": true,
+ "schema": {
+ "type": "string"
+ }
}
- }],
+ ],
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/broadcastMessage"
+ }
+ }
+ },
+ "required": true
+ },
"responses": {
"200": {
"description": "Success Response",
@@ -76,27 +92,30 @@
}
}
}
- },
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/broadcastMessage"
- }
- }
- },
- "required": true
}
},
- "parameters": [{
- "name": "topicName",
- "in": "path",
- "description": "Name of topic on which to broadcast message",
- "required": true,
- "schema": {
- "type": "string"
+ "parameters": [
+ {
+ "name": "topicName",
+ "in": "path",
+ "description": "Name of topic on which to broadcast message",
+ "required": true,
+ "schema": {
+ "type": "string"
+ }
+ }
+ ]
+ },
+ "/processcheck": {
+ "get": {
+ "summary": "Check Processor",
+ "operationId": "API_V1_PROCESSOR_CHECK",
+ "responses": {
+ "200": {
+ "description": "Success"
+ }
}
- }]
+ }
}
},
"components": {
diff --git a/config-layers/test/in/erail/service/BaseService.properties b/config-layers/test/in/erail/service/BaseService.properties
new file mode 100644
index 0000000..08c55c0
--- /dev/null
+++ b/config-layers/test/in/erail/service/BaseService.properties
@@ -0,0 +1,7 @@
+#/in/erail/service/BaseService
+
+vertx=/io/vertx/core/Vertx
+enable=true
+log=true
+preProcessProcessors=/in/erail/service/processor/LoadSubjectProcessor1
+postProcessProcessors=/in/erail/service/processor/AddHeaderProcessor1
diff --git a/config-layers/test/in/erail/service/BinaryBodyService.properties b/config-layers/test/in/erail/service/BinaryBodyService.properties
index 8e79b3b..13f5cd9 100644
--- a/config-layers/test/in/erail/service/BinaryBodyService.properties
+++ b/config-layers/test/in/erail/service/BinaryBodyService.properties
@@ -1,8 +1,6 @@
#/in/erail/service/BinaryBodyService
$class=in.erail.service.BinaryBodyService
+$basedOn=/in/erail/service/BaseService
operationId=API_V1_BROADCAST_V2_SERVICE
serviceUniqueId=API_V1_BROADCAST_V2_SERVICE
-vertx=/io/vertx/core/Vertx
-enable=true
-log=true
diff --git a/config-layers/test/in/erail/service/BroadcastService.properties b/config-layers/test/in/erail/service/BroadcastService.properties
index 0d5d9f5..ef7eb00 100644
--- a/config-layers/test/in/erail/service/BroadcastService.properties
+++ b/config-layers/test/in/erail/service/BroadcastService.properties
@@ -1,8 +1,6 @@
#/in/erail/service/BroadcastService
$class=in.erail.service.BroadcastService
+$basedOn=/in/erail/service/BaseService
operationId=API_V1_BROADCAST_SERVICE
serviceUniqueId=API_V1_BROADCAST_SERVICE
-vertx=/io/vertx/core/Vertx
-enable=true
-log=true
\ No newline at end of file
diff --git a/config-layers/test/in/erail/service/ProcessorCheckService.properties b/config-layers/test/in/erail/service/ProcessorCheckService.properties
new file mode 100644
index 0000000..d092bfe
--- /dev/null
+++ b/config-layers/test/in/erail/service/ProcessorCheckService.properties
@@ -0,0 +1,8 @@
+#/in/erail/service/ProcessorCheckService
+$class=in.erail.service.ProcessorCheckService
+$basedOn=/in/erail/service/BaseService
+
+operationId=API_V1_PROCESSOR_CHECK
+serviceUniqueId=API_V1_PROCESSOR_CHECK
+preProcessProcessors+=/in/erail/service/processor/LoadSubjectProcessor2
+postProcessProcessors+=/in/erail/service/processor/AddHeaderProcessor2
diff --git a/config-layers/test/in/erail/service/processor/AddHeaderProcessor1.properties b/config-layers/test/in/erail/service/processor/AddHeaderProcessor1.properties
new file mode 100644
index 0000000..e9fcb47
--- /dev/null
+++ b/config-layers/test/in/erail/service/processor/AddHeaderProcessor1.properties
@@ -0,0 +1,3 @@
+#/in/erail/service/processor/AddHeaderProcessor1
+$class=in.erail.service.processor.AddHeaderProcessor
+message=Header1
diff --git a/config-layers/test/in/erail/service/processor/AddHeaderProcessor2.properties b/config-layers/test/in/erail/service/processor/AddHeaderProcessor2.properties
new file mode 100644
index 0000000..52fdc02
--- /dev/null
+++ b/config-layers/test/in/erail/service/processor/AddHeaderProcessor2.properties
@@ -0,0 +1,3 @@
+#/in/erail/service/processor/AddHeaderProcessor2
+$class=in.erail.service.processor.AddHeaderProcessor
+message=Header2
diff --git a/config-layers/test/in/erail/service/processor/LoadSubjectProcessor1.properties b/config-layers/test/in/erail/service/processor/LoadSubjectProcessor1.properties
new file mode 100644
index 0000000..19a7bae
--- /dev/null
+++ b/config-layers/test/in/erail/service/processor/LoadSubjectProcessor1.properties
@@ -0,0 +1,3 @@
+#/in/erail/service/processor/LoadSubjectProcessor1
+$class=in.erail.service.processor.LoadSubjectProcessor
+message=Subject1
diff --git a/config-layers/test/in/erail/service/processor/LoadSubjectProcessor2.properties b/config-layers/test/in/erail/service/processor/LoadSubjectProcessor2.properties
new file mode 100644
index 0000000..94faa02
--- /dev/null
+++ b/config-layers/test/in/erail/service/processor/LoadSubjectProcessor2.properties
@@ -0,0 +1,3 @@
+#/in/erail/service/processor/LoadSubjectProcessor2
+$class=in.erail.service.processor.LoadSubjectProcessor
+message=Subject2
diff --git a/pom.xml b/pom.xml
index a9bede9..0f8efa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
4.0.0
in.erail
api-framework
- 2.4.3
+ 2.4.4
jar
@@ -57,6 +57,12 @@
jar
+
+
+ --pinentry-mode
+ loopback
+
+
diff --git a/src/main/java/in/erail/model/Event.java b/src/main/java/in/erail/model/Event.java
new file mode 100644
index 0000000..07a7db4
--- /dev/null
+++ b/src/main/java/in/erail/model/Event.java
@@ -0,0 +1,39 @@
+package in.erail.model;
+
+/**
+ *
+ * @author vinay
+ */
+public class Event {
+
+ private RequestEvent request;
+ private ResponseEvent response;
+
+ public Event() {
+ this(new RequestEvent(), new ResponseEvent());
+ }
+
+ public Event(RequestEvent pRequest, ResponseEvent pResponse) {
+ this.request = pRequest;
+ this.response = pResponse;
+ }
+
+ public RequestEvent getRequest() {
+ return request;
+ }
+
+ public Event setRequest(RequestEvent pRequest) {
+ this.request = pRequest;
+ return this;
+ }
+
+ public ResponseEvent getResponse() {
+ return response;
+ }
+
+ public Event setResponse(ResponseEvent pResponse) {
+ this.response = pResponse;
+ return this;
+ }
+
+}
diff --git a/src/main/java/in/erail/model/RequestEvent.java b/src/main/java/in/erail/model/RequestEvent.java
index d6d828b..3948d55 100644
--- a/src/main/java/in/erail/model/RequestEvent.java
+++ b/src/main/java/in/erail/model/RequestEvent.java
@@ -28,77 +28,87 @@ public class RequestEvent {
private byte[] mBody = new byte[0];
private boolean mIsBase64Encoded = false;
private Map mPrincipal;
+ private Object mSubject;
public String getResource() {
return mResource;
}
- public void setResource(String pResource) {
+ public RequestEvent setResource(String pResource) {
this.mResource = pResource;
+ return this;
}
public String getPath() {
return mPath;
}
- public void setPath(String pPath) {
+ public RequestEvent setPath(String pPath) {
this.mPath = pPath;
+ return this;
}
public HttpMethod getHttpMethod() {
return mHttpMethod;
}
- public void setHttpMethod(HttpMethod pHttpMethod) {
+ public RequestEvent setHttpMethod(HttpMethod pHttpMethod) {
this.mHttpMethod = pHttpMethod;
+ return this;
}
public Map getHeaders() {
return mHeaders;
}
- public void setHeaders(Map pHeaders) {
+ public RequestEvent setHeaders(Map pHeaders) {
this.mHeaders = pHeaders;
+ return this;
}
public Map getMultiValueHeaders() {
return mMultiValueHeaders;
}
- public void setMultiValueHeaders(Map pMultiValueHeaders) {
+ public RequestEvent setMultiValueHeaders(Map pMultiValueHeaders) {
this.mMultiValueHeaders = pMultiValueHeaders;
+ return this;
}
public Map getQueryStringParameters() {
return mQueryStringParameters;
}
- public void setQueryStringParameters(Map pQueryStringParameters) {
+ public RequestEvent setQueryStringParameters(Map pQueryStringParameters) {
this.mQueryStringParameters = pQueryStringParameters;
+ return this;
}
public Map getMultiValueQueryStringParameters() {
return mMultiValueQueryStringParameters;
}
- public void setMultiValueQueryStringParameters(Map pMultiValueQueryStringParameters) {
+ public RequestEvent setMultiValueQueryStringParameters(Map pMultiValueQueryStringParameters) {
this.mMultiValueQueryStringParameters = pMultiValueQueryStringParameters;
+ return this;
}
public Map getPathParameters() {
return mPathParameters;
}
- public void setPathParameters(Map pPathParameters) {
+ public RequestEvent setPathParameters(Map pPathParameters) {
this.mPathParameters = pPathParameters;
+ return this;
}
public Map getStageVariables() {
return mStageVariables;
}
- public void setStageVariables(Map pStageVariables) {
+ public RequestEvent setStageVariables(Map pStageVariables) {
this.mStageVariables = pStageVariables;
+ return this;
}
@SuppressWarnings("rawtypes")
@@ -107,24 +117,27 @@ public Map getRequestContext() {
}
@SuppressWarnings("rawtypes")
- public void setRequestContext(Map pRequestContext) {
+ public RequestEvent setRequestContext(Map pRequestContext) {
this.mRequestContext = pRequestContext;
+ return this;
}
public boolean isIsBase64Encoded() {
return mIsBase64Encoded;
}
- public void setIsBase64Encoded(boolean pIsBase64Encoded) {
+ public RequestEvent setIsBase64Encoded(boolean pIsBase64Encoded) {
this.mIsBase64Encoded = pIsBase64Encoded;
+ return this;
}
public byte[] getBody() {
return mBody;
}
- public void setBody(byte[] pBody) {
+ public RequestEvent setBody(byte[] pBody) {
this.mBody = pBody;
+ return this;
}
public String bodyAsString() {
@@ -138,8 +151,9 @@ public Map getPrincipal() {
return mPrincipal;
}
- public void setPrincipal(Map pPrincipal) {
+ public RequestEvent setPrincipal(Map pPrincipal) {
this.mPrincipal = pPrincipal;
+ return this;
}
@Override
@@ -147,4 +161,13 @@ public String toString() {
return JsonObject.mapFrom(this).toString();
}
+ public Object getSubject() {
+ return mSubject;
+ }
+
+ public RequestEvent setSubject(Object pSubject) {
+ this.mSubject = pSubject;
+ return this;
+ }
+
}
diff --git a/src/main/java/in/erail/model/ResponseEvent.java b/src/main/java/in/erail/model/ResponseEvent.java
index 3151388..a56a339 100644
--- a/src/main/java/in/erail/model/ResponseEvent.java
+++ b/src/main/java/in/erail/model/ResponseEvent.java
@@ -85,13 +85,14 @@ public ResponseEvent setMultiValueHeaders(Map pValue) {
a.addAll(b);
return a;
});
-
+
return this;
}
/**
* Return copy of headers map
- * @return
+ *
+ * @return
*/
public Map getMultiValueHeaders() {
@@ -135,7 +136,8 @@ public ResponseEvent setHeaders(Map pValue) {
/**
* Return copy of Header Map
- * @return
+ *
+ * @return
*/
public Map getHeaders() {
@@ -164,6 +166,7 @@ public ResponseEvent setContentType(String pContentType) {
/**
* Predefined Content Type
+ *
* @param pMediaType
* @return Response Event
*/
@@ -181,9 +184,13 @@ public String headerValue(String pHeaderName) {
return mMultiValueHeaders.get(pHeaderName);
}
+ public void removeHeader(String pHeaderName) {
+ mMultiValueHeaders.remove(pHeaderName);
+ }
+
@Override
public String toString() {
return JsonObject.mapFrom(this).toString();
}
-
+
}
diff --git a/src/main/java/in/erail/service/RESTService.java b/src/main/java/in/erail/service/RESTService.java
index bd6c57e..fd1a0a4 100644
--- a/src/main/java/in/erail/service/RESTService.java
+++ b/src/main/java/in/erail/service/RESTService.java
@@ -1,5 +1,6 @@
package in.erail.service;
+import in.erail.model.Event;
import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
import io.reactivex.Maybe;
@@ -14,9 +15,21 @@ public interface RESTService {
String getServiceUniqueId();
- Maybe process(RequestEvent pRequest);
+ default Class extends RequestEvent> getRequestEventClass() {
+ return RequestEvent.class;
+ }
+
+ default Class extends ResponseEvent> getResponseEventClass() {
+ return ResponseEvent.class;
+ }
+
+ default Event createEvent(RequestEvent pRequest) throws InstantiationException, IllegalAccessException {
+ return new Event(pRequest, getResponseEventClass().newInstance());
+ }
+
+ Maybe handleEvent(Event pEvent);
String getAuthority();
-
+
boolean isSecure();
}
diff --git a/src/main/java/in/erail/service/RESTServiceImpl.java b/src/main/java/in/erail/service/RESTServiceImpl.java
index b3150ab..0adbb8a 100644
--- a/src/main/java/in/erail/service/RESTServiceImpl.java
+++ b/src/main/java/in/erail/service/RESTServiceImpl.java
@@ -6,21 +6,24 @@
import io.vertx.reactivex.core.Vertx;
import org.apache.logging.log4j.Logger;
import in.erail.glue.annotation.StartService;
+import in.erail.model.Event;
import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.reactivex.Maybe;
+import io.reactivex.MaybeSource;
+import io.reactivex.MaybeTransformer;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.vertx.reactivex.core.eventbus.Message;
+import java.util.Arrays;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
*
* @author vinay
*/
-public abstract class RESTServiceImpl implements RESTService {
-
- private static final ResponseEvent DEFAULT_REPONSE_EVENT = new ResponseEvent();
+public abstract class RESTServiceImpl implements RESTService, MaybeTransformer {
private String mOperationId;
private String mServiceUniqueId;
@@ -28,12 +31,19 @@ public abstract class RESTServiceImpl implements RESTService {
private boolean mEnable = false;
private Logger mLog;
private Scheduler mScheduler = Schedulers.io();
- private ResponseEvent mDefaultResponseEvent = DEFAULT_REPONSE_EVENT;
- private boolean secure = false;
- private String authority;
+ private Event mDefaultEvent;
+ private boolean mSecure = false;
+ private String mAuthority;
+ private Class extends RequestEvent> mRequestEventClass = RequestEvent.class;
+ private Class extends ResponseEvent> mResponseEventClass = ResponseEvent.class;
+ private MaybeTransformer mPreProcessProcessors[];
+ private MaybeTransformer mPostProcessProcessors[];
@StartService
- public void start() {
+ public void start() throws InstantiationException, IllegalAccessException {
+
+ mDefaultEvent = new Event(getRequestEventClass().newInstance(), getResponseEventClass().newInstance());
+
if (mEnable) {
getVertx()
.eventBus()
@@ -52,13 +62,14 @@ public void start() {
public Single handleRequest(Message pMessage) {
return Single
.just(pMessage)
- .map(m -> pMessage.body().mapTo(RequestEvent.class))
- .flatMapMaybe(req -> process(req))
- .toSingle(getDefaultResponseEvent())
- .map(resp -> JsonObject.mapFrom(resp))
+ .map(m -> pMessage.body().mapTo(getRequestEventClass()))
+ .map(this::createEvent)
+ .flatMapMaybe(this::handleEvent)
+ .toSingle(getDefaultEvent())
+ .map(resp -> JsonObject.mapFrom(resp.getResponse()))
.doOnSuccess(resp -> pMessage.reply(resp))
.doOnError(err -> {
- ResponseEvent resp = new ResponseEvent()
+ ResponseEvent resp = getResponseEventClass().newInstance()
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
.setMediaType(MediaType.PLAIN_TEXT_UTF_8)
.setBody(ExceptionUtils.getMessage(err).getBytes());
@@ -68,6 +79,32 @@ public Single handleRequest(Message pMessage) {
.onErrorReturnItem(new JsonObject());
}
+ @Override
+ public Maybe handleEvent(Event pEvent) {
+ return Maybe
+ .just(pEvent)
+ .compose(composePipeline(getPreProcessProcessors()))
+ .compose(this)
+ .compose(composePipeline(getPostProcessProcessors()));
+ }
+
+ protected MaybeTransformer composePipeline(MaybeTransformer[] pProcessors) {
+
+ if (pProcessors == null || pProcessors.length == 0) {
+ return (Maybe pEvent) -> pEvent;
+ }
+
+ return (Maybe pEvent) -> Arrays
+ .stream(pProcessors)
+ .reduce(pEvent, (acc, p) -> acc.compose(p), (a, b) -> a);
+ }
+
+ @Override
+ public final MaybeSource apply(Maybe pRequest) {
+ return process(pRequest);
+ }
+ public abstract MaybeSource process(Maybe pEvent);
+
@Override
public String getOperationId() {
return mOperationId;
@@ -118,29 +155,64 @@ public void setScheduler(Scheduler pScheduler) {
this.mScheduler = pScheduler;
}
- public ResponseEvent getDefaultResponseEvent() {
- return mDefaultResponseEvent;
+ public Event getDefaultEvent() {
+ return mDefaultEvent;
}
- public void setDefaultResponseEvent(ResponseEvent pDefaultResponseEvent) {
- this.mDefaultResponseEvent = pDefaultResponseEvent;
+ public void setDefaultEvent(Event pDefaultEvent) {
+ this.mDefaultEvent = pDefaultEvent;
}
@Override
public boolean isSecure() {
- return secure;
+ return mSecure;
}
public void setSecure(boolean pSecure) {
- this.secure = pSecure;
+ this.mSecure = pSecure;
}
@Override
public String getAuthority() {
- return authority;
+ return mAuthority;
}
public void setAuthority(String pAuthority) {
- this.authority = pAuthority;
+ this.mAuthority = pAuthority;
+ }
+
+ @Override
+ public Class extends RequestEvent> getRequestEventClass() {
+ return mRequestEventClass;
}
+
+ public void setRequestEventClass(Class extends RequestEvent> pRequestEventClass) {
+ this.mRequestEventClass = pRequestEventClass;
+ }
+
+ @Override
+ public Class extends ResponseEvent> getResponseEventClass() {
+ return mResponseEventClass;
+ }
+
+ public void setResponseEventClass(Class extends ResponseEvent> pResponseEventClass) {
+ this.mResponseEventClass = pResponseEventClass;
+ }
+
+ public MaybeTransformer[] getPreProcessProcessors() {
+ return mPreProcessProcessors;
+ }
+
+ public void setPreProcessProcessors(MaybeTransformer[] pPreProcessProcessors) {
+ this.mPreProcessProcessors = pPreProcessProcessors;
+ }
+
+ public MaybeTransformer[] getPostProcessProcessors() {
+ return mPostProcessProcessors;
+ }
+
+ public void setPostProcessProcessors(MaybeTransformer[] pPostProcessProcessors) {
+ this.mPostProcessProcessors = pPostProcessProcessors;
+ }
+
}
diff --git a/src/test/java/in/erail/service/BinaryBodyService.java b/src/test/java/in/erail/service/BinaryBodyService.java
index dd8d0eb..6423682 100644
--- a/src/test/java/in/erail/service/BinaryBodyService.java
+++ b/src/test/java/in/erail/service/BinaryBodyService.java
@@ -2,12 +2,14 @@
import com.google.common.base.Strings;
import com.google.common.net.MediaType;
+import in.erail.model.Event;
import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
import in.erail.test.TestConstants;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.Maybe;
+import io.reactivex.MaybeSource;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
@@ -18,22 +20,23 @@
public class BinaryBodyService extends RESTServiceImpl {
@Override
- public Maybe process(RequestEvent pRequest) {
+ public MaybeSource process(Maybe pRequest) {
+ return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
+ }
+
+ protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME);
if (Strings.isNullOrEmpty(topicName)) {
- return Maybe.just(new ResponseEvent()
- .setStatusCode(HttpResponseStatus.BAD_REQUEST.code()));
+ pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
+ return;
}
- ResponseEvent response = new ResponseEvent();
- response.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
+ pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
JsonObject jsonBody = new JsonObject(Buffer.buffer(pRequest.getBody()));
String bodyContent = jsonBody.getString("data");
- response.setBody(bodyContent.getBytes());
- return Maybe.just(response);
+ pRespone.setBody(bodyContent.getBytes());
}
-
}
diff --git a/src/test/java/in/erail/service/BroadcastService.java b/src/test/java/in/erail/service/BroadcastService.java
index 6413804..6bf49b2 100644
--- a/src/test/java/in/erail/service/BroadcastService.java
+++ b/src/test/java/in/erail/service/BroadcastService.java
@@ -2,12 +2,14 @@
import com.google.common.base.Strings;
import com.google.common.net.MediaType;
+import in.erail.model.Event;
import in.erail.model.RequestEvent;
import in.erail.model.ResponseEvent;
import in.erail.test.TestConstants;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.Maybe;
+import io.reactivex.MaybeSource;
import io.vertx.core.json.JsonObject;
/**
@@ -17,12 +19,16 @@
public class BroadcastService extends RESTServiceImpl {
@Override
- public Maybe process(RequestEvent pRequest) {
+ public MaybeSource process(Maybe pRequest) {
+ return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
+ }
+ protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
String topicName = pRequest.getPathParameters().get(TestConstants.Service.Broadcast.APIMessage.PARAM_TOPIC_NAME);
if (Strings.isNullOrEmpty(topicName)) {
- return Maybe.just(new ResponseEvent().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()));
+ pRespone.setStatusCode(HttpResponseStatus.BAD_REQUEST.code());
+ return;
}
JsonObject bodyJson = new JsonObject(pRequest.bodyAsString());
@@ -33,11 +39,7 @@ public Maybe process(RequestEvent pRequest) {
getLog().debug(() -> String.format("Message[%s] published on [%s]", bodyJson.toString(), topicName));
- ResponseEvent response = new ResponseEvent();
- response.setBody(TestConstants.Service.Message.successMessage().toString().getBytes());
- response.setMediaType(MediaType.JSON_UTF_8);
-
- return Maybe.just(response);
+ pRespone.setBody(TestConstants.Service.Message.successMessage().toString().getBytes());
+ pRespone.setMediaType(MediaType.JSON_UTF_8);
}
-
}
diff --git a/src/test/java/in/erail/service/ProcessorCheckService.java b/src/test/java/in/erail/service/ProcessorCheckService.java
new file mode 100644
index 0000000..201a4d7
--- /dev/null
+++ b/src/test/java/in/erail/service/ProcessorCheckService.java
@@ -0,0 +1,26 @@
+package in.erail.service;
+
+import com.google.common.net.MediaType;
+import in.erail.model.Event;
+
+import in.erail.model.RequestEvent;
+import in.erail.model.ResponseEvent;
+import io.reactivex.Maybe;
+import io.reactivex.MaybeSource;
+
+/**
+ *
+ * @author vinay
+ */
+public class ProcessorCheckService extends RESTServiceImpl {
+
+@Override
+ public MaybeSource process(Maybe pRequest) {
+ return pRequest.doOnSuccess(e -> handle(e.getRequest(), e.getResponse()));
+ }
+
+ protected void handle(RequestEvent pRequest, ResponseEvent pRespone) {
+ pRespone.setMediaType(MediaType.PLAIN_TEXT_UTF_8);
+ pRespone.setBody(pRequest.getSubject().toString().getBytes());
+ }
+}
diff --git a/src/test/java/in/erail/service/ProcessorCheckServiceTest.java b/src/test/java/in/erail/service/ProcessorCheckServiceTest.java
new file mode 100644
index 0000000..139a214
--- /dev/null
+++ b/src/test/java/in/erail/service/ProcessorCheckServiceTest.java
@@ -0,0 +1,60 @@
+package in.erail.service;
+
+import com.google.common.net.HttpHeaders;
+import com.google.common.net.MediaType;
+import in.erail.server.Server;
+import in.erail.test.TestConstants;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.Timeout;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import org.junit.Rule;
+import in.erail.glue.Glue;
+
+/**
+ *
+ * @author vinay
+ */
+@RunWith(VertxUnitRunner.class)
+public class ProcessorCheckServiceTest {
+
+ @Rule
+ public Timeout rule = Timeout.seconds(2000);
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testProcess(TestContext context) {
+
+ Async async = context.async();
+
+ Server server = Glue.instance().resolve("/in/erail/server/Server");
+
+ //Broadcast Request
+ String json = new JsonObject().put("data", "testdata").toString();
+ server
+ .getVertx()
+ .createHttpClient()
+ .get(server.getHttpServerOptions().getPort(), server.getHttpServerOptions().getHost(), "/v1/processcheck")
+ .putHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
+ .putHeader(HttpHeaders.ORIGIN, "https://test.com")
+ .putHeader(HttpHeaders.CONTENT_LENGTH, Integer.toString(json.length()))
+ .putHeader(HttpHeaders.AUTHORIZATION, TestConstants.ACCESS_TOKEN)
+ .handler(response -> {
+ context.assertEquals(response.statusCode(), 200, response.statusMessage());
+ context.assertEquals(response.getHeader("ProcessorHeader"), "Header1Header2");
+ context.assertTrue(MediaType.parse(response.getHeader(HttpHeaders.CONTENT_TYPE)).equals(MediaType.PLAIN_TEXT_UTF_8));
+ response.bodyHandler((event) -> {
+ context.assertEquals(event.toString(), "Subject1Subject2");
+ async.countDown();
+ });
+ })
+ .write(json)
+ .end();
+
+ }
+
+}
diff --git a/src/test/java/in/erail/service/processor/AddHeaderProcessor.java b/src/test/java/in/erail/service/processor/AddHeaderProcessor.java
new file mode 100644
index 0000000..b251b06
--- /dev/null
+++ b/src/test/java/in/erail/service/processor/AddHeaderProcessor.java
@@ -0,0 +1,36 @@
+package in.erail.service.processor;
+
+import java.util.Optional;
+
+import in.erail.model.Event;
+import io.reactivex.Maybe;
+import io.reactivex.MaybeSource;
+import io.reactivex.MaybeTransformer;
+
+/**
+ *
+ * @author vinay
+ */
+public class AddHeaderProcessor implements MaybeTransformer {
+
+ private String mMessage;
+
+ @Override
+ public MaybeSource apply(Maybe pEvent) {
+ return pEvent.map(r -> {
+ String msg = Optional.ofNullable(r.getResponse().headerValue("ProcessorHeader")).orElse("") + getMessage();
+ r.getResponse().removeHeader("ProcessorHeader");
+ r.getResponse().addHeader("ProcessorHeader", msg);
+ return r;
+ });
+ }
+
+ public String getMessage() {
+ return mMessage;
+ }
+
+ public void setMessage(String pMessage) {
+ this.mMessage = pMessage;
+ }
+
+}
diff --git a/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java b/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java
new file mode 100644
index 0000000..2678d8e
--- /dev/null
+++ b/src/test/java/in/erail/service/processor/LoadSubjectProcessor.java
@@ -0,0 +1,33 @@
+package in.erail.service.processor;
+
+import in.erail.model.Event;
+import io.reactivex.Maybe;
+import io.reactivex.MaybeSource;
+import io.reactivex.MaybeTransformer;
+import java.util.Optional;
+
+/**
+ *
+ * @author vinay
+ */
+public class LoadSubjectProcessor implements MaybeTransformer {
+
+ private String mMessage;
+
+ @Override
+ public MaybeSource apply(Maybe pEvent) {
+ return pEvent.map(e -> {
+ e.getRequest().setSubject(Optional.ofNullable(e.getRequest().getSubject()).orElse("") + getMessage());
+ return e;
+ });
+ }
+
+ public String getMessage() {
+ return mMessage;
+ }
+
+ public void setMessage(String pMessage) {
+ this.mMessage = pMessage;
+ }
+
+}