Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/backend/services/comment.action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import { and, eq, inArray } from "sqlkit";
import { CommentPresentation } from "../models/domain-models";
import { inngest } from "@/lib/inngest";
import { assertCommentResourceExists } from "./notifications.payload";
import { publishMessage } from "@/lib/pusher/pusher.server";
import {
publishMessage,
REALTIME_PUSHER_EVENTS,
} from "@/lib/pusher/pusher.server";

const sql = String.raw;

Expand Down Expand Up @@ -71,7 +74,7 @@ export const createMyComment = async (

void publishMessage(
`resource.${resource_type}.${resource_id}`,
"comment.created",
REALTIME_PUSHER_EVENTS.COMMENT_CREATED,
{ scope: "comments" },
);

Expand Down Expand Up @@ -106,7 +109,7 @@ export const updateMyComment = async (

void publishMessage(
`resource.${existing.resource_type}.${existing.resource_id}`,
"comment.updated",
REALTIME_PUSHER_EVENTS.COMMENT_UPDATED,
{ scope: "comments" },
);

Expand Down Expand Up @@ -164,7 +167,7 @@ export const deleteMyComment = async (

void publishMessage(
`resource.${root.resource_type}.${root.resource_id}`,
"comment.deleted",
REALTIME_PUSHER_EVENTS.COMMENT_DELETED,
{ scope: "comments" },
);

Expand Down
11 changes: 7 additions & 4 deletions src/components/comment-section.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ import { Button } from "./ui/button";
import { Skeleton } from "./ui/skeleton";
import { Textarea } from "./ui/textarea";
import getFileUrl from "@/utils/getFileUrl";
import { listenChannel } from "@/lib/pusher/pusher.client";
import {
listenChannel,
REALTIME_PUSHER_EVENTS,
} from "@/lib/pusher/pusher.client";

const Context = React.createContext<
{ mutatingId?: string; setMutatingId: (id?: string) => void } | undefined
Expand Down Expand Up @@ -166,9 +169,9 @@ export const CommentSection = (props: {
});
};
return listenChannel(channelName, {
"comment.created": invalidate,
"comment.updated": invalidate,
"comment.deleted": invalidate,
[REALTIME_PUSHER_EVENTS.COMMENT_CREATED]: invalidate,
[REALTIME_PUSHER_EVENTS.COMMENT_UPDATED]: invalidate,
[REALTIME_PUSHER_EVENTS.COMMENT_DELETED]: invalidate,
});
}, [props.resource_id, props.resource_type, queryClient]);

Expand Down
10 changes: 8 additions & 2 deletions src/components/providers/RealtimeProvider.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"use client";

import { listenChannel } from "@/lib/pusher/pusher.client";
import {
listenChannel,
REALTIME_PUSHER_EVENTS,
} from "@/lib/pusher/pusher.client";
import { useSession } from "@/store/session.atom";
import { useQueryClient } from "@tanstack/react-query";
import React, { PropsWithChildren, useEffect } from "react";
Expand Down Expand Up @@ -28,11 +31,14 @@ export function RealtimeProvider({ children }: PropsWithChildren) {
if (!userId) return;

const channelName = `private-user.${userId}`;
return listenChannel(channelName, "notification.new", () => {
const invalidate = () => {
queryClient.invalidateQueries({ queryKey: ["my-notifications"] });
queryClient.invalidateQueries({
queryKey: ["unread-notification-count"],
});
};
return listenChannel(channelName, {
[REALTIME_PUSHER_EVENTS.NOTIFICATION_NEW]: invalidate,
});
}, [userId, queryClient]);

Expand Down
13 changes: 9 additions & 4 deletions src/lib/inngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import {
import { persistenceRepository } from "@/backend/persistence/persistence-repositories";
import { ActionException } from "@/backend/services/RepositoryException";
import { buildPersistableNotification } from "@/backend/services/notifications.payload";
import { publishMessage } from "@/lib/pusher/pusher.server";
import {
publishMessage,
REALTIME_PUSHER_EVENTS,
} from "@/lib/pusher/pusher.server";
import { deleteExpiredArticles } from "@/backend/services/article-cleanup-service";

const notificationPayloadSchema = z.object({
Expand Down Expand Up @@ -191,9 +194,11 @@ export const persistNotificationFn = inngest.createFunction(

// Broadcast a lightweight signal so the recipient's browser can invalidate
// its TanStack Query caches without polling.
await publishMessage(`private-user.${data.recipient_id}`, "notification.new", {
scope: "notifications",
});
await publishMessage(
`private-user.${data.recipient_id}`,
REALTIME_PUSHER_EVENTS.NOTIFICATION_NEW,
{ scope: "notifications" },
);

return { success: true };
},
Expand Down
39 changes: 17 additions & 22 deletions src/lib/pusher/pusher.client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
import Pusher from "pusher-js";
import { env } from "@/env";
import type {
RealtimeListenHandlers,
RealtimePusherEvent,
} from "./realtime-events";

export {
REALTIME_PUSHER_EVENTS,
type RealtimeListenHandlers,
type RealtimePusherEvent,
} from "./realtime-events";

let _pusherClient: Pusher | null = null;

Expand All @@ -23,40 +33,25 @@ function getPusherClient(): Pusher | null {
return _pusherClient;
}

type EventHandlers = Record<string, () => void>;

export function listenChannel(
channel: string,
handlers: EventHandlers,
): () => void;
export function listenChannel(
channel: string,
event: string,
handler: () => void,
): () => void;
export function listenChannel(
channel: string,
eventOrHandlers: string | EventHandlers,
handler?: () => void,
handlers: RealtimeListenHandlers,
): () => void {
const handlers: EventHandlers =
typeof eventOrHandlers === "string" && handler !== undefined
? { [eventOrHandlers]: handler }
: (eventOrHandlers as EventHandlers);

const pusher = getPusherClient();
if (!pusher) {
return () => {};
}

const ch = pusher.subscribe(channel);
for (const [event, fn] of Object.entries(handlers)) {
ch.bind(event, fn);
for (const event of Object.keys(handlers) as RealtimePusherEvent[]) {
const fn = handlers[event];
if (fn) ch.bind(event, fn);
}

return () => {
for (const [event, fn] of Object.entries(handlers)) {
ch.unbind(event, fn);
for (const event of Object.keys(handlers) as RealtimePusherEvent[]) {
const fn = handlers[event];
if (fn) ch.unbind(event, fn);
}
pusher.unsubscribe(channel);
};
Expand Down
8 changes: 7 additions & 1 deletion src/lib/pusher/pusher.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import Pusher from "pusher";
import { env } from "@/env";
import type { RealtimePusherEvent } from "./realtime-events";

export {
REALTIME_PUSHER_EVENTS,
type RealtimePusherEvent,
} from "./realtime-events";

/**
* Lazy singleton for the server-side Pusher/Soketi client.
Expand Down Expand Up @@ -28,7 +34,7 @@ export const pusherServer = createPusherServer();
*/
export async function publishMessage(
channel: string,
event: "comment.created" | "comment.updated" | "comment.deleted",
event: RealtimePusherEvent,
data: Record<string, unknown> = {},
): Promise<void> {
console.log(`
Expand Down
18 changes: 18 additions & 0 deletions src/lib/pusher/realtime-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Canonical Pusher event names for app realtime (invalidate / refetch signals).
* Extend here so client listeners and server triggers stay aligned.
*/
export const REALTIME_PUSHER_EVENTS = {
NOTIFICATION_NEW: "notification.new",
COMMENT_CREATED: "comment.created",
COMMENT_UPDATED: "comment.updated",
COMMENT_DELETED: "comment.deleted",
} as const;

export type RealtimePusherEvent =
(typeof REALTIME_PUSHER_EVENTS)[keyof typeof REALTIME_PUSHER_EVENTS];

/** Handlers object for `listenChannel`: only known event names, each optional. */
export type RealtimeListenHandlers = {
[K in RealtimePusherEvent]?: () => void;
};