Skip to content

Commit

Permalink
feat(muc/sub): correctly handle archive messages delivered via MUC/Sub
Browse files Browse the repository at this point in the history
- extend mock interface with MUC/Sub handling and room creation features
- add tests
- minor refactorings and stylistic fixes
  • Loading branch information
BendingBender committed Aug 18, 2021
1 parent 044ffa7 commit 921f573
Show file tree
Hide file tree
Showing 19 changed files with 618 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ export class ChatMessageListComponent implements OnInit, OnDestroy, OnChanges, A
filter(() => this.isNearBottom()),
takeUntil(this.ngDestroy),
)
.subscribe((message) => this.scheduleScrollToLastMessage());
.subscribe((_) => this.scheduleScrollToLastMessage());

if (this.recipient.messages.length < 10) {
await this.loadMessages(); // in case insufficient old messages are displayed
Expand Down
9 changes: 6 additions & 3 deletions projects/pazznetwork/ngx-chat/src/lib/core/contact.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ describe('contact', () => {
body: '',
direction: Direction.in,
id: '1',
delayed: false
delayed: false,
fromArchive: false,
};
contact.addMessage(message);
contact.addMessage(message);
Expand All @@ -41,7 +42,8 @@ describe('contact', () => {
datetime: new Date(),
body: '',
direction: Direction.in,
delayed: false
delayed: false,
fromArchive: false,
};
contact.addMessage(message);
contact.addMessage(message);
Expand Down Expand Up @@ -85,7 +87,8 @@ describe('contact', () => {
datetime: new Date(new Date(date)),
body: '',
direction: Direction.in,
delayed: false
delayed: false,
fromArchive: false,
};
}

Expand Down
1 change: 1 addition & 0 deletions projects/pazznetwork/ngx-chat/src/lib/core/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export interface Message {
datetime: Date;
id?: string;
delayed: boolean;
fromArchive: boolean;
/**
* if no explicit state is set for the message, use implicit contact message states instead.
*/
Expand Down
5 changes: 3 additions & 2 deletions projects/pazznetwork/ngx-chat/src/lib/ngx-chat.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ export class NgxChatModule {
const multiUserChatPlugin = new MultiUserChatPlugin(xmppChatAdapter, logService, serviceDiscoveryPlugin);
const unreadMessageCountPlugin = new UnreadMessageCountPlugin(
xmppChatAdapter, chatMessageListRegistryService, publishSubscribePlugin, entityTimePlugin, multiUserChatPlugin);
const messagePlugin = new MessagePlugin(xmppChatAdapter, logService);

xmppChatAdapter.addPlugins([
new BookmarkPlugin(publishSubscribePlugin),
new MessageArchivePlugin(xmppChatAdapter, serviceDiscoveryPlugin, multiUserChatPlugin, logService),
new MessagePlugin(xmppChatAdapter, logService),
new MessageArchivePlugin(xmppChatAdapter, serviceDiscoveryPlugin, multiUserChatPlugin, logService, messagePlugin),
messagePlugin,
new MessageUuidPlugin(),
multiUserChatPlugin,
publishSubscribePlugin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,50 @@ import { XmppChatConnectionService } from '../xmpp-chat-connection.service';
import { XmppClientFactoryService } from '../xmpp-client-factory.service';
import { MessageArchivePlugin } from './message-archive.plugin';
import SpyObj = jasmine.SpyObj;
import { MessagePlugin } from './message.plugin';
import { ServiceDiscoveryPlugin } from './service-discovery.plugin';
import { MultiUserChatPlugin, Room } from './multi-user-chat.plugin';

describe('message archive plugin', () => {

let chatConnectionService: XmppChatConnectionService;
let chatAdapter: XmppChatAdapter;
let contactFactory: ContactFactoryService;
let xmppClientMock: SpyObj<Client>;
let contact1: Contact;
let otherContact: Contact;
const otherContactJid = parseJid('someone@else.com');
const userJid = parseJid('me@example.com/myresource');
const roomJid = parseJid('someroom@conference.example.com/mynick');

const validArchiveStanza =
const chatArchiveStanza =
xml('message', {},
xml('result', {xmlns: 'urn:xmpp:mam:2'},
xml('forwarded', {},
xml('delay', {stamp: '2018-07-18T08:47:44.233057Z'}),
xml('message', {to: userJid.toString(), from: 'someone@else.com/resource', type: 'chat'},
xml('message', {from: userJid.toString(), to: otherContactJid.toString(), type: 'chat'},
xml('origin-id', {id: 'id'}),
xml('body', {}, 'message text')))));

const groupChatArchiveStanza =
xml('message', {},
xml('result', {xmlns: 'urn:xmpp:mam:2'},
xml('forwarded', {},
xml('delay', {stamp: '2021-08-17T15:33:25.375401Z'}),
xml('message', {from: roomJid.bare().toString() + '/othernick', type: 'groupchat'},
xml('body', {}, 'group chat!')))));

const mucSubArchiveStanza =
xml('message', {},
xml('result', {xmlns: 'urn:xmpp:mam:2'},
xml('forwarded', {},
xml('delay', {stamp: '2021-08-17T15:33:25.375401Z'}),
xml('message', {},
xml('event', {xmlns: 'http://jabber.org/protocol/pubsub#event'},
xml('items', {node: 'urn:xmpp:mucsub:nodes:messages'},
xml('item', {},
xml('message', {from: roomJid.bare().toString() + '/othernick', type: 'groupchat'},
xml('body', {}, 'group chat!'))))))))); // see: https://xkcd.com/297/

beforeEach(() => {
const mockClientFactory = new MockClientFactory();
xmppClientMock = mockClientFactory.clientInstance;
Expand All @@ -49,31 +74,92 @@ describe('message archive plugin', () => {

contactFactory = TestBed.inject(ContactFactoryService);
chatAdapter = TestBed.inject(XmppChatAdapter);
contact1 = contactFactory.createContact('someone@else.com', 'jon doe');
otherContact = contactFactory.createContact(otherContactJid.toString(), 'jon doe');
});

it('should send a request, create contacts and add messages ', () => {
const messageArchivePlugin = new MessageArchivePlugin(chatAdapter, null, null, testLogService());
it('should handle chat messages from archive by creating contacts and adding messages to contacts', () => {
const serviceDiscoveryPlugin = {
supportsFeature() {
return Promise.resolve(false);
}
} as any as ServiceDiscoveryPlugin;
const messagePlugin = new MessagePlugin(chatAdapter, testLogService());
const messageArchivePlugin = new MessageArchivePlugin(chatAdapter, serviceDiscoveryPlugin, null, testLogService(), messagePlugin);
chatAdapter.addPlugins([messageArchivePlugin]);
chatConnectionService.onOnline(userJid);

chatConnectionService.onStanzaReceived(validArchiveStanza);
chatConnectionService.onStanzaReceived(chatArchiveStanza);

const contacts = chatAdapter.contacts$.getValue();
expect(contacts.length).toEqual(1);
expect(contacts[0].jidBare).toEqual(contact1.jidBare);
expect(contacts.length).toBe(1);
expect(contacts[0].jidBare).toEqual(otherContact.jidBare);

const messages = contacts[0].messages;
expect(messages.length).toEqual(1);
expect(messages[0].body).toEqual('message text');
expect(messages[0].direction).toEqual(Direction.in);
expect(messages.length).toBe(1);
expect(messages[0].body).toBe('message text');
expect(messages[0].direction).toBe(Direction.out);
expect(messages[0].datetime).toEqual(new Date('2018-07-18T08:47:44.233057Z'));
expect(messages[0].fromArchive).toBe(true);
});

it('should handle group chat messages by adding them to appropriate rooms', () => {
const serviceDiscoveryPlugin = {
supportsFeature() {
return Promise.resolve(false);
}
} as unknown as ServiceDiscoveryPlugin;
const logService = testLogService();
const multiUserChatPlugin = new MultiUserChatPlugin(chatAdapter, logService, null);
const messageArchivePlugin = new MessageArchivePlugin(chatAdapter, serviceDiscoveryPlugin, multiUserChatPlugin, logService, null);
chatAdapter.addPlugins([messageArchivePlugin, multiUserChatPlugin]);
chatConnectionService.onOnline(userJid);
multiUserChatPlugin.rooms$.next([new Room(roomJid, logService)]);

chatConnectionService.onStanzaReceived(groupChatArchiveStanza);

const roomMessages = multiUserChatPlugin.rooms$.getValue()[0].messages;

expect(roomMessages.length).toBe(1);

const roomMessage = roomMessages[0];

expect(roomMessage.body).toBe('group chat!');
expect(roomMessage.datetime).toEqual(new Date('2021-08-17T15:33:25.375401Z'));
expect(roomMessage.direction).toBe(Direction.in);
expect(roomMessage.fromArchive).toBe(true);
});

it('should handle MUC/Sub archive stanzas correctly', () => {
const serviceDiscoveryPlugin = {
supportsFeature() {
return Promise.resolve(false);
}
} as unknown as ServiceDiscoveryPlugin;
const logService = testLogService();
const multiUserChatPlugin = new MultiUserChatPlugin(chatAdapter, logService, null);
const messageArchivePlugin = new MessageArchivePlugin(chatAdapter, serviceDiscoveryPlugin, multiUserChatPlugin, logService, null);
chatAdapter.addPlugins([messageArchivePlugin, multiUserChatPlugin]);
chatConnectionService.onOnline(userJid);
multiUserChatPlugin.rooms$.next([new Room(roomJid, logService)]);

chatConnectionService.onStanzaReceived(mucSubArchiveStanza);

const roomMessages = multiUserChatPlugin.rooms$.getValue()[0].messages;

expect(roomMessages.length).toBe(1);

const roomMessage = roomMessages[0];

expect(roomMessage.body).toBe('group chat!');
expect(roomMessage.datetime).toEqual(new Date('2021-08-17T15:33:25.375401Z'));
expect(roomMessage.direction).toBe(Direction.in);
expect(roomMessage.fromArchive).toBe(true);
});

it('should not request messages if message archive plugin is not set ', () => {
chatConnectionService.onOnline(userJid);

chatConnectionService.onStanzaReceived(validArchiveStanza);
chatConnectionService.onStanzaReceived(chatArchiveStanza);

expect(chatAdapter.contacts$.getValue()).toEqual([]);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
import { jid as parseJid, xml } from '@xmpp/client';
import { xml } from '@xmpp/client';
import { Element } from 'ltx';
import { Subject } from 'rxjs';
import { debounceTime, filter } from 'rxjs/operators';
import { Direction } from '../../../../core/message';
import { Recipient } from '../../../../core/recipient';
import { Stanza } from '../../../../core/stanza';
import { IqResponseStanza, Stanza } from '../../../../core/stanza';
import { LogService } from '../../../log.service';
import { XmppChatAdapter } from '../xmpp-chat-adapter.service';
import { AbstractXmppPlugin } from './abstract-xmpp-plugin';
import { MessageUuidPlugin } from './message-uuid.plugin';
import { MultiUserChatPlugin } from './multi-user-chat.plugin';
import { ServiceDiscoveryPlugin } from './service-discovery.plugin';
import { PUBSUB_EVENT_XMLNS } from './publish-subscribe.plugin';
import { MessagePlugin } from './message.plugin';
import { MUC_SUB_EVENT_TYPE } from './muc-sub.plugin';

/**
* https://xmpp.org/extensions/xep-0313.html
* Message Archive Management
*/
export class MessageArchivePlugin extends AbstractXmppPlugin {

private mamMessageReceived$ = new Subject<void>();
private readonly mamMessageReceived$ = new Subject<void>();

constructor(
private chatService: XmppChatAdapter,
private serviceDiscoveryPlugin: ServiceDiscoveryPlugin,
private multiUserChatPlugin: MultiUserChatPlugin,
private logService: LogService,
private readonly chatService: XmppChatAdapter,
private readonly serviceDiscoveryPlugin: ServiceDiscoveryPlugin,
private readonly multiUserChatPlugin: MultiUserChatPlugin,
private readonly logService: LogService,
private readonly messagePlugin: MessagePlugin,
) {
super();

this.chatService.state$
.pipe(filter(state => state === 'online'))
.subscribe(async () => {
if (await this.supportsMessageArchiveManagement()) {
this.requestNewestMessages();
await this.requestNewestMessages();
}
});

Expand All @@ -41,8 +44,8 @@ export class MessageArchivePlugin extends AbstractXmppPlugin {
.subscribe(() => this.chatService.contacts$.next(this.chatService.contacts$.getValue()));
}

private requestNewestMessages() {
this.chatService.chatConnectionService.sendIq(
private async requestNewestMessages(): Promise<void> {
await this.chatService.chatConnectionService.sendIq(
xml('iq', {type: 'set'},
xml('query', {xmlns: 'urn:xmpp:mam:2'},
xml('set', {xmlns: 'http://jabber.org/protocol/rsm'},
Expand All @@ -54,7 +57,7 @@ export class MessageArchivePlugin extends AbstractXmppPlugin {
);
}

async loadMostRecentUnloadedMessages(recipient: Recipient) {
async loadMostRecentUnloadedMessages(recipient: Recipient): Promise<void> {
// for user-to-user chats no to-attribute is necessary, in case of multi-user-chats it has to be set to the bare room jid
const to = recipient.recipientType === 'room' ? recipient.roomJid.toString() : undefined;

Expand Down Expand Up @@ -86,7 +89,7 @@ export class MessageArchivePlugin extends AbstractXmppPlugin {
await this.chatService.chatConnectionService.sendIq(request);
}

async loadAllMessages() {
async loadAllMessages(): Promise<void> {
if (!(await this.supportsMessageArchiveManagement())) {
throw new Error('message archive management not suppported');
}
Expand All @@ -112,7 +115,7 @@ export class MessageArchivePlugin extends AbstractXmppPlugin {
}
}

private async supportsMessageArchiveManagement() {
private async supportsMessageArchiveManagement(): Promise<boolean> {
const supportsMessageArchiveManagement = await this.serviceDiscoveryPlugin.supportsFeature(
this.chatService.chatConnectionService.userJid.bare().toString(), 'urn:xmpp:mam:2');
if (!supportsMessageArchiveManagement) {
Expand All @@ -121,53 +124,56 @@ export class MessageArchivePlugin extends AbstractXmppPlugin {
return supportsMessageArchiveManagement;
}

handleStanza(stanza: Stanza) {
handleStanza(stanza: Stanza): boolean {
if (this.isMamMessageStanza(stanza)) {
this.handleMamMessageStanza(stanza);
return true;
}
return false;
}

private isMamMessageStanza(stanza: Stanza) {
private isMamMessageStanza(stanza: Stanza): boolean {
const result = stanza.getChild('result');
return stanza.name === 'message' && result && result.attrs.xmlns === 'urn:xmpp:mam:2';
return stanza.name === 'message' && result?.attrs.xmlns === 'urn:xmpp:mam:2';
}

private handleMamMessageStanza(stanza: Stanza) {
private handleMamMessageStanza(stanza: Stanza): void {
const forwardedElement = stanza.getChild('result').getChild('forwarded');
const messageElement = forwardedElement.getChild('message');
const delayElement = forwardedElement.getChild('delay');

const eventElement = messageElement.getChild('event', PUBSUB_EVENT_XMLNS);
if (messageElement.getAttr('type') == null && eventElement != null) {
this.handlePubSubEvent(eventElement, delayElement);
} else {
this.handleArchivedMessage(messageElement, delayElement);
}
}

private handleArchivedMessage(messageElement: Stanza, delayEl: Element): void {
const type = messageElement.getAttr('type');
if (type === 'chat') {
// TODO: messagePlugin.handleMessage should be refactored so that it can
// handle messageElement like multiUserChatPlugin.handleRoomMessageStanza
// after refactoring just delegate to messagePlugin.handleMessage(messageElement, forwardedElement.getChild('delay')
const isAddressedToMe = this.chatService.chatConnectionService.userJid.bare()
.equals(parseJid(messageElement.attrs.to).bare());

const messageBody = messageElement.getChildText('body')?.trim();
if (messageBody) {
const contactJid = isAddressedToMe ? messageElement.attrs.from : messageElement.attrs.to;
const contact = this.chatService.getOrCreateContactById(contactJid);
const datetime = new Date(
forwardedElement.getChild('delay').attrs.stamp,
);
const direction = isAddressedToMe ? Direction.in : Direction.out;

contact.addMessage({
direction,
datetime,
body: messageBody,
id: MessageUuidPlugin.extractIdFromStanza(messageElement),
delayed: true,
});
const messageHandled = this.messagePlugin.handleStanza(messageElement, delayEl);
if (messageHandled) {
this.mamMessageReceived$.next();
}
} else if (type === 'groupchat') {
this.multiUserChatPlugin.handleRoomMessageStanza(messageElement, forwardedElement.getChild('delay'));
this.multiUserChatPlugin.handleRoomMessageStanza(messageElement, delayEl);
} else {
throw new Error('unknown archived message type: ' + type);
throw new Error(`unknown archived message type: ${type}`);
}
}

private handlePubSubEvent(eventElement: Element, delayElement: Element): void {
const itemsElement = eventElement.getChild('items');
const itemsNode = itemsElement?.attrs.node;

if (itemsNode !== MUC_SUB_EVENT_TYPE.messages) {
this.logService.warn(`Handling of MUC/Sub message types other than ${MUC_SUB_EVENT_TYPE.messages} isn't implemented yet!`);
return;
}

const itemElements = itemsElement.getChildren('item');
itemElements.forEach((itemEl) => this.handleArchivedMessage(itemEl.getChild('message'), delayElement));
}
}

0 comments on commit 921f573

Please sign in to comment.