Skip to content

Commit

Permalink
Merge pull request thingsboard#66 from ikulikov/feature/aws-sqs-integ…
Browse files Browse the repository at this point in the history
…ration

Feature/aws sqs integration
  • Loading branch information
ashvayka committed Sep 12, 2019
2 parents 888c75c + f5216aa commit 2ff279b
Show file tree
Hide file tree
Showing 44 changed files with 1,638 additions and 14 deletions.
8 changes: 8 additions & 0 deletions application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@
<groupId>org.thingsboard.common.integration</groupId>
<artifactId>mqtt-integrations</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common.integration</groupId>
<artifactId>aws-integrations</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common.integration</groupId>
<artifactId>opcua-integrations</artifactId>
Expand Down Expand Up @@ -323,6 +327,10 @@
<groupId>com.github.ua-parser</groupId>
<artifactId>uap-java</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.converter.DataConverterService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.integration.aws.sqs.AwsSqsIntegration;
import org.thingsboard.integration.api.data.DefaultIntegrationDownlinkMsg;
import org.thingsboard.server.service.integration.rpc.IntegrationRpcService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
Expand Down Expand Up @@ -657,6 +658,8 @@ private ThingsboardPlatformIntegration createThingsboardPlatformIntegration(Inte
return new BasicMqttIntegration();
case AWS_IOT:
return new AwsIotIntegration();
case AWS_SQS:
return new AwsSqsIntegration();
case IBM_WATSON_IOT:
return new IbmWatsonIotIntegration();
case TTN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
Expand Down Expand Up @@ -123,15 +125,23 @@ public void handleContextClosed(ContextClosedEvent event) {
private CustomerService customerService;

private EventLoopGroup eventLoopGroup;
private ScheduledExecutorService scheduledExecutorService;

@PostConstruct
public void init() {
eventLoopGroup = new NioEventLoopGroup();
scheduledExecutorService = Executors.newScheduledThreadPool(3);
}

@PreDestroy
public void destroy() {
eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
}

ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;

@Data
Expand Down Expand Up @@ -306,4 +307,9 @@ public ServerAddress getServerAddress() {
public EventLoopGroup getEventLoopGroup() {
return ctx.getEventLoopGroup();
}

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return ctx.getScheduledExecutorService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@AllArgsConstructor
public enum IntegrationType {
OCEANCONNECT(false), SIGFOX(false), THINGPARK(false), TMOBILE_IOT_CDP(false), HTTP(false), MQTT(true),
AWS_IOT(true), IBM_WATSON_IOT(true), TTN(true), AZURE_EVENT_HUB(true), OPC_UA(true),
AWS_IOT(true), AWS_SQS(true), IBM_WATSON_IOT(true), TTN(true), AZURE_EVENT_HUB(true), OPC_UA(true),
CUSTOM(false, true), UDP(false, true), TCP(false, true);

IntegrationType(boolean singleton) {
Expand Down
85 changes: 85 additions & 0 deletions common/integration/aws-integrations/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<!--
ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
Copyright © 2016-2019 ThingsBoard, Inc. All Rights Reserved.
NOTICE: All information contained herein is, and remains
the property of ThingsBoard, Inc. and its suppliers,
if any. The intellectual and technical concepts contained
herein are proprietary to ThingsBoard, Inc.
and its suppliers and may be covered by U.S. and Foreign Patents,
patents in process, and are protected by trade secret or copyright law.
Dissemination of this information or reproduction of this material is strictly forbidden
unless prior written permission is obtained from COMPANY.
Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
managers or contractors who have executed Confidentiality and Non-disclosure agreements
explicitly covering such access.
The copyright notice above does not evidence any actual or intended publication
or disclosure of this source code, which includes
information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.common</groupId>
<version>2.4.1PE-SNAPSHOT</version>
<artifactId>integration</artifactId>
</parent>
<groupId>org.thingsboard.common.integration</groupId>
<artifactId>aws-integrations</artifactId>
<packaging>jar</packaging>

<name>Thingsboard Server AWS Integrations</name>
<url>https://thingsboard.io</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../../..</main.dir>
</properties>

<dependencies>
<dependency>
<groupId>org.thingsboard.common.integration</groupId>
<artifactId>integration-api</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common.js</groupId>
<artifactId>js-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/**
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
*
* Copyright © 2016-2019 ThingsBoard, Inc. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of ThingsBoard, Inc. and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to ThingsBoard, Inc.
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
*
* Dissemination of this information or reproduction of this material is strictly forbidden
* unless prior written permission is obtained from COMPANY.
*
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
* explicitly covering such access.
*
* The copyright notice above does not evidence any actual or intended publication
* or disclosure of this source code, which includes
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
*/
package org.thingsboard.integration.aws.sqs;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import org.thingsboard.integration.api.AbstractIntegration;
import org.thingsboard.integration.api.IntegrationContext;
import org.thingsboard.integration.api.TbIntegrationInitParams;
import org.thingsboard.integration.api.data.UplinkData;
import org.thingsboard.integration.api.data.UplinkMetaData;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/*
* Created by Valerii Sosliuk on 30.05.19
*/
@Slf4j
public class AwsSqsIntegration extends AbstractIntegration<SqsIntegrationMsg> {

private IntegrationContext context;
private SqsIntegrationConfiguration sqsConfiguration;
private AmazonSQS sqs;
private ScheduledFuture taskFuture;
private volatile boolean stopped;

@PostConstruct
public void init(TbIntegrationInitParams params) throws Exception {
super.init(params);
if (!this.configuration.isEnabled()) {
stopped = true;
return;
}
stopped = false;
this.context = params.getContext();
this.sqsConfiguration = mapper.readValue(
mapper.writeValueAsString(configuration.getConfiguration().get("sqsConfiguration")),
SqsIntegrationConfiguration.class);
BasicAWSCredentials awsCreds = new BasicAWSCredentials(sqsConfiguration.getAccessKeyId(), sqsConfiguration.getSecretAccessKey());
sqs = AmazonSQSClientBuilder.standard().withRegion(sqsConfiguration.getRegion())
.withCredentials(new AWSStaticCredentialsProvider(awsCreds)).build();
taskFuture = this.context.getScheduledExecutorService().schedule(this::pollMessages, sqsConfiguration.getPollingPeriodSeconds(), TimeUnit.SECONDS);
}

private void pollMessages() {
if (stopped) {
return;
}
try {
ReceiveMessageRequest sqsRequest = new ReceiveMessageRequest();
sqsRequest.setQueueUrl(sqsConfiguration.getQueueUrl());
sqsRequest.setMaxNumberOfMessages(10);
List<Message> messages = sqs.receiveMessage(sqsRequest).getMessages();
if (!CollectionUtils.isEmpty(messages)) {
for (Message message : messages) {
try {
SqsIntegrationMsg sqsMessage = toSqsIntegrationMsg(message);
process(sqsMessage);
} catch (IOException e) {
log.error("Failed to process message: " + message + ". Reason: " + e.getMessage(), e);
} finally {
sqs.deleteMessage(sqsConfiguration.getQueueUrl(), message.getReceiptHandle());
}
}
this.context.getScheduledExecutorService().submit(this::pollMessages);
} else {
taskFuture = this.context.getScheduledExecutorService().schedule(this::pollMessages, sqsConfiguration.getPollingPeriodSeconds(), TimeUnit.SECONDS);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
persistDebug(context, "Uplink", getUplinkContentType(), e.getMessage(), "ERROR", e);
taskFuture = this.context.getScheduledExecutorService().schedule(this::pollMessages, sqsConfiguration.getPollingPeriodSeconds(), TimeUnit.SECONDS);
}
}

private SqsIntegrationMsg toSqsIntegrationMsg(Message message) throws IOException {
String unescaped = StringEscapeUtils.unescapeJson(message.getBody());
unescaped = StringUtils.removeStart(unescaped, "\"");
unescaped = StringUtils.removeEnd(unescaped, "\"");
JsonNode node = mapper.readTree(unescaped);
SqsIntegrationMsg sqsMsg = new SqsIntegrationMsg(node, metadataTemplate.getKvMap());
return sqsMsg;
}

@Override
public void process(SqsIntegrationMsg message) {
try {
List<UplinkData> uplinkDataList = convertToUplinkDataList(context, message.getPayload(), new UplinkMetaData(getUplinkContentType(), message.getDeviceMetadata()));
if (uplinkDataList != null) {
for (UplinkData data : uplinkDataList) {
processUplinkData(context, data);
log.debug("[{}] Processing uplink data", data);
}
}
if (configuration.isDebugMode()) {
persistDebug(context, "Uplink", getUplinkContentType(), mapper.writeValueAsString(message.getJson()), "OK", null);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
persistDebug(context, "Uplink", getUplinkContentType(), e.getMessage(), "ERROR", e);
}
}

@PreDestroy
public void stop() {
stopped = true;
if (sqs != null) {
sqs.shutdown();
}
if (taskFuture != null) {
taskFuture.cancel(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* ThingsBoard, Inc. ("COMPANY") CONFIDENTIAL
*
* Copyright © 2016-2019 ThingsBoard, Inc. All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of ThingsBoard, Inc. and its suppliers,
* if any. The intellectual and technical concepts contained
* herein are proprietary to ThingsBoard, Inc.
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
*
* Dissemination of this information or reproduction of this material is strictly forbidden
* unless prior written permission is obtained from COMPANY.
*
* Access to the source code contained herein is hereby forbidden to anyone except current COMPANY employees,
* managers or contractors who have executed Confidentiality and Non-disclosure agreements
* explicitly covering such access.
*
* The copyright notice above does not evidence any actual or intended publication
* or disclosure of this source code, which includes
* information that is confidential and/or proprietary, and is a trade secret, of COMPANY.
* ANY REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE,
* OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE CODE WITHOUT
* THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED,
* AND IN VIOLATION OF APPLICABLE LAWS AND INTERNATIONAL TREATIES.
* THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION
* DOES NOT CONVEY OR IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS,
* OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT MAY DESCRIBE, IN WHOLE OR IN PART.
*/
package org.thingsboard.integration.aws.sqs;

import lombok.Data;

import java.util.concurrent.TimeUnit;

/*
* Created by Valerii Sosliuk on 03.06.19
*/
@Data
public class SqsIntegrationConfiguration {

private String queueUrl;
private String region;
private String accessKeyId;
private String secretAccessKey;
private Long pollingPeriodSeconds;
}

0 comments on commit 2ff279b

Please sign in to comment.