Skip to content

Commit 3af40bb

Browse files
process subscriptions improvements - when collecting persistent subscriptions and delivering msg for non-persistent
1 parent a1dd722 commit 3af40bb

File tree

3 files changed

+43
-29
lines changed

3 files changed

+43
-29
lines changed

application/src/main/java/org/thingsboard/mqtt/broker/service/processing/MsgDispatcherServiceImpl.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -150,38 +150,26 @@ private void processPersistentSubscriptions(PublishMsgWithId publishMsgWithId, P
150150

151151
PersistentMsgSubscriptions processBasicAndCollectPersistentSubscriptions(MsgSubscriptions msgSubscriptions,
152152
PublishMsgProto publishMsgProto) {
153-
List<Subscription> applicationSubscriptions = null;
154-
List<Subscription> deviceSubscriptions = null;
153+
final PersistentMsgSubscriptions persistentSubscriptions = new PersistentMsgSubscriptions(
154+
null, null, msgSubscriptions.getAllApplicationSharedSubscriptions()
155+
);
155156
long startTime = System.nanoTime();
156157

157158
if (!CollectionUtils.isEmpty(msgSubscriptions.getCommonSubscriptions())) {
158-
int commonSubsSize = msgSubscriptions.getCommonSubscriptions().size();
159-
applicationSubscriptions = initArrayList(commonSubsSize);
160-
deviceSubscriptions = initArrayList(commonSubsSize);
161-
processSubscriptions(msgSubscriptions.getCommonSubscriptions(), publishMsgProto,
162-
applicationSubscriptions, deviceSubscriptions);
159+
processSubscriptions(msgSubscriptions.getCommonSubscriptions(), publishMsgProto, persistentSubscriptions);
163160
}
164-
165161
if (!CollectionUtils.isEmpty(msgSubscriptions.getTargetDeviceSharedSubscriptions())) {
166-
int targetDeviceSharedSubsSize = msgSubscriptions.getTargetDeviceSharedSubscriptions().size();
167-
applicationSubscriptions = initSubscriptionListIfNull(applicationSubscriptions, targetDeviceSharedSubsSize);
168-
deviceSubscriptions = initSubscriptionListIfNull(deviceSubscriptions, targetDeviceSharedSubsSize);
169-
processSubscriptions(msgSubscriptions.getTargetDeviceSharedSubscriptions(), publishMsgProto,
170-
applicationSubscriptions, deviceSubscriptions);
162+
processSubscriptions(msgSubscriptions.getTargetDeviceSharedSubscriptions(), publishMsgProto, persistentSubscriptions);
171163
}
172164

173165
if (publishMsgProcessingTimerStats != null) {
174166
publishMsgProcessingTimerStats.logNotPersistentMessagesProcessing(startTime, TimeUnit.NANOSECONDS);
175167
}
176-
return new PersistentMsgSubscriptions(
177-
deviceSubscriptions,
178-
applicationSubscriptions,
179-
msgSubscriptions.getAllApplicationSharedSubscriptions()
180-
);
168+
return persistentSubscriptions;
181169
}
182170

183171
private void processSubscriptions(List<Subscription> subscriptions, PublishMsgProto publishMsgProto,
184-
List<Subscription> applicationSubscriptions, List<Subscription> deviceSubscriptions) {
172+
final PersistentMsgSubscriptions persistentMsgSubscriptions) {
185173
boolean nonPersistentByPubQos = publishMsgProto.getQos() == MqttQoS.AT_MOST_ONCE.value();
186174
if (nonPersistentByPubQos) {
187175
if (processSubscriptionsInParallel) {
@@ -194,13 +182,25 @@ private void processSubscriptions(List<Subscription> subscriptions, PublishMsgPr
194182
}
195183
}
196184
} else {
185+
persistentMsgSubscriptions.setDeviceSubscriptions(initSubscriptionListIfNull(persistentMsgSubscriptions.getDeviceSubscriptions(), subscriptions.size()));
186+
persistentMsgSubscriptions.setApplicationSubscriptions(initSubscriptionListIfNull(persistentMsgSubscriptions.getApplicationSubscriptions(), subscriptions.size()));
197187
if (processSubscriptionsInParallel) {
198188
subscriptions
199189
.parallelStream()
200-
.forEach(subscription -> processSubscription(subscription, publishMsgProto, applicationSubscriptions, deviceSubscriptions));
190+
.forEach(subscription -> processSubscription(
191+
subscription,
192+
publishMsgProto,
193+
persistentMsgSubscriptions.getApplicationSubscriptions(),
194+
persistentMsgSubscriptions.getDeviceSubscriptions())
195+
);
201196
} else {
202197
for (Subscription subscription : subscriptions) {
203-
processSubscription(subscription, publishMsgProto, applicationSubscriptions, deviceSubscriptions);
198+
processSubscription(
199+
subscription,
200+
publishMsgProto,
201+
persistentMsgSubscriptions.getApplicationSubscriptions(),
202+
persistentMsgSubscriptions.getDeviceSubscriptions()
203+
);
204204
}
205205
}
206206
}

application/src/main/java/org/thingsboard/mqtt/broker/service/processing/data/PersistentMsgSubscriptions.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@
1515
*/
1616
package org.thingsboard.mqtt.broker.service.processing.data;
1717

18+
import lombok.AllArgsConstructor;
1819
import lombok.Data;
19-
import lombok.RequiredArgsConstructor;
2020
import org.springframework.util.CollectionUtils;
2121
import org.thingsboard.mqtt.broker.service.subscription.Subscription;
2222

2323
import java.util.List;
2424
import java.util.Set;
2525

2626
@Data
27-
@RequiredArgsConstructor
27+
@AllArgsConstructor
2828
public class PersistentMsgSubscriptions {
2929

30-
private final List<Subscription> deviceSubscriptions;
31-
private final List<Subscription> applicationSubscriptions;
32-
private final Set<Subscription> allApplicationSharedSubscriptions;
30+
private List<Subscription> deviceSubscriptions;
31+
private List<Subscription> applicationSubscriptions;
32+
private Set<Subscription> allApplicationSharedSubscriptions;
3333

3434
public boolean isNotEmpty() {
3535
return !CollectionUtils.isEmpty(deviceSubscriptions) ||

application/src/test/java/org/thingsboard/mqtt/broker/service/processing/MsgDispatcherServiceImplTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,29 +311,39 @@ public void testProcessBasicAndCollectPersistentSubscriptions() {
311311
ClientSessionInfo clientSessionInfo3 = mock(ClientSessionInfo.class);
312312
ClientSessionInfo clientSessionInfo4 = mock(ClientSessionInfo.class);
313313
ClientSessionInfo clientSessionInfo5 = mock(ClientSessionInfo.class);
314+
ClientSessionInfo clientSessionInfo6 = mock(ClientSessionInfo.class);
315+
ClientSessionInfo clientSessionInfo7 = mock(ClientSessionInfo.class);
314316

315317
when(clientSessionInfo1.isPersistent()).thenReturn(true);
316318
when(clientSessionInfo2.isPersistent()).thenReturn(true);
317319
when(clientSessionInfo5.isPersistent()).thenReturn(false);
320+
when(clientSessionInfo6.isPersistent()).thenReturn(true);
321+
when(clientSessionInfo7.isPersistent()).thenReturn(true);
318322

319323
when(clientSessionInfo1.getType()).thenReturn(ClientType.APPLICATION);
320324
when(clientSessionInfo2.getType()).thenReturn(ClientType.APPLICATION);
321325
when(clientSessionInfo5.getType()).thenReturn(ClientType.DEVICE);
326+
when(clientSessionInfo6.getType()).thenReturn(ClientType.DEVICE);
327+
when(clientSessionInfo7.getType()).thenReturn(ClientType.DEVICE);
322328

323329
mockClientSessionGetClientId(clientSessionInfo1, "clientId1");
324330
mockClientSessionGetClientId(clientSessionInfo2, "clientId2");
331+
mockClientSessionGetClientId(clientSessionInfo6, "clientId6");
332+
mockClientSessionGetClientId(clientSessionInfo7, "clientId7");
325333

326334
MsgSubscriptions msgSubscriptions = new MsgSubscriptions(
327335
List.of(
328336
new Subscription("test/topic/1", 1, clientSessionInfo1),
329-
new Subscription("test/+/1", 2, clientSessionInfo2)
337+
new Subscription("test/+/1", 2, clientSessionInfo2),
338+
new Subscription("test/+/1", 1, clientSessionInfo7)
330339
),
331340
Set.of(
332341
new Subscription("#", 2, clientSessionInfo3),
333342
new Subscription("test/#", 0, clientSessionInfo4)
334343
),
335344
List.of(
336-
new Subscription("test/topic/#", 1, clientSessionInfo5)
345+
new Subscription("test/topic/#", 1, clientSessionInfo5),
346+
new Subscription("+/topic/1", 2, clientSessionInfo6)
337347
)
338348
);
339349
QueueProtos.PublishMsgProto publishMsgProto = QueueProtos.PublishMsgProto
@@ -346,7 +356,11 @@ public void testProcessBasicAndCollectPersistentSubscriptions() {
346356
msgDispatcherService.processBasicAndCollectPersistentSubscriptions(msgSubscriptions, publishMsgProto);
347357

348358
assertEquals(persistentMsgSubscriptions.getAllApplicationSharedSubscriptions(), msgSubscriptions.getAllApplicationSharedSubscriptions());
349-
assertTrue(persistentMsgSubscriptions.getDeviceSubscriptions().isEmpty());
359+
360+
assertEquals(2, persistentMsgSubscriptions.getDeviceSubscriptions().size());
361+
List<String> devClientIds = getClientIds(persistentMsgSubscriptions.getDeviceSubscriptions().stream());
362+
assertTrue(devClientIds.containsAll(List.of("clientId6", "clientId7")));
363+
350364
assertEquals(2, persistentMsgSubscriptions.getApplicationSubscriptions().size());
351365
List<String> appClientIds = getClientIds(persistentMsgSubscriptions.getApplicationSubscriptions().stream());
352366
assertTrue(appClientIds.containsAll(List.of("clientId1", "clientId2")));

0 commit comments

Comments
 (0)