diff --git a/src/backend/models/domain-models.ts b/src/backend/models/domain-models.ts index b9fc34c..ea16048 100644 --- a/src/backend/models/domain-models.ts +++ b/src/backend/models/domain-models.ts @@ -196,6 +196,7 @@ export enum DIRECTORY_NAME { } export interface Reaction { + id?: string; resource_id: string; resource_type: "ARTICLE" | "COMMENT" | "GIST"; reaction_type: REACTION_TYPE; diff --git a/src/backend/services/comment.action.ts b/src/backend/services/comment.action.ts index fe84df7..d7f6309 100644 --- a/src/backend/services/comment.action.ts +++ b/src/backend/services/comment.action.ts @@ -47,9 +47,11 @@ export const createMyComment = async ( await assertCommentResourceExists(resource_id, resource_type); + const commentId = input.comment_id ?? crypto.randomUUID(); + const created = await persistenceRepository.comment.insert([ { - id: input.comment_id ?? crypto.randomUUID(), + id: commentId, body, resource_id, resource_type, @@ -59,6 +61,7 @@ export const createMyComment = async ( inngest .send({ + id: `notif:comment:${commentId}`, name: "app/notification.requested", data: { actor_id: sessionId, @@ -72,7 +75,7 @@ export const createMyComment = async ( console.error("[inngest] Failed to send notification event:", err); }); - void publishMessage( + await publishMessage( `resource.${resource_type}.${resource_id}`, REALTIME_PUSHER_EVENTS.COMMENT_CREATED, { scope: "comments" }, @@ -107,7 +110,7 @@ export const updateMyComment = async ( data: { body: input.body, updated_at: new Date() }, }); - void publishMessage( + await publishMessage( `resource.${existing.resource_type}.${existing.resource_id}`, REALTIME_PUSHER_EVENTS.COMMENT_UPDATED, { scope: "comments" }, @@ -165,7 +168,7 @@ export const deleteMyComment = async ( where: inArray("id", ids), }); - void publishMessage( + await publishMessage( `resource.${root.resource_type}.${root.resource_id}`, REALTIME_PUSHER_EVENTS.COMMENT_DELETED, { scope: "comments" }, diff --git a/src/backend/services/reaction.actions.ts b/src/backend/services/reaction.actions.ts index cba6f87..ae99c0e 100644 --- a/src/backend/services/reaction.actions.ts +++ b/src/backend/services/reaction.actions.ts @@ -69,9 +69,12 @@ export async function toogleReaction( }; } + const reactionId = crypto.randomUUID(); + // If reaction does not exist, create it await persistenceRepository.reaction.insert([ { + id: reactionId, resource_id: input.resource_id, resource_type: input.resource_type, reaction_type: input.reaction_type, @@ -83,6 +86,7 @@ export async function toogleReaction( // Send notification event for insert path only (log errors, don't fail mutation) inngest .send({ + id: `notif:reaction:${reactionId}`, name: "app/notification.requested", data: { actor_id: sessionUserId, diff --git a/src/lib/inngest.ts b/src/lib/inngest.ts index f8c6349..92fb52a 100644 --- a/src/lib/inngest.ts +++ b/src/lib/inngest.ts @@ -122,13 +122,13 @@ export const persistNotificationFn = inngest.createFunction( id: "persist-notification", triggers: [{ event: "app/notification.requested" }], }, - async ({ event }: { event: { data: NotificationEventData } }) => { + async ({ event, step }) => { const parsed = notificationEventSchema.safeParse(event.data); if (!parsed.success) { return { skipped: true, reason: "invalid-notification-payload" }; } - let data = parsed.data; + let data: NotificationEventData = parsed.data; if (data.reaction_request && data.actor_id) { const built = await buildPersistableNotification({ @@ -182,23 +182,46 @@ export const persistNotificationFn = inngest.createFunction( return { skipped: true, reason: "self-notification" }; } - await persistenceRepository.notification.insert([ - { - recipient_id: data.recipient_id, - actor_id: data.actor_id ?? null, - type: data.type as NotificationType, - payload: (data.payload ?? null) as NotificationPayload | null, - created_at: new Date(), - }, - ]); + const row = { + recipient_id: data.recipient_id, + actor_id: data.actor_id ?? null, + type: data.type as NotificationType, + payload: (data.payload ?? null) as NotificationPayload | null, + created_at: new Date(), + }; - // Broadcast a lightweight signal so the recipient's browser can invalidate - // its TanStack Query caches without polling. - await publishMessage( - `private-user.${data.recipient_id}`, - REALTIME_PUSHER_EVENTS.NOTIFICATION_NEW, - { scope: "notifications" }, - ); + await step.run("insert-notification-row", async () => { + try { + const result = await persistenceRepository.notification.insert([row]); + return { + insertedRow: result?.rows?.[0], + }; + } catch (err) { + return { + insertedRow: null, + message: "Failed to insert notification row", + }; + } + }); + + await step.run("publish-notification-realtime", async () => { + try { + await publishMessage( + `private-user.${data.recipient_id}`, + REALTIME_PUSHER_EVENTS.NOTIFICATION_NEW, + { scope: "notifications" }, + ); + return { + published: true, + message: "Notification published successfully", + }; + } catch (err) { + return { + published: false, + message: "Failed to publish notification", + }; + } + }); return { success: true }; }, diff --git a/src/lib/pusher/pusher.server.ts b/src/lib/pusher/pusher.server.ts index c57b473..c6fcfb9 100644 --- a/src/lib/pusher/pusher.server.ts +++ b/src/lib/pusher/pusher.server.ts @@ -36,17 +36,12 @@ export async function publishMessage( channel: string, event: RealtimePusherEvent, data: Record = {}, -): Promise { - console.log(` - [pusher] Publishing message to channel ${channel} with event ${event} and data ${JSON.stringify(data)} - `); - - pusherServer - ?.trigger(channel, event, data) - .then((data) => { - console.log("[pusher] Published message successfully"); - }) - .catch((err) => { - console.error("[pusher] Failed to publish message:", JSON.stringify(err)); - }); +) { + if (!pusherServer) return; + try { + return await pusherServer.trigger(channel, event, data); + } catch (err) { + console.error("[pusher] Failed to publish message:", JSON.stringify(err)); + return null; + } }