Skip to content

Commit

Permalink
🌟 Added link preview service (#2204)
Browse files Browse the repository at this point in the history
* 🌟 added link preview service

* unit tests

* moved links preview callback handler to message service

* added publish message in realtime helper

* publish message in realtime after preview generation

* publish message only to channels
  • Loading branch information
rezk2ll authored and Labels Bot committed Jun 2, 2022
1 parent d278e56 commit d9782bb
Show file tree
Hide file tree
Showing 27 changed files with 618 additions and 71 deletions.
4 changes: 4 additions & 0 deletions twake/backend/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"@types/node-uuid": "^0.0.28",
"@types/pdf-image": "^2.0.1",
"@types/pino": "^6.3.2",
"@types/probe-image-size": "^7.0.1",
"@types/pump": "^1.1.1",
"@types/socket.io-client": "^1.4.34",
"@types/supertest": "2.0.4",
Expand Down Expand Up @@ -134,6 +135,8 @@
"fluent-ffmpeg": "^2.1.2",
"fold-to-ascii": "^5.0.0",
"generate-password": "^1.6.0",
"get-website-favicon": "^0.0.7",
"html-metadata-parser": "^2.0.4",
"jsonwebtoken": "^8.5.1",
"lodash": "^4.17.21",
"match-all": "^1.2.6",
Expand All @@ -148,6 +151,7 @@
"ora": "^5.4.0",
"pdf2pic": "^2.1.4",
"pino": "^6.8.0",
"probe-image-size": "^7.2.3",
"pump": "^3.0.0",
"redis": "3",
"reflect-metadata": "^0.1.13",
Expand Down
1 change: 1 addition & 0 deletions twake/backend/node/src/@types/get-website-favicon.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
declare module "get-website-favicon";
19 changes: 14 additions & 5 deletions twake/backend/node/src/services/global-resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ import { MobilePushService } from "./notifications/services/mobile-push";
import { ChannelMemberPreferencesServiceImpl } from "./notifications/services/channel-preferences";
import { ChannelThreadUsersServiceImpl } from "./notifications/services/channel-thread-users";
import { PushServiceAPI } from "../core/platform/services/push/api";
import { PreviewProcessService } from "./previews/services/processing/service";
import { PreviewServiceAPI } from "./previews/types";
import { PreviewProcessService } from "./previews/services/files/processing/service";
import { LinkPreviewServiceAPI, PreviewServiceAPI } from "./previews/types";
import { CronAPI } from "../core/platform/services/cron/api";
import WebSocketAPI from "../core/platform/services/websocket/provider";
import TrackerAPI from "../core/platform/services/tracker/provider";
import { ApplicationHooksService } from "./applications/services/hooks";
import { OnlineServiceAPI } from "./online/api";
import OnlineServiceImpl from "./online/service";
import { PreviewEngine } from "./previews/services/engine";
import { PreviewEngine } from "./previews/services/files/engine";
import KnowledgeGraphService from "../core/platform/services/knowledge-graph";
import { ChannelsPubsubListener } from "./channels/services/pubsub";
import { LinkPreviewProcessService } from "./previews/services/links/processing/service";
import { LinkPreviewEngine } from "./previews/services/links/engine";

type PlatformServices = {
auth: AuthServiceAPI;
Expand Down Expand Up @@ -105,7 +107,10 @@ type TwakeServices = {
preferences: UserNotificationPreferencesAPI;
mobilePush: MobilePushService;
};
preview: PreviewServiceAPI;
preview: {
files: PreviewServiceAPI;
links: LinkPreviewServiceAPI;
};
messages: {
messages: MessageThreadMessagesServiceAPI;
threads: MessageThreadsServiceAPI;
Expand Down Expand Up @@ -167,6 +172,7 @@ class GlobalResolver {
});

await new PreviewEngine().init();
await new LinkPreviewEngine().init();

this.services = {
workspaces: await new WorkspaceServiceImpl().init(),
Expand All @@ -183,7 +189,10 @@ class GlobalResolver {
preferences: await new NotificationPreferencesService().init(),
mobilePush: await new MobilePushService().init(),
},
preview: await new PreviewProcessService().init(),
preview: {
files: await new PreviewProcessService().init(),
links: await new LinkPreviewProcessService().init(),
},
messages: {
messages: await new ThreadMessagesService().init(platform),
threads: await new ThreadsService().init(platform),
Expand Down
13 changes: 13 additions & 0 deletions twake/backend/node/src/services/messages/entities/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ export class Message {
workspace_id: string;
channel_id: string;
};

@Column("links", "encoded_json")
links: null | MessageLinks[];
}

export type MessageReaction = { count: number; name: string; users: string[] };
Expand Down Expand Up @@ -132,3 +135,13 @@ export type MessageWithUsers = Message & {
users?: UserObject[];
application?: Partial<Application>;
};

export type MessageLinks = {
title: string;
description: string | null;
domain: string;
img: string | null;
favicon: string | null;
img_width: number | null;
img_height: number | null;
};
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import _ from "lodash";
import { StatisticsMessageProcessor } from "../../../statistics/pubsub/messages";
import { MessageToHooksProcessor } from "./processors/message-to-hooks";
import gr from "../../../global-resolver";
import { MessageLinksPreviewFinishedProcessor } from "./processors/links";
import { Message } from "../../entities/messages";

export class MessagesEngine implements Initializable {
private channelViewProcessor: ChannelViewProcessor;
Expand All @@ -26,6 +28,7 @@ export class MessagesEngine implements Initializable {
private messageToHooks: MessageToHooksProcessor;

private threadRepository: Repository<Thread>;
private messageRepository: Repository<Message>;

constructor() {
this.channelViewProcessor = new ChannelViewProcessor();
Expand Down Expand Up @@ -68,6 +71,7 @@ export class MessagesEngine implements Initializable {

async init(): Promise<this> {
this.threadRepository = await gr.database.getRepository<Thread>("threads", Thread);
this.messageRepository = await gr.database.getRepository<Message>("messages", Message);

await this.channelViewProcessor.init();
await this.channelMarkedViewProcessor.init();
Expand All @@ -77,6 +81,9 @@ export class MessagesEngine implements Initializable {

gr.platformServices.pubsub.processor.addHandler(new ChannelSystemActivityMessageProcessor());
gr.platformServices.pubsub.processor.addHandler(new StatisticsMessageProcessor());
gr.platformServices.pubsub.processor.addHandler(
new MessageLinksPreviewFinishedProcessor(this.messageRepository, this.threadRepository),
);

localEventBus.subscribe("message:saved", async (e: MessageLocalEvent) => {
this.dispatchMessage(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
} from "../../../../../../core/platform/framework/api/crud-service";
import { getThreadMessagePath } from "../../../../web/realtime";
import gr from "../../../../../global-resolver";
import { publishMessageInRealtime } from "../../../utils";

export class ChannelViewProcessor {
repository: Repository<MessageChannelRef>;
Expand Down Expand Up @@ -111,24 +112,7 @@ export class ChannelViewProcessor {
}

//Publish message in realtime
const room = `/companies/${participant.company_id}/workspaces/${participant.workspace_id}/channels/${participant.id}/feed`;
const type = "message";
const entity = message.resource;
const context = message.context;
localEventBus.publish("realtime:publish", {
topic: message.created
? RealtimeEntityActionType.Created
: RealtimeEntityActionType.Updated,
event: {
type: type,
room: ResourcePath.get(room),
resourcePath: getThreadMessagePath(context as ThreadExecutionContext) + "/" + entity.id,
entity: entity,
result: message.created
? new CreateResult<Message>(type, entity)
: new UpdateResult<Message>(type, entity),
},
} as RealtimeLocalBusEvent<Message>);
publishMessageInRealtime(message, participant);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { logger, TwakeContext } from "../../../../../../core/platform/framework";
import { PubsubHandler } from "../../../../../../core/platform/services/pubsub/api";
import { Message } from "../../../../entities/messages";
import Repository from "../../../../../../core/platform/services/database/services/orm/repository/repository";
import { LinkPreviewPubsubCallback } from "../../../../../previews/types";
import { Thread } from "../../../../entities/threads";
import { publishMessageInRealtime } from "../../../utils";

export class MessageLinksPreviewFinishedProcessor
implements PubsubHandler<LinkPreviewPubsubCallback, string>
{
constructor(
private MessageRepository: Repository<Message>,
private ThreadRepository: Repository<Thread>,
) {}
readonly name = "MessageLinksPreviewFinishedProcessor";
readonly topics = {
in: "services:preview:links:callback",
};

readonly options = {
unique: true,
ack: true,
};

init?(context?: TwakeContext): Promise<this> {
throw new Error("Method not implemented.");
}

validate(message: LinkPreviewPubsubCallback): boolean {
return !!(message && message.previews && message.previews.length);
}

async process(localMessage: LinkPreviewPubsubCallback): Promise<string> {
logger.info(
`${this.name} - updating message links with generated previews: ${localMessage.previews.length}`,
);

const entity = await this.MessageRepository.findOne({
thread_id: localMessage.message.resource.thread_id,
id: localMessage.message.resource.thread_id,
});

if (!entity) {
logger.error(`${this.name} - message not found`);
return "";
}

entity.links = localMessage.previews;

await this.MessageRepository.save(entity);

const thread: Thread = await this.ThreadRepository.findOne({
id: localMessage.message.resource.thread_id,
});

if (!thread) {
logger.error(`${this.name} - thread not found`);
return "";
}

const updatedMessage = {
...localMessage.message,
resource: entity,
};

for (const participant of thread.participants.filter(p => p.type === "channel")) {
publishMessageInRealtime(updatedMessage, participant);
}

return "done";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from "../../../../../../core/platform/framework/api/crud-service";
import { Message } from "../../../../entities/messages";
import gr from "../../../../../global-resolver";
import { publishMessageInRealtime } from "../../../utils";

export class UserInboxViewProcessor {
repositoryRef: Repository<MessageUserInboxRef>;
Expand Down Expand Up @@ -87,24 +88,7 @@ export class UserInboxViewProcessor {
//Publish message in realtime

//TODO send a thread object instead of a message object
const room = `/companies/${channelParticipant.company_id}/users/${userParticipant.id}/inbox`;
const type = "message";
const entity = message.resource;
const context = message.context;
localEventBus.publish("realtime:publish", {
topic: message.created
? RealtimeEntityActionType.Created
: RealtimeEntityActionType.Updated,
event: {
type: type,
room: ResourcePath.get(room),
resourcePath: getThreadMessagePath(context as ThreadExecutionContext) + "/" + entity.id,
entity: entity,
result: message.created
? new CreateResult<Message>(type, entity)
: new UpdateResult<Message>(type, entity),
},
} as RealtimeLocalBusEvent<Message>);
publishMessageInRealtime(message, channelParticipant);
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion twake/backend/node/src/services/messages/services/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ import { UserObject } from "../../user/web/types";
import { formatUser } from "../../../utils/users";
import gr from "../../global-resolver";
import { getDefaultMessageInstance } from "../../../utils/messages";
import { buildMessageListPagination, getMentions } from "./utils";
import { buildMessageListPagination, getLinks, getMentions } from "./utils";
import { localEventBus } from "../../../core/platform/framework/pubsub";
import {
KnowledgeGraphEvents,
KnowledgeGraphGenericEventPayload,
} from "../../../core/platform/services/knowledge-graph/types";
import { MessageUserInboxRef } from "../entities/message-user-inbox-refs";
import { MessageUserInboxRefReversed } from "../entities/message-user-inbox-refs-reversed";
import { LinkPreviewPubsubRequest } from "../../../services/previews/types";

export class ThreadMessagesService implements MessageThreadMessagesServiceAPI {
version: "1";
Expand Down Expand Up @@ -545,6 +546,19 @@ export class ThreadMessagesService implements MessageThreadMessagesServiceAPI {
},
])
async onSaved(message: Message, options: { created?: boolean }, context: ThreadExecutionContext) {
const messageLinks = getLinks(message);

gr.platformServices.pubsub.publish<LinkPreviewPubsubRequest>("services:preview:links", {
data: {
links: messageLinks,
message: {
context,
resource: message,
created: options?.created,
},
},
});

if (options.created && !message.ephemeral) {
await gr.services.messages.threads.addReply(message.thread_id);
}
Expand Down
56 changes: 54 additions & 2 deletions twake/backend/node/src/services/messages/services/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import { FindOptions } from "../../../core/platform/services/database/services/orm/repository/repository";
import { Pagination } from "../../../core/platform/framework/api/crud-service";
import {
CreateResult,
Pagination,
UpdateResult,
} from "../../../core/platform/framework/api/crud-service";
import { Message } from "../entities/messages";
import { specialMention } from "../types";
import { MessageLocalEvent, specialMention, ThreadExecutionContext } from "../types";
import User from "../../../services/user/entities/user";
import { RealtimeEntityActionType } from "../../../core/platform/services/realtime/types";
import { getThreadMessagePath } from "../web/realtime";
import { ResourcePath } from "../../../core/platform/services/realtime/types";
import { RealtimeLocalBusEvent } from "../../../core/platform/services/realtime/types";
import { localEventBus } from "../../../core/platform/framework/pubsub";
import { ParticipantObject } from "../entities/threads";

export const buildMessageListPagination = (
pagination: Pagination,
Expand Down Expand Up @@ -48,3 +58,45 @@ export const getMentions = async (
specials: (globalOutput || []).map(g => (g || "").trim().split("@").pop()) as specialMention[],
};
};

/**
* extracts the links from a message
*
* @param {Message} messageResource - The message to be parsed
* @returns {String} - links found in the message
*/
export const getLinks = (messageResource: Message): string[] => {
const links = (messageResource.text || "").match(/https?:\/\/[^ ]+/gm);
return links || [];
};

/**
* Publish a message to the realtime bus
*
* @param {MessageLocalEvent} message - The event to be published
* @param {ParticipantObject} participant - The participant
*/
export const publishMessageInRealtime = (
message: MessageLocalEvent,
participant: ParticipantObject,
): void => {
if (participant.type !== "channel") return;

const room = `/companies/${participant.company_id}/workspaces/${participant.workspace_id}/channels/${participant.id}/feed`;
const type = "message";
const entity = message.resource;
const context = message.context;

localEventBus.publish("realtime:publish", {
topic: message.created ? RealtimeEntityActionType.Created : RealtimeEntityActionType.Updated,
event: {
type,
room: ResourcePath.get(room),
resourcePath: getThreadMessagePath(context as ThreadExecutionContext) + "/" + entity.id,
entity,
result: message.created
? new CreateResult<Message>(type, entity)
: new UpdateResult<Message>(type, entity),
},
} as RealtimeLocalBusEvent<Message>);
};
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { PreviewPubsubHandler } from "../../api";
import { logger, TwakeContext } from "../../../../core/platform/framework";
import { PreviewClearPubsubRequest, PreviewPubsubCallback } from "../../types";
import gr from "../../../global-resolver";
import { PreviewPubsubHandler } from "../../../api";
import { logger, TwakeContext } from "../../../../../core/platform/framework";
import { PreviewClearPubsubRequest, PreviewPubsubCallback } from "../../../types";
import gr from "../../../../global-resolver";

/**
* Clear thumbnails when the delete task is called
Expand Down

0 comments on commit d9782bb

Please sign in to comment.