Skip to content

Commit

Permalink
feat: add redis cluster and tls extra options support
Browse files Browse the repository at this point in the history
  • Loading branch information
alekitto committed Apr 13, 2024
1 parent 63639ad commit aa9b7ae
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 34 deletions.
5 changes: 5 additions & 0 deletions .changeset/dull-frogs-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@logto/core": minor
---

add support for Redis Cluster and extra TLS options for Redis connections
45 changes: 32 additions & 13 deletions packages/core/src/caches/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import { createMockUtils } from '@logto/shared/esm';
import Sinon from 'sinon';

import { EnvSet } from '#src/env-set/index.js';

const { jest } = import.meta;
const { mockEsm } = createMockUtils(jest);
Expand All @@ -18,19 +15,21 @@ mockEsm('redis', () => ({
disconnect: mockFunction,
on: mockFunction,
}),
createCluster: () => ({
set: mockFunction,
get: mockFunction,
del: mockFunction,
sendCommand: async () => 'PONG',
connect: mockFunction,
disconnect: mockFunction,
on: mockFunction,
}),
}));

const { RedisCache } = await import('./index.js');

const stubRedisUrl = (url?: string) =>
Sinon.stub(EnvSet, 'values').value({
...EnvSet.values,
redisUrl: url,
});
const { RedisCache, RedisClusterCache } = await import('./index.js');

describe('RedisCache', () => {
it('should successfully construct with no REDIS_URL', async () => {
stubRedisUrl();
const cache = new RedisCache();

expect(cache.client).toBeUndefined();
Expand All @@ -47,8 +46,28 @@ describe('RedisCache', () => {
it('should successfully construct with a Redis client', async () => {
for (const url of ['1', 'redis://url']) {
jest.clearAllMocks();
stubRedisUrl(url);
const cache = new RedisCache();
const cache = new RedisCache(url);

expect(cache.client).toBeTruthy();

// eslint-disable-next-line no-await-in-loop
await Promise.all([
cache.set('foo', 'bar'),
cache.get('foo'),
cache.delete('foo'),
cache.connect(),
cache.disconnect(),
]);

// Do sanity check only
expect(mockFunction).toBeCalledTimes(6);
}
});

it('should successfully construct with a Redis Cluster client', async () => {
for (const url of ['redis://url', 'redis:?host=h1&host=h2&host=h3']) {
jest.clearAllMocks();
const cache = new RedisClusterCache(new URL(url));

expect(cache.client).toBeTruthy();

Expand Down
128 changes: 109 additions & 19 deletions packages/core/src/caches/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
import fs from 'node:fs';

import { appInsights } from '@logto/app-insights/node';
import { type Optional, conditional, yes } from '@silverhand/essentials';
import { createClient, type RedisClientType } from 'redis';
import { type Optional, conditional, yes, trySafe } from '@silverhand/essentials';
import { createClient, createCluster, type RedisClientType, type RedisClusterType } from 'redis';

import { EnvSet } from '#src/env-set/index.js';
import { consoleLog } from '#src/utils/console.js';

import { type CacheStore } from './types.js';

export class RedisCache implements CacheStore {
readonly client?: RedisClientType;

constructor() {
const { redisUrl } = EnvSet.values;

if (redisUrl) {
this.client = createClient({
url: conditional(!yes(redisUrl) && redisUrl),
});
this.client.on('error', (error) => {
void appInsights.trackException(error);
});
}
}
abstract class RedisCacheBase implements CacheStore {
readonly client?: RedisClientType | RedisClusterType;

async set(key: string, value: string, expire: number = 30 * 60) {
await this.client?.set(key, value, {
Expand All @@ -40,7 +29,7 @@ export class RedisCache implements CacheStore {
async connect() {
if (this.client) {
await this.client.connect();
const pong = await this.client.ping();
const pong = await this.ping();

if (pong === 'PONG') {
consoleLog.info('[CACHE] Connected to Redis');
Expand All @@ -56,6 +45,107 @@ export class RedisCache implements CacheStore {
consoleLog.info('[CACHE] Disconnected from Redis');
}
}

protected getSocketOptions(url: URL) {
const certFile = url.searchParams.get('cert');
const keyFile = url.searchParams.get('key');
const caFile = url.searchParams.get('certroot');

return {
rejectUnauthorized: yes(url.searchParams.get('reject_unauthorized')),
tls: url.protocol === 'rediss',
cert: certFile ? fs.readFileSync(certFile).toString() : undefined,
key: keyFile ? fs.readFileSync(keyFile).toString() : undefined,
ca: caFile ? fs.readFileSync(caFile).toString() : undefined,
reconnectStrategy: (retries: number, cause: Error) => {
if ('code' in cause && cause.code === 'SELF_SIGNED_CERT_IN_CHAIN') {
// This will throw only if reject unauthorized is true (default).
// Doesn't make sense to retry.
return false;
}

return Math.min(retries * 50, 500);
},
};
}

protected abstract ping(): Promise<string | undefined>;
}

export class RedisCache extends RedisCacheBase {
readonly client?: RedisClientType;

constructor(redisUrl?: string | undefined) {
super();

if (redisUrl) {
this.client = createClient({
url: conditional(!yes(redisUrl) && redisUrl),
socket: trySafe(() => this.getSocketOptions(new URL(redisUrl))),
});

this.client.on('error', (error) => {
void appInsights.trackException(error);
});
}
}

protected async ping(): Promise<string | undefined> {
return this.client?.ping();
}
}

export const redisCache = new RedisCache();
export class RedisClusterCache extends RedisCacheBase {
readonly client?: RedisClusterType;

constructor(connectionUrl: URL) {
super();

/* eslint-disable @silverhand/fp/no-mutating-methods */
const hosts = [];
if (connectionUrl.host) {
hosts.push(connectionUrl.host);
}
hosts.push(...connectionUrl.searchParams.getAll('host'));
/* eslint-enable @silverhand/fp/no-mutating-methods */

const rootNodes = hosts.map((host) => {
return {
url: 'redis://' + host,
};
});

this.client = createCluster({
rootNodes,
useReplicas: true,
defaults: {
socket: this.getSocketOptions(connectionUrl),
username: connectionUrl.username,
password: connectionUrl.password,
},
});

this.client.on('error', (error) => {
void appInsights.trackException(error);
});
}

protected async ping(): Promise<string | undefined> {
return this.client?.sendCommand(undefined, true, ['PING']);
}
}

const redisCacheFactory = (): RedisCacheBase => {
const { redisUrl } = EnvSet.values;

if (redisUrl) {
const url = new URL(redisUrl);
if (yes(url.searchParams.get('redis_cluster'))) {
return new RedisClusterCache(url);
}
}

return new RedisCache(redisUrl);
};

export const redisCache = redisCacheFactory();
4 changes: 2 additions & 2 deletions packages/core/src/tenants/Tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import koaLogger from 'koa-logger';
import mount from 'koa-mount';
import type Provider from 'oidc-provider';

import { type RedisCache } from '#src/caches/index.js';
import { type CacheStore } from '#src/caches/types.js';
import { WellKnownCache } from '#src/caches/well-known.js';
import { AdminApps, EnvSet, UserApps } from '#src/env-set/index.js';
import { createCloudConnectionLibrary } from '#src/libraries/cloud-connection.js';
Expand Down Expand Up @@ -35,7 +35,7 @@ import type TenantContext from './TenantContext.js';
import { getTenantDatabaseDsn } from './utils.js';

export default class Tenant implements TenantContext {
static async create(id: string, redisCache: RedisCache, customDomain?: string): Promise<Tenant> {
static async create(id: string, redisCache: CacheStore, customDomain?: string): Promise<Tenant> {
// Treat the default database URL as the management URL
const envSet = new EnvSet(id, await getTenantDatabaseDsn(id));
// Custom endpoint is used for building OIDC issuer URL when the request is a custom domain
Expand Down

0 comments on commit aa9b7ae

Please sign in to comment.