Skip to content

Commit

Permalink
Merge pull request #5749 from dmytro-landiak/queue-factories-fix
Browse files Browse the repository at this point in the history
[3.3.3] Queue factories improvements
  • Loading branch information
ikulikov committed Dec 20, 2021
2 parents 82dc38a + c195fc2 commit 3ccc0bd
Show file tree
Hide file tree
Showing 18 changed files with 150 additions and 72 deletions.
Expand Up @@ -102,7 +102,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProd

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand All @@ -112,7 +112,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {

@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}

@Override
Expand Down Expand Up @@ -182,13 +182,13 @@ public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageSta

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
Expand All @@ -63,6 +64,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
Expand All @@ -77,14 +79,16 @@ public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings,
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.sqsSettings = sqsSettings;
this.coreSettings = coreSettings;
this.transportApiSettings = transportApiSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;

this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
Expand All @@ -95,7 +99,7 @@ public AwsSqsTbCoreQueueFactory(TbAwsSqsSettings sqsSettings,

@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}

@Override
Expand All @@ -105,7 +109,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProd

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand All @@ -115,7 +119,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {

@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}

@Override
Expand All @@ -139,7 +143,7 @@ public TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> createTransportA

@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getResponsesTopic());
}

@Override
Expand Down Expand Up @@ -172,13 +176,13 @@ public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageSta

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
Expand All @@ -57,6 +58,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbAwsSqsSettings sqsSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
Expand All @@ -68,13 +70,15 @@ public AwsSqsTbRuleEngineQueueFactory(PartitionService partitionService, TbQueue
TbServiceInfoProvider serviceInfoProvider,
TbAwsSqsSettings sqsSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.ruleEngineSettings = ruleEngineSettings;
this.sqsSettings = sqsSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;

this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
Expand All @@ -84,17 +88,17 @@ public AwsSqsTbRuleEngineQueueFactory(PartitionService partitionService, TbQueue

@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand All @@ -104,7 +108,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {

@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}

@Override
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
Expand All @@ -51,34 +52,39 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
private final TbAwsSqsSettings sqsSettings;
private final TbQueueCoreSettings coreSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRuleEngineSettings ruleEngineSettings;

private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;

public AwsSqsTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbAwsSqsSettings sqsSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRuleEngineSettings ruleEngineSettings) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.sqsSettings = sqsSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;

this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes());
this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
}

@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());

TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings,
transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
Expand All @@ -96,7 +102,7 @@ public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQu

@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}

@Override
Expand Down Expand Up @@ -126,5 +132,8 @@ private void destroy() {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}
Expand Up @@ -131,7 +131,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleE
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}

Expand All @@ -151,7 +151,7 @@ public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotif
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}

Expand Down Expand Up @@ -328,5 +328,8 @@ private void destroy() {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

0 comments on commit 3ccc0bd

Please sign in to comment.