Permalink
Browse files

Adding init status for first state and fixing socket timeout issue

  • Loading branch information...
jerome.vervier
jerome.vervier committed Oct 26, 2018
1 parent 1051765 commit fc135f4c2d656d4354bdafe2ad804abb97531f60
@@ -176,6 +176,8 @@
private Map<AttributeRef, ResteasyWebTarget> controllersTargetMap = new HashMap<>();
private Map<AttributeRef, ScheduledFuture> controllerHeartbeat = new HashMap<>();
private Map<AttributeRef, Boolean> initStatusDone = new HashMap<>();
@Override
public void init(Container container) throws Exception {
super.init(container);
@@ -201,10 +203,12 @@ protected void doStop(Container container) {
@Override
protected void doLinkProtocolConfiguration(AssetAttribute protocolConfiguration) {
LOG.fine("### Adding new protocol " + protocolConfiguration.getReferenceOrThrow() + " (" + protocolConfiguration.getNameOrThrow() + ")");
String baseURL = protocolConfiguration.getMetaItem(META_PROTOCOL_BASE_URI).flatMap(AbstractValueHolder::getValueAsString)
.orElseThrow(() -> new IllegalArgumentException("Missing or invalid required meta item: " + META_PROTOCOL_BASE_URI));
WebTargetBuilder webTargetBuilder = new WebTargetBuilder(baseURL);
WebTargetBuilder webTargetBuilder = new WebTargetBuilder(baseURL, 70000);
String username = protocolConfiguration.getMetaItem(META_PROTOCOL_USERNAME).flatMap(AbstractValueHolder::getValueAsString).orElse(null);
String password = protocolConfiguration.getMetaItem(META_PROTOCOL_PASSWORD).flatMap(AbstractValueHolder::getValueAsString).orElse(null);
@@ -259,6 +263,7 @@ protected void doLinkAttribute(AssetAttribute attribute, AssetAttribute protocol
* Build Sensor Status info for polling request
*/
if (sensorName != null) {
LOG.fine("### Adding new sensor [" + deviceName + "," + sensorName + "] linked to " + protocolConfiguration.getReferenceOrThrow() + " (" + protocolConfiguration.getNameOrThrow() + ")");
controllersMap.get(protocolConfiguration.getReferenceOrThrow()).addSensor(attribute.getReferenceOrThrow(), new ControllerSensor(deviceName, sensorName));
//Properly stop previously existing polling on device name --> use of false parameter
@@ -268,6 +273,11 @@ protected void doLinkAttribute(AssetAttribute attribute, AssetAttribute protocol
pollingSensorList.get(pollingKey).cancel(true);
}
this.initStatusDone.put(attribute.getReferenceOrThrow(), false);
//Get initial status of sensor
this.collectInitialStatus(attribute.getReferenceOrThrow(), deviceName, sensorName, protocolConfiguration.getReferenceOrThrow());
//Put new polling on a new device name or update previous
this.schedulePollingTask(pollingKey);
}
@@ -336,6 +346,65 @@ protected void processLinkedAttributeWrite(AttributeEvent event, AssetAttribute
return commandsMap;
}
private void collectInitialStatus(AttributeRef attributeRef, String deviceName, String sensorName, AttributeRef controllerRef) {
this.executorService
.schedule(() -> this.executeInitialStatus(attributeRef, deviceName, sensorName, controllerRef,response -> onInitialStatusResponse(attributeRef, deviceName, sensorName, controllerRef, response)),
0);
}
private void executeInitialStatus(AttributeRef attributeRef, String deviceName, String sensorName, AttributeRef controllerRef, Consumer<Response> responseConsumer) {
withLock(getProtocolName() + "::executeInitialStatus::" + attributeRef, () -> {
LOG.info("### Initial status check for " + attributeRef.getAttributeName() + " [" + deviceName + "," + sensorName + "] ...");
HttpClientProtocol.HttpClientRequest checkRequest = RequestBuilder.buildStatusRequest(deviceName, Arrays.asList(sensorName), this.controllersTargetMap.get(controllerRef));
Response response = null;
try {
response = checkRequest.invoke(null);
} catch (ProcessingException e) {
LOG.log(Level.SEVERE, "### Initial status for " + attributeRef.getAttributeName() + " [" + deviceName + "," + sensorName + "] doesn't succeed", e);
}
responseConsumer.accept(response);
});
}
private void onInitialStatusResponse(AttributeRef attributeRef, String deviceName, String sensorName, AttributeRef controllerRef, Response response) {
if(response != null) {
if (response.getStatusInfo().equals(Response.Status.OK)) {
String responseBodyAsString = response.readEntity(String.class);
LOG.fine("### New sensor [" + sensorName + "] status received");
LOG.finer("### Status request body response : " + responseBodyAsString);
Optional<ArrayValue> arrayValue = Values.parse(responseBodyAsString).flatMap(Values::getArray);
Optional<List<ObjectValue>> statuses = Values.getArrayElements(arrayValue.orElse(null), ObjectValue.class, false, false);
if (!statuses.isPresent()) {
LOG.warning("### Status response is not a JSON array or empty: " + responseBodyAsString);
} else {
statuses.get().forEach(status -> {
String name = status.getString("name").orElse(null);
String value = status.getString("value").orElse(null);
this.updateAttributeValue(attributeRef, value);
this.initStatusDone.put(attributeRef, true);
});
}
} else {
LOG.severe("### Status code for initial status received error : " + response.getStatus() + " --> " + response.getStatusInfo().getReasonPhrase());
}
} else {
LOG.warning("### Initial status check return a null value for " + attributeRef.getAttributeName() + " [" + deviceName + "," + sensorName + "]");
}
if(!this.initStatusDone.get(attributeRef)) {
collectInitialStatus(attributeRef, deviceName, sensorName, controllerRef);
}
}
/**
* Compute the polling request for a given deviceName and controller. Method check all registered sensor's (linked to the Protocol) and collect all sensor's name
* to put them into polling request
@@ -344,7 +413,7 @@ protected void processLinkedAttributeWrite(AttributeEvent event, AssetAttribute
* @return {@link ScheduledFuture} task to keep a track on
*/
private ScheduledFuture computePollingTask(PollingKey pollingKey) {
return withLockReturning(getProtocolName() + "::computePollingTask", () -> {
return withLockReturning(getProtocolName() + "::computePollingTask::" + pollingKey.getControllerAgentRef() + "::" + pollingKey.getDeviceName(), () -> {
List<String> sensorNameList = this.controllersMap.get(pollingKey.getControllerAgentRef()).collectSensorNameLinkedToDeviceName(pollingKey.getDeviceName());
if (sensorNameList.isEmpty()) {
@@ -447,6 +516,7 @@ private void onPollingResponse(PollingKey pollingKey, List<String> sensorNameLis
* @param value
*/
private void updateAttributeValue(AttributeRef attributeRef, String value) {
LOG.fine("### Updating attribute " + attributeRef + " with value " + value);
AttributeValueType attributeType = this.linkedAttributes.get(attributeRef).getTypeOrThrow();
ValueType valueType = attributeType.getValueType();
@@ -490,7 +560,7 @@ private void onAttributeWriteResponse(Response response) {
LOG.severe("### Linked attribute Write request return with an error (different from 204) : " + response.getStatusInfo().getReasonPhrase());
}
} else {
LOG.warning("### Response set to null");
LOG.warning("### Response set to null on Write");
}
}
@@ -114,6 +114,25 @@
);
}
public static HttpClientProtocol.HttpClientRequest buildStatusRequest(String deviceName, List<String> sensorsName, ResteasyWebTarget webTarget) {
MultivaluedMap<String, String> queryParam = new MultivaluedHashMap<>();
queryParam.addAll("name", sensorsName);
return new HttpClientProtocol.HttpClientRequest(
webTarget,
"/rest/devices/" + deviceName + "/status",
"GET",
getDefaultHeaders(),
queryParam,
null,
false,
false,
null,
MediaType.APPLICATION_JSON
);
}
public static HttpClientProtocol.HttpClientRequest buildCheckRequest(ResteasyWebTarget webTarget) {
return new HttpClientProtocol.HttpClientRequest(
webTarget,
@@ -69,13 +69,24 @@ public WebTargetBuilder(String uri) {
this(ResteasyUriBuilder.fromUri(uri));
}
public WebTargetBuilder(String uri, long overrideSocketTimeout) {
this(ResteasyUriBuilder.fromUri(uri), overrideSocketTimeout);
}
public WebTargetBuilder(URI uri) {
this(ResteasyUriBuilder.fromUri(uri));
}
public WebTargetBuilder(UriBuilder uri) {
if (client == null) {
initClient();
initClient(null);
}
this.uri = uri;
}
public WebTargetBuilder(UriBuilder uri, long overrideSocketTimeout) {
if (client == null) {
initClient(overrideSocketTimeout);
}
this.uri = uri;
}
@@ -193,14 +204,14 @@ public ResteasyWebTarget build() {
return target;
}
protected static void initClient() {
protected static void initClient(Long overrideSocketTimeout) {
if (client != null) {
return;
}
ResteasyClientBuilder clientBuilder = new ResteasyClientBuilder()
.connectionPoolSize(CONNECTION_POOL_SIZE)
.connectionCheckoutTimeout(CONNECTION_CHECKOUT_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS)
.socketTimeout(CONNECTION_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS)
.socketTimeout(overrideSocketTimeout == null ? CONNECTION_TIMEOUT_MILLISECONDS : overrideSocketTimeout, TimeUnit.MILLISECONDS)
.establishConnectionTimeout(CONNECTION_TIMEOUT_MILLISECONDS, TimeUnit.MILLISECONDS)
.register(new JacksonConfig());

0 comments on commit fc135f4

Please sign in to comment.