Skip to content

Commit

Permalink
Rework observability into logEvents.events + logEvents.log for full c…
Browse files Browse the repository at this point in the history
…ustomizability
  • Loading branch information
PabloSzx committed Aug 15, 2022
1 parent 9d1bdf2 commit 7cecccb
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 77 deletions.
5 changes: 0 additions & 5 deletions .changeset/ten-games-nail.md

This file was deleted.

5 changes: 5 additions & 0 deletions .changeset/unlucky-seas-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@soundxyz/redis-pubsub": major
---

Rework observability into logEvents.events + logEvents.log for full customizability
8 changes: 2 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"scripts": {
"prepublishOnly": "concurrently -r \"bob-ts -f interop -i src -d dist --no-sourcemap\" \"tsc -p tsconfig.build.json\"",
"release": "changeset publish",
"test": "c8 --100 --include=src --exclude=src/promise.ts ava",
"test": "c8 --check-coverage --lines=100 --statements=100 --branches=97 --functions=95 --include=src --exclude=src/promise.ts ava",
"test:watch": "ava --watch",
"test:watch:coverage": "bob-watch -w src test package.json -c \"pnpm test\""
},
Expand Down Expand Up @@ -63,24 +63,20 @@
"graphql-ez": "^0.15.1",
"graphql-ws": "^5.9.1",
"ioredis": "^5.0.5",
"pino": "^8.3.1",
"pino": "^8.4.1",
"prettier": "^2.6.2",
"typescript": "^4.7.2",
"wait-for-expect": "^3.0.2",
"zod": "^3.17.3"
},
"peerDependencies": {
"ioredis": "^5.0.5",
"pino": "*",
"zod": "^3.17.3"
},
"peerDependenciesMeta": {
"ioredis": {
"optional": true
},
"pino": {
"optional": true
},
"zod": {
"optional": true
}
Expand Down
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 71 additions & 49 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Redis } from "ioredis";
import type { Logger } from "pino";
import { parse, stringify } from "superjson";
import type { ZodSchema, ZodTypeDef } from "zod";
import {
Expand All @@ -9,21 +8,36 @@ import {
pubsubDeferredPromise,
} from "./promise";

export type LogLevel = "silent" | "info" | "tracing";
export type EventParamsObject = Record<string, string | number | boolean | null | undefined>;

export type LogEventArgs = { message: string; code: EventCodes; params: EventParamsObject };

export type LoggedEvents = Partial<
Record<EventCodes, string | boolean | null | ((args: LogEventArgs) => void)>
>;

function defaultLog({ message }: LogEventArgs) {
console.log(message);
}

export interface RedisPubSubOptions {
publisher: Redis;
subscriber: Redis;
logger: Logger;
/**
* @default "silent"
*/
logLevel?: LogLevel;
onParseError?: (err: unknown) => void;
/**
* Customize or disable the specified event codes messages
* Enable and customize observability
*/
customizeEventCodes?: Partial<Record<EventCodes, string | boolean | null>>;
logEvents?: {
/**
* Set specific events to enable logging
*/
events: LoggedEvents;

/**
* @default console.log
*/
log?: (args: LogEventArgs) => void;
};
}

export const EventCodes = {
Expand All @@ -48,15 +62,9 @@ export type EventCodes = typeof EventCodes[keyof typeof EventCodes];
export function RedisPubSub({
publisher,
subscriber,
logger,
logLevel = "silent",
onParseError = logger.error,
customizeEventCodes,
logEvents,
onParseError = console.error,
}: RedisPubSubOptions) {
const intLogLevel = logLevel === "silent" ? 0 : logLevel === "info" ? 1 : 2;

const customizedEventCodes = { ...EventCodes, ...customizeEventCodes };

interface DataPromise {
current: PubSubDeferredPromise<unknown>;
unsubscribe: () => Promise<void>;
Expand All @@ -77,43 +85,57 @@ export function RedisPubSub({
const subscribedChannels: Record<string, boolean | Promise<void>> = {};
const unsubscribingChannels: Record<string, Promise<void> | false> = {};

const enabledLogEvents = logEvents?.events;

const logMessage = logEvents
? function logMessage(code: EventCodes, params: EventParamsObject) {
const eventValue = logEvents.events[code];

if (!eventValue) return;

const log = typeof eventValue === "function" ? eventValue : logEvents.log || defaultLog;

const codeMessageValue = typeof eventValue === "string" ? eventValue : code;

let paramsString = "";

for (const key in params) {
let value = params[key];

if (value === undefined) continue;

if (value === "") value = "null";

paramsString += " " + key + "=" + value;
}

log({
code,
message: `[${codeMessageValue}]${paramsString}`,
params,
});
}
: () => void 0;

return {
createChannel,
unsubscribeAll,
close,
};

function getTracing() {
if (intLogLevel < 2) return null;

const start = performance.now();

return () => `${(performance.now() - start).toFixed()}ms`;
}

function logMessage(code: EventCodes, paramsObject: Record<string, string | number>) {
let codeValue = customizedEventCodes[code];

if (!codeValue) return;

if (typeof codeValue !== "string") codeValue = EventCodes[code];

let params = "";

for (const key in paramsObject) {
params += " " + key + "=" + paramsObject[key];
}

logger.info(`[${codeValue}]${params}`);
}

async function onMessage(channel: string, message: string) {
const tracing = getTracing();
const tracing = enabledLogEvents?.SUBSCRIPTION_MESSAGE_EXECUTION_TIME ? getTracing() : null;

const subscription = subscriptionsMap[channel];

if (!subscription?.dataPromises.size) {
if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIPTION_MESSAGE_WITHOUT_SUBSCRIBERS) {
logMessage("SUBSCRIPTION_MESSAGE_WITHOUT_SUBSCRIBERS", { channel });
}
return;
Expand All @@ -128,7 +150,7 @@ export function RedisPubSub({
return;
}

if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIPTION_MESSAGE_WITHOUT_SUBSCRIBERS) {
logMessage("SUBSCRIPTION_MESSAGE_WITH_SUBSCRIBERS", {
channel,
subscribers: subscription.dataPromises.size,
Expand All @@ -154,7 +176,7 @@ export function RedisPubSub({

if (subscribed) {
if (typeof subscribed === "boolean") {
if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIBE_REUSE_REDIS_SUBSCRIPTION) {
logMessage("SUBSCRIBE_REUSE_REDIS_SUBSCRIPTION", {
channel,
});
Expand All @@ -165,13 +187,13 @@ export function RedisPubSub({
return subscribed;
}

const tracing = getTracing();
const tracing = enabledLogEvents?.SUBSCRIBE_EXECUTION_TIME ? getTracing() : null;

return (subscribedChannels[channel] = subscriber.subscribe(channel).then(
() => {
subscribedChannels[channel] = true;

if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIBE_REDIS) {
logMessage("SUBSCRIBE_REDIS", {
channel,
});
Expand Down Expand Up @@ -208,13 +230,13 @@ export function RedisPubSub({
return subcribed.then(unsubscribe);

function unsubscribe() {
const tracing = getTracing();
const tracing = enabledLogEvents?.UNSUBSCRIBE_EXECUTION_TIME ? getTracing() : null;

return (unsubscribingChannels[channel] = subscriber.unsubscribe(channel).then(
() => {
unsubscribingChannels[channel] = subscribedChannels[channel] = false;

if (intLogLevel) {
if (enabledLogEvents?.UNSUBSCRIBE_REDIS) {
logMessage("UNSUBSCRIBE_REDIS", {
channel,
});
Expand Down Expand Up @@ -352,7 +374,7 @@ export function RedisPubSub({
abortSignal.addEventListener(
"abort",
(abortListener = () => {
if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIPTION_ABORTED) {
logMessage("SUBSCRIPTION_ABORTED", {
channel,
subscribers: dataPromises.size,
Expand All @@ -377,7 +399,7 @@ export function RedisPubSub({
abortSignal.removeEventListener("abort", abortListener);
}

if (intLogLevel) {
if (enabledLogEvents?.UNSUBSCRIBE_CHANNEL) {
logMessage("UNSUBSCRIBE_CHANNEL", {
channel,
subscribers: dataPromises.size,
Expand Down Expand Up @@ -408,7 +430,7 @@ export function RedisPubSub({

for (const value of dataPromise.current.values as Output[]) {
if (filter && !filter(value)) {
if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIPTION_MESSAGE_FILTERED_OUT) {
logMessage("SUBSCRIPTION_MESSAGE_FILTERED_OUT", {
channel,
});
Expand All @@ -420,7 +442,7 @@ export function RedisPubSub({
}

if (dataPromise.current.isDone) {
if (intLogLevel) {
if (enabledLogEvents?.SUBSCRIPTION_FINISHED) {
logMessage("SUBSCRIPTION_FINISHED", {
channel,
});
Expand Down Expand Up @@ -482,7 +504,7 @@ export function RedisPubSub({
) {
await Promise.all(
values.map(async ({ value, identifier }) => {
const tracing = getTracing();
const tracing = enabledLogEvents?.PUBLISH_MESSAGE_EXECUTION_TIME ? getTracing() : null;

let parsedValue: Input | Output;

Expand All @@ -497,7 +519,7 @@ export function RedisPubSub({

await publisher.publish(channel, stringify(parsedValue));

if (intLogLevel) {
if (enabledLogEvents?.PUBLISH_MESSAGE) {
logMessage("PUBLISH_MESSAGE", {
channel,
});
Expand Down
35 changes: 28 additions & 7 deletions test/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
import { RedisPubSub, RedisPubSubOptions } from "../src";
import { LoggedEvents, RedisPubSub, RedisPubSubOptions } from "../src";
import Pino from "pino";
import Redis from "ioredis";

export const getPubsub = (options?: Partial<RedisPubSubOptions>) => {
const logger = Pino({
level: process.env.CI ? "warn" : "info",
});
const logEverything: Required<LoggedEvents> = {
PUBLISH_MESSAGE: true,
PUBLISH_MESSAGE_EXECUTION_TIME: true,
SUBSCRIBE_EXECUTION_TIME: true,
SUBSCRIBE_REDIS: true,
SUBSCRIBE_REUSE_REDIS_SUBSCRIPTION: true,
SUBSCRIPTION_ABORTED: true,
SUBSCRIPTION_FINISHED: true,
SUBSCRIPTION_MESSAGE_EXECUTION_TIME: true,
SUBSCRIPTION_MESSAGE_FILTERED_OUT: true,
SUBSCRIPTION_MESSAGE_WITH_SUBSCRIBERS: true,
SUBSCRIPTION_MESSAGE_WITHOUT_SUBSCRIBERS: true,
UNSUBSCRIBE_CHANNEL: true,
UNSUBSCRIBE_EXECUTION_TIME: true,
UNSUBSCRIBE_REDIS: true,
};

export const logger = Pino({
level: process.env.CI ? "warn" : "info",
});

export const getPubsub = (options?: Partial<RedisPubSubOptions>) => {
const publisher = new Redis({
port: 6389,
});
Expand All @@ -17,8 +34,12 @@ export const getPubsub = (options?: Partial<RedisPubSubOptions>) => {
const pubSub = RedisPubSub({
publisher,
subscriber,
logger,
logLevel: "tracing",
logEvents: {
events: logEverything,
log({ message }) {
logger.info(message);
},
},
...options,
});

Expand Down

0 comments on commit 7cecccb

Please sign in to comment.