Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
74ab680
feat: add TracingChannel support with argument sanitization
logaretm Mar 25, 2026
8cbedb4
test: add comprehensive unit tests for sanitizeArgs
logaretm Mar 25, 2026
26cf19a
refactor: use ? placeholder for redacted args
logaretm Mar 25, 2026
73aa8fb
refactor: move OTel command metrics to TracingChannel subscribers
logaretm Mar 25, 2026
65a0ba2
refactor: remove NoopCommandMetrics class
logaretm Mar 25, 2026
556dc38
refactor: move all inline OTel metrics to diagnostics_channel events
logaretm Mar 25, 2026
d3f7742
refactor: consolidate all OTel metric classes into channel subscribers
logaretm Mar 25, 2026
5f9cc04
test: remove no-op channel publish tests
logaretm Mar 25, 2026
b1248bd
refactor: use CHANNELS map consistently and tracingChannel API for su…
logaretm Mar 25, 2026
3644481
chore: remove dead types and interfaces from opentelemetry module
logaretm Mar 26, 2026
fea0d3e
refactor: deduplicate diagnostics_channel loading
logaretm Mar 26, 2026
0b58464
refactor: rename BatchTraceContext to BatchOperationContext
logaretm Mar 26, 2026
5b8c217
chore: remove unused CONNECTION_WAIT_START event
logaretm Mar 26, 2026
1b588a7
fix: restore wasReady guard on connection count decrement
logaretm Mar 26, 2026
f381a5e
feat: export all channel names and event types for APM consumers
logaretm Mar 26, 2026
e848850
refactor: convert pool wait time to TracingChannel
logaretm Mar 26, 2026
bc11aa5
fix: consistent serverPort typing across all event interfaces
logaretm Mar 26, 2026
f2f618b
refactor: add getTracingChannel helper with auto-resolved context types
logaretm Mar 26, 2026
80c70d7
refactor: stop exporting dc, use getTracingChannel and getChannel ins…
logaretm Mar 26, 2026
17c98f0
refactor: unify trace functions into single generic trace()
logaretm Mar 26, 2026
794e3ad
refactor: store unsubscriber closures instead of handler references
logaretm Mar 26, 2026
1364ebe
chore: delete noop-metrics.ts
logaretm Mar 26, 2026
94abf86
fix: deduplicate COMMAND_REPLY events and merge subscriber
logaretm Mar 26, 2026
1bdc4ee
fix: reject tracing promise on pool acquire timeout
logaretm Mar 26, 2026
3dfaa23
chore: remove noop-meter.ts
logaretm Mar 26, 2026
0d90da4
fix: sanitize COMMAND_REPLY args and catch pool trace promise
logaretm Mar 26, 2026
11d2b40
fix: reject pending wait traces on pool destroy
logaretm Mar 26, 2026
264286d
style: move const noop after imports in pool.ts
logaretm Mar 26, 2026
d9f870d
fix: E2E test failures from refactor
logaretm Mar 26, 2026
d4fa6a2
fix: record connection wait time when client is immediately available
logaretm Mar 26, 2026
625146f
style: clean up comments, remove em dashes, be concise
logaretm Mar 26, 2026
fad7566
fix(perf): optimize channel acquisition
logaretm Mar 26, 2026
f870361
refactor: deduplicate command error reporting, use TracingChannel err…
logaretm Mar 26, 2026
1ea1db2
fix: filter redirection errors at call site, not in #recordError
logaretm Mar 27, 2026
5e5d25f
fix: use ctx.origin instead of ctx.internal for redirection deduplica…
logaretm Mar 27, 2026
82a03ee
fix: guard against disabled metrics in OTelMetrics constructor
logaretm Mar 27, 2026
7927297
fix: treat MULTI as atomic operation, remove per-command tracing
logaretm Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,23 @@ export { SetOptions, CLIENT_KILL_FILTERS, FAILOVER_MODES, CLUSTER_SLOT_STATES, C

export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
export { OpenTelemetry } from './lib/opentelemetry';

export {
CHANNELS,
type ChannelEvents,
type CommandTraceContext,
type BatchCommandTraceContext,
type BatchOperationContext,
type ConnectTraceContext,
type ConnectionReadyEvent,
type ConnectionClosedEvent,
type ConnectionRelaxedTimeoutEvent,
type ConnectionHandoffEvent,
type ConnectionWaitContext,
type ClientErrorEvent,
type MaintenanceNotificationEvent,
type PubSubMessageEvent,
type CacheRequestEvent,
type CacheEvictionEvent,
type CommandReplyEvent,
} from './lib/client/tracing';
29 changes: 8 additions & 21 deletions packages/client/lib/client/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EventEmitter } from 'stream';
import RedisClient from '.';
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
import { BasicCommandParser } from './parser';
import { OTelMetrics, CSC_RESULT, CSC_EVICTION_REASON } from '../opentelemetry';
import { publish, CHANNELS } from './tracing';

/**
* A snapshot of cache statistics.
Expand Down Expand Up @@ -558,33 +558,20 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
// If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters.
if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1"
this.#statsCounter.recordHits(1);
OTelMetrics.instance.clientSideCacheMetrics.recordCacheRequest(
CSC_RESULT.HIT,
client._clientId,
);
OTelMetrics.instance.clientSideCacheMetrics.recordNetworkBytesSaved(
cacheEntry.value,
client._clientId,
);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'hit', clientId: client._clientId }));

return structuredClone(cacheEntry.value);
} else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2
// This counts as a miss since the value hasn't been fully loaded yet.
this.#statsCounter.recordMisses(1);
OTelMetrics.instance.clientSideCacheMetrics.recordCacheRequest(
CSC_RESULT.MISS,
client._clientId,
);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'miss', clientId: client._clientId }));
reply = await cacheEntry.promise;
} else {
throw new Error("unknown cache entry type");
}
} else { // 3/3a
this.#statsCounter.recordMisses(1);
OTelMetrics.instance.clientSideCacheMetrics.recordCacheRequest(
CSC_RESULT.MISS,
client._clientId,
);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'miss', clientId: client._clientId }));

const startTime = performance.now();
const promise = fn();
Expand Down Expand Up @@ -640,7 +627,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.clear(false);
// Record invalidations as server-initiated evictions
if (oldSize > 0) {
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.INVALIDATION, oldSize);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'invalidation', count: oldSize }));
}
this.emit("invalidate", key);

Expand All @@ -661,7 +648,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.#keyToCacheKeySetMap.delete(key.toString());
if (deletedCount > 0) {
// Record invalidations as server-initiated evictions
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.INVALIDATION, deletedCount);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'invalidation', count: deletedCount }));
}
}

Expand Down Expand Up @@ -692,7 +679,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.delete(cacheKey);
this.#statsCounter.recordEvictions(1);
// Entry failed validation - this is TTL expiry since invalidation marks are handled separately
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.TTL);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'ttl', count: 1 }));
this.emit("cache-evict", cacheKey);

return undefined;
Expand Down Expand Up @@ -731,7 +718,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.deleteOldest();
this.#statsCounter.recordEvictions(1);
// Eviction due to cache capacity limit
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.FULL);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'full', count: 1 }));
}

this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry);
Expand Down
21 changes: 9 additions & 12 deletions packages/client/lib/client/enterprise-maintenance-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import { setTimeout } from "node:timers/promises";
import { RedisTcpSocketOptions } from "./socket";
import diagnostics_channel from "node:diagnostics_channel";
import { RedisArgument } from "../RESP/types";
import { OTelMetrics } from "../opentelemetry";
import { METRIC_ERROR_ORIGIN } from "../opentelemetry/types";
import { publish, CHANNELS } from "./tracing";

type RedisType = RedisClient<any, any, any, any, any>;

Expand Down Expand Up @@ -124,12 +123,12 @@ export default class EnterpriseMaintenanceManager {
errorHandler: (error: Error) => {
dbgMaintenance("handshake failed:", error);

OTelMetrics.instance.resiliencyMetrics.recordClientErrors({
publish(CHANNELS.ERROR, () => ({
error,
origin: METRIC_ERROR_ORIGIN.CLIENT,
origin: 'client',
internal: true,
clientId,
});
}));

if (options.maintNotifications === "enabled") {
throw error;
Expand Down Expand Up @@ -160,10 +159,10 @@ export default class EnterpriseMaintenanceManager {

const type = String(push[0]);

OTelMetrics.instance.resiliencyMetrics.recordMaintenanceNotifications(
type,
this.#client._clientId,
);
publish(CHANNELS.MAINTENANCE, () => ({
notification: type,
clientId: this.#client._clientId,
}));

emitDiagnostics({
type,
Expand Down Expand Up @@ -306,9 +305,7 @@ export default class EnterpriseMaintenanceManager {
dbgMaintenance("Resume writing");
this.#client._unpause();
this.#onMigrated();
OTelMetrics.instance.connectionBasicMetrics.recordConnectionHandoff(
this.#client._clientId,
);
publish(CHANNELS.CONNECTION_HANDOFF, () => ({ clientId: this.#client._clientId }));
};

#onMigrating = () => {
Expand Down
Loading